analyze_snapshot.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. """
  2. V4 决策快照分析 — 实测观测用
  3. 用法:
  4. cd /Users/liulidong/project/agent/Agent
  5. .venv/bin/python3 examples/auto_put_ad_mini/analyze_snapshot.py --date 20260415
  6. 输入:examples/auto_put_ad_mini/outputs/decisions_history/{date}/snapshot.json
  7. 输出:
  8. - stdout:可读报告
  9. - 同目录 quality_report.json:结构化指标
  10. 评估维度(9 项):
  11. 1. 决策分布(action / source / LLM 占比)
  12. 2. bid_down 多样性(dimension 种类数 = 上一轮瓶颈)
  13. 3. rule_alignment 分布 + override 率
  14. 4. reasoning_chain 质量(覆盖率、长度、引用信号)
  15. 5. 按 tier 分布
  16. 6. 信号健康度(decay/bid_down/bid_up 分布)
  17. 7. 同桶决策一致性(抽样熵)
  18. 8. 上下文规模估算
  19. 9. 质量评级判定
  20. """
  21. import argparse
  22. import json
  23. import math
  24. import sys
  25. from collections import Counter
  26. from pathlib import Path
  27. from typing import Any, Dict, List, Optional, Tuple
  28. HERE = Path(__file__).resolve().parent
  29. # ═══════════════════════════════════════════
  30. # 工具函数
  31. # ═══════════════════════════════════════════
  32. def _num_stats(values: List[Any]) -> Optional[Dict[str, float]]:
  33. nums = [float(v) for v in values
  34. if v is not None and not (isinstance(v, float) and math.isnan(v))]
  35. if not nums:
  36. return None
  37. nums_sorted = sorted(nums)
  38. n = len(nums_sorted)
  39. return {
  40. "count": n,
  41. "mean": round(sum(nums_sorted) / n, 4),
  42. "min": round(nums_sorted[0], 4),
  43. "max": round(nums_sorted[-1], 4),
  44. "p25": round(nums_sorted[max(0, n // 4)], 4),
  45. "p50": round(nums_sorted[n // 2], 4),
  46. "p75": round(nums_sorted[min(n - 1, 3 * n // 4)], 4),
  47. }
  48. def _mean(values: List[Any]) -> Optional[float]:
  49. nums = [float(v) for v in values
  50. if v is not None and not (isinstance(v, float) and math.isnan(v))]
  51. return round(sum(nums) / len(nums), 4) if nums else None
  52. def _nonzero_rate(values: List[Any]) -> Optional[float]:
  53. nums = [v for v in values
  54. if v is not None and not (isinstance(v, float) and math.isnan(v))]
  55. if not nums:
  56. return None
  57. return round(sum(1 for v in nums if float(v) > 0) / len(nums), 3)
  58. def load_snapshot(date: str) -> Dict[str, Any]:
  59. snap_path = HERE / "outputs" / "decisions_history" / date / "snapshot.json"
  60. if not snap_path.exists():
  61. raise FileNotFoundError(f"快照不存在: {snap_path}")
  62. return json.loads(snap_path.read_text(encoding="utf-8"))
  63. # ═══════════════════════════════════════════
  64. # 分析模块
  65. # ═══════════════════════════════════════════
  66. def action_distribution(decisions: List[Dict]) -> Dict[str, Any]:
  67. total = len(decisions)
  68. action_counter: Counter = Counter()
  69. source_counter: Counter = Counter()
  70. for d in decisions:
  71. action_counter[d.get("action", "unknown")] += 1
  72. source_counter[d.get("source", "unknown")] += 1
  73. llm_keys = {"llm", "llm_main", "llm_override"}
  74. llm_count = sum(v for k, v in source_counter.items() if k in llm_keys)
  75. return {
  76. "total": total,
  77. "action_counts": dict(action_counter),
  78. "action_pct": {k: round(v / total, 3) for k, v in action_counter.items()} if total else {},
  79. "source_counts": dict(source_counter),
  80. "llm_decision_count": llm_count,
  81. "llm_decision_pct": round(llm_count / total, 3) if total else 0,
  82. }
  83. def bid_down_diversity(decisions: List[Dict]) -> Dict[str, Any]:
  84. bid_down = [d for d in decisions if d.get("action") == "bid_down"]
  85. dim_counter = Counter(d.get("dimension", "unknown") for d in bid_down)
  86. scenario_counter = Counter(
  87. (d.get("signal_scores") or {}).get("bid_down_scenario", "unknown")
  88. for d in bid_down
  89. )
  90. pcts = [d.get("recommended_change_pct") for d in bid_down
  91. if d.get("recommended_change_pct") is not None]
  92. return {
  93. "total": len(bid_down),
  94. "dimension_count": len(dim_counter),
  95. "dimensions": dict(dim_counter.most_common(10)),
  96. "scenarios": dict(scenario_counter.most_common()),
  97. "pct_stats": _num_stats(pcts),
  98. }
  99. def rule_alignment_dist(decisions: List[Dict]) -> Dict[str, Any]:
  100. align_counter: Counter = Counter()
  101. override_reasons: List[str] = []
  102. for d in decisions:
  103. # 跳过 auto_pause(规则硬底线,不参与 align 统计)
  104. if d.get("source") == "auto_pause":
  105. continue
  106. align = d.get("rule_alignment") or "unknown"
  107. align_counter[align] += 1
  108. if align == "override" and d.get("override_reason"):
  109. override_reasons.append(str(d["override_reason"]))
  110. known_total = sum(v for k, v in align_counter.items() if k != "unknown")
  111. return {
  112. "counts": dict(align_counter),
  113. "override_pct": (round(align_counter.get("override", 0) / known_total, 3)
  114. if known_total else None),
  115. "override_reason_samples": override_reasons[:5],
  116. }
  117. def reasoning_chain_quality(decisions: List[Dict]) -> Dict[str, Any]:
  118. lengths: List[int] = []
  119. signal_counter: Counter = Counter()
  120. has_chain = 0
  121. total_llm = 0
  122. for d in decisions:
  123. # 只看非 auto_pause 的决策(LLM 产出的)
  124. if d.get("source") == "auto_pause":
  125. continue
  126. total_llm += 1
  127. chain = d.get("reasoning_chain")
  128. if isinstance(chain, list) and len(chain) > 0:
  129. has_chain += 1
  130. lengths.append(len(chain))
  131. for item in chain:
  132. if isinstance(item, dict):
  133. signal_counter[str(item.get("signal", "unknown"))] += 1
  134. return {
  135. "llm_decisions": total_llm,
  136. "with_reasoning_chain": has_chain,
  137. "reasoning_chain_rate": round(has_chain / total_llm, 3) if total_llm else 0,
  138. "length_stats": _num_stats(lengths),
  139. "top_signals": dict(signal_counter.most_common(10)),
  140. }
  141. def tier_breakdown(decisions: List[Dict]) -> Dict[str, Any]:
  142. tier_groups: Dict[str, List[Dict]] = {}
  143. for d in decisions:
  144. tier = (d.get("input_signals") or {}).get("audience_tier", "default")
  145. tier_groups.setdefault(str(tier), []).append(d)
  146. out = {}
  147. for tier, items in sorted(tier_groups.items()):
  148. action_counter = Counter(d.get("action", "unknown") for d in items)
  149. ss = [(d.get("signal_scores") or {}) for d in items]
  150. out[tier] = {
  151. "count": len(items),
  152. "actions": dict(action_counter),
  153. "avg_decay_score": _mean([s.get("decay_score") for s in ss]),
  154. "avg_bid_down_score": _mean([s.get("bid_down_score") for s in ss]),
  155. "avg_bid_up_score": _mean([s.get("bid_up_score") for s in ss]),
  156. }
  157. return out
  158. def signal_health(decisions: List[Dict]) -> Dict[str, Any]:
  159. decay, bd, bu = [], [], []
  160. for d in decisions:
  161. ss = d.get("signal_scores") or {}
  162. decay.append(ss.get("decay_score"))
  163. bd.append(ss.get("bid_down_score"))
  164. bu.append(ss.get("bid_up_score"))
  165. return {
  166. "decay_score": _num_stats(decay),
  167. "bid_down_score": _num_stats(bd),
  168. "bid_up_score": _num_stats(bu),
  169. "decay_nonzero_rate": _nonzero_rate(decay),
  170. "bid_down_nonzero_rate": _nonzero_rate(bd),
  171. "bid_up_nonzero_rate": _nonzero_rate(bu),
  172. }
  173. def consistency_check(decisions: List[Dict]) -> Dict[str, Any]:
  174. """
  175. 同桶决策一致性:
  176. 按 (tier, roi bucket 0.2, decay bucket 0.2) 分组,计算每组 action 分布熵。
  177. 熵越高 = 决策越不一致。
  178. """
  179. buckets: Dict[Tuple[str, float, float], List[str]] = {}
  180. for d in decisions:
  181. ss = d.get("signal_scores") or {}
  182. inp = d.get("input_signals") or {}
  183. tier = str(inp.get("audience_tier") or "default")
  184. roi = inp.get("动态ROI_7日均值")
  185. decay = ss.get("decay_score")
  186. if roi is None or decay is None:
  187. continue
  188. try:
  189. roi_f = float(roi)
  190. decay_f = float(decay)
  191. if math.isnan(roi_f) or math.isnan(decay_f):
  192. continue
  193. except (TypeError, ValueError):
  194. continue
  195. roi_b = round(math.floor(roi_f / 0.2) * 0.2, 1)
  196. decay_b = round(math.floor(decay_f / 0.2) * 0.2, 1)
  197. key = (tier, roi_b, decay_b)
  198. buckets.setdefault(key, []).append(str(d.get("action", "unknown")))
  199. high_entropy: List[Dict[str, Any]] = []
  200. entropies: List[float] = []
  201. for key, actions in buckets.items():
  202. if len(actions) < 2:
  203. continue
  204. c = Counter(actions)
  205. n = len(actions)
  206. entropy = -sum((v / n) * math.log2(v / n) for v in c.values() if v > 0)
  207. entropies.append(entropy)
  208. if entropy > 0.8:
  209. high_entropy.append({
  210. "bucket": f"tier={key[0]}, roi~{key[1]}, decay~{key[2]}",
  211. "n": n,
  212. "actions": dict(c),
  213. "entropy": round(entropy, 3),
  214. })
  215. high_entropy.sort(key=lambda x: x["entropy"], reverse=True)
  216. return {
  217. "buckets_with_gt1": sum(1 for _, a in buckets.items() if len(a) >= 2),
  218. "avg_entropy": round(sum(entropies) / len(entropies), 3) if entropies else 0,
  219. "max_entropy": round(max(entropies), 3) if entropies else 0,
  220. "high_entropy_buckets": high_entropy[:5],
  221. }
  222. def context_size_est(decisions: List[Dict]) -> Dict[str, Any]:
  223. """估算每次 LLM tier 批次的输入 token 量"""
  224. tier_counts: Counter = Counter()
  225. for d in decisions:
  226. if d.get("source") == "auto_pause":
  227. continue
  228. tier = str((d.get("input_signals") or {}).get("audience_tier", "default"))
  229. tier_counts[tier] += 1
  230. sample_reviews = [d for d in decisions if d.get("source") != "auto_pause"][:5]
  231. avg_bytes = 0
  232. if sample_reviews:
  233. avg_bytes = sum(len(json.dumps(d, ensure_ascii=False)) for d in sample_reviews) / len(sample_reviews)
  234. by_tier_est: Dict[str, Dict[str, int]] = {}
  235. for tier, cnt in tier_counts.items():
  236. # 1 token ≈ 3 字节(中文偏 2~3 字节/token)+ 8K 系统 prompt
  237. bytes_est = cnt * avg_bytes + 8000
  238. by_tier_est[tier] = {
  239. "review_count": int(cnt),
  240. "est_bytes": int(bytes_est),
  241. "est_tokens": int(bytes_est / 3),
  242. }
  243. max_tier_tokens = max((v["est_tokens"] for v in by_tier_est.values()), default=0)
  244. return {
  245. "avg_review_bytes": int(avg_bytes),
  246. "by_tier": by_tier_est,
  247. "max_tier_tokens": max_tier_tokens,
  248. }
  249. # ═══════════════════════════════════════════
  250. # 报告构建 + 判定
  251. # ═══════════════════════════════════════════
  252. def build_report(snapshot: Dict) -> Dict:
  253. decisions = snapshot.get("decisions", [])
  254. return {
  255. "metadata": snapshot.get("metadata", {}),
  256. "action_distribution": action_distribution(decisions),
  257. "bid_down_diversity": bid_down_diversity(decisions),
  258. "rule_alignment": rule_alignment_dist(decisions),
  259. "reasoning_chain": reasoning_chain_quality(decisions),
  260. "tier_breakdown": tier_breakdown(decisions),
  261. "signal_health": signal_health(decisions),
  262. "consistency": consistency_check(decisions),
  263. "context_size_est": context_size_est(decisions),
  264. }
  265. def assess(report: Dict) -> Tuple[str, List[str]]:
  266. """决策质量评级判定"""
  267. flags: List[str] = []
  268. # bid_down 多样性
  269. bd_div = report["bid_down_diversity"]["dimension_count"]
  270. if bd_div < 3:
  271. flags.append(f"bid_down dimension 种类 {bd_div}(< 3,多样性不足)")
  272. # LLM 决策占比(auto_pause 不算 LLM)
  273. llm_pct = report["action_distribution"]["llm_decision_pct"]
  274. if llm_pct < 0.7:
  275. flags.append(f"LLM 决策占比 {llm_pct}(< 70%,规则压过 LLM)")
  276. # override 率
  277. ra = report["rule_alignment"]["override_pct"]
  278. if ra is not None:
  279. if ra < 0.05:
  280. flags.append(f"override 率 {ra}(< 5%,LLM 不敢推翻规则)")
  281. elif ra > 0.4:
  282. flags.append(f"override 率 {ra}(> 40%,signal_scores 可能不准)")
  283. # reasoning_chain
  284. rc_rate = report["reasoning_chain"]["reasoning_chain_rate"]
  285. if rc_rate < 0.95:
  286. flags.append(f"reasoning_chain 缺失率 {round(1 - rc_rate, 3)}(LLM 未遵循 schema)")
  287. # 一致性
  288. max_ent = report["consistency"]["max_entropy"]
  289. if max_ent > 1.2:
  290. flags.append(f"同桶决策熵最大 {max_ent}(> 1.2,稳定性不足)")
  291. # token
  292. max_tok = report["context_size_est"]["max_tier_tokens"]
  293. if max_tok > 100_000:
  294. flags.append(f"最大 tier token {max_tok}(> 100K,需细分桶)")
  295. # 分支判定
  296. if not flags:
  297. return "pass: 架构够用,可进入后验采集下一 plan", flags
  298. for f in flags:
  299. if "token" in f:
  300. return "refine_tier: tier 细分桶 / signal_scores 精简", flags
  301. for f in flags:
  302. if "决策熵" in f:
  303. return "deep_judge: 考虑对边缘案例单条深判", flags
  304. return "tune_prompt: 调 prompt 或 DECAY_WEIGHTS", flags
  305. # ═══════════════════════════════════════════
  306. # 可读输出
  307. # ═══════════════════════════════════════════
  308. def print_report(report: Dict, assessment: Tuple[str, List[str]]) -> None:
  309. meta = report["metadata"]
  310. print("=" * 70)
  311. print("V4 决策快照分析报告")
  312. print(f"决策日期 : {meta.get('decision_date')}")
  313. print(f"Agent 版本 : {meta.get('agent_version')}")
  314. print(f"LLM 模型 : {meta.get('llm_model')}")
  315. print(f"决策总数 : {meta.get('decision_count')}")
  316. print(f"运行时间 : {meta.get('run_timestamp')}")
  317. print("=" * 70)
  318. # 1. 决策分布
  319. print("\n【1. 决策分布】")
  320. ad = report["action_distribution"]
  321. print(f" Action: {ad['action_counts']}")
  322. print(f" Source: {ad['source_counts']}")
  323. print(f" LLM 决策占比: {ad['llm_decision_pct'] * 100:.1f}% (目标 >= 70%)")
  324. # 2. bid_down 多样性
  325. print("\n【2. bid_down 多样性(上一轮瓶颈)】")
  326. bd = report["bid_down_diversity"]
  327. print(f" bid_down 总数: {bd['total']}")
  328. print(f" Dimension 种类: {bd['dimension_count']} (目标 >= 4)")
  329. if bd["dimensions"]:
  330. print(f" Dimensions: {bd['dimensions']}")
  331. if bd["scenarios"]:
  332. print(f" Scenarios: {bd['scenarios']}")
  333. if bd["pct_stats"]:
  334. print(f" 推荐幅度: mean={bd['pct_stats']['mean']}, "
  335. f"p25/50/75={bd['pct_stats']['p25']}/{bd['pct_stats']['p50']}/{bd['pct_stats']['p75']}")
  336. # 3. rule_alignment
  337. print("\n【3. rule_alignment(LLM 是否推翻规则)】")
  338. ra = report["rule_alignment"]
  339. print(f" 分布: {ra['counts']}")
  340. print(f" override 率: {ra['override_pct']} (目标 10% ~ 25%)")
  341. if ra["override_reason_samples"]:
  342. print(f" override_reason 样本:")
  343. for i, r in enumerate(ra["override_reason_samples"][:3], 1):
  344. print(f" [{i}] {r[:80]}")
  345. # 4. reasoning_chain
  346. print("\n【4. reasoning_chain 质量】")
  347. rc = report["reasoning_chain"]
  348. print(f" LLM 决策数: {rc['llm_decisions']}")
  349. print(f" 含 chain: {rc['with_reasoning_chain']} ({rc['reasoning_chain_rate'] * 100:.1f}%) (目标 >= 95%)")
  350. if rc["length_stats"]:
  351. print(f" chain 长度: mean={rc['length_stats']['mean']}, p50={rc['length_stats']['p50']}")
  352. if rc["top_signals"]:
  353. print(f" Top 引用信号: {rc['top_signals']}")
  354. # 5. tier 分布
  355. print("\n【5. 按 tier 分布】")
  356. for tier, info in report["tier_breakdown"].items():
  357. print(f" {tier:8s} count={info['count']:3d} actions={info['actions']}")
  358. print(f" avg: decay={info['avg_decay_score']} "
  359. f"bd={info['avg_bid_down_score']} bu={info['avg_bid_up_score']}")
  360. # 6. 信号健康度
  361. print("\n【6. 信号健康度】")
  362. sh = report["signal_health"]
  363. if sh["decay_score"]:
  364. s = sh["decay_score"]
  365. print(f" decay_score : mean={s['mean']} max={s['max']} nonzero={sh['decay_nonzero_rate']}")
  366. if sh["bid_down_score"]:
  367. s = sh["bid_down_score"]
  368. print(f" bid_down_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_down_nonzero_rate']}")
  369. if sh["bid_up_score"]:
  370. s = sh["bid_up_score"]
  371. print(f" bid_up_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_up_nonzero_rate']}")
  372. # 7. 同桶一致性
  373. print("\n【7. 同桶决策一致性】")
  374. c = report["consistency"]
  375. print(f" 有效桶(>= 2 广告): {c['buckets_with_gt1']}")
  376. print(f" 平均熵: {c['avg_entropy']} max: {c['max_entropy']} (告警 > 1.2)")
  377. if c["high_entropy_buckets"]:
  378. print(" 高熵桶 Top 3:")
  379. for b in c["high_entropy_buckets"][:3]:
  380. print(f" {b['bucket']} n={b['n']} {b['actions']} entropy={b['entropy']}")
  381. # 8. 上下文规模
  382. print("\n【8. 上下文规模估算】")
  383. cs = report["context_size_est"]
  384. print(f" 平均 review 字节: {cs['avg_review_bytes']}")
  385. print(f" 最大 tier token: {cs['max_tier_tokens']} (告警 > 100K)")
  386. for tier, info in cs["by_tier"].items():
  387. print(f" {tier:8s} {info['review_count']:3d} ads → ~{info['est_tokens']} tokens")
  388. # 9. 判定
  389. print("\n【9. 判定】")
  390. case, flags = assessment
  391. print(f" → {case}")
  392. if flags:
  393. for f in flags:
  394. print(f" • {f}")
  395. else:
  396. print(" ✅ 所有关键指标达标")
  397. # ═══════════════════════════════════════════
  398. # Entry
  399. # ═══════════════════════════════════════════
  400. def main() -> None:
  401. parser = argparse.ArgumentParser(description="V4 决策快照分析")
  402. parser.add_argument("--date", required=True, help="决策日期 YYYYMMDD")
  403. args = parser.parse_args()
  404. try:
  405. snapshot = load_snapshot(args.date)
  406. except FileNotFoundError as e:
  407. print(f"❌ {e}")
  408. sys.exit(1)
  409. report = build_report(snapshot)
  410. assessment = assess(report)
  411. print_report(report, assessment)
  412. # 落盘
  413. out_dir = HERE / "outputs" / "decisions_history" / args.date
  414. out_path = out_dir / "quality_report.json"
  415. out_path.write_text(
  416. json.dumps(
  417. {
  418. "report": report,
  419. "assessment": {"case": assessment[0], "flags": assessment[1]},
  420. },
  421. ensure_ascii=False,
  422. indent=2,
  423. ),
  424. encoding="utf-8",
  425. )
  426. print(f"\n✅ 报告落盘: {out_path}")
  427. if __name__ == "__main__":
  428. main()