creative_metrics.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. """
  2. 创意级指标工具 — auto_put_ad_mini (Step 2)
  3. 职责:
  4. - 按 creative_id 聚合最近 N 天 ROI(简单 ROI,非 f_7日动态ROI)
  5. - 为"负向决策前做创意归因检查"提供数据支撑
  6. 使用定位(重要):
  7. - 本阶段 Agent 决策范围是【广告级】:调出价 / 暂停广告
  8. - 创意变更【不在】本阶段范围内(由后续 创意策略 Agent 负责)
  9. - 创意级数据的真正目的:在 LLM 准备做 bid_down / pause 决策之前做归因检查
  10. * 防止广告级误判:ROI 低可能只是"某个创意拖后腿",此时应 hold 等创意层优化
  11. * 提供决策理由佐证:reason 明确说"整体问题 vs 某创意问题"
  12. 两个工具:
  13. - calculate_creative_metrics:批量聚合所有广告的创意级 ROI(Mode 1 工作流 Step 2.5)
  14. - get_creative_context:对单个广告做创意归因诊断(LLM 决策时按需调用)
  15. """
  16. import logging
  17. from datetime import datetime, timedelta
  18. from pathlib import Path
  19. from typing import List, Optional
  20. import pandas as pd
  21. from agent.tools import tool
  22. from agent.tools.models import ToolContext, ToolResult
  23. logger = logging.getLogger(__name__)
  24. _MINI_DIR = Path(__file__).resolve().parent.parent
  25. _MERGED_DIR = _MINI_DIR / "outputs" / "merged"
  26. _CREATIVE_METRICS_DIR = _MINI_DIR / "outputs" / "creative_metrics"
  27. # ===== 内部工具函数 =====
  28. def _load_merged_window(end_date_str: str, days: int) -> Optional[pd.DataFrame]:
  29. """加载最近 days 天的 merged CSV,concat 后返回。"""
  30. try:
  31. end_dt = datetime.strptime(end_date_str, "%Y%m%d")
  32. except ValueError:
  33. logger.error("非法 end_date: %s", end_date_str)
  34. return None
  35. dfs = []
  36. for i in range(days):
  37. date = (end_dt - timedelta(days=days - 1 - i)).strftime("%Y%m%d")
  38. csv_path = _MERGED_DIR / f"merged_{date}.csv"
  39. if not csv_path.exists():
  40. logger.warning("创意聚合:merged 数据缺失 %s", date)
  41. continue
  42. df = pd.read_csv(csv_path, dtype={"ad_id": str, "creative_id": str, "account_id": str})
  43. dfs.append(df)
  44. if not dfs:
  45. return None
  46. return pd.concat(dfs, ignore_index=True)
  47. def _aggregate_creative_metrics(df: pd.DataFrame, days: int) -> pd.DataFrame:
  48. """按 (ad_id, creative_id) 聚合。"""
  49. if df.empty:
  50. return pd.DataFrame()
  51. # 保证关键列存在
  52. required = ["ad_id", "creative_id", "cost", "bizdate"]
  53. for col in required:
  54. if col not in df.columns:
  55. logger.error("创意聚合:缺少必需列 %s", col)
  56. return pd.DataFrame()
  57. # 安全数值转换
  58. for col in ["cost", "总收入", "首层小程序打开数", "裂变0层回流数", "view_count", "valid_click_count"]:
  59. if col in df.columns:
  60. df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
  61. # 过滤掉空 creative_id(有些广告状态行可能 creative_id 为空)
  62. df = df[df["creative_id"].notna() & (df["creative_id"].astype(str).str.strip() != "")]
  63. # 取 creative_name(若有变更取最新)
  64. df_sorted = df.sort_values("bizdate")
  65. creative_name_latest = (
  66. df_sorted.groupby(["ad_id", "creative_id"])["creative_name"].last()
  67. if "creative_name" in df.columns
  68. else None
  69. )
  70. # 创意首次出现日期(用于 age)
  71. creative_first_date = (
  72. df_sorted.groupby(["ad_id", "creative_id"])["bizdate"].min()
  73. )
  74. agg_cols = {"cost": "sum"}
  75. if "总收入" in df.columns:
  76. agg_cols["总收入"] = "sum"
  77. if "首层小程序打开数" in df.columns:
  78. agg_cols["首层小程序打开数"] = "sum"
  79. if "裂变0层回流数" in df.columns:
  80. agg_cols["裂变0层回流数"] = "sum"
  81. if "view_count" in df.columns:
  82. agg_cols["view_count"] = "sum"
  83. if "valid_click_count" in df.columns:
  84. agg_cols["valid_click_count"] = "sum"
  85. active_days = df.groupby(["ad_id", "creative_id"]).apply(
  86. lambda g: g[g["cost"] > 0]["bizdate"].nunique()
  87. )
  88. result = df.groupby(["ad_id", "creative_id"]).agg(agg_cols).reset_index()
  89. result[f"active_days_{days}d"] = result.apply(
  90. lambda r: int(active_days.get((r["ad_id"], r["creative_id"]), 0)), axis=1
  91. )
  92. # 重命名聚合列
  93. rename_map = {
  94. "cost": f"cost_{days}d",
  95. "总收入": f"revenue_{days}d",
  96. "首层小程序打开数": f"open_count_{days}d",
  97. "裂变0层回流数": f"fission0_{days}d",
  98. "view_count": f"view_count_{days}d",
  99. "valid_click_count": f"click_count_{days}d",
  100. }
  101. result = result.rename(columns={k: v for k, v in rename_map.items() if k in result.columns})
  102. # 附加 creative_name 与 creative_age_days
  103. if creative_name_latest is not None:
  104. result["creative_name"] = result.apply(
  105. lambda r: creative_name_latest.get((r["ad_id"], r["creative_id"]), ""), axis=1
  106. )
  107. else:
  108. result["creative_name"] = ""
  109. def _age_days(row):
  110. first = creative_first_date.get((row["ad_id"], row["creative_id"]))
  111. if not first:
  112. return None
  113. try:
  114. first_dt = datetime.strptime(str(first), "%Y%m%d")
  115. today = datetime.now()
  116. return max((today - first_dt).days, 0)
  117. except ValueError:
  118. return None
  119. result["creative_age_days"] = result.apply(_age_days, axis=1)
  120. # 简单 ROI
  121. revenue_col = f"revenue_{days}d"
  122. cost_col = f"cost_{days}d"
  123. if revenue_col in result.columns:
  124. result[f"roi_{days}d"] = result.apply(
  125. lambda r: round(r[revenue_col] / r[cost_col], 4) if r[cost_col] > 0 else None,
  126. axis=1,
  127. )
  128. else:
  129. result[f"roi_{days}d"] = None
  130. # 该创意在所属广告中的消耗占比
  131. ad_total = result.groupby("ad_id")[cost_col].transform("sum")
  132. result["cost_share"] = result.apply(
  133. lambda r: round(r[cost_col] / ad_total.loc[r.name], 4) if ad_total.loc[r.name] > 0 else 0,
  134. axis=1,
  135. )
  136. return result
  137. def _classify_attribution(creatives_for_ad: pd.DataFrame, cost_col: str, roi_col: str) -> str:
  138. """
  139. 基于单个广告下的创意表现,判定归因类型。
  140. 返回值对应 skill 中定义的 4 类 attribution_hint:
  141. - "single_laggard" : 单创意占 >60% 消耗且 ROI 远低于其他 → 建议 hold
  142. - "balanced_bad" : 多创意均衡但整体 ROI 差 → 真广告级问题,保持原负向决策
  143. - "new_cold_start" : 主要消耗集中在 age < 7 天的新创意 → 建议 hold
  144. - "mixed" : 其他情况,LLM 自行综合判断
  145. """
  146. # 仅考虑近期有消耗的创意
  147. active = creatives_for_ad[creatives_for_ad[cost_col] > 0].copy()
  148. if active.empty:
  149. return "mixed"
  150. active_sorted = active.sort_values(cost_col, ascending=False)
  151. top = active_sorted.iloc[0]
  152. top_share = float(top["cost_share"] or 0)
  153. # 新创意冷启动:主导创意年龄 < 7 天
  154. top_age = top.get("creative_age_days")
  155. if top_share > 0.6 and top_age is not None and top_age < 7:
  156. return "new_cold_start"
  157. # 单创意拖后腿:主导创意占 >60% 且 ROI 明显低于其他活跃创意
  158. if top_share > 0.6 and len(active_sorted) >= 2:
  159. others = active_sorted.iloc[1:]
  160. top_roi = top.get(roi_col)
  161. other_roi = others[roi_col].dropna()
  162. if top_roi is not None and not other_roi.empty:
  163. other_mean = other_roi.mean()
  164. # 主导创意 ROI 显著低于其他(低于其他均值的 70%)
  165. if other_mean > 0 and top_roi < other_mean * 0.7:
  166. return "single_laggard"
  167. # 多创意均衡且整体差(≥3 个活跃创意,且占比相对均衡)
  168. if len(active_sorted) >= 3 and top_share < 0.5:
  169. return "balanced_bad"
  170. return "mixed"
  171. # ===== 工具 1:批量计算创意级指标 =====
  172. @tool(description="按 creative_id 聚合最近 N 天 ROI(供广告决策做创意归因检查)")
  173. async def calculate_creative_metrics(
  174. ctx: ToolContext,
  175. days: int = 7,
  176. end_date: str = "yesterday",
  177. ) -> ToolResult:
  178. """
  179. 聚合创意级 ROI 指标。
  180. 数据源:outputs/merged/merged_*.csv(最近 N 天),按 (ad_id, creative_id) 聚合:
  181. - cost_{N}d, revenue_{N}d, open_count_{N}d, fission0_{N}d
  182. - roi_{N}d = revenue_{N}d / cost_{N}d (简单 ROI,不是 f_7日动态ROI)
  183. - cost_share = 该创意在所属广告中的消耗占比
  184. - creative_age_days = 该创意首次出现至今天数
  185. - active_days_{N}d = 近 N 天有消耗的天数
  186. 输出:outputs/creative_metrics/creative_metrics_{end_date}.csv
  187. Args:
  188. days: 聚合窗口(默认 7 天)
  189. end_date: 结束日期(YYYYMMDD 或 "yesterday")
  190. Returns:
  191. ToolResult 含 CSV 路径 + 总览统计
  192. """
  193. try:
  194. if end_date == "yesterday":
  195. end_dt = datetime.now() - timedelta(days=1)
  196. else:
  197. end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
  198. end_date_str = end_dt.strftime("%Y%m%d")
  199. df = _load_merged_window(end_date_str, days)
  200. if df is None or df.empty:
  201. return ToolResult(
  202. title="创意指标聚合失败",
  203. output=f"未找到任何 merged 数据({_MERGED_DIR},最近 {days} 天)",
  204. )
  205. result = _aggregate_creative_metrics(df, days)
  206. if result.empty:
  207. return ToolResult(
  208. title="创意指标聚合失败",
  209. output="聚合结果为空(可能 creative_id 全为空)",
  210. )
  211. _CREATIVE_METRICS_DIR.mkdir(parents=True, exist_ok=True)
  212. out_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv"
  213. result.to_csv(out_path, index=False, encoding="utf-8-sig")
  214. # 总览统计
  215. total = len(result)
  216. active = int((result[f"cost_{days}d"] > 0).sum())
  217. ad_count = result["ad_id"].nunique()
  218. summary_lines = [
  219. f"✅ 创意级指标聚合完成({days} 天窗口)",
  220. f" 输出文件:{out_path}",
  221. f" 创意总数:{total}(其中 {active} 个近 {days} 天有消耗)",
  222. f" 覆盖广告:{ad_count} 条",
  223. ]
  224. return ToolResult(
  225. title=f"创意级指标聚合({days}天)",
  226. output="\n".join(summary_lines),
  227. metadata={
  228. "csv_path": str(out_path),
  229. "total_creatives": total,
  230. "active_creatives": active,
  231. "ad_count": ad_count,
  232. "days": days,
  233. },
  234. )
  235. except Exception as e:
  236. logger.exception("calculate_creative_metrics 失败")
  237. return ToolResult(title="创意指标聚合异常", output=f"错误:{e}")
  238. # ===== 工具 2:单广告归因检查 =====
  239. @tool(description="对准备做负向决策(降价/暂停)的广告,做创意归因检查")
  240. async def get_creative_context(
  241. ctx: ToolContext,
  242. ad_id: str,
  243. days: int = 7,
  244. end_date: str = "yesterday",
  245. ) -> ToolResult:
  246. """
  247. 对指定广告做创意归因诊断。
  248. 使用时机(严格):
  249. - LLM 初步打算判 bid_down / pause 的广告才调用
  250. - 不对 hold / bid_up 的广告调用(避免无意义开销)
  251. Args:
  252. ad_id: 广告 ID
  253. days: 归因窗口(默认 7 天,需要已先跑过 calculate_creative_metrics)
  254. end_date: 结束日期(YYYYMMDD 或 "yesterday")
  255. Returns:
  256. ToolResult.data:
  257. {
  258. "ad_id": ...,
  259. "total_creatives": int,
  260. "active_creatives_7d": int,
  261. "dominant_creative_pct": float,
  262. "attribution_hint": "single_laggard" | "balanced_bad" | "new_cold_start" | "mixed",
  263. "creatives": [...按 cost_share 降序...]
  264. }
  265. """
  266. try:
  267. if end_date == "yesterday":
  268. end_dt = datetime.now() - timedelta(days=1)
  269. else:
  270. end_dt = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
  271. end_date_str = end_dt.strftime("%Y%m%d")
  272. metrics_path = _CREATIVE_METRICS_DIR / f"creative_metrics_{end_date_str}.csv"
  273. if not metrics_path.exists():
  274. return ToolResult(
  275. title="创意归因失败",
  276. output=f"创意指标 CSV 不存在:{metrics_path}\n请先运行 calculate_creative_metrics。",
  277. )
  278. df = pd.read_csv(metrics_path, dtype={"ad_id": str, "creative_id": str})
  279. ad_str = str(ad_id)
  280. ad_df = df[df["ad_id"] == ad_str].copy()
  281. if ad_df.empty:
  282. return ToolResult(
  283. title="创意归因:广告无创意数据",
  284. output=f"广告 {ad_id} 在创意指标表中无记录(可能该广告未产生创意级消耗)",
  285. metadata={
  286. "ad_id": ad_str,
  287. "total_creatives": 0,
  288. "active_creatives_7d": 0,
  289. "attribution_hint": "mixed",
  290. "creatives": [],
  291. },
  292. )
  293. cost_col = f"cost_{days}d"
  294. roi_col = f"roi_{days}d"
  295. # 基础统计
  296. total = len(ad_df)
  297. active = int((ad_df[cost_col] > 0).sum())
  298. ad_df_sorted = ad_df.sort_values(cost_col, ascending=False)
  299. dominant_pct = (
  300. float(ad_df_sorted.iloc[0]["cost_share"]) if not ad_df_sorted.empty else 0.0
  301. )
  302. attribution_hint = _classify_attribution(ad_df, cost_col, roi_col)
  303. # 构建返回创意列表(按 cost_share 降序)
  304. creatives_list = []
  305. for _, row in ad_df_sorted.iterrows():
  306. creatives_list.append({
  307. "creative_id": str(row["creative_id"]),
  308. "creative_name": str(row.get("creative_name", "") or ""),
  309. "cost": round(float(row[cost_col]), 2) if cost_col in row else 0.0,
  310. "revenue": round(float(row.get(f"revenue_{days}d", 0)), 2),
  311. "roi": (
  312. round(float(row[roi_col]), 4)
  313. if roi_col in row and pd.notna(row[roi_col])
  314. else None
  315. ),
  316. "cost_share": round(float(row.get("cost_share", 0)), 4),
  317. "creative_age_days": (
  318. int(row["creative_age_days"])
  319. if pd.notna(row.get("creative_age_days"))
  320. else None
  321. ),
  322. "active_days": int(row.get(f"active_days_{days}d", 0) or 0),
  323. })
  324. # 生成人类可读摘要
  325. summary_lines = [
  326. f"广告 {ad_id} 创意归因(近 {days} 天)",
  327. f" 总创意数:{total}(活跃 {active})",
  328. f" 主导创意占比:{dominant_pct*100:.1f}%",
  329. f" 归因判定:{attribution_hint}",
  330. ]
  331. for c in creatives_list[:5]:
  332. roi_str = f"{c['roi']:.2f}" if c["roi"] is not None else "N/A"
  333. summary_lines.append(
  334. f" - {c['creative_id']}: cost={c['cost']:.0f}元, ROI={roi_str}, 占比={c['cost_share']*100:.1f}%, age={c['creative_age_days']}天"
  335. )
  336. return ToolResult(
  337. title=f"创意归因 ad_id={ad_id}",
  338. output="\n".join(summary_lines),
  339. metadata={
  340. "ad_id": ad_str,
  341. "total_creatives": total,
  342. "active_creatives_7d": active,
  343. "dominant_creative_pct": round(dominant_pct, 4),
  344. "attribution_hint": attribution_hint,
  345. "creatives": creatives_list,
  346. },
  347. )
  348. except Exception as e:
  349. logger.exception("get_creative_context 失败")
  350. return ToolResult(title="创意归因异常", output=f"错误:{e}")