工作上一直都會使用到 S3 各式各樣的刪除等等有的沒有的 所以想說開一篇來記錄
cli
- 根據同一模糊比對之檔名進行下載 ( 我要抓同一路徑下 檔案名稱包含於 myrport_202506得資料 )
aws s3 sync s3://my-aws-s3/data/ ./ --exclude "*" --include "myreport_202506*.csv"
python
- 動態的刪除某些資料夾的檔案
start_time = datetime(2025,5,31,2,0)
end_time = datetime(2025,6,9,23,0)
current_time = start_time
while current_time <= end_time:
s3_uri = f"s3://myfold/month={current_time.strftime('%m')}/dt={current_time.strftime(%y-%m-%d')}/hr={current_time.strftime('%y-%m-%d-$H')}/*"
try:
bucket.Object(s3_uri).delete()
- 動態的從 A bucket 複製 資料 到 B bucket 路經 大致相同的情況
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
import os
import sys
import boto3
# 檢查命令列參數
if len(sys.argv) != 2:
print("用法: python main.py YYYY-MM-DD")
sys.exit(1)
input_date_str = sys.argv[1]
try:
input_date = datetime.datetime.strptime(input_date_str, "%Y-%m-%d")
except ValueError:
print("日期格式錯誤,請使用 YYYY-MM-DD")
sys.exit(1)
# 提取年月日
year = input_date.strftime("%Y")
month = input_date.strftime("%m")
dt = input_date.strftime("%Y-%m-%d")
# AWS S3 設定
source_bucket = 'source_bucket'
target_bucket = 'target_bucket'
source_s3 = boto3.client(
's3',
aws_access_key_id='xxx',
aws_secret_access_key='xxxx',
region_name='ap-northeast-1'
)
target_s3 = boto3.client(
's3',
aws_access_key_id='xxxxx',
aws_secret_access_key='xxxxx',
region_name='ap-northeast-1'
)
# 日誌檔案(自動加上日期)
log_file = f'transfer_log_{dt}.txt'
def log(msg):
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = f'[{timestamp}] {msg}'
print(line)
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line + '\n')
def list_all_objects(bucket, prefix, s3_client):
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get('Contents', []):
yield obj['Key']
def object_exists(bucket, key, s3_client):
try:
s3_client.head_object(Bucket=bucket, Key=key)
return True
except s3_client.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
raise
def copy_object_between_accounts(source_bucket, source_key, target_bucket, target_key):
response = source_s3.get_object(Bucket=source_bucket, Key=source_key)
body = response['Body'].read()
target_s3.put_object(Bucket=target_bucket, Key=target_key, Body=body)
def get_target_key(source_key, source_prefix, target_prefix):
relative_path = source_key[len(source_prefix):]
base, ext = os.path.splitext(relative_path)
new_filename = f"{base}_copy_{ext}"
# new_filename = f"{base}{ext}"
return target_prefix + new_filename
def copy_single_file(source_key, source_prefix, target_prefix):
target_key = get_target_key(source_key, source_prefix, target_prefix)
try:
target_s3.put_object(
Bucket=target_bucket,
Key=target_key,
Body=source_s3.get_object(Bucket=source_bucket, Key=source_key)['Body'].read()
)
log(f'✅ 搬移成功:{source_key} ➡️ {target_key}')
except Exception as e:
log(f'❌ 搬移失敗:{source_key} ➡️ {target_key}|錯誤:{e}')
def copy_hourly_objects(hour):
hr_str = f"{dt}-{hour:02d}"
source_prefix = f"myfold/year={year}/month={month}/dt={dt}/hr={hr_str}/type=success/"
target_prefix = source_prefix
keys = list(list_all_objects(source_bucket, source_prefix, source_s3))
if not keys:
log(f'⚠️ 無資料於 {source_prefix}')
return
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(copy_single_file, key, source_prefix, target_prefix) for key in keys]
for future in as_completed(futures):
future.result() # 觸發錯誤拋出
if __name__ == '__main__':
for hour in range(24):
log(f'==== 處理小時 {hour:02d} ====')
copy_hourly_objects(hour)
duckdb
關於 如果資料被封存了 (Glacier) 該如何處理
- 搬出來新的路徑 並且設定新的到期時間 趕快再複製一份出來