Przeglądaj źródła

refactor(auto_put_ad_mini): 决策引擎 context 瘦身 + 稳定性加固

核心改动:
- zero_spend_ads 传 LLM 从全量列表(1206 条×110B ≈ 106KB)改为 count + 10 条样本(~1.3KB),每次 LLM 调用节省 ~50k tokens
- ROI 基准从均值改为中位数(避免少数高 ROI 广告拉高基线)
- need_review_ads 按 audience_tier 分组输出 review_by_tier / tier_batches,便于后续分批评估
- apply_decisions 新增 ad_id 类型统一(int64)+ source 优先级去重,避免 LLM 决策与规则默认值重复

参数重命名:min_spend_for_class_a → min_spend_for_zero_spend(语义对齐新的类别命名)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
刘立冬 3 tygodni temu
rodzic
commit
2f28241ffa
1 zmienionych plików z 71 dodań i 19 usunięć
  1. 71 19
      examples/auto_put_ad_mini/tools/ad_decision.py

+ 71 - 19
examples/auto_put_ad_mini/tools/ad_decision.py

@@ -278,20 +278,20 @@ async def get_ads_for_review(
     metrics_csv: str = "",
     end_date: str = "yesterday",
     roi_review_factor: float = 0.8,
-    min_spend_for_class_a: float = 10.0,
+    min_spend_for_zero_spend: float = 10.0,
 ) -> ToolResult:
     """
     不做决策,将广告分为三类,返回结构化摘要供 LLM 推理。
 
-    类别 A【已确认异常,建议直接关停】:7日均消耗 < 10元(几乎零活动)
-    类别 B【待LLM评估】:消耗有意义但指标异常(ROI偏低或衰退信号)
-    类别 C【正常运行】:仅返回摘要统计
+    【零消耗待关停】:7日均消耗 < 10元(几乎零活动),规则直接关停
+    【待评估(候选)】:消耗有意义但指标异常(ROI偏低或衰退信号),需LLM评估
+    【正常运行】:无异常信号,仅返回摘要统计
 
     Args:
         metrics_csv: ROI 指标 CSV 路径(calculate_roi_metrics 输出)
         end_date: 结束日期
-        roi_review_factor: 动态ROI < 全体均值 × 此值 → 进入 B 类(默认 0.8)
-        min_spend_for_class_a: 7日均消耗低于此值(元)→ A 类(默认 10.0)
+        roi_review_factor: 动态ROI < 全体均值 × 此值 → 进入 待评估(候选)(默认 0.8)
+        min_spend_for_zero_spend: 7日均消耗低于此值(元)→ 零消耗待关停(默认 10.0)
     """
     try:
         # 加载策略参数(动态阈值,不写死在代码中)
@@ -324,7 +324,7 @@ async def get_ads_for_review(
                 by_tier_goal = portfolio_data.get("by_tier_goal", {})
                 logger.info(f"✅ 从 {portfolio_file.name} 加载了 {len(by_tier_stats)} 个人群包 + {len(by_tier_goal)} 个tier+goal组的统计数据")
             else:
-                logger.warning(f"未找到 portfolio_summary 文件: {portfolio_file},将使用全局均值兜底")
+                logger.warning(f"未找到 portfolio_summary 文件: {portfolio_file}(请确认 calculate_roi_metrics 已正常运行)")
                 by_tier_goal = {}
         except Exception as e:
             logger.warning(f"读取人群包统计数据失败,使用空字典兜底: {e}")
@@ -350,7 +350,7 @@ async def get_ads_for_review(
 
         # 全体 ROI 分布
         roi_series = df["动态ROI_7日均值"].dropna()
-        roi_mean = float(roi_series.mean()) if len(roi_series) > 0 else 0.0
+        roi_mean = float(roi_series.median()) if len(roi_series) > 0 else 0.0
         roi_p25 = float(roi_series.quantile(0.25)) if len(roi_series) > 0 else 0.0
         roi_p50 = float(roi_series.quantile(0.50)) if len(roi_series) > 0 else 0.0
         roi_p75 = float(roi_series.quantile(0.75)) if len(roi_series) > 0 else 0.0
@@ -378,7 +378,7 @@ async def get_ads_for_review(
 
             # 零消耗待关停:7日均消耗 < 10元,几乎无活动(强规则,仍保留)
             # ⚠️ 但需要年龄保护:≤7天的广告不适用零消耗规则
-            if cost_7d_avg < min_spend_for_class_a:
+            if cost_7d_avg < min_spend_for_zero_spend:
                 # 检查广告年龄
                 if ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
                     # 4-7天(早期成长期)或≤3天(冷启动期):保护,不关停
@@ -421,7 +421,8 @@ async def get_ads_for_review(
             else:
                 ad_fission = float(ad_fission)
 
-            tier = _extract_audience_tier(str(row.get("ad_name", "")))
+            # 人群包名称(优先用 audience_tier=package_name,兜底用 ad_name 提取 R 层级)
+            tier = str(row.get("audience_tier", "")) or _extract_audience_tier(str(row.get("ad_name", "")))
             tier_stats = by_tier_stats.get(tier, {})
             tier_fission_mean = tier_stats.get("fission_mean")
 
@@ -668,12 +669,36 @@ async def get_ads_for_review(
             normal_ads_count += 1
 
         import json
+
+        # ═══════════════════════════════════════════
+        # 按 audience_tier 分组 need_review_ads(用于子 Agent 并行评估)
+        # ═══════════════════════════════════════════
+        review_by_tier: Dict[str, List[Dict]] = {}
+        for ad in need_review_ads:
+            tier = str(ad.get("audience_tier", "default") or "default")
+            review_by_tier.setdefault(tier, []).append(ad)
+
+        # tier 分组摘要(便于 LLM 快速判断是否需要分发子 Agent)
+        tier_batches = sorted(
+            [
+                {
+                    "audience_tier": t,
+                    "count": len(ads),
+                    "ad_ids": [a.get("ad_id") for a in ads],
+                }
+                for t, ads in review_by_tier.items()
+            ],
+            key=lambda x: -x["count"],
+        )
+
         result = {
             "summary": {
                 "total": len(df),
                 "zero_spend_ads": len(zero_spend_ads),
                 "need_review_ads": len(need_review_ads),
                 "normal_ads": normal_ads_count,
+                "tier_groups": len(review_by_tier),  # 并发批次数
+                "max_batch_size": max((b["count"] for b in tier_batches), default=0),
             },
             "distribution": {
                 "roi_mean": round(roi_mean, 4),
@@ -700,20 +725,30 @@ async def get_ads_for_review(
                 "bid_down_line": round(roi_mean * params["BID_DOWN_ROI_FACTOR"], 4),
                 "bid_up_line": round(roi_mean * params["BID_UP_ROI_FACTOR"], 4),
             },
-            "zero_spend_ads": zero_spend_ads,
+            # 零消耗广告由规则全自动处理,LLM 无需逐条决策
+            # 仅传入规模 + 10 条样本(供 LLM 追溯形态,避免 1000+ 条名单挤占 context)
+            "zero_spend_ads_count": len(zero_spend_ads),
+            "zero_spend_ads_samples": zero_spend_ads[:10],
             "need_review_ads": need_review_ads,
+            # ★ 新增:按 tier 分组(用于 agent(task=[...]) 并发评估)
+            "review_by_tier": review_by_tier,
+            "tier_batches": tier_batches,
         }
 
         output_json = json.dumps(result, ensure_ascii=False, indent=2)
 
         return ToolResult(
-            title=f"广告分类(零消耗:{len(zero_spend_ads)} 待评估:{len(need_review_ads)} 正常:{normal_ads_count})",
+            title=(
+                f"广告分类(零消耗:{len(zero_spend_ads)} 待评估:{len(need_review_ads)} "
+                f"分 {len(review_by_tier)} 个 tier 批次 正常:{normal_ads_count})"
+            ),
             output=output_json,
             metadata={
                 "total": len(df),
                 "zero_spend_ads": len(zero_spend_ads),
                 "need_review_ads": len(need_review_ads),
                 "normal_ads": normal_ads_count,
+                "tier_groups": len(review_by_tier),
                 "roi_mean": roi_mean,
                 "end_date": end_date,
             },
@@ -737,19 +772,19 @@ async def apply_decisions(
     metrics_csv: str = "",
 ) -> ToolResult:
     """
-    接收 LLM 的决策,合并 A 类广告(自动关停)和 C 类广告(自动保持),保存到 llm_decisions_{date}.csv。
+    接收 LLM 的决策,合并【零消耗待关停】(自动关停)和【正常运行】(自动保持)广告,保存到 llm_decisions_{date}.csv。
 
     决策分类:
       - 零消耗待关停:7日均消耗 < 10元,几乎无活动 → 规则判断自动关停
-      - 待优化评估:ROI 偏低、衰退信号、出价调整候选 → 智能判断
+      - 待评估(候选):ROI 偏低、衰退信号、出价调整候选 → 智能判断
       - 正常运行:ROI 正常且无异常信号 → 规则判断自动保持
 
     Args:
-        decisions: JSON 字符串,LLM 输出的"待优化评估"类广告决策列表
+        decisions: JSON 字符串,LLM 输出的【待评估(候选)】广告决策列表
                    格式:[{"ad_id": 123, "action": "pause"/"hold"/"bid_up"/"bid_down",
                            "dimension": "...", "reason": "...", "confidence": "high"/"medium"/"low"}]
         end_date: 结束日期
-        metrics_csv: ROI 指标 CSV 路径(用于获取 A/C 类广告)
+        metrics_csv: ROI 指标 CSV 路径(用于获取【零消耗待关停】和【正常运行】广告)
     """
     import json
 
@@ -808,6 +843,11 @@ async def apply_decisions(
         # 合并 LLM 决策(标注来源 + 添加cost_7d_avg用于排序 + 冷启动期决策过滤)
         for item in llm_list:
             item["source"] = "智能判断"
+            # ★ 关键修复:统一 ad_id 为 int,避免 int vs string 导致去重失败
+            try:
+                item["ad_id"] = int(item["ad_id"])
+            except (ValueError, TypeError):
+                pass
             ad_id = item.get("ad_id")
             action = item.get("action", "hold")
 
@@ -905,10 +945,22 @@ async def apply_decisions(
 
         df_out = pd.DataFrame(all_decisions)
 
+        # ★ 统一 ad_id 为 int64,确保后续 merge/去重不因类型不匹配而失败
+        df_out["ad_id"] = pd.to_numeric(df_out["ad_id"], errors="coerce").astype("Int64")
+
+        # ===== 去重:同一 ad_id 只保留优先级最高的决策 =====
+        # 优先级:智能判断 > 规则判断(LLM 的判断优先于规则默认值)
+        source_priority = {"智能判断": 0, "llm_modified": 1, "规则判断": 2}
+        df_out["_source_rank"] = df_out["source"].map(source_priority).fillna(9)
+        df_out = df_out.sort_values("_source_rank").drop_duplicates(subset=["ad_id"], keep="first")
+        df_out = df_out.drop(columns=["_source_rank"])
+        logger.info(f"去重后决策数: {len(df_out)}(智能判断优先)")
+
         # ===== 关键修复:合并 metrics CSV 中的字段 =====
         # 从 metrics CSV 补充 ad_name, ad_age_days, cost_7d_avg, 动态ROI 等字段
         try:
             df_metrics_full = pd.read_csv(metrics_csv)
+            df_metrics_full["ad_id"] = pd.to_numeric(df_metrics_full["ad_id"], errors="coerce").astype("Int64")
             # 选择需要合并的列(OUTPUT_COLUMNS中定义的所有列)
             merge_cols = [
                 "ad_id", "account_id", "ad_name", "audience_tier", "create_time", "ad_age_days",
@@ -1063,9 +1115,9 @@ async def query_ad_detail(
         # 计算广告年龄
         ad_age_days = _calculate_ad_age_days(row.get("create_time"))
 
-        # 全局 ROI 分布
+        # 全局 ROI 分布(使用中位数作为基准,避免被少数高ROI广告拉高)
         roi_series = df["动态ROI_7日均值"].dropna()
-        roi_mean = float(roi_series.mean()) if len(roi_series) > 0 else 0.0
+        roi_mean = float(roi_series.median()) if len(roi_series) > 0 else 0.0
 
         roi_low_line = roi_mean * ROI_LOW_FACTOR
         bid_down_line = roi_mean * BID_DOWN_ROI_FACTOR
@@ -1104,7 +1156,7 @@ async def query_ad_detail(
 
         # 全局上下文
         global_context = {
-            "全体动态ROI均值": round(roi_mean, 4),
+            "全体动态ROI基准(中位数)": round(roi_mean, 4),
             "ROI关停线": round(roi_low_line, 4),
             "ROI降价线": round(bid_down_line, 4),
             "ROI提价线": round(bid_up_line, 4),