posterior_collector.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. """
  2. 后验数据采集 — 为进化奠基(V4 Layer 5)
  3. 职责:
  4. - 给定决策日期,回采 t+1d / t+3d / t+7d 的广告 ROI 和消耗
  5. - 输出"决策 ↔ 后验"配对 CSV
  6. - 自动给决策打 outcome_label(good/bad/neutral/executed)
  7. - 回填 snapshot.json 的 posterior_* 字段
  8. ⚠️ 本轮(auto_put_ad_mini_v4)不在主流程调用;下阶段反馈环对接。
  9. - 接口签名固化:collect_posterior_data(decision_date, posterior_days, update_snapshot)
  10. - 数据 schema 固化:posterior.csv 字段顺序见下文 POSTERIOR_CSV_COLUMNS
  11. - 决策 outcome_label 标注规则固化:见 _label_outcome()
  12. ──────────────────────────────────────────────────────────────────────
  13. 后续接入路径(下阶段反馈环可直接对接):
  14. 1. 每日定时任务:collect_posterior_data(yesterday, 1) → 滚动回采
  15. 2. T+3 / T+7 定时任务:再回采一次(数据稳定后)
  16. 3. 累计 100~500 条带后验的样本后:
  17. - 训练 Few-shot 示例池(outcome=good 的高质量决策)
  18. - 用回归校准 DECAY_WEIGHTS 等规则权重
  19. - 生成决策质量监控仪表盘(精确率/召回率)
  20. ──────────────────────────────────────────────────────────────────────
  21. """
  22. import json
  23. import logging
  24. import sys
  25. from datetime import datetime, timedelta
  26. from pathlib import Path
  27. from typing import Any, Dict, List, Optional
  28. import pandas as pd
  29. from agent.tools import tool
  30. from agent.tools.models import ToolContext, ToolResult
  31. _MINI_DIR = Path(__file__).resolve().parent.parent
  32. if str(_MINI_DIR) not in sys.path:
  33. sys.path.insert(0, str(_MINI_DIR))
  34. logger = logging.getLogger(__name__)
  35. # ═══════════════════════════════════════════
  36. # 数据 Schema 定义(固化,下阶段不要改)
  37. # ═══════════════════════════════════════════
  38. POSTERIOR_CSV_COLUMNS: List[str] = [
  39. # 决策标识
  40. "ad_id",
  41. "decision_date",
  42. "action",
  43. "dimension",
  44. "source", # rule_pause / llm_main / llm_override
  45. # 决策时刻的快照
  46. "decision_roi_7d", # 决策时的 动态ROI_7日均值
  47. "decision_cost_7d_avg",
  48. "decision_bid_amount",
  49. # 后验回采(多窗口)
  50. "posterior_t1d_roi",
  51. "posterior_t1d_cost",
  52. "posterior_t3d_roi",
  53. "posterior_t3d_cost",
  54. "posterior_t7d_roi",
  55. "posterior_t7d_cost",
  56. # 计算出的相对变化
  57. "roi_change_t3d_pct", # (posterior_t3d - decision) / decision
  58. "cost_change_t3d_pct",
  59. # 标签 + 评分溯源(JSON 串,便于按信号统计)
  60. "outcome_label", # good / bad / neutral / executed
  61. "signal_scores_json",
  62. "reasoning_chain_json",
  63. "rule_alignment", # agree / override
  64. ]
  65. # ═══════════════════════════════════════════
  66. # 内部工具函数
  67. # ═══════════════════════════════════════════
  68. def _safe_pct_change(new_v: Optional[float], old_v: Optional[float]) -> Optional[float]:
  69. """安全计算百分比变化,None / 0 兜底。"""
  70. if new_v is None or old_v is None:
  71. return None
  72. try:
  73. if abs(old_v) < 1e-9:
  74. return None
  75. return round((new_v - old_v) / old_v, 4)
  76. except Exception:
  77. return None
  78. def _label_outcome(
  79. decision_action: str,
  80. decision_roi: Optional[float],
  81. decision_cost: Optional[float],
  82. posterior_t3d_roi: Optional[float],
  83. posterior_t3d_cost: Optional[float],
  84. ) -> str:
  85. """
  86. 后验标注规则(固化,下阶段不要改):
  87. - action = pause:
  88. posterior_t3d_cost ≈ 0 → executed(说明确实暂停了)
  89. posterior_t3d_cost > 0 → bad(暂停决策没生效或被复活)
  90. - action = bid_down:
  91. ROI 提升 → good
  92. ROI 持平 → neutral
  93. ROI 下降 → bad
  94. - action = bid_up:
  95. cost 提升 + ROI 不掉 → good
  96. cost 不变 → neutral
  97. ROI 大跌 → bad
  98. - action = hold / needs_llm: 默认 neutral
  99. """
  100. if posterior_t3d_roi is None and posterior_t3d_cost is None:
  101. return "unknown"
  102. act = (decision_action or "").lower()
  103. if act == "pause":
  104. if posterior_t3d_cost is None:
  105. return "unknown"
  106. return "executed" if posterior_t3d_cost < 1.0 else "bad"
  107. if act == "bid_down":
  108. roi_chg = _safe_pct_change(posterior_t3d_roi, decision_roi)
  109. if roi_chg is None:
  110. return "unknown"
  111. if roi_chg > 0.05:
  112. return "good"
  113. if roi_chg < -0.05:
  114. return "bad"
  115. return "neutral"
  116. if act == "bid_up":
  117. cost_chg = _safe_pct_change(posterior_t3d_cost, decision_cost)
  118. roi_chg = _safe_pct_change(posterior_t3d_roi, decision_roi)
  119. if cost_chg is None:
  120. return "unknown"
  121. if roi_chg is not None and roi_chg < -0.10:
  122. return "bad"
  123. if cost_chg > 0.10:
  124. return "good"
  125. return "neutral"
  126. return "neutral"
  127. def _date_offset(date_str: str, days: int) -> str:
  128. """YYYYMMDD + N 天 → YYYYMMDD"""
  129. dt = datetime.strptime(date_str, "%Y%m%d") + timedelta(days=days)
  130. return dt.strftime("%Y%m%d")
  131. def _load_metrics_for_date(date_str: str) -> Optional[pd.DataFrame]:
  132. """
  133. 尝试加载某一天的 metrics 快照(用于回采当天 ROI / cost)。
  134. 优先策略(下阶段可扩展):
  135. 1. outputs/metrics_history/{date}.csv
  136. 2. outputs/metrics_temp.csv(如果它的 end_date 等于 date)
  137. 3. None(无数据)
  138. """
  139. history_csv = _MINI_DIR / "outputs" / "metrics_history" / f"{date_str}.csv"
  140. if history_csv.exists():
  141. try:
  142. return pd.read_csv(history_csv)
  143. except Exception as e:
  144. logger.warning("加载历史 metrics 失败 %s: %s", history_csv, e)
  145. # fallback: temp(仅当日期吻合)
  146. temp_csv = _MINI_DIR / "outputs" / "metrics_temp.csv"
  147. if temp_csv.exists():
  148. try:
  149. df = pd.read_csv(temp_csv)
  150. if "end_date" in df.columns:
  151. if str(df["end_date"].iloc[0]) == date_str:
  152. return df
  153. except Exception:
  154. pass
  155. return None
  156. def _lookup_posterior(
  157. df: Optional[pd.DataFrame],
  158. ad_id: int,
  159. ) -> Dict[str, Optional[float]]:
  160. """从 metrics DataFrame 提取某广告的 ROI / cost。"""
  161. if df is None or df.empty:
  162. return {"roi": None, "cost": None}
  163. try:
  164. row = df[df["ad_id"] == ad_id]
  165. if row.empty:
  166. return {"roi": None, "cost": None}
  167. r = row.iloc[0]
  168. roi_val = r.get("动态ROI_7日均值")
  169. cost_val = r.get("cost_7d_avg")
  170. return {
  171. "roi": float(roi_val) if pd.notna(roi_val) else None,
  172. "cost": float(cost_val) if pd.notna(cost_val) else None,
  173. }
  174. except Exception as e:
  175. logger.warning("查找广告 %s 后验数据失败: %s", ad_id, e)
  176. return {"roi": None, "cost": None}
  177. # ═══════════════════════════════════════════
  178. # 主接口:后验采集(固化签名)
  179. # ═══════════════════════════════════════════
  180. @tool(description="V4 后验数据采集(决策 → 执行后效果配对)— 本轮预留接口,主流程不调用")
  181. async def collect_posterior_data(
  182. ctx: ToolContext = None,
  183. decision_date: str = "",
  184. posterior_days: int = 7,
  185. update_snapshot: bool = True,
  186. ) -> ToolResult:
  187. """
  188. 后验数据采集 — 决策 ↔ 执行后效果配对。
  189. Args:
  190. ctx: 工具上下文
  191. decision_date: 决策日期 YYYYMMDD(即 snapshot.json 的目录名)
  192. posterior_days: 采集多少天后验(默认 7)
  193. update_snapshot: 是否回写 snapshot.json 的 posterior_* 字段(默认 True)
  194. 流程:
  195. 1. 加载 outputs/decisions_history/{decision_date}/snapshot.json
  196. 2. 对每条决策的 ad_id,分别加载 t+1d / t+3d / t+7d 的 metrics
  197. 3. 计算 posterior_t1d/t3d/t7d 的 roi/cost
  198. 4. 给决策打 outcome_label(_label_outcome 规则)
  199. 5. 写 outputs/decisions_history/{decision_date}/posterior.csv
  200. 6. (可选)回写 snapshot.json 的 posterior_* 和 outcome_label
  201. Returns:
  202. ToolResult 含统计 summary:good/bad/neutral/executed/unknown 的分布
  203. ⚠️ 本轮不在 prompts/system.prompt 中引用;仅供下阶段反馈环对接。
  204. """
  205. try:
  206. snap_dir = _MINI_DIR / "outputs" / "decisions_history" / decision_date
  207. snap_path = snap_dir / "snapshot.json"
  208. if not snap_path.exists():
  209. return ToolResult(
  210. title="collect_posterior_data 失败",
  211. output=f"决策快照不存在: {snap_path}",
  212. )
  213. snapshot = json.loads(snap_path.read_text(encoding="utf-8"))
  214. decisions = snapshot.get("decisions", [])
  215. if not decisions:
  216. return ToolResult(
  217. title="collect_posterior_data",
  218. output=f"快照 {decision_date} 无决策记录",
  219. )
  220. # 预加载 t+1d / t+3d / t+7d 的 metrics
  221. t1d_date = _date_offset(decision_date, 1)
  222. t3d_date = _date_offset(decision_date, 3)
  223. t7d_date = _date_offset(decision_date, 7)
  224. df_t1d = _load_metrics_for_date(t1d_date)
  225. df_t3d = _load_metrics_for_date(t3d_date)
  226. df_t7d = _load_metrics_for_date(t7d_date) if posterior_days >= 7 else None
  227. rows: List[Dict[str, Any]] = []
  228. outcome_dist: Dict[str, int] = {}
  229. for d in decisions:
  230. ad_id_raw = d.get("ad_id")
  231. try:
  232. ad_id_int = int(ad_id_raw)
  233. except (ValueError, TypeError):
  234. continue
  235. # 决策快照本体
  236. input_signals = d.get("input_signals") or {}
  237. decision_roi = input_signals.get("动态ROI_7日均值")
  238. decision_cost = input_signals.get("cost_7d_avg")
  239. decision_bid = input_signals.get("bid_amount")
  240. # 各窗口后验
  241. p1 = _lookup_posterior(df_t1d, ad_id_int)
  242. p3 = _lookup_posterior(df_t3d, ad_id_int)
  243. p7 = _lookup_posterior(df_t7d, ad_id_int) if df_t7d is not None else {"roi": None, "cost": None}
  244. # 标签
  245. label = _label_outcome(
  246. decision_action=d.get("action"),
  247. decision_roi=decision_roi,
  248. decision_cost=decision_cost,
  249. posterior_t3d_roi=p3["roi"],
  250. posterior_t3d_cost=p3["cost"],
  251. )
  252. outcome_dist[label] = outcome_dist.get(label, 0) + 1
  253. # 写一行
  254. rows.append({
  255. "ad_id": ad_id_int,
  256. "decision_date": decision_date,
  257. "action": d.get("action"),
  258. "dimension": d.get("dimension"),
  259. "source": d.get("source"),
  260. "decision_roi_7d": decision_roi,
  261. "decision_cost_7d_avg": decision_cost,
  262. "decision_bid_amount": decision_bid,
  263. "posterior_t1d_roi": p1["roi"],
  264. "posterior_t1d_cost": p1["cost"],
  265. "posterior_t3d_roi": p3["roi"],
  266. "posterior_t3d_cost": p3["cost"],
  267. "posterior_t7d_roi": p7["roi"],
  268. "posterior_t7d_cost": p7["cost"],
  269. "roi_change_t3d_pct": _safe_pct_change(p3["roi"], decision_roi),
  270. "cost_change_t3d_pct": _safe_pct_change(p3["cost"], decision_cost),
  271. "outcome_label": label,
  272. "signal_scores_json": json.dumps(d.get("signal_scores") or {}, ensure_ascii=False),
  273. "reasoning_chain_json": json.dumps(d.get("reasoning_chain") or [], ensure_ascii=False),
  274. "rule_alignment": d.get("rule_alignment"),
  275. })
  276. # 回写 snapshot
  277. if update_snapshot:
  278. d["posterior_t1d"] = p1
  279. d["posterior_t3d"] = p3
  280. d["posterior_t7d"] = p7
  281. d["outcome_label"] = label
  282. # 输出 posterior.csv
  283. out_csv = snap_dir / "posterior.csv"
  284. df_out = pd.DataFrame(rows, columns=POSTERIOR_CSV_COLUMNS)
  285. df_out.to_csv(out_csv, index=False, encoding="utf-8-sig")
  286. # 回写 snapshot
  287. if update_snapshot:
  288. snapshot["metadata"] = snapshot.get("metadata", {})
  289. snapshot["metadata"]["posterior_collected_at"] = datetime.now().isoformat()
  290. snapshot["metadata"]["posterior_days"] = posterior_days
  291. snap_path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2), encoding="utf-8")
  292. # 汇总
  293. lines = [
  294. f"决策日期: {decision_date}",
  295. f"采集窗口: t+1d={t1d_date}, t+3d={t3d_date}, t+7d={t7d_date}",
  296. f"决策总数: {len(decisions)}",
  297. f"配对 CSV: {out_csv}",
  298. "",
  299. "outcome_label 分布:",
  300. ]
  301. for k, v in sorted(outcome_dist.items(), key=lambda x: -x[1]):
  302. lines.append(f" {k}: {v}")
  303. return ToolResult(
  304. title=f"后验采集完成({decision_date})",
  305. output="\n".join(lines),
  306. metadata={
  307. "decision_date": decision_date,
  308. "posterior_csv": str(out_csv),
  309. "snapshot_updated": update_snapshot,
  310. "decision_count": len(decisions),
  311. "outcome_distribution": outcome_dist,
  312. "posterior_dates": {"t1d": t1d_date, "t3d": t3d_date, "t7d": t7d_date},
  313. },
  314. )
  315. except Exception as e:
  316. logger.error("collect_posterior_data 失败: %s", e, exc_info=True)
  317. return ToolResult(title="collect_posterior_data 失败", output=str(e))