| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- """
- V4 决策快照分析 — 实测观测用
- 用法:
- cd /Users/liulidong/project/agent/Agent
- .venv/bin/python3 examples/auto_put_ad_mini/analyze_snapshot.py --date 20260415
- 输入:examples/auto_put_ad_mini/outputs/decisions_history/{date}/snapshot.json
- 输出:
- - stdout:可读报告
- - 同目录 quality_report.json:结构化指标
- 评估维度(9 项):
- 1. 决策分布(action / source / LLM 占比)
- 2. bid_down 多样性(dimension 种类数 = 上一轮瓶颈)
- 3. rule_alignment 分布 + override 率
- 4. reasoning_chain 质量(覆盖率、长度、引用信号)
- 5. 按 tier 分布
- 6. 信号健康度(decay/bid_down/bid_up 分布)
- 7. 同桶决策一致性(抽样熵)
- 8. 上下文规模估算
- 9. 质量评级判定
- """
- import argparse
- import json
- import math
- import sys
- from collections import Counter
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- HERE = Path(__file__).resolve().parent
- # ═══════════════════════════════════════════
- # 工具函数
- # ═══════════════════════════════════════════
- def _num_stats(values: List[Any]) -> Optional[Dict[str, float]]:
- nums = [float(v) for v in values
- if v is not None and not (isinstance(v, float) and math.isnan(v))]
- if not nums:
- return None
- nums_sorted = sorted(nums)
- n = len(nums_sorted)
- return {
- "count": n,
- "mean": round(sum(nums_sorted) / n, 4),
- "min": round(nums_sorted[0], 4),
- "max": round(nums_sorted[-1], 4),
- "p25": round(nums_sorted[max(0, n // 4)], 4),
- "p50": round(nums_sorted[n // 2], 4),
- "p75": round(nums_sorted[min(n - 1, 3 * n // 4)], 4),
- }
- def _mean(values: List[Any]) -> Optional[float]:
- nums = [float(v) for v in values
- if v is not None and not (isinstance(v, float) and math.isnan(v))]
- return round(sum(nums) / len(nums), 4) if nums else None
- def _nonzero_rate(values: List[Any]) -> Optional[float]:
- nums = [v for v in values
- if v is not None and not (isinstance(v, float) and math.isnan(v))]
- if not nums:
- return None
- return round(sum(1 for v in nums if float(v) > 0) / len(nums), 3)
- def load_snapshot(date: str) -> Dict[str, Any]:
- snap_path = HERE / "outputs" / "decisions_history" / date / "snapshot.json"
- if not snap_path.exists():
- raise FileNotFoundError(f"快照不存在: {snap_path}")
- return json.loads(snap_path.read_text(encoding="utf-8"))
- # ═══════════════════════════════════════════
- # 分析模块
- # ═══════════════════════════════════════════
- def action_distribution(decisions: List[Dict]) -> Dict[str, Any]:
- total = len(decisions)
- action_counter: Counter = Counter()
- source_counter: Counter = Counter()
- for d in decisions:
- action_counter[d.get("action", "unknown")] += 1
- source_counter[d.get("source", "unknown")] += 1
- llm_keys = {"llm", "llm_main", "llm_override"}
- llm_count = sum(v for k, v in source_counter.items() if k in llm_keys)
- return {
- "total": total,
- "action_counts": dict(action_counter),
- "action_pct": {k: round(v / total, 3) for k, v in action_counter.items()} if total else {},
- "source_counts": dict(source_counter),
- "llm_decision_count": llm_count,
- "llm_decision_pct": round(llm_count / total, 3) if total else 0,
- }
- def bid_down_diversity(decisions: List[Dict]) -> Dict[str, Any]:
- bid_down = [d for d in decisions if d.get("action") == "bid_down"]
- dim_counter = Counter(d.get("dimension", "unknown") for d in bid_down)
- scenario_counter = Counter(
- (d.get("signal_scores") or {}).get("bid_down_scenario", "unknown")
- for d in bid_down
- )
- pcts = [d.get("recommended_change_pct") for d in bid_down
- if d.get("recommended_change_pct") is not None]
- return {
- "total": len(bid_down),
- "dimension_count": len(dim_counter),
- "dimensions": dict(dim_counter.most_common(10)),
- "scenarios": dict(scenario_counter.most_common()),
- "pct_stats": _num_stats(pcts),
- }
- def rule_alignment_dist(decisions: List[Dict]) -> Dict[str, Any]:
- align_counter: Counter = Counter()
- override_reasons: List[str] = []
- for d in decisions:
- # 跳过 auto_pause(规则硬底线,不参与 align 统计)
- if d.get("source") == "auto_pause":
- continue
- align = d.get("rule_alignment") or "unknown"
- align_counter[align] += 1
- if align == "override" and d.get("override_reason"):
- override_reasons.append(str(d["override_reason"]))
- known_total = sum(v for k, v in align_counter.items() if k != "unknown")
- return {
- "counts": dict(align_counter),
- "override_pct": (round(align_counter.get("override", 0) / known_total, 3)
- if known_total else None),
- "override_reason_samples": override_reasons[:5],
- }
- def reasoning_chain_quality(decisions: List[Dict]) -> Dict[str, Any]:
- lengths: List[int] = []
- signal_counter: Counter = Counter()
- has_chain = 0
- total_llm = 0
- for d in decisions:
- # 只看非 auto_pause 的决策(LLM 产出的)
- if d.get("source") == "auto_pause":
- continue
- total_llm += 1
- chain = d.get("reasoning_chain")
- if isinstance(chain, list) and len(chain) > 0:
- has_chain += 1
- lengths.append(len(chain))
- for item in chain:
- if isinstance(item, dict):
- signal_counter[str(item.get("signal", "unknown"))] += 1
- return {
- "llm_decisions": total_llm,
- "with_reasoning_chain": has_chain,
- "reasoning_chain_rate": round(has_chain / total_llm, 3) if total_llm else 0,
- "length_stats": _num_stats(lengths),
- "top_signals": dict(signal_counter.most_common(10)),
- }
- def tier_breakdown(decisions: List[Dict]) -> Dict[str, Any]:
- tier_groups: Dict[str, List[Dict]] = {}
- for d in decisions:
- tier = (d.get("input_signals") or {}).get("audience_tier", "default")
- tier_groups.setdefault(str(tier), []).append(d)
- out = {}
- for tier, items in sorted(tier_groups.items()):
- action_counter = Counter(d.get("action", "unknown") for d in items)
- ss = [(d.get("signal_scores") or {}) for d in items]
- out[tier] = {
- "count": len(items),
- "actions": dict(action_counter),
- "avg_decay_score": _mean([s.get("decay_score") for s in ss]),
- "avg_bid_down_score": _mean([s.get("bid_down_score") for s in ss]),
- "avg_bid_up_score": _mean([s.get("bid_up_score") for s in ss]),
- }
- return out
- def signal_health(decisions: List[Dict]) -> Dict[str, Any]:
- decay, bd, bu = [], [], []
- for d in decisions:
- ss = d.get("signal_scores") or {}
- decay.append(ss.get("decay_score"))
- bd.append(ss.get("bid_down_score"))
- bu.append(ss.get("bid_up_score"))
- return {
- "decay_score": _num_stats(decay),
- "bid_down_score": _num_stats(bd),
- "bid_up_score": _num_stats(bu),
- "decay_nonzero_rate": _nonzero_rate(decay),
- "bid_down_nonzero_rate": _nonzero_rate(bd),
- "bid_up_nonzero_rate": _nonzero_rate(bu),
- }
- def consistency_check(decisions: List[Dict]) -> Dict[str, Any]:
- """
- 同桶决策一致性:
- 按 (tier, roi bucket 0.2, decay bucket 0.2) 分组,计算每组 action 分布熵。
- 熵越高 = 决策越不一致。
- """
- buckets: Dict[Tuple[str, float, float], List[str]] = {}
- for d in decisions:
- ss = d.get("signal_scores") or {}
- inp = d.get("input_signals") or {}
- tier = str(inp.get("audience_tier") or "default")
- roi = inp.get("动态ROI_7日均值")
- decay = ss.get("decay_score")
- if roi is None or decay is None:
- continue
- try:
- roi_f = float(roi)
- decay_f = float(decay)
- if math.isnan(roi_f) or math.isnan(decay_f):
- continue
- except (TypeError, ValueError):
- continue
- roi_b = round(math.floor(roi_f / 0.2) * 0.2, 1)
- decay_b = round(math.floor(decay_f / 0.2) * 0.2, 1)
- key = (tier, roi_b, decay_b)
- buckets.setdefault(key, []).append(str(d.get("action", "unknown")))
- high_entropy: List[Dict[str, Any]] = []
- entropies: List[float] = []
- for key, actions in buckets.items():
- if len(actions) < 2:
- continue
- c = Counter(actions)
- n = len(actions)
- entropy = -sum((v / n) * math.log2(v / n) for v in c.values() if v > 0)
- entropies.append(entropy)
- if entropy > 0.8:
- high_entropy.append({
- "bucket": f"tier={key[0]}, roi~{key[1]}, decay~{key[2]}",
- "n": n,
- "actions": dict(c),
- "entropy": round(entropy, 3),
- })
- high_entropy.sort(key=lambda x: x["entropy"], reverse=True)
- return {
- "buckets_with_gt1": sum(1 for _, a in buckets.items() if len(a) >= 2),
- "avg_entropy": round(sum(entropies) / len(entropies), 3) if entropies else 0,
- "max_entropy": round(max(entropies), 3) if entropies else 0,
- "high_entropy_buckets": high_entropy[:5],
- }
- def context_size_est(decisions: List[Dict]) -> Dict[str, Any]:
- """估算每次 LLM tier 批次的输入 token 量"""
- tier_counts: Counter = Counter()
- for d in decisions:
- if d.get("source") == "auto_pause":
- continue
- tier = str((d.get("input_signals") or {}).get("audience_tier", "default"))
- tier_counts[tier] += 1
- sample_reviews = [d for d in decisions if d.get("source") != "auto_pause"][:5]
- avg_bytes = 0
- if sample_reviews:
- avg_bytes = sum(len(json.dumps(d, ensure_ascii=False)) for d in sample_reviews) / len(sample_reviews)
- by_tier_est: Dict[str, Dict[str, int]] = {}
- for tier, cnt in tier_counts.items():
- # 1 token ≈ 3 字节(中文偏 2~3 字节/token)+ 8K 系统 prompt
- bytes_est = cnt * avg_bytes + 8000
- by_tier_est[tier] = {
- "review_count": int(cnt),
- "est_bytes": int(bytes_est),
- "est_tokens": int(bytes_est / 3),
- }
- max_tier_tokens = max((v["est_tokens"] for v in by_tier_est.values()), default=0)
- return {
- "avg_review_bytes": int(avg_bytes),
- "by_tier": by_tier_est,
- "max_tier_tokens": max_tier_tokens,
- }
- # ═══════════════════════════════════════════
- # 报告构建 + 判定
- # ═══════════════════════════════════════════
- def build_report(snapshot: Dict) -> Dict:
- decisions = snapshot.get("decisions", [])
- return {
- "metadata": snapshot.get("metadata", {}),
- "action_distribution": action_distribution(decisions),
- "bid_down_diversity": bid_down_diversity(decisions),
- "rule_alignment": rule_alignment_dist(decisions),
- "reasoning_chain": reasoning_chain_quality(decisions),
- "tier_breakdown": tier_breakdown(decisions),
- "signal_health": signal_health(decisions),
- "consistency": consistency_check(decisions),
- "context_size_est": context_size_est(decisions),
- }
- def assess(report: Dict) -> Tuple[str, List[str]]:
- """决策质量评级判定"""
- flags: List[str] = []
- # bid_down 多样性
- bd_div = report["bid_down_diversity"]["dimension_count"]
- if bd_div < 3:
- flags.append(f"bid_down dimension 种类 {bd_div}(< 3,多样性不足)")
- # LLM 决策占比(auto_pause 不算 LLM)
- llm_pct = report["action_distribution"]["llm_decision_pct"]
- if llm_pct < 0.7:
- flags.append(f"LLM 决策占比 {llm_pct}(< 70%,规则压过 LLM)")
- # override 率
- ra = report["rule_alignment"]["override_pct"]
- if ra is not None:
- if ra < 0.05:
- flags.append(f"override 率 {ra}(< 5%,LLM 不敢推翻规则)")
- elif ra > 0.4:
- flags.append(f"override 率 {ra}(> 40%,signal_scores 可能不准)")
- # reasoning_chain
- rc_rate = report["reasoning_chain"]["reasoning_chain_rate"]
- if rc_rate < 0.95:
- flags.append(f"reasoning_chain 缺失率 {round(1 - rc_rate, 3)}(LLM 未遵循 schema)")
- # 一致性
- max_ent = report["consistency"]["max_entropy"]
- if max_ent > 1.2:
- flags.append(f"同桶决策熵最大 {max_ent}(> 1.2,稳定性不足)")
- # token
- max_tok = report["context_size_est"]["max_tier_tokens"]
- if max_tok > 100_000:
- flags.append(f"最大 tier token {max_tok}(> 100K,需细分桶)")
- # 分支判定
- if not flags:
- return "pass: 架构够用,可进入后验采集下一 plan", flags
- for f in flags:
- if "token" in f:
- return "refine_tier: tier 细分桶 / signal_scores 精简", flags
- for f in flags:
- if "决策熵" in f:
- return "deep_judge: 考虑对边缘案例单条深判", flags
- return "tune_prompt: 调 prompt 或 DECAY_WEIGHTS", flags
- # ═══════════════════════════════════════════
- # 可读输出
- # ═══════════════════════════════════════════
- def print_report(report: Dict, assessment: Tuple[str, List[str]]) -> None:
- meta = report["metadata"]
- print("=" * 70)
- print("V4 决策快照分析报告")
- print(f"决策日期 : {meta.get('decision_date')}")
- print(f"Agent 版本 : {meta.get('agent_version')}")
- print(f"LLM 模型 : {meta.get('llm_model')}")
- print(f"决策总数 : {meta.get('decision_count')}")
- print(f"运行时间 : {meta.get('run_timestamp')}")
- print("=" * 70)
- # 1. 决策分布
- print("\n【1. 决策分布】")
- ad = report["action_distribution"]
- print(f" Action: {ad['action_counts']}")
- print(f" Source: {ad['source_counts']}")
- print(f" LLM 决策占比: {ad['llm_decision_pct'] * 100:.1f}% (目标 >= 70%)")
- # 2. bid_down 多样性
- print("\n【2. bid_down 多样性(上一轮瓶颈)】")
- bd = report["bid_down_diversity"]
- print(f" bid_down 总数: {bd['total']}")
- print(f" Dimension 种类: {bd['dimension_count']} (目标 >= 4)")
- if bd["dimensions"]:
- print(f" Dimensions: {bd['dimensions']}")
- if bd["scenarios"]:
- print(f" Scenarios: {bd['scenarios']}")
- if bd["pct_stats"]:
- print(f" 推荐幅度: mean={bd['pct_stats']['mean']}, "
- f"p25/50/75={bd['pct_stats']['p25']}/{bd['pct_stats']['p50']}/{bd['pct_stats']['p75']}")
- # 3. rule_alignment
- print("\n【3. rule_alignment(LLM 是否推翻规则)】")
- ra = report["rule_alignment"]
- print(f" 分布: {ra['counts']}")
- print(f" override 率: {ra['override_pct']} (目标 10% ~ 25%)")
- if ra["override_reason_samples"]:
- print(f" override_reason 样本:")
- for i, r in enumerate(ra["override_reason_samples"][:3], 1):
- print(f" [{i}] {r[:80]}")
- # 4. reasoning_chain
- print("\n【4. reasoning_chain 质量】")
- rc = report["reasoning_chain"]
- print(f" LLM 决策数: {rc['llm_decisions']}")
- print(f" 含 chain: {rc['with_reasoning_chain']} ({rc['reasoning_chain_rate'] * 100:.1f}%) (目标 >= 95%)")
- if rc["length_stats"]:
- print(f" chain 长度: mean={rc['length_stats']['mean']}, p50={rc['length_stats']['p50']}")
- if rc["top_signals"]:
- print(f" Top 引用信号: {rc['top_signals']}")
- # 5. tier 分布
- print("\n【5. 按 tier 分布】")
- for tier, info in report["tier_breakdown"].items():
- print(f" {tier:8s} count={info['count']:3d} actions={info['actions']}")
- print(f" avg: decay={info['avg_decay_score']} "
- f"bd={info['avg_bid_down_score']} bu={info['avg_bid_up_score']}")
- # 6. 信号健康度
- print("\n【6. 信号健康度】")
- sh = report["signal_health"]
- if sh["decay_score"]:
- s = sh["decay_score"]
- print(f" decay_score : mean={s['mean']} max={s['max']} nonzero={sh['decay_nonzero_rate']}")
- if sh["bid_down_score"]:
- s = sh["bid_down_score"]
- print(f" bid_down_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_down_nonzero_rate']}")
- if sh["bid_up_score"]:
- s = sh["bid_up_score"]
- print(f" bid_up_score : mean={s['mean']} max={s['max']} nonzero={sh['bid_up_nonzero_rate']}")
- # 7. 同桶一致性
- print("\n【7. 同桶决策一致性】")
- c = report["consistency"]
- print(f" 有效桶(>= 2 广告): {c['buckets_with_gt1']}")
- print(f" 平均熵: {c['avg_entropy']} max: {c['max_entropy']} (告警 > 1.2)")
- if c["high_entropy_buckets"]:
- print(" 高熵桶 Top 3:")
- for b in c["high_entropy_buckets"][:3]:
- print(f" {b['bucket']} n={b['n']} {b['actions']} entropy={b['entropy']}")
- # 8. 上下文规模
- print("\n【8. 上下文规模估算】")
- cs = report["context_size_est"]
- print(f" 平均 review 字节: {cs['avg_review_bytes']}")
- print(f" 最大 tier token: {cs['max_tier_tokens']} (告警 > 100K)")
- for tier, info in cs["by_tier"].items():
- print(f" {tier:8s} {info['review_count']:3d} ads → ~{info['est_tokens']} tokens")
- # 9. 判定
- print("\n【9. 判定】")
- case, flags = assessment
- print(f" → {case}")
- if flags:
- for f in flags:
- print(f" • {f}")
- else:
- print(" ✅ 所有关键指标达标")
- # ═══════════════════════════════════════════
- # Entry
- # ═══════════════════════════════════════════
- def main() -> None:
- parser = argparse.ArgumentParser(description="V4 决策快照分析")
- parser.add_argument("--date", required=True, help="决策日期 YYYYMMDD")
- args = parser.parse_args()
- try:
- snapshot = load_snapshot(args.date)
- except FileNotFoundError as e:
- print(f"❌ {e}")
- sys.exit(1)
- report = build_report(snapshot)
- assessment = assess(report)
- print_report(report, assessment)
- # 落盘
- out_dir = HERE / "outputs" / "decisions_history" / args.date
- out_path = out_dir / "quality_report.json"
- out_path.write_text(
- json.dumps(
- {
- "report": report,
- "assessment": {"case": assessment[0], "flags": assessment[1]},
- },
- ensure_ascii=False,
- indent=2,
- ),
- encoding="utf-8",
- )
- print(f"\n✅ 报告落盘: {out_path}")
- if __name__ == "__main__":
- main()
|