""" V4 决策快照分析 — 实测观测用 用法: cd /Users/liulidong/project/agent/Agent .venv/bin/python3 examples/auto_put_ad_mini/analyze_snapshot.py --date 20260415 输入:examples/auto_put_ad_mini/outputs/decisions_history/{date}/snapshot.json 输出: - stdout:可读报告 - 同目录 quality_report.json:结构化指标 评估维度(9 项): 1. 决策分布(action / source / LLM 占比) 2. bid_down 多样性(dimension 种类数 = 上一轮瓶颈) 3. rule_alignment 分布 + override 率 4. reasoning_chain 质量(覆盖率、长度、引用信号) 5. 按 tier 分布 6. 信号健康度(decay/bid_down/bid_up 分布) 7. 同桶决策一致性(抽样熵) 8. 上下文规模估算 9. 质量评级判定 """ import argparse import json import math import sys from collections import Counter from pathlib import Path from typing import Any, Dict, List, Optional, Tuple HERE = Path(__file__).resolve().parent # ═══════════════════════════════════════════ # 工具函数 # ═══════════════════════════════════════════ def _num_stats(values: List[Any]) -> Optional[Dict[str, float]]: nums = [float(v) for v in values if v is not None and not (isinstance(v, float) and math.isnan(v))] if not nums: return None nums_sorted = sorted(nums) n = len(nums_sorted) return { "count": n, "mean": round(sum(nums_sorted) / n, 4), "min": round(nums_sorted[0], 4), "max": round(nums_sorted[-1], 4), "p25": round(nums_sorted[max(0, n // 4)], 4), "p50": round(nums_sorted[n // 2], 4), "p75": round(nums_sorted[min(n - 1, 3 * n // 4)], 4), } def _mean(values: List[Any]) -> Optional[float]: nums = [float(v) for v in values if v is not None and not (isinstance(v, float) and math.isnan(v))] return round(sum(nums) / len(nums), 4) if nums else None def _nonzero_rate(values: List[Any]) -> Optional[float]: nums = [v for v in values if v is not None and not (isinstance(v, float) and math.isnan(v))] if not nums: return None return round(sum(1 for v in nums if float(v) > 0) / len(nums), 3) def load_snapshot(date: str) -> Dict[str, Any]: snap_path = HERE / "outputs" / "decisions_history" / date / "snapshot.json" if not snap_path.exists(): raise FileNotFoundError(f"快照不存在: {snap_path}") return json.loads(snap_path.read_text(encoding="utf-8")) # ═══════════════════════════════════════════ # 分析模块 # ═══════════════════════════════════════════ def action_distribution(decisions: List[Dict]) -> Dict[str, Any]: total = len(decisions) action_counter: Counter = Counter() source_counter: Counter = Counter() for d in decisions: action_counter[d.get("action", "unknown")] += 1 source_counter[d.get("source", "unknown")] += 1 llm_keys = {"llm", "llm_main", "llm_override"} llm_count = sum(v for k, v in source_counter.items() if k in llm_keys) return { "total": total, "action_counts": dict(action_counter), "action_pct": {k: round(v / total, 3) for k, v in action_counter.items()} if total else {}, "source_counts": dict(source_counter), "llm_decision_count": llm_count, "llm_decision_pct": round(llm_count / total, 3) if total else 0, } def bid_down_diversity(decisions: List[Dict]) -> Dict[str, Any]: bid_down = [d for d in decisions if d.get("action") == "bid_down"] dim_counter = Counter(d.get("dimension", "unknown") for d in bid_down) scenario_counter = Counter( (d.get("signal_scores") or {}).get("bid_down_scenario", "unknown") for d in bid_down ) pcts = [d.get("recommended_change_pct") for d in bid_down if d.get("recommended_change_pct") is not None] return { "total": len(bid_down), "dimension_count": len(dim_counter), "dimensions": dict(dim_counter.most_common(10)), "scenarios": dict(scenario_counter.most_common()), "pct_stats": _num_stats(pcts), } def rule_alignment_dist(decisions: List[Dict]) -> Dict[str, Any]: align_counter: Counter = Counter() override_reasons: List[str] = [] for d in decisions: # 跳过 auto_pause(规则硬底线,不参与 align 统计) if d.get("source") == "auto_pause": continue align = d.get("rule_alignment") or "unknown" align_counter[align] += 1 if align == "override" and d.get("override_reason"): override_reasons.append(str(d["override_reason"])) known_total = sum(v for k, v in align_counter.items() if k != "unknown") return { "counts": dict(align_counter), "override_pct": (round(align_counter.get("override", 0) / known_total, 3) if known_total else None), "override_reason_samples": override_reasons[:5], } def reasoning_chain_quality(decisions: List[Dict]) -> Dict[str, Any]: lengths: List[int] = [] signal_counter: Counter = Counter() has_chain = 0 total_llm = 0 for d in decisions: # 只看非 auto_pause 的决策(LLM 产出的) if d.get("source") == "auto_pause": continue total_llm += 1 chain = d.get("reasoning_chain") if isinstance(chain, list) and len(chain) > 0: has_chain += 1 lengths.append(len(chain)) for item in chain: if isinstance(item, dict): signal_counter[str(item.get("signal", "unknown"))] += 1 return { "llm_decisions": total_llm, "with_reasoning_chain": has_chain, "reasoning_chain_rate": round(has_chain / total_llm, 3) if total_llm else 0, "length_stats": _num_stats(lengths), "top_signals": dict(signal_counter.most_common(10)), } def tier_breakdown(decisions: List[Dict]) -> Dict[str, Any]: tier_groups: Dict[str, List[Dict]] = {} for d in decisions: tier = (d.get("input_signals") or {}).get("audience_tier", "default") tier_groups.setdefault(str(tier), []).append(d) out = {} for tier, items in sorted(tier_groups.items()): action_counter = Counter(d.get("action", "unknown") for d in items) ss = [(d.get("signal_scores") or {}) for d in items] out[tier] = { "count": len(items), "actions": dict(action_counter), "avg_decay_score": _mean([s.get("decay_score") for s in ss]), "avg_bid_down_score": _mean([s.get("bid_down_score") for s in ss]), "avg_bid_up_score": _mean([s.get("bid_up_score") for s in ss]), } return out def signal_health(decisions: List[Dict]) -> Dict[str, Any]: decay, bd, bu = [], [], [] for d in decisions: ss = d.get("signal_scores") or {} decay.append(ss.get("decay_score")) bd.append(ss.get("bid_down_score")) bu.append(ss.get("bid_up_score")) return { "decay_score": _num_stats(decay), "bid_down_score": _num_stats(bd), "bid_up_score": _num_stats(bu), "decay_nonzero_rate": _nonzero_rate(decay), "bid_down_nonzero_rate": _nonzero_rate(bd), "bid_up_nonzero_rate": _nonzero_rate(bu), } def consistency_check(decisions: List[Dict]) -> Dict[str, Any]: """ 同桶决策一致性: 按 (tier, roi bucket 0.2, decay bucket 0.2) 分组,计算每组 action 分布熵。 熵越高 = 决策越不一致。 """ buckets: Dict[Tuple[str, float, float], List[str]] = {} for d in decisions: ss = d.get("signal_scores") or {} inp = d.get("input_signals") or {} tier = str(inp.get("audience_tier") or "default") roi = inp.get("动态ROI_7日均值") decay = ss.get("decay_score") if roi is None or decay is None: continue try: roi_f = float(roi) decay_f = float(decay) if math.isnan(roi_f) or math.isnan(decay_f): continue except (TypeError, ValueError): continue roi_b = round(math.floor(roi_f / 0.2) * 0.2, 1) decay_b = round(math.floor(decay_f / 0.2) * 0.2, 1) key = (tier, roi_b, decay_b) buckets.setdefault(key, []).append(str(d.get("action", "unknown"))) high_entropy: List[Dict[str, Any]] = [] entropies: List[float] = [] for key, actions in buckets.items(): if len(actions) < 2: continue c = Counter(actions) n = len(actions) entropy = -sum((v / n) * math.log2(v / n) for v in c.values() if v > 0) entropies.append(entropy) if entropy > 0.8: high_entropy.append({ "bucket": f"tier={key[0]}, roi~{key[1]}, decay~{key[2]}", "n": n, "actions": dict(c), "entropy": round(entropy, 3), }) high_entropy.sort(key=lambda x: x["entropy"], reverse=True) return { "buckets_with_gt1": sum(1 for _, a in buckets.items() if len(a) >= 2), "avg_entropy": round(sum(entropies) / len(entropies), 3) if entropies else 0, "max_entropy": round(max(entropies), 3) if entropies else 0, "high_entropy_buckets": high_entropy[:5], } def context_size_est(decisions: List[Dict]) -> Dict[str, Any]: """估算每次 LLM tier 批次的输入 token 量""" tier_counts: Counter = Counter() for d in decisions: if d.get("source") == "auto_pause": continue tier = str((d.get("input_signals") or {}).get("audience_tier", "default")) tier_counts[tier] += 1 sample_reviews = [d for d in decisions if d.get("source") != "auto_pause"][:5] avg_bytes = 0 if sample_reviews: avg_bytes = sum(len(json.dumps(d, ensure_ascii=False)) for d in sample_reviews) / len(sample_reviews) by_tier_est: Dict[str, Dict[str, int]] = {} for tier, cnt in tier_counts.items(): # 1 token ≈ 3 字节(中文偏 2~3 字节/token)+ 8K 系统 prompt bytes_est = cnt * avg_bytes + 8000 by_tier_est[tier] = { "review_count": int(cnt), "est_bytes": int(bytes_est), "est_tokens": int(bytes_est / 3), } max_tier_tokens = max((v["est_tokens"] for v in by_tier_est.values()), default=0) return { "avg_review_bytes": int(avg_bytes), "by_tier": by_tier_est, "max_tier_tokens": max_tier_tokens, } # ═══════════════════════════════════════════ # 报告构建 + 判定 # ═══════════════════════════════════════════ def build_report(snapshot: Dict) -> Dict: decisions = snapshot.get("decisions", []) return { "metadata": snapshot.get("metadata", {}), "action_distribution": action_distribution(decisions), "bid_down_diversity": bid_down_diversity(decisions), "rule_alignment": rule_alignment_dist(decisions), "reasoning_chain": reasoning_chain_quality(decisions), "tier_breakdown": tier_breakdown(decisions), "signal_health": signal_health(decisions), "consistency": consistency_check(decisions), "context_size_est": context_size_est(decisions), } def assess(report: Dict) -> Tuple[str, List[str]]: """决策质量评级判定""" flags: List[str] = [] # bid_down 多样性 bd_div = report["bid_down_diversity"]["dimension_count"] if bd_div < 3: flags.append(f"bid_down dimension 种类 {bd_div}(< 3,多样性不足)") # LLM 决策占比(auto_pause 不算 LLM) llm_pct = report["action_distribution"]["llm_decision_pct"] if llm_pct < 0.7: flags.append(f"LLM 决策占比 {llm_pct}(< 70%,规则压过 LLM)") # override 率 ra = report["rule_alignment"]["override_pct"] if ra is not None: if ra < 0.05: flags.append(f"override 率 {ra}(< 5%,LLM 不敢推翻规则)") elif ra > 0.4: flags.append(f"override 率 {ra}(> 40%,signal_scores 可能不准)") # reasoning_chain rc_rate = report["reasoning_chain"]["reasoning_chain_rate"] if rc_rate < 0.95: flags.append(f"reasoning_chain 缺失率 {round(1 - rc_rate, 3)}(LLM 未遵循 schema)") # 一致性 max_ent = report["consistency"]["max_entropy"] if max_ent > 1.2: flags.append(f"同桶决策熵最大 {max_ent}(> 1.2,稳定性不足)") # token max_tok = report["context_size_est"]["max_tier_tokens"] if max_tok > 100_000: flags.append(f"最大 tier token {max_tok}(> 100K,需细分桶)") # 分支判定 if not flags: return "pass: 架构够用,可进入后验采集下一 plan", flags for f in flags: if "token" in f: return "refine_tier: tier 细分桶 / signal_scores 精简", flags for f in flags: if "决策熵" in f: return "deep_judge: 考虑对边缘案例单条深判", flags return "tune_prompt: 调 prompt 或 DECAY_WEIGHTS", flags # ═══════════════════════════════════════════ # 可读输出 # ═══════════════════════════════════════════ def print_report(report: Dict, assessment: Tuple[str, List[str]]) -> None: meta = report["metadata"] print("=" * 70) print("V4 决策快照分析报告") print(f"决策日期 : {meta.get('decision_date')}") print(f"Agent 版本 : {meta.get('agent_version')}") print(f"LLM 模型 : {meta.get('llm_model')}") print(f"决策总数 : {meta.get('decision_count')}") print(f"运行时间 : {meta.get('run_timestamp')}") print("=" * 70) # 1. 决策分布 print("\n【1. 决策分布】") ad = report["action_distribution"] print(f" Action: {ad['action_counts']}") print(f" Source: {ad['source_counts']}") print(f" LLM 决策占比: {ad['llm_decision_pct'] * 100:.1f}% (目标 >= 70%)") # 2. bid_down 多样性 print("\n【2. bid_down 多样性(上一轮瓶颈)】") bd = report["bid_down_diversity"] print(f" bid_down 总数: {bd['total']}") print(f" Dimension 种类: {bd['dimension_count']} (目标 >= 4)") if bd["dimensions"]: print(f" Dimensions: {bd['dimensions']}") if bd["scenarios"]: print(f" Scenarios: {bd['scenarios']}") if bd["pct_stats"]: print(f" 推荐幅度: mean={bd['pct_stats']['mean']}, " f"p25/50/75={bd['pct_stats']['p25']}/{bd['pct_stats']['p50']}/{bd['pct_stats']['p75']}") # 3. rule_alignment print("\n【3. rule_alignment(LLM 是否推翻规则)】") ra = report["rule_alignment"] print(f" 分布: {ra['counts']}") print(f" override 率: {ra['override_pct']} (目标 10% ~ 25%)") if ra["override_reason_samples"]: print(f" override_reason 样本:") for i, r in enumerate(ra["override_reason_samples"][:3], 1): print(f" [{i}] {r[:80]}") # 4. reasoning_chain print("\n【4. reasoning_chain 质量】") rc = report["reasoning_chain"] print(f" LLM 决策数: {rc['llm_decisions']}") print(f" 含 chain: {rc['with_reasoning_chain']} ({rc['reasoning_chain_rate'] * 100:.1f}%) (目标 >= 95%)") if rc["length_stats"]: print(f" chain 长度: mean={rc['length_stats']['mean']}, p50={rc['length_stats']['p50']}") if rc["top_signals"]: print(f" Top 引用信号: {rc['top_signals']}") # 5. tier 分布 print("\n【5. 按 tier 分布】") for tier, info in report["tier_breakdown"].items(): print(f" {tier:8s} count={info['count']:3d} actions={info['actions']}") print(f" avg: decay={info['avg_decay_score']} " f"bd={info['avg_bid_down_score']} bu={info['avg_bid_up_score']}") # 6. 信号健康度 print("\n【6. 信号健康度】") sh = report["signal_health"] if sh["decay_score"]: s = sh["decay_score"] print(f" decay_score : mean={s['mean']} max={s['max']} nonzero={sh['decay_nonzero_rate']}") if sh["bid_down_score"]: s = sh["bid_down_score"] print(f" bid_down_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_down_nonzero_rate']}") if sh["bid_up_score"]: s = sh["bid_up_score"] print(f" bid_up_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_up_nonzero_rate']}") # 7. 同桶一致性 print("\n【7. 同桶决策一致性】") c = report["consistency"] print(f" 有效桶(>= 2 广告): {c['buckets_with_gt1']}") print(f" 平均熵: {c['avg_entropy']} max: {c['max_entropy']} (告警 > 1.2)") if c["high_entropy_buckets"]: print(" 高熵桶 Top 3:") for b in c["high_entropy_buckets"][:3]: print(f" {b['bucket']} n={b['n']} {b['actions']} entropy={b['entropy']}") # 8. 上下文规模 print("\n【8. 上下文规模估算】") cs = report["context_size_est"] print(f" 平均 review 字节: {cs['avg_review_bytes']}") print(f" 最大 tier token: {cs['max_tier_tokens']} (告警 > 100K)") for tier, info in cs["by_tier"].items(): print(f" {tier:8s} {info['review_count']:3d} ads → ~{info['est_tokens']} tokens") # 9. 判定 print("\n【9. 判定】") case, flags = assessment print(f" → {case}") if flags: for f in flags: print(f" • {f}") else: print(" ✅ 所有关键指标达标") # ═══════════════════════════════════════════ # Entry # ═══════════════════════════════════════════ def main() -> None: parser = argparse.ArgumentParser(description="V4 决策快照分析") parser.add_argument("--date", required=True, help="决策日期 YYYYMMDD") args = parser.parse_args() try: snapshot = load_snapshot(args.date) except FileNotFoundError as e: print(f"❌ {e}") sys.exit(1) report = build_report(snapshot) assessment = assess(report) print_report(report, assessment) # 落盘 out_dir = HERE / "outputs" / "decisions_history" / args.date out_path = out_dir / "quality_report.json" out_path.write_text( json.dumps( { "report": report, "assessment": {"case": assessment[0], "flags": assessment[1]}, }, ensure_ascii=False, indent=2, ), encoding="utf-8", ) print(f"\n✅ 报告落盘: {out_path}") if __name__ == "__main__": main()