airflow
docker-compose.yml
改從dockerfile 打包自己稍微客製化的 airflowimage
- 新增一個Dockerfile 並且新增一個requirement.txt (裡面寫要裝得套件)
FROM apache/airflow:2.10.3
ADD requirements.txt .
RUN pip install apache-airflow==${AIRFLOW_VERSION} -r requirements.txt
- 然後將docker-compose.yml image 部分註解 build . 註解拿掉

test-connection disabled 的問題
- 一般設定完 db connection 都會做測試連線 但airflow 因安全問題 test 按鈕是disalbed 的 所以要
在docker-compose.yml 中 environment 的地方增加
AIRFLOW__CORE__TEST_CONNECTION: 'Enabled'
如果需要連線 mssql
需要走 dockerfile 自包image 的做法
並且在requirements.txt 中新增
apache-airflow-providers-microsoft-mssql[common.sql]==4.0.0
新版問題
- 如果是 2.10.4版本的話 dag task 會報錯 無法執行 進去work 看才發現有錯誤
排程寫法
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.timetables.trigger import CronTriggerTimetable
with DAG(
"your_dag",
description="your_dag",
schedule=CronTriggerTimetable(
cron="* * * * *", # UTC 時間晚上 3 點對應 UTC+8 時區晚上 11 點
timezone="Asia/Taipei", # 指定時區為台北(UTC+8)
),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False, # 重啟時 忽略 重啟時間和停止時間差的作業
tags=["test"],
) as dag:
GCS TO bq
單一檔案
load_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bq",
gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
bucket="{{YOUR_GCS_BUCKET_NAME}}", # 替換為你的 GCS bucket 名稱
# source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}"], # 替換為你的文件路徑
destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}", # BigQuery 表的完整名稱
schema_fields={{YOUR_BQ_TABLE_FORMAT}},
source_format="NEWLINE_DELIMITED_JSON", # 支援的格式包括 CSV、JSON、AVRO 等
write_disposition="WRITE_APPEND", # 覆蓋表內容
max_bad_records=999,
time_partitioning={
"type": "DAY" # 分區類型,DAY 表示按天分區
},
)
- BQ_FORMAT_EXAMPLE
BQ_FORMAT = [
{
"name": "id",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "partts",
"type": "INTEGER",
"mode": "NULLABLE"
}
....
]
單一資料夾 多檔案
load_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bq",
gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
bucket="{{YOUR_GCS_BUCKET_NAME}}", # 替換為你的 GCS bucket 名稱
# source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}/*.gz"], # 替換為你的文件路徑
destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}", # BigQuery 表的完整名稱
schema_fields={{YOUR_BQ_TABLE_FORMAT}},
source_format="NEWLINE_DELIMITED_JSON", # 支援的格式包括 CSV、JSON、AVRO 等
write_disposition="WRITE_APPEND", # 覆蓋表內容
max_bad_records=999,
time_partitioning={
"type": "DAY" # 分區類型,DAY 表示按天分區
},
)
多資料夾多檔案?
load_gcs_to_bigquery = GCSToBigQueryOperator(
task_id="load_gcs_to_bq",
gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
bucket="{{YOUR_GCS_BUCKET_NAME}}", # 替換為你的 GCS bucket 名稱
# source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}/**/*.gz"], # 替換為你的文件路徑
destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}", # BigQuery 表的完整名稱
schema_fields={{YOUR_BQ_TABLE_FORMAT}},
source_format="NEWLINE_DELIMITED_JSON", # 支援的格式包括 CSV、JSON、AVRO 等
write_disposition="WRITE_APPEND", # 覆蓋表內容
max_bad_records=999,
time_partitioning={
"type": "DAY" # 分區類型,DAY 表示按天分區
},
)
以為這樣可以嗎?
不行
會出現
File "/home/airflow/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 261, in result
raise self._exception
google.api_core.exceptions.NotFound: 404 Not found: Uris
那如果寫gcs list 然後把檔案名稱弄出來 再傳遞給 gcs_to_bq呢
# 列出所有符合條件的 GCS 檔案
list_gcs_files = GCSListObjectsOperator(
task_id="list_gcs_files",
bucket=BUCKET_NAME,
prefix=PREFIX,
gcp_conn_id="self_gcs",
)
# 將檔案加載到 BigQuery
load_gcs_to_bq = GCSToBigQueryOperator(
task_id="load_gcs_to_bq",
bucket=BUCKET_NAME,
source_objects="{{ task_instance.xcom_pull(task_ids='list_gcs_files') }}",
destination_project_dataset_table=BQ_TABLE,
schema_fields=BQ_FORMAT,
source_format="NEWLINE_DELIMITED_JSON",
write_disposition="WRITE_APPEND",
gcp_conn_id="self_gcs",
time_partitioning={
"type": "DAY" # 分區類型,DAY 表示按天分區
},
)
第一次出現 字不能太長 限制 5000 字元
File "/home/airflow/.local/lib/python3.12/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/*********/jobs?prettyPrint=false: Source URI is too long: ******, 'test/raw..., must be less than 5000 characters
試試看先不要抓這麼多檔案 結果出現有 ’ , ‘
File "/home/airflow/.local/lib/python3.12/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/*********/jobs?prettyPrint=false: Source URI must not contain the ',' character: ********
謝謝~