Skip to content

如何安裝 clickhouse

Published: at 上午07:39

匯入資料

python

當資料是csv 是gz 壓縮 很多個的情境

import requests
from pathlib import Path

CLICKHOUSE_URL = "http://default:123456@localhost:8123/"
TABLE_NAME = "mrt"
DATA_DIR = "./data"

def upload_file(file_path):
    print(f"Uploading {file_path}")

    with open(file_path, "rb") as f:
        r = requests.post(
            CLICKHOUSE_URL,
            headers={"Content-Encoding": "gzip"},
            params={
                "query": f"INSERT INTO {TABLE_NAME} FORMAT CSVWithNames",
                "max_insert_block_size": 1000000
            },
            data=f,
            timeout=600
        )

    if r.status_code != 200:
        print("Error:", r.text)
    else:
        print("Success")

def main():
    for file_path in Path(DATA_DIR).rglob("*.csv.gz"):
        upload_file(str(file_path))

if __name__ == "__main__":
    main()

當內容是 json 是gz 且雜亂無章 且多層資料夾


CREATE TABLE raw_json_data
(
    raw String
)
ENGINE = MergeTree
ORDER BY tuple();
SELECT 
    JSONExtractString(raw, 'name') AS name,
    JSONExtractUInt(raw, 'id') AS id
FROM raw_json_data
LIMIT 10;
import os
import requests
import gzip
import json
from pathlib import Path

# ====== 配置 ======
CLICKHOUSE_HOST = "localhost"
CLICKHOUSE_PORT = 8123
CLICKHOUSE_USER = "default"
CLICKHOUSE_PASSWORD = "123456"
TABLE_NAME = "raw_json_data"
DATA_DIR = "./data"  # 你的 JSON.gz 資料夾路徑
# ==================

# HTTP API URL
CLICKHOUSE_URL = f"http://{CLICKHOUSE_USER}:{CLICKHOUSE_PASSWORD}@{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}/"

def insert_gz_file(file_path):
    """將單個 json.gz 檔丟到 ClickHouse"""
    print(f"Uploading {file_path} ...")
    with open(file_path, "rb") as f:
        try:
            r = requests.post(
                CLICKHOUSE_URL,
                headers={"Content-Encoding": "gzip"},
                params={"query": f"INSERT INTO {TABLE_NAME} FORMAT LineAsString"},
                data=f,
                timeout=300  # 避免大檔案 timeout
            )
            if r.status_code != 200:
                print(f"Error: {r.text}")
            else:
                print(f"Uploaded {file_path} successfully!")
        except Exception as e:
            print(f"Exception uploading {file_path}: {e}")

def scan_and_upload(directory):
    """遞迴掃描資料夾並上傳所有 .json.gz"""
    path = Path(directory)
    for file_path in path.rglob("*.json.gz"):
        insert_gz_file(str(file_path))

def parse_sample_json(file_path, n=5):
    """解析 sample JSON 內容,查看欄位結構"""
    print(f"\nParsing sample from {file_path}:")
    with gzip.open(file_path, "rt", encoding="utf-8") as f:
        for i, line in enumerate(f):
            if i >= n:
                break
            try:
                data = json.loads(line)
                print(data)
            except Exception as e:
                print(f"Invalid JSON: {line} ({e})")

if __name__ == "__main__":
    # 1️⃣ 可先查看 sample JSON
    for sample_file in Path(DATA_DIR).rglob("*.json.gz"):
        parse_sample_json(sample_file)
        break  # 只看一個 sample

    # 2️⃣ 上傳資料夾內所有 JSON.gz
    scan_and_upload(DATA_DIR)

創 materialized view

create materialized view bidder engine = AggregatingMergeTree()
order by
sessionId POPULATE as
select
	JSONExtractString(raw, 'sessionId') as sessionId,
	MAX(JSONExtractString(raw, 'impreEvent')) as impreEvent,
	MAX(JSONExtractString(raw, 'bidRequest')) as bidRequest,
	MAX(JSONExtractString(raw, 'bidResponse')) as bidResponse,
	MAX(JSONExtractString(raw, 'clickEvent')) as clickEvent,
	maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'request') as request_ts,
	maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'impres') as impres_ts,
	maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'click') as click_ts,
	maxIf(toDateTime64(toInt64(ts)/ 1000, 3), type = 'response') as response_ts
from
	(
	select
		JSONExtractString(raw, 'ts') ts,
		case
			when JSONHas(raw, 'clickEvent') then 'click'
			when JSONHas(raw, 'impreEvent') then 'impres'
			when JSONHas(raw, 'bidResponse') then 'response'
			when JSONHas(raw, 'bidRequest') then 'request'
			else 'other'
		end as type
,
		raw
	from
		raw_json_data)
group by
	sessionId;

JSONExtractString