""" 创意级指标工具 — auto_put_ad_mini (Step 2) 职责: - 按 creative_id 聚合最近 N 天 ROI(简单 ROI,非 动态 ROI (7日均值)) - 为"负向决策前做创意归因检查"提供数据支撑 使用定位(重要): - 本阶段 Agent 决策范围是【广告级】:调出价 / 暂停广告 - 创意变更【不在】本阶段范围内(由后续 创意策略 Agent 负责) - 创意级数据的真正目的:在 LLM 准备做 bid_down / pause 决策之前做归因检查 * 防止广告级误判:ROI 低可能只是"某个创意拖后腿",此时应 hold 等创意层优化 * 提供决策理由佐证:reason 明确说"整体问题 vs 某创意问题" 两个工具: - calculate_creative_metrics:批量聚合所有广告的创意级 ROI(Mode 1 工作流 Step 2.5) - get_creative_context:对单个广告做创意归因诊断(LLM 决策时按需调用) """ import logging from datetime import datetime, timedelta from pathlib import Path from typing import List, Optional import pandas as pd from agent.tools import tool from agent.tools.models import ToolContext, ToolResult logger = logging.getLogger(__name__) _MINI_DIR = Path(__file__).resolve().parent.parent _MERGED_DIR = _MINI_DIR / "outputs" / "merged" _CREATIVE_METRICS_DIR = _MINI_DIR / "outputs" / "creative_metrics" # ===== 内部工具函数 ===== def _load_merged_window(end_date_str: str, days: int) -> Optional[pd.DataFrame]: """加载最近 days 天的 merged CSV,concat 后返回。""" try: end_dt = datetime.strptime(end_date_str, "%Y%m%d") except ValueError: logger.error("非法 end_date: %s", end_date_str) return None dfs = [] for i in range(days): date = (end_dt - timedelta(days=days - 1 - i)).strftime("%Y%m%d") csv_path = _MERGED_DIR / f"merged_{date}.csv" if not csv_path.exists(): logger.warning("创意聚合:merged 数据缺失 %s", date) continue df = pd.read_csv(csv_path, dtype={"ad_id": str, "creative_id": str, "account_id": str}) dfs.append(df) if not dfs: return None return pd.concat(dfs, ignore_index=True) def _aggregate_creative_metrics(df: pd.DataFrame, days: int) -> pd.DataFrame: """按 (ad_id, creative_id) 聚合。""" if df.empty: return pd.DataFrame() # 保证关键列存在 required = ["ad_id", "creative_id", "cost", "bizdate"] for col in required: if col not in df.columns: logger.error("创意聚合:缺少必需列 %s", col) return pd.DataFrame() # 安全数值转换 for col in ["cost", "总收入", "首层小程序打开数", "裂变0层回流数", "view_count", "valid_click_count"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) # 过滤掉空 creative_id(有些广告状态行可能 creative_id 为空) df = df[df["creative_id"].notna() & (df["creative_id"].astype(str).str.strip() != "")] # 取 creative_name(若有变更取最新) df_sorted = df.sort_values("bizdate") creative_name_latest = ( df_sorted.groupby(["ad_id", "creative_id"])["creative_name"].last() if "creative_name" in df.columns else None ) # 创意首次出现日期(用于 age) creative_first_date = ( df_sorted.groupby(["ad_id", "creative_id"])["bizdate"].min() ) agg_cols = {"cost": "sum"} if "总收入" in df.columns: agg_cols["总收入"] = "sum" if "首层小程序打开数" in df.columns: agg_cols["首层小程序打开数"] = "sum" if "裂变0层回流数" in df.columns: agg_cols["裂变0层回流数"] = "sum" if "view_count" in df.columns: agg_cols["view_count"] = "sum" if "valid_click_count" in df.columns: agg_cols["valid_click_count"] = "sum" active_days = df.groupby(["ad_id", "creative_id"]).apply( lambda g: g[g["cost"] > 0]["bizdate"].nunique() ) result = df.groupby(["ad_id", "creative_id"]).agg(agg_cols).reset_index() result[f"active_days_{days}d"] = result.apply( lambda r: int(active_days.get((r["ad_id"], r["creative_id"]), 0)), axis=1 ) # 重命名聚合列 rename_map = { "cost": f"cost_{days}d", "总收入": f"revenue_{days}d", "首层小程序打开数": f"open_count_{days}d", "裂变0层回流数": f"fission0_{days}d", "view_count": f"view_count_{days}d", "valid_click_count": f"click_count_{days}d", } result = result.rename(columns={k: v for k, v in rename_map.items() if k in result.columns}) # 附加 creative_name 与 creative_age_days if creative_name_latest is not None: result["creative_name"] = result.apply( lambda r: creative_name_latest.get((r["ad_id"], r["creative_id"]), ""), axis=1 ) else: result["creative_name"] = "" def _age_days(row): first = creative_first_date.get((row["ad_id"], row["creative_id"])) if not first: return None try: first_dt = datetime.strptime(str(first), "%Y%m%d") today = datetime.now() return max((today - first_dt).days, 0) except ValueError: return None result["creative_age_days"] = result.apply(_age_days, axis=1) # 简单 ROI revenue_col = f"revenue_{days}d" cost_col = f"cost_{days}d" if revenue_col in result.columns: result[f"roi_{days}d"] = result.apply( lambda r: round(r[revenue_col] / r[cost_col], 4) if r[cost_col] > 0 else None, axis=1, ) else: result[f"roi_{days}d"] = None # 该创意在所属广告中的消耗占比 ad_total = result.groupby("ad_id")[cost_col].transform("sum") result["cost_share"] = result.apply( lambda r: round(r[cost_col] / ad_total.loc[r.name], 4) if ad_total.loc[r.name] > 0 else 0, axis=1, ) return result def _classify_attribution(creatives_for_ad: pd.DataFrame, cost_col: str, roi_col: str) -> str: """ 基于单个广告下的创意表现,判定归因类型。 返回值对应 skill 中定义的 4 类 attribution_hint: - "single_laggard" : 单创意占 >60% 消耗且 ROI 远低于其他 → 建议 hold - "balanced_bad" : 多创意均衡但整体 ROI 差 → 真广告级问题,保持原负向决策 - "new_cold_start" : 主要消耗集中在 age < 7 天的新创意 → 建议 hold - "mixed" : 其他情况,LLM 自行综合判断 """ # 仅考虑近期有消耗的创意 active = creatives_for_ad[creatives_for_ad[cost_col] > 0].copy() if active.empty: return "mixed" active_sorted = active.sort_values(cost_col, ascending=False) top = active_sorted.iloc[0] top_share = float(top["cost_share"] or 0) # 新创意冷启动:主导创意年龄 < 7 天 top_age = top.get("creative_age_days") if top_share > 0.6 and top_age is not None and top_age < 7: return "new_cold_start" # 单创意拖后腿:主导创意占 >60% 且 ROI 明显低于其他活跃创意 if top_share > 0.6 and len(active_sorted) >= 2: others = active_sorted.iloc[1:] top_roi = top.get(roi_col) other_roi = others[roi_col].dropna() if top_roi is not None and not other_roi.empty: other_mean = other_roi.mean() # 主导创意 ROI 显著低于其他(低于其他均值的 70%) if other_mean > 0 and top_roi < other_mean * 0.7: return "single_laggard" # 多创意均衡且整体差(≥3 个活跃创意,且占比相对均衡) if len(active_sorted) >= 3 and top_share < 0.5: return "balanced_bad" return "mixed" # ===== 工具 1:批量计算创意级指标 ===== @tool(description="按 creative_id 聚合最近 N 天 ROI(供广告决策做创意归因检查)") async def calculate_creative_metrics( ctx: ToolContext = None, days: int = 7, end_date: str = "yesterday", ) -> ToolResult: """ 聚合创意级 ROI 指标。 数据源:outputs/merged/merged_*.csv(最近 N 天),按 (ad_id, creative_id) 聚合: - cost_{N}d, revenue_{N}d, open_count_{N}d, fission0_{N}d - roi_{N}d = revenue_{N}d / cost_{N}d (简单 ROI,不是 动态 ROI (7日均值)) - cost_share = 该创意在所属广告中的消耗占比 - creative_age_days = 该创意首次出现至今天数 - active_days_{N}d = 近 N 天有消耗的天数 输出:outputs/creative_metrics/creative_metrics_{end_date}.csv Args: days: 聚合窗口(默认 7 天) end_date: 结束日期(YYYYMMDD 或 "yesterday") Returns: ToolResult 含 CSV 路径 + 总览统计 """ try: if end_date == "yesterday": end_dt = datetime.now() - timedelta(days=1) else: end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d") end_date_str = end_dt.strftime("%Y%m%d") df = _load_merged_window(end_date_str, days) if df is None or df.empty: return ToolResult( title="创意指标聚合失败", output=f"未找到任何 merged 数据({_MERGED_DIR},最近 {days} 天)", ) result = _aggregate_creative_metrics(df, days) if result.empty: return ToolResult( title="创意指标聚合失败", output="聚合结果为空(可能 creative_id 全为空)", ) _CREATIVE_METRICS_DIR.mkdir(parents=True, exist_ok=True) out_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv" result.to_csv(out_path, index=False, encoding="utf-8-sig") # 总览统计 total = len(result) active = int((result[f"cost_{days}d"] > 0).sum()) ad_count = result["ad_id"].nunique() summary_lines = [ f"✅ 创意级指标聚合完成({days} 天窗口)", f" 输出文件:{out_path}", f" 创意总数:{total}(其中 {active} 个近 {days} 天有消耗)", f" 覆盖广告:{ad_count} 条", ] return ToolResult( title=f"创意级指标聚合({days}天)", output="\n".join(summary_lines), metadata={ "csv_path": str(out_path), "total_creatives": total, "active_creatives": active, "ad_count": ad_count, "days": days, }, ) except Exception as e: logger.exception("calculate_creative_metrics 失败") return ToolResult(title="创意指标聚合异常", output=f"错误:{e}") # ===== 工具 2:单广告归因检查 ===== @tool(description="对准备做负向决策(降价/暂停)的广告,做创意归因检查") async def get_creative_context( ctx: ToolContext = None, ad_id: str = "", days: int = 7, end_date: str = "yesterday", ) -> ToolResult: """ 对指定广告做创意归因诊断。 使用时机(严格): - LLM 初步打算判 bid_down / pause 的广告才调用 - 不对 hold / bid_up 的广告调用(避免无意义开销) Args: ad_id: 广告 ID days: 归因窗口(默认 7 天,需要已先跑过 calculate_creative_metrics) end_date: 结束日期(YYYYMMDD 或 "yesterday") Returns: ToolResult.data: { "ad_id": ..., "total_creatives": int, "active_creatives_7d": int, "dominant_creative_pct": float, "attribution_hint": "single_laggard" | "balanced_bad" | "new_cold_start" | "mixed", "creatives": [...按 cost_share 降序...] } """ try: if end_date == "yesterday": end_dt = datetime.now() - timedelta(days=1) else: end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d") end_date_str = end_dt.strftime("%Y%m%d") metrics_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv" if not metrics_path.exists(): return ToolResult( title="创意归因失败", output=f"创意指标 CSV 不存在:{metrics_path}\n请先运行 calculate_creative_metrics。", ) df = pd.read_csv(metrics_path, dtype={"ad_id": str, "creative_id": str}) ad_str = str(ad_id) ad_df = df[df["ad_id"] == ad_str].copy() if ad_df.empty: return ToolResult( title="创意归因:广告无创意数据", output=f"广告 {ad_id} 在创意指标表中无记录(可能该广告未产生创意级消耗)", metadata={ "ad_id": ad_str, "total_creatives": 0, "active_creatives_7d": 0, "attribution_hint": "mixed", "creatives": [], }, ) cost_col = f"cost_{days}d" roi_col = f"roi_{days}d" # 基础统计 total = len(ad_df) active = int((ad_df[cost_col] > 0).sum()) ad_df_sorted = ad_df.sort_values(cost_col, ascending=False) dominant_pct = ( float(ad_df_sorted.iloc[0]["cost_share"]) if not ad_df_sorted.empty else 0.0 ) attribution_hint = _classify_attribution(ad_df, cost_col, roi_col) # 构建返回创意列表(按 cost_share 降序) creatives_list = [] for _, row in ad_df_sorted.iterrows(): creatives_list.append({ "creative_id": str(row["creative_id"]), "creative_name": str(row.get("creative_name", "") or ""), "cost": round(float(row[cost_col]), 2) if cost_col in row else 0.0, "revenue": round(float(row.get(f"revenue_{days}d", 0)), 2), "roi": ( round(float(row[roi_col]), 4) if roi_col in row and pd.notna(row[roi_col]) else None ), "cost_share": round(float(row.get("cost_share", 0)), 4), "creative_age_days": ( int(row["creative_age_days"]) if pd.notna(row.get("creative_age_days")) else None ), "active_days": int(row.get(f"active_days_{days}d", 0) or 0), }) # 生成人类可读摘要 summary_lines = [ f"广告 {ad_id} 创意归因(近 {days} 天)", f" 总创意数:{total}(活跃 {active})", f" 主导创意占比:{dominant_pct*100:.1f}%", f" 归因判定:{attribution_hint}", ] for c in creatives_list[:5]: roi_str = f"{c['roi']:.2f}" if c["roi"] is not None else "N/A" summary_lines.append( f" - {c['creative_id']}: cost={c['cost']:.0f}元, ROI={roi_str}, 占比={c['cost_share']*100:.1f}%, age={c['creative_age_days']}天" ) return ToolResult( title=f"创意归因 ad_id={ad_id}", output="\n".join(summary_lines), metadata={ "ad_id": ad_str, "total_creatives": total, "active_creatives_7d": active, "dominant_creative_pct": round(dominant_pct, 4), "attribution_hint": attribution_hint, "creatives": creatives_list, }, ) except Exception as e: logger.exception("get_creative_context 失败") return ToolResult(title="创意归因异常", output=f"错误:{e}")