Skip to content

airflow 使用紀錄

Published: at 下午03:39

airflow

docker-compose.yml

從這裡拿

改從dockerfile 打包自己稍微客製化的 airflowimage

FROM apache/airflow:2.10.3
ADD requirements.txt .
RUN pip install apache-airflow==${AIRFLOW_VERSION} -r requirements.txt
airflow-docker-file.png

test-connection disabled 的問題

在docker-compose.yml 中 environment 的地方增加

    AIRFLOW__CORE__TEST_CONNECTION: 'Enabled'

如果需要連線 mssql

需要走 dockerfile 自包image 的做法

並且在requirements.txt 中新增

apache-airflow-providers-microsoft-mssql[common.sql]==4.0.0

新版問題

排程寫法

from airflow.models.dag import DAG

from airflow.operators.python import PythonOperator

from airflow.timetables.trigger import CronTriggerTimetable

with DAG(
    "your_dag",
    description="your_dag",
    schedule=CronTriggerTimetable(
        cron="* * * * *",  # UTC 時間晚上 3 點對應 UTC+8 時區晚上 11 點
        timezone="Asia/Taipei",  # 指定時區為台北(UTC+8)
    ),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False, # 重啟時 忽略 重啟時間和停止時間差的作業
    tags=["test"],
) as dag:

GCS TO bq

單一檔案

    load_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bq",
        gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
        bucket="{{YOUR_GCS_BUCKET_NAME}}",  # 替換為你的 GCS bucket 名稱
        # source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}"],  # 替換為你的文件路徑
        destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}",  # BigQuery 表的完整名稱
        schema_fields={{YOUR_BQ_TABLE_FORMAT}},
        source_format="NEWLINE_DELIMITED_JSON",  # 支援的格式包括 CSV、JSON、AVRO 等
        write_disposition="WRITE_APPEND",  # 覆蓋表內容
        max_bad_records=999,
        time_partitioning={
            "type": "DAY"  # 分區類型,DAY 表示按天分區
        },
    )
BQ_FORMAT = [
    {
        "name": "id",
        "type": "STRING",
        "mode": "NULLABLE"
    },
    {
        "name": "partts",
        "type": "INTEGER",
        "mode": "NULLABLE"
    }
    ....
]

單一資料夾 多檔案

    load_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bq",
        gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
        bucket="{{YOUR_GCS_BUCKET_NAME}}",  # 替換為你的 GCS bucket 名稱
        # source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}/*.gz"],  # 替換為你的文件路徑
        destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}",  # BigQuery 表的完整名稱
        schema_fields={{YOUR_BQ_TABLE_FORMAT}},
        source_format="NEWLINE_DELIMITED_JSON",  # 支援的格式包括 CSV、JSON、AVRO 等
        write_disposition="WRITE_APPEND",  # 覆蓋表內容
        max_bad_records=999,
        time_partitioning={
            "type": "DAY"  # 分區類型,DAY 表示按天分區
        },
    )

多資料夾多檔案?

    load_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="load_gcs_to_bq",
        gcp_conn_id='{{YOUR_GCP_CONNECT_ID}}',
        bucket="{{YOUR_GCS_BUCKET_NAME}}",  # 替換為你的 GCS bucket 名稱
        # source_objects=["{{YOUR_GCS_FILE_PATH_AND_FILE_NAME}}/**/*.gz"],  # 替換為你的文件路徑
        destination_project_dataset_table=" {{BQ_PROEJCT_NAME}}.{{BQ_DATA_SET_NAME}}.{{BQ_TABLE_NAME}}",  # BigQuery 表的完整名稱
        schema_fields={{YOUR_BQ_TABLE_FORMAT}},
        source_format="NEWLINE_DELIMITED_JSON",  # 支援的格式包括 CSV、JSON、AVRO 等
        write_disposition="WRITE_APPEND",  # 覆蓋表內容
        max_bad_records=999,
        time_partitioning={
            "type": "DAY"  # 分區類型,DAY 表示按天分區
        },
    )

以為這樣可以嗎?

不行

會出現

  File "/home/airflow/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.NotFound: 404 Not found: Uris

那如果寫gcs list 然後把檔案名稱弄出來 再傳遞給 gcs_to_bq呢

    # 列出所有符合條件的 GCS 檔案
    list_gcs_files = GCSListObjectsOperator(
        task_id="list_gcs_files",
        bucket=BUCKET_NAME,
        prefix=PREFIX,
        gcp_conn_id="self_gcs",
    )

  # 將檔案加載到 BigQuery
    load_gcs_to_bq = GCSToBigQueryOperator(
        task_id="load_gcs_to_bq",
        bucket=BUCKET_NAME,
        source_objects="{{ task_instance.xcom_pull(task_ids='list_gcs_files') }}",
        destination_project_dataset_table=BQ_TABLE,
        schema_fields=BQ_FORMAT,
        source_format="NEWLINE_DELIMITED_JSON",
        write_disposition="WRITE_APPEND",
        gcp_conn_id="self_gcs",
        time_partitioning={
            "type": "DAY"  # 分區類型,DAY 表示按天分區
        },
    )

第一次出現 字不能太長 限制 5000 字元

  File "/home/airflow/.local/lib/python3.12/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/*********/jobs?prettyPrint=false: Source URI is too long: ******, 'test/raw..., must be less than 5000 characters

試試看先不要抓這麼多檔案 結果出現有 ’ , ‘

  File "/home/airflow/.local/lib/python3.12/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/*********/jobs?prettyPrint=false: Source URI must not contain the ',' character: ********

謝謝~