| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- """
- 预算模块测试脚本 — 仅测试计算逻辑,不调用腾讯广告 API
- 测试内容:
- 1. 效率数据查询(ODPS)+ 广告状态查询(ODPS)
- 2. 分位数阈值计算(ROI P70/P30 + 消耗 P50)
- 3. ROI × 跑量 二维分类 + 5 种动作决策
- 4. 缩量/扩量/持平三种场景
- 运行方式:
- cd /path/to/Agent
- python examples/auto_put_ad/test_budget.py --budget 100000 # 缩量
- python examples/auto_put_ad/test_budget.py --budget 250000 # 扩量
- python examples/auto_put_ad/test_budget.py --budget 175000 # 持平
- """
- import argparse
- import sys
- from collections import Counter
- from datetime import datetime, timedelta
- from pathlib import Path
- import pandas as pd
- # 项目根目录加入 sys.path
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from examples.auto_put_ad.odps_module import ODPSClient
- from examples.auto_put_ad.tools.budget_calc import (
- MIN_BID,
- MAX_BID,
- _build_efficiency_sql,
- _classify_ad,
- _compute_thresholds,
- _decide_action,
- _determine_strategy,
- _parse_bizdate,
- )
- def run_budget_test(bizdate: str = "yesterday", total_budget_yuan: float = 100_000):
- print("=" * 70)
- print(" 预算计算测试(ROI × 跑量 二维矩阵,不执行 API)")
- print("=" * 70)
- biz, biz_dash = _parse_bizdate(bizdate)
- print(f"\n数据日期 : {biz}({biz_dash})")
- print(f"今日预算 : {total_budget_yuan:,.0f} 元")
- # ── 1. ODPS 查询 ────────────────────────────────────────
- print("\n[Step 1] 连接 ODPS + 查询效率数据...")
- client = ODPSClient(project="loghubods")
- sql_efficiency = _build_efficiency_sql(biz, biz_dash)
- df_eff = client.execute_sql(sql_efficiency)
- if df_eff.empty:
- print(f"❌ 昨日({biz})效率数据为空"); return
- print(f" → 效率数据: {len(df_eff)} 个广告")
- # ── 2. 广告状态 ─────────────────────────────────────────
- print("[Step 2] 查询广告出价/状态...")
- ad_ids = [int(x) for x in df_eff["ad_id"].dropna().unique() if str(x) != "nan"]
- sql_status = f"""
- SELECT ad_id, ad_name, account_id, bid_amount, day_amount, ad_status, optimization_goal
- FROM loghubods.ad_put_tencent_ad
- WHERE ad_id IN ({",".join(map(str, ad_ids))})
- """
- df_status = client.execute_sql(sql_status)
- print(f" → 广告状态: {len(df_status)} 个")
- # ── 3. 合并 + 效率分 ───────────────────────────────────
- df_eff["ad_id"] = df_eff["ad_id"].astype(float).astype("Int64")
- df_status["ad_id"] = df_status["ad_id"].astype(float).astype("Int64")
- df = pd.merge(df_eff, df_status[["ad_id", "bid_amount", "day_amount", "ad_status"]], on="ad_id", how="left")
- df["efficiency"] = df.apply(
- lambda r: r["fission0_count"] / r["cost"] if r["cost"] and r["cost"] > 0 else None, axis=1,
- )
- df_valid = df[df["open_count"] >= 100].copy().sort_values("efficiency", ascending=False).reset_index(drop=True)
- df_nosample = df[df["open_count"] < 100].copy()
- print(f" → 有效广告: {len(df_valid)},样本不足: {len(df_nosample)}")
- # ── 4. 分位数阈值 ──────────────────────────────────────
- thresholds = _compute_thresholds(df_valid)
- print(f"\n[Step 3] 分位数阈值:")
- print(f" ROI P70 = {thresholds['roi_p70']:.4f}")
- print(f" ROI P30 = {thresholds['roi_p30']:.4f}")
- print(f" 消耗 P50 = {thresholds['cost_p50']:.0f} 元")
- # ── 5. 策略判断 ─────────────────────────────────────────
- yesterday_total = float(df_valid["cost"].sum())
- scale_ratio = total_budget_yuan / yesterday_total if yesterday_total > 0 else 1.0
- strategy = _determine_strategy(scale_ratio)
- direction = "缩量" if scale_ratio < 1 else "扩量" if scale_ratio > 1 else "持平"
- print(f"\n[Step 4] 策略判断:")
- print(f" 昨日消耗 : {yesterday_total:,.0f} 元")
- print(f" scale_ratio : {scale_ratio:.2f}({direction} {abs(1-scale_ratio)*100:.0f}%)")
- print(f" 策略 : {strategy}")
- # ── 6. 二维矩阵决策 ────────────────────────────────────
- print(f"\n[Step 5] 二维矩阵决策...")
- results = []
- for _, row in df_valid.iterrows():
- eff = float(row["efficiency"]) if pd.notna(row["efficiency"]) else 0.0
- cost = float(row["cost"])
- roi_level, volume_level = _classify_ad(eff, cost, thresholds)
- action, adj_ratio = _decide_action(roi_level, volume_level, strategy)
- bid = row["bid_amount"] if pd.notna(row["bid_amount"]) else None
- new_bid = None
- if bid and action in ("increase", "decrease"):
- new_bid = max(MIN_BID, min(MAX_BID, int(float(bid) * (1 + adj_ratio))))
- elif bid:
- new_bid = int(float(bid))
- results.append({
- "ad_id": int(row["ad_id"]),
- "roi_level": roi_level, "volume_level": volume_level,
- "efficiency": round(eff, 4), "cost": round(cost, 2),
- "current_bid": int(float(bid)) if bid else None,
- "new_bid": new_bid,
- "adj_ratio": f"{adj_ratio:+.0%}" if adj_ratio != 0 else "—",
- "action": action,
- "ad_status": str(row["ad_status"]) if pd.notna(row.get("ad_status")) else "",
- })
- # ── 7. 结果展示 ────────────────────────────────────────
- print("\n" + "=" * 70)
- print(f" 出价调整方案({direction} {abs(1-scale_ratio)*100:.0f}%)")
- print(f" 昨日消耗: {yesterday_total:,.0f} 元 → 今日预算: {total_budget_yuan:,.0f} 元")
- print(f" 策略: {strategy}")
- print(f" 阈值: ROI P70={thresholds['roi_p70']:.4f}, P30={thresholds['roi_p30']:.4f}, 消耗 P50={thresholds['cost_p50']:.0f}元")
- print("=" * 70)
- action_labels = [
- ("keep", "保持不动"), ("increase", "提价放量"), ("decrease", "降价控量"),
- ("close", "建议关停"), ("observe", "观察不动"),
- ]
- for act, label in action_labels:
- sub = [r for r in results if r["action"] == act]
- if not sub:
- continue
- print(f"\n【{label}({act})- {len(sub)}个】")
- header = f" {'ad_id':<15} {'ROI':>4} {'量':>4} {'效率分':>8} {'消耗(元)':>10} {'当前出价':>8} {'新出价':>8} {'幅度':>6}"
- print(header)
- print(" " + "-" * (len(header) - 2))
- for item in sub[:8]:
- bid_str = str(item["current_bid"]) if item["current_bid"] else "—"
- new_str = str(item["new_bid"]) if item["new_bid"] else "—"
- print(f" {item['ad_id']:<15} {item['roi_level']:>4} {item['volume_level']:>4} "
- f"{item['efficiency']:>8} {item['cost']:>10,.0f} {bid_str:>8} {new_str:>8} {item['adj_ratio']:>6}")
- if len(sub) > 8:
- print(f" ... 还有 {len(sub)-8} 个")
- if len(df_nosample) > 0:
- print(f"\n【样本不足 - {len(df_nosample)}个,本次不操作】")
- counts = Counter(r["action"] for r in results)
- summary = " / ".join(f"{label}:{counts.get(act, 0)}" for act, label in action_labels)
- print(f"\n合计:{summary} / 样本不足:{len(df_nosample)}")
- print("\n✅ 计算完成(未调用腾讯广告 API)")
- print("=" * 70)
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="预算模块计算测试(二维矩阵)")
- parser.add_argument("--date", default="yesterday", help="数据日期,YYYYMMDD 或 yesterday")
- parser.add_argument("--budget", type=float, default=100_000, help="今日总预算(元)")
- args = parser.parse_args()
- run_budget_test(bizdate=args.date, total_budget_yuan=args.budget)
|