Skip to content

s3使用經驗紀錄

Published: at 下午06:06

工作上一直都會使用到 S3 各式各樣的刪除等等有的沒有的 所以想說開一篇來記錄

cli

aws s3 sync s3://my-aws-s3/data/ ./ --exclude "*" --include "myreport_202506*.csv"

python

start_time = datetime(2025,5,31,2,0)
end_time = datetime(2025,6,9,23,0)
current_time = start_time

while current_time <= end_time:
    s3_uri = f"s3://myfold/month={current_time.strftime('%m')}/dt={current_time.strftime(%y-%m-%d')}/hr={current_time.strftime('%y-%m-%d-$H')}/*"
    try:
        bucket.Object(s3_uri).delete()
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
import os
import sys
import boto3

# 檢查命令列參數
if len(sys.argv) != 2:
    print("用法: python main.py YYYY-MM-DD")
    sys.exit(1)

input_date_str = sys.argv[1]

try:
    input_date = datetime.datetime.strptime(input_date_str, "%Y-%m-%d")
except ValueError:
    print("日期格式錯誤,請使用 YYYY-MM-DD")
    sys.exit(1)

# 提取年月日
year = input_date.strftime("%Y")
month = input_date.strftime("%m")
dt = input_date.strftime("%Y-%m-%d")

# AWS S3 設定
source_bucket = 'source_bucket'
target_bucket = 'target_bucket'

source_s3 = boto3.client(
    's3',
    aws_access_key_id='xxx',
    aws_secret_access_key='xxxx',
    region_name='ap-northeast-1'
)

target_s3 = boto3.client(
    's3',
    aws_access_key_id='xxxxx',
    aws_secret_access_key='xxxxx',
    region_name='ap-northeast-1'
)

# 日誌檔案(自動加上日期)
log_file = f'transfer_log_{dt}.txt'

def log(msg):
    timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    line = f'[{timestamp}] {msg}'
    print(line)
    with open(log_file, 'a', encoding='utf-8') as f:
        f.write(line + '\n')

def list_all_objects(bucket, prefix, s3_client):
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            yield obj['Key']

def object_exists(bucket, key, s3_client):
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
        return True
    except s3_client.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return False
        else:
            raise

def copy_object_between_accounts(source_bucket, source_key, target_bucket, target_key):
    response = source_s3.get_object(Bucket=source_bucket, Key=source_key)
    body = response['Body'].read()
    target_s3.put_object(Bucket=target_bucket, Key=target_key, Body=body)

def get_target_key(source_key, source_prefix, target_prefix):
    relative_path = source_key[len(source_prefix):]
    base, ext = os.path.splitext(relative_path)
    new_filename = f"{base}_copy_{ext}"
    # new_filename = f"{base}{ext}"
    return target_prefix + new_filename
def copy_single_file(source_key, source_prefix, target_prefix):
    target_key = get_target_key(source_key, source_prefix, target_prefix)
    try:
        target_s3.put_object(
            Bucket=target_bucket,
            Key=target_key,
            Body=source_s3.get_object(Bucket=source_bucket, Key=source_key)['Body'].read()
        )
        log(f'✅ 搬移成功:{source_key} ➡️ {target_key}')
    except Exception as e:
        log(f'❌ 搬移失敗:{source_key} ➡️ {target_key}|錯誤:{e}')
        
def copy_hourly_objects(hour):
    hr_str = f"{dt}-{hour:02d}"
    source_prefix = f"myfold/year={year}/month={month}/dt={dt}/hr={hr_str}/type=success/"
    target_prefix = source_prefix
    keys = list(list_all_objects(source_bucket, source_prefix, source_s3))
    if not keys:
        log(f'⚠️ 無資料於 {source_prefix}')
        return

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(copy_single_file, key, source_prefix, target_prefix) for key in keys]
        for future in as_completed(futures):
            future.result()  # 觸發錯誤拋出


if __name__ == '__main__':
    for hour in range(24):
        log(f'==== 處理小時 {hour:02d} ====')
        copy_hourly_objects(hour)

duckdb

關於 如果資料被封存了 (Glacier) 該如何處理

  1. 搬出來新的路徑 並且設定新的到期時間 趕快再複製一份出來