匯入資料
python
當資料是csv 是gz 壓縮 很多個的情境
import requests
from pathlib import Path
CLICKHOUSE_URL = "http://default:123456@localhost:8123/"
TABLE_NAME = "mrt"
DATA_DIR = "./data"
def upload_file(file_path):
print(f"Uploading {file_path}")
with open(file_path, "rb") as f:
r = requests.post(
CLICKHOUSE_URL,
headers={"Content-Encoding": "gzip"},
params={
"query": f"INSERT INTO {TABLE_NAME} FORMAT CSVWithNames",
"max_insert_block_size": 1000000
},
data=f,
timeout=600
)
if r.status_code != 200:
print("Error:", r.text)
else:
print("Success")
def main():
for file_path in Path(DATA_DIR).rglob("*.csv.gz"):
upload_file(str(file_path))
if __name__ == "__main__":
main()
當內容是 json 是gz 且雜亂無章 且多層資料夾
- DDL
CREATE TABLE raw_json_data
(
raw String
)
ENGINE = MergeTree
ORDER BY tuple();
- 如何取出
SELECT
JSONExtractString(raw, 'name') AS name,
JSONExtractUInt(raw, 'id') AS id
FROM raw_json_data
LIMIT 10;
- 如何匯入資料
import os
import requests
import gzip
import json
from pathlib import Path
# ====== 配置 ======
CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 8123
CLICKHOUSE_USER = "default"
CLICKHOUSE_PASSWORD = "123456"
TABLE_NAME = "raw_json_data"
DATA_DIR = "./data" # 你的 JSON.gz 資料夾路徑
# ==================
# HTTP API URL
CLICKHOUSE_URL = f"http://{CLICKHOUSE_USER}:{CLICKHOUSE_PASSWORD}@{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/"
def insert_gz_file(file_path):
"""將單個 json.gz 檔丟到 ClickHouse"""
print(f"Uploading {file_path} ...")
with open(file_path, "rb") as f:
try:
r = requests.post(
CLICKHOUSE_URL,
headers={"Content-Encoding": "gzip"},
params={"query": f"INSERT INTO {TABLE_NAME} FORMAT LineAsString"},
data=f,
timeout=300 # 避免大檔案 timeout
)
if r.status_code != 200:
print(f"Error: {r.text}")
else:
print(f"Uploaded {file_path} successfully!")
except Exception as e:
print(f"Exception uploading {file_path}: {e}")
def scan_and_upload(directory):
"""遞迴掃描資料夾並上傳所有 .json.gz"""
path = Path(directory)
for file_path in path.rglob("*.json.gz"):
insert_gz_file(str(file_path))
def parse_sample_json(file_path, n=5):
"""解析 sample JSON 內容,查看欄位結構"""
print(f"\nParsing sample from {file_path}:")
with gzip.open(file_path, "rt", encoding="utf-8") as f:
for i, line in enumerate(f):
if i >= n:
break
try:
data = json.loads(line)
print(data)
except Exception as e:
print(f"Invalid JSON: {line} ({e})")
if __name__ == "__main__":
# 1️⃣ 可先查看 sample JSON
for sample_file in Path(DATA_DIR).rglob("*.json.gz"):
parse_sample_json(sample_file)
break # 只看一個 sample
# 2️⃣ 上傳資料夾內所有 JSON.gz
scan_and_upload(DATA_DIR)
創 materialized view
create materialized view bidder engine = AggregatingMergeTree()
order by
sessionId POPULATE as
select
JSONExtractString(raw, 'sessionId') as sessionId,
MAX(JSONExtractString(raw, 'impreEvent')) as impreEvent,
MAX(JSONExtractString(raw, 'bidRequest')) as bidRequest,
MAX(JSONExtractString(raw, 'bidResponse')) as bidResponse,
MAX(JSONExtractString(raw, 'clickEvent')) as clickEvent,
maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'request') as request_ts,
maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'impres') as impres_ts,
maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'click') as click_ts,
maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'response') as response_ts
from
(
select
JSONExtractString(raw, 'ts') ts,
case
when JSONHas(raw, 'clickEvent') then 'click'
when JSONHas(raw, 'impreEvent') then 'impres'
when JSONHas(raw, 'bidResponse') then 'response'
when JSONHas(raw, 'bidRequest') then 'request'
else 'other'
end as type
,
raw
from
raw_json_data)
group by
sessionId;
JSONExtractString