Skip to content

利用淘寶公開數據練習開發推薦模型使用fastapi,lightgbm,duckdb實作推薦api

Published: at 下午06:16

利用淘寶公開數據練習開發推薦模型使用fastapi,lightgbm,duckdb實作推薦api

最近看到 有淘寶(taobao) 有公開數據一億筆資料

想做練習看看 如何做到推薦模型

git repo

資料結構觀察

筆數 100,150,808

1,2268318,2520377,pv,1511544070
1,2333346,2520771,pv,1511561733
1,2576651,149192,pv,1511572885
1,3830808,4181361,pv,1511593493
1,4365585,2520377,pv,1511596146
1,4606018,2735466,pv,1511616481
1,230380,411153,pv,1511644942
1,3827899,2920476,pv,1511713473
1,3745169,2891509,pv,1511725471
1,1531036,2920476,pv,1511733732
1,2266567,4145813,pv,1511741471
1,2951368,1080785,pv,1511750828
列名称	说明
用户ID	整数类型,序列化后的用户ID
商品ID	整数类型,序列化后的商品ID
商品类目ID	整数类型,序列化后的商品所属类目ID
行为类型	字符串,枚举类型,包括('pv', 'buy', 'cart', 'fav')
时间戳	行为发生的时间戳

利用 duckdb 分析 產出 訓練資料(做特徵)

  1. 預設已經會使用dudkdb,使用範例

  2. 加入csv title

大檔案 3GB多 於是使用 vi 將 csv header 加上 (雖然還是跑得很慢

user_id,product_id,catalog_id,action,ts
  1. 載入資料到duckdb
CREATE TABLE logs AS
SELECT *
FROM read_csv_auto('UserBehavior.csv.zip');
  1. 建立 Label(Ranking 的答案) 買過些甚麼東西

買 = 1 其他 = 0

CREATE TABLE label_table AS
SELECT
    user_id,
    product_id,
    MAX(CASE WHEN action='buy' THEN 1 ELSE 0 END) AS label
FROM logs_clean
GROUP BY user_id, product_id;
  1. User Feature 使用者購買率
CREATE TABLE user_features AS
SELECT
    user_id,
    COUNT(*) AS user_actions,
    SUM(action='buy') AS user_buy_cnt,
    SUM(action='buy') * 1.0 / COUNT(*) AS user_buy_rate
FROM logs_clean
GROUP BY user_id;
  1. Product Feature 商品熱門度
CREATE TABLE product_features AS
SELECT
    product_id,
    COUNT(*) AS product_popularity,
    SUM(action='buy') * 1.0 / COUNT(*) AS product_buy_rate
FROM logs_clean
GROUP BY product_id;
  1. User × Product Feature 使用者看過幾次商品
CREATE TABLE interaction_features AS
SELECT
    user_id,
    product_id,
    COUNT(*) AS view_cnt,
    MAX(ts) AS last_view_ts
FROM logs_clean
GROUP BY user_id, product_id;
  1. 合併
CREATE TABLE training_table AS
SELECT
    l.user_id,
    l.product_id,
    u.user_buy_rate,
    p.product_popularity,
    i.view_cnt,
    l.label
FROM label_table l
JOIN user_features u USING(user_id)
JOIN product_features p USING(product_id)
JOIN interaction_features i USING(user_id, product_id);
  1. 看一下
select * from training_table limit 40;

使用者id , 商品 id , 使用者購買率,商品受歡迎程度,看過幾次 ,有沒有買過

user_idproduct_iduser_buy_rateproduct_popularityview_cntlabel
75205533619950.0068027210884353742210
75206035398130.0510
75208039751410.00843881856540084330610
75208020806900.008438818565400843133310
75210243424670.016393442622950824110
75210333192440.01515151515151515233420
75215030346960.012658227848101266453810
75215129989470.0330
75221728779020.02110
75223148536260.0108108108108108118510
75226940244090.0403410
75230723660140.03076923076923077205710
75233912695790.023809523809523808336210
75235919569050.0188679245283018863110
75236434639790.026410
75236826758590.067120
7523719537700.03846153846153846478110
7523733484460.03846153846153846413710
  1. 匯出
COPY (select * from training_table)
TO 'training.parquet'
(FORMAT PARQUET);

訓練模型

main.py

import duckdb
import pandas as pd
from lightgbm import LGBMClassifier
import joblib

print("正在從 Parquet 讀取資料...")
df = duckdb.read_parquet("training.parquet").df()

# ===============================
# Feature / Label
# ===============================
X = df.drop(columns=["label"])
y = df["label"]

print(f"開始訓練模型... 總資料量: {len(df)} 筆")

model = LGBMClassifier(n_estimators=100)
model.fit(X, y)

# ===============================
# ✅ 儲存模型
# ===============================
joblib.dump(model, "lgbm_model.pkl")

# 產出需要的欄位順序
joblib.dump(X.columns.tolist(), "features.pkl")

print("✅ model.pkl + features.pkl 已儲存")
duckdb==1.4.4
joblib==1.5.3
lightgbm==4.6.0
numpy==2.4.2
pandas==3.0.1
python-dateutil==2.9.0.post0
scikit-learn==1.8.0
scipy==1.17.1
six==1.17.0
threadpoolctl==3.6.0
tzdata==2025.3
uv pip install -r requirements.txt

如何測試模型

test.py

import joblib
import pandas as pd
import duckdb

# 1. 載入練好的大腦 (模型)
model_path = "./lgbm_model.pkl"
model = joblib.load(model_path)
print(f"✅ 已載入模型: {model_path}")

# 2. 讀取需要預測的新資料 (這裡假設是 new_data.parquet)
# 注意:新資料的欄位名稱和順序,必須跟訓練時的 X 一模一樣
print("🚀 讀取待預測資料...")
new_df = duckdb.read_parquet("./training.parquet").df()

# 如果有 label 欄位要先去掉,只留特徵
X_new = new_df.drop(columns=["label"]) if "label" in new_df.columns else new_df

# 3. 執行預測
# predict() 會直接給 0 或 1
predictions = model.predict(X_new)

# predict_proba() 會給機率 (例如:0.98 表示非常有可能是 1)
probabilities = model.predict_proba(X_new)[:, 1]

# 4. 將結果合併回原始資料並儲存
new_df['prediction'] = predictions
new_df['score'] = probabilities

print("📊 預測完成!前 5 筆結果:")
print(new_df[['prediction', 'score']].head())

# 5. 匯出結果
new_df.to_csv("predictions_result.csv", index=False)


print("💾 預測結果已存至 predictions_result.csv")

threshold = 0.01  # 只要機率大於 1%,我們就視為潛在客戶 就可以給那個人 看這個品
new_df['potential_buyer'] = (new_df['score'] > threshold).astype(int)

top_potential = new_df.sort_values('score', ascending=False).head(100)

new_df.to_csv("top_potential.csv", index=False)


print("💾 預測結果已存至 top_potential.csv")
uv run test.py

看資料

head -n 10 top_potential.csv 
user_id,product_id,user_buy_rate,product_popularity,view_cnt,label,prediction,score,potential_buyer
752055,3361995,0.006802721088435374,22,1,0,0,0.001953612870347798,0
752060,3539813,0.0,5,1,0,0,1.183486612355564e-06,0
752080,3975141,0.008438818565400843,306,1,0,0,0.0023560931749064197,0
752080,2080690,0.008438818565400843,1333,1,0,0,0.0023220314090510253,0
752102,4342467,0.01639344262295082,41,1,0,0,0.005052367426943275,0
752103,3319244,0.015151515151515152,334,2,0,0,0.03660798523751377,1
752150,3034696,0.012658227848101266,4538,1,0,0,0.0018015359387412201,0
752151,2998947,0.0,3,3,0,0,1.160701650510975e-06,0
752217,2877902,0.0,21,1,0,0,1.183486612355564e-06,0

使用faspi 掛載起模型 執行推薦api

 uv pip install "fastapi[standard]"

檔案名稱 myapp.py 不要叫 fastapi.py 會壞掉喔

# import joblib

# model = joblib.load("./lgbm_model.pkl")

# print("模型已成功載入!")
# print(type(model))

from fastapi import FastAPI, HTTPException
import joblib
import duckdb
import pandas as pd
from lightgbm import LGBMClassifier

# ===============================
# 1️⃣ 啟動時載入模型(只載一次)
# ===============================
print("🚀 Loading model...")

MODEL_PATH = "./lgbm_model.pkl"
FEATURE_PATH = "./features.pkl"
DATA_PATH = "./training.parquet"

model = joblib.load(MODEL_PATH)
feature_columns = joblib.load(FEATURE_PATH)
df = duckdb.read_parquet(DATA_PATH).df()

print("✅ Model Ready")

# ===============================
# 2️⃣ 建立 FastAPI
# ===============================
app = FastAPI(
    title="LightGBM Recommendation API",
    version="1.0"
)

# ===============================
# 3️⃣ Health Check
# ===============================
@app.get("/health")
def health():
    return {"status": "ok"}

# ===============================
# 4️⃣ 取得使用者特徵
# ===============================
def load_feature_by_vid(vid: str):
    user_df = df[df["user_id"] == int(vid)].copy()
    return user_df


# ===============================
# 5️⃣ 排序推薦
# ===============================
def rank_items(feature_df):

    if feature_df.empty:
        return feature_df

    # 只拿模型需要的欄位
    X = feature_df[feature_columns]

    # LightGBM 預測機率
    feature_df["score"] = model.predict_proba(X)[:, 1]

    ranked = feature_df.sort_values(
        "score",
        ascending=False
    )

    return ranked


# ===============================
# 6️⃣ 推薦 API
# ===============================
@app.get("/recommend/{vid}")
def recommend(vid: str, top_k: int = 20):

    feature_df = load_feature_by_vid(vid)

    if feature_df.empty:
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )

    ranked = rank_items(feature_df)

    result = ranked.head(top_k)[
        ["user_id", "product_id", "score"]
    ].to_dict("records")

    return {
        "vid": vid,
        "top_k": top_k,
        "recommendations": result
    }
uvicorn myapp:app  --reload  --host 0.0.0.0 --port 9999
curl -X 'GET''http://localhost:9999/recommend/752055?top_k=2' -H 'accept: application/json'

結果

{
  "vid": "752055",
  "top_k": 2,
  "recommendations": [
    {
      "user_id": 752055,
      "product_id": 4422413,
      "score": 0.19915600225401725
    },
    {
      "user_id": 752055,
      "product_id": 4158852,
      "score": 0.0972484069020572
    }
  ]

但請求多次 都還是會是一樣的結果

在真正的 推薦系統 如果這樣 讓客戶一直看到 一樣的東西 應該會被哭哭

所以大概就是 多取個幾個正相關的幾品 然後 打亂重排 再給客戶看吧

這樣在local的請求耗時 大概六十毫秒 效率該是不錯的

但是 是不是真的的有效 可能要再驗證

ref