""" 预算模块测试脚本 — 仅测试计算逻辑,不调用腾讯广告 API 测试内容: 1. 效率数据查询(ODPS)+ 广告状态查询(ODPS) 2. 分位数阈值计算(ROI P70/P30 + 消耗 P50) 3. ROI × 跑量 二维分类 + 5 种动作决策 4. 缩量/扩量/持平三种场景 运行方式: cd /path/to/Agent python examples/auto_put_ad/test_budget.py --budget 100000 # 缩量 python examples/auto_put_ad/test_budget.py --budget 250000 # 扩量 python examples/auto_put_ad/test_budget.py --budget 175000 # 持平 """ import argparse import sys from collections import Counter from datetime import datetime, timedelta from pathlib import Path import pandas as pd # 项目根目录加入 sys.path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from examples.auto_put_ad.odps_module import ODPSClient from examples.auto_put_ad.tools.budget_calc import ( MIN_BID, MAX_BID, _build_efficiency_sql, _classify_ad, _compute_thresholds, _decide_action, _determine_strategy, _parse_bizdate, ) def run_budget_test(bizdate: str = "yesterday", total_budget_yuan: float = 100_000): print("=" * 70) print(" 预算计算测试(ROI × 跑量 二维矩阵,不执行 API)") print("=" * 70) biz, biz_dash = _parse_bizdate(bizdate) print(f"\n数据日期 : {biz}({biz_dash})") print(f"今日预算 : {total_budget_yuan:,.0f} 元") # ── 1. ODPS 查询 ──────────────────────────────────────── print("\n[Step 1] 连接 ODPS + 查询效率数据...") client = ODPSClient(project="loghubods") sql_efficiency = _build_efficiency_sql(biz, biz_dash) df_eff = client.execute_sql(sql_efficiency) if df_eff.empty: print(f"❌ 昨日({biz})效率数据为空"); return print(f" → 效率数据: {len(df_eff)} 个广告") # ── 2. 广告状态 ───────────────────────────────────────── print("[Step 2] 查询广告出价/状态...") ad_ids = [int(x) for x in df_eff["ad_id"].dropna().unique() if str(x) != "nan"] sql_status = f""" SELECT ad_id, ad_name, account_id, bid_amount, day_amount, ad_status, optimization_goal FROM loghubods.ad_put_tencent_ad WHERE ad_id IN ({",".join(map(str, ad_ids))}) """ df_status = client.execute_sql(sql_status) print(f" → 广告状态: {len(df_status)} 个") # ── 3. 合并 + 效率分 ─────────────────────────────────── df_eff["ad_id"] = df_eff["ad_id"].astype(float).astype("Int64") df_status["ad_id"] = df_status["ad_id"].astype(float).astype("Int64") df = pd.merge(df_eff, df_status[["ad_id", "bid_amount", "day_amount", "ad_status"]], on="ad_id", how="left") df["efficiency"] = df.apply( lambda r: r["fission0_count"] / r["cost"] if r["cost"] and r["cost"] > 0 else None, axis=1, ) df_valid = df[df["open_count"] >= 100].copy().sort_values("efficiency", ascending=False).reset_index(drop=True) df_nosample = df[df["open_count"] < 100].copy() print(f" → 有效广告: {len(df_valid)},样本不足: {len(df_nosample)}") # ── 4. 分位数阈值 ────────────────────────────────────── thresholds = _compute_thresholds(df_valid) print(f"\n[Step 3] 分位数阈值:") print(f" ROI P70 = {thresholds['roi_p70']:.4f}") print(f" ROI P30 = {thresholds['roi_p30']:.4f}") print(f" 消耗 P50 = {thresholds['cost_p50']:.0f} 元") # ── 5. 策略判断 ───────────────────────────────────────── yesterday_total = float(df_valid["cost"].sum()) scale_ratio = total_budget_yuan / yesterday_total if yesterday_total > 0 else 1.0 strategy = _determine_strategy(scale_ratio) direction = "缩量" if scale_ratio < 1 else "扩量" if scale_ratio > 1 else "持平" print(f"\n[Step 4] 策略判断:") print(f" 昨日消耗 : {yesterday_total:,.0f} 元") print(f" scale_ratio : {scale_ratio:.2f}({direction} {abs(1-scale_ratio)*100:.0f}%)") print(f" 策略 : {strategy}") # ── 6. 二维矩阵决策 ──────────────────────────────────── print(f"\n[Step 5] 二维矩阵决策...") results = [] for _, row in df_valid.iterrows(): eff = float(row["efficiency"]) if pd.notna(row["efficiency"]) else 0.0 cost = float(row["cost"]) roi_level, volume_level = _classify_ad(eff, cost, thresholds) action, adj_ratio = _decide_action(roi_level, volume_level, strategy) bid = row["bid_amount"] if pd.notna(row["bid_amount"]) else None new_bid = None if bid and action in ("increase", "decrease"): new_bid = max(MIN_BID, min(MAX_BID, int(float(bid) * (1 + adj_ratio)))) elif bid: new_bid = int(float(bid)) results.append({ "ad_id": int(row["ad_id"]), "roi_level": roi_level, "volume_level": volume_level, "efficiency": round(eff, 4), "cost": round(cost, 2), "current_bid": int(float(bid)) if bid else None, "new_bid": new_bid, "adj_ratio": f"{adj_ratio:+.0%}" if adj_ratio != 0 else "—", "action": action, "ad_status": str(row["ad_status"]) if pd.notna(row.get("ad_status")) else "", }) # ── 7. 结果展示 ──────────────────────────────────────── print("\n" + "=" * 70) print(f" 出价调整方案({direction} {abs(1-scale_ratio)*100:.0f}%)") print(f" 昨日消耗: {yesterday_total:,.0f} 元 → 今日预算: {total_budget_yuan:,.0f} 元") print(f" 策略: {strategy}") print(f" 阈值: ROI P70={thresholds['roi_p70']:.4f}, P30={thresholds['roi_p30']:.4f}, 消耗 P50={thresholds['cost_p50']:.0f}元") print("=" * 70) action_labels = [ ("keep", "保持不动"), ("increase", "提价放量"), ("decrease", "降价控量"), ("close", "建议关停"), ("observe", "观察不动"), ] for act, label in action_labels: sub = [r for r in results if r["action"] == act] if not sub: continue print(f"\n【{label}({act})- {len(sub)}个】") header = f" {'ad_id':<15} {'ROI':>4} {'量':>4} {'效率分':>8} {'消耗(元)':>10} {'当前出价':>8} {'新出价':>8} {'幅度':>6}" print(header) print(" " + "-" * (len(header) - 2)) for item in sub[:8]: bid_str = str(item["current_bid"]) if item["current_bid"] else "—" new_str = str(item["new_bid"]) if item["new_bid"] else "—" print(f" {item['ad_id']:<15} {item['roi_level']:>4} {item['volume_level']:>4} " f"{item['efficiency']:>8} {item['cost']:>10,.0f} {bid_str:>8} {new_str:>8} {item['adj_ratio']:>6}") if len(sub) > 8: print(f" ... 还有 {len(sub)-8} 个") if len(df_nosample) > 0: print(f"\n【样本不足 - {len(df_nosample)}个,本次不操作】") counts = Counter(r["action"] for r in results) summary = " / ".join(f"{label}:{counts.get(act, 0)}" for act, label in action_labels) print(f"\n合计:{summary} / 样本不足:{len(df_nosample)}") print("\n✅ 计算完成(未调用腾讯广告 API)") print("=" * 70) if __name__ == "__main__": parser = argparse.ArgumentParser(description="预算模块计算测试(二维矩阵)") parser.add_argument("--date", default="yesterday", help="数据日期,YYYYMMDD 或 yesterday") parser.add_argument("--budget", type=float, default=100_000, help="今日总预算(元)") args = parser.parse_args() run_budget_test(bizdate=args.date, total_budget_yuan=args.budget)