Quellcode durchsuchen

docs(v3-m5): 新增 V3 M5 Implementation Briefs(并发批处理)

5 份(Index+M5A/B/C/D,对标 M4 颗粒度 11 节;两岗只读交叉验证:证据零误+架构审查据反馈补 3 处):
- M5A 并发配置:walk_policy 拍 gemini_calls_per_run_cap=200 + 新增 gemini_max_workers=4(loader 解包零特例)
- M5B 判定批并发:recall_decision 唯一 analyze 循环用 ThreadPoolExecutor 并发 + 按 offset 回收(judgments[offset]=future.result());组装/落盘留主线程串行→产物逐条等价串行;_safe_analyze 兜底不抛;三处调用方零改
- M5C 配额闸:QuotaCappedGeminiVideoClient wrapper 单点注入(零签名改)+ per-run 计数跨三次 recall 累计 + 按 offset 确定性截断(非完成序)+ evidence/run_event 可观测
- M5D 一致性:test_concurrency_consistency.py 新建(串/并行逐条等价+Jittered 暴露乱序+cap 截断+兜底);R9 固定 run_id 注入(3 处);max_workers 经 monkeypatch _resolve_max_workers 注入(config_overrides 够不到 walk_policy);墙钟口径作废旧 83min 改 Gemini 24s/条×N

已拍板:cap=200 / max_workers=4 / frontier 同层并发推迟 / DB 池·限流锁不做。三原则严守。

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee vor 2 Tagen
Ursprung
Commit
2968ab5b9c

+ 75 - 0
tech_documents/工程落地/v3_implementation_briefs/M5/00_M5_Brief_Index.md

@@ -0,0 +1,75 @@
+# V3 M5 并发批处理 Brief Index
+
+状态:本目录覆盖 V3 M5——把**串行逐条 Gemini 判定**改成**批并发**。验收**硬门槛=并发结果与串行逐条完全一致**(确定性),软目标=判定段墙钟大幅下降。核心:`recall_decision.py` 是全仓**唯一** Gemini 调用循环(初始 recall 节点 + walk_engine 两处 recall 都汇聚于此),且它**先内存收集、最后一次性落盘**——故只并发 `analyze()`、按 offset 顺序回收,产物逐条等价串行,DB 写路径完全不被并发触碰。**判定/游走口径(M2/M3/M4)不动**。对标 M4 颗粒度。
+
+## 目标定位
+
+当前管线纯串行:`recall_decision.run`(19)逐条 `gemini_video_client.analyze`(40),每条 ~24s(实测 memory/video-multimodal-analysis),N≈20–40 条 → 判定段墙钟分钟级线性累积。M5 做完:recall 循环内 `ThreadPoolExecutor` 并发 analyze、按 offset 回收;`QuotaCappedGeminiVideoClient` wrapper 单点注入封顶单 run Gemini 调用;并发不引入任何结果变化(同决策/同血缘/同 walk_actions 指纹)。**frontier 同层并发推迟**(作者循环受限流+只 2 作者,提速近零)。
+
+## 子 brief 执行口径(A→D 线性,A 先行因 B/C 读 A 的配置)
+
+| 子 brief | 文件 | 核心交付 | 类型 |
+|---|---|---|---|
+| **M5A** | `M5A_Concurrency_Config.md` | walk_policy.json 拍 `gemini_calls_per_run_cap=200` + 新增 `gemini_concurrency.max_workers=4`;loader 解包/校验 + 单测 | 配置+代码 |
+| **M5B** | `M5B_Recall_Batch_Concurrency.md` | `recall_decision.run` 内 ThreadPool 并发 analyze + 按 offset 回收;组装/落盘不动 | 代码 |
+| **M5C** | `M5C_Quota_Cap_And_Observability.md` | `QuotaCappedGeminiVideoClient` wrapper + 单点注入 + 确定性截断(按 offset)+ 可观测 | 代码 |
+| **M5D** | `M5D_Consistency_And_Wallclock.md` | `test_concurrency_consistency.py` 新建 + R9 固定 run_id 注入 + Jittered fake;墙钟口径重述 | 测试 |
+
+## 不处理范围(红线)
+
+- **frontier 同层并发**(已拍板推迟):作者循环只 2 作者、受 12s 限流同 bucket 串行、提速近零、确定性风险高;M5 只做判定并发(已覆盖全 run 所有 Gemini 成本)。
+- **DB 连接池不做**(并发段只跑 analyze、落盘仍主线程串行,DB 写路径不被并发触碰);**RateLimiter 不加锁**(判定走 CDN+OpenRouter,不经 crawapi 限流)。
+- 不改判定/游走口径(并发不得引入任何结果变化);不改 graph 编排;不改 recall/walk_engine 的 `run` 签名;不改 DB 表结构。
+- 不动 M4 的 walk_policy 既有键(只增 cap 定值 + max_workers)。
+
+## 已拍板(2026-06-12,09 计划 M5,本里程碑内嵌)
+
+1. **`gemini_calls_per_run_cap = 200`**(上限约 $4.4/run;远高于典型 N≈20–40,封顶失控;M7 真负标定)。
+2. **判定并发度 `max_workers = 4`**(IO 密集、不走 15s 限流;M7 实测调)。
+3. **frontier 同层并发推迟**(明确不做,见不处理范围)。
+4. **DB 连接池不做 / RateLimiter 不加锁**(并发段不写 DB、判定不经限流——09 列的两拍板项可消)。
+
+## 开工前必须拍板
+
+- 无。
+
+## 现有证据(共享,子 brief 各自细引)
+
+- **唯一 analyze 循环**:`recall_decision.py:37-48`——`for offset,item in enumerate(...): judgment=gemini_video_client.analyze(item,media,source_context)`(40)、`recall_evidence_id=f"recall_{start_index+offset:03d}"`(39)、updated_items/updated_bundles/evidence_rows 按 offset append;循环后 50-51 `runtime.append_jsonl` 一次性落盘。`run` 签名 19。
+- **三处调 pattern_recall.run 汇聚**:graph.py:109(初始 recall_pattern 节点)、walk_engine.py:200(`_execute_query_batch`)、walk_engine.py:496(`_expand_authors`),都只传 `gemini_video_client`+`start_index`,并发封在 run 内 → 三处零改。
+- **analyze 可并发**:`gemini_video.py:106-142` 纯 IO(video_fetch 下载+ffmpeg+httpx.post OpenRouter)、self 全常量无共享态、失败返回 `_fail`(status=failed)不抛;`video_fetch` subprocess 无全局态/临时文件竞争,走 CDN 不经 crawapi 限流。
+- **无现成并发**:全仓无 ThreadPool/asyncio/concurrent.futures(query_variant/gemini_video 均同步 httpx.post)→ 用标准库 `concurrent.futures` 新建。
+- **wrapper 注入单点**:`run_service.py:54` `self._gemini_video_client=...`、`:139-144` 装进 `RunDependencies(gemini_video_client=...)` → 流到 graph/walk_engine 所有 analyze。`start_run`(:98)每 run 跑一次,`run_id=f"v1_run_{uuid4().hex[:12]}"`(:99)。
+- **cap/并发度配置**:walk_policy.json `global.gemini_calls_per_run_cap {value:null,tbd:true}`;`WalkGraphStore.load_policy` 经 `_unwrap`(walk_graph_json.py:22-25)解纯值;walk_engine.py:63 `store.load_policy()` 是同款读取路径。
+- **GeminiVideoClient Protocol**:`interfaces.py:63-69` `analyze(content,media,source_context)->dict` → wrapper 实现它即对调用方透明。
+- **可观测落点(现有字段够用,零 schema 改)**:`_fail` 的 status/reason → `recall_decision._build_evidence_row`(109)写进 `pattern_recall_evidence.evidence_summary`;`RunService._append_lifecycle_event`(356-386)自由结构(run 级汇总);被截断内容无判定 → `_terminal_stage` `decision_by_target_id.get` 缺项自然跳过(M4 已有语义,validate_run 不误判)。
+- **一致性资产**:`tests/test_walk_profile_degradation.py:19` `_fingerprint`(7 字段组不含 wa_id 因 run_id 随机)+ `tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json`;`FakeGeminiVideoClient`(gemini_helpers.py)按 content_id 确定性返回,**`self.calls.append`(48)无锁须补**;`replay_case`(replay_harness.py:63)支持注入 gemini_video_client、**不支持固定 run_id**(R9);`test_concurrency_consistency.py` 不存在。
+
+## 数据合同(汇总)
+
+- 并发后 `discovered_content_items`/`pattern_recall_evidence`/`rule_decisions`/`walk_actions 指纹`/`source_path_records` 与串行逐条等价(固定 run_id 下全 jsonl 逐字节相等)。
+- max_workers/cap 经 `WalkGraphStore.load_policy()` 读纯值(兜底 4/200);recall/graph/walk_engine 签名零改。
+- 配额命中:超额内容 judgment.status=failed/reason=`gemini_quota_exhausted`(逐条落 evidence)+ run 末汇总 run_event;被截断内容无判定 → 不产终端动作(优雅退化)。
+
+## 验证命令(汇总)
+
+```bash
+uv run pytest tests/test_walk_graph_config.py -q                       # M5A 配置 loader
+uv run pytest tests/test_concurrency_consistency.py -q                 # M5B/C/D 并发==串行 + 配额 + 兜底
+uv run pytest tests/test_walk_profile_degradation.py -q                # 指纹基线(应全绿)
+uv run pytest -q                                                       # 全量(并发不改结果 → 全绿)
+```
+
+## 失败归因(汇总)
+
+- 并发结果飘/偶发崩:offset 回收错位(误按 as_completed 完成序 append)、或 future.result() 未经 `_safe_analyze` 兜底在主线程抛。
+- 配额截断不确定:按完成序而非 offset 预判截断。
+- FakeGemini flaky:`self.calls.append` 未加锁。
+- 墙钟无提升:判定段加速比被 12s 限流的抓取段稀释 → baseline 必须分段报告(判定段 vs 整 run)。
+
+## sub-agent 交叉验证要点
+
+- 确认唯一 analyze 循环并发化无遗漏、组装/落盘仍主线程按 offset、recall_evidence_id 编号与串行逐字节等价。
+- 确认确定性截断按 offset 预判(非完成序)、wrapper 单点注入零签名改、per-run 计数跨初始+walk 两次 recall 共享。
+- 确认 DB 池/限流锁/graph 编排/判定口径均不动、frontier 推迟理由成立。
+- 确认一致性测试逐条比对四类产物 + Jittered fake 暴露乱序、3 项拍板内嵌、三原则。

+ 68 - 0
tech_documents/工程落地/v3_implementation_briefs/M5/M5A_Concurrency_Config.md

@@ -0,0 +1,68 @@
+# M5A 并发配置地基 Implementation Brief
+
+状态:本 brief 只覆盖「walk_policy.json 拍板 `gemini_calls_per_run_cap=200`(第 5 个也是最后一个 TBD_V3)+ 新增 `gemini_concurrency.max_workers=4`;loader 解包/校验 + 单测」。**先于 M5B/M5C**(它们从 `load_policy()` 取这两个值)。仿 M4A 写法,纯配置 + loader,零行为变更。
+
+## 目标
+
+把单 run Gemini 调用上限与判定并发度落成 walk_policy.json 的纯值配置,经 `WalkGraphStore.load_policy()` 解包供 M5B(并发度)与 M5C(配额)读取——单一真相源,recall/walk_engine/graph 签名零改。
+
+## 现有证据
+
+- `tech_documents/数据接口与来源/walk_policy.json` `global.gemini_calls_per_run_cap` 现为 `{ "value": null, "provenance": "TBD_V3:...(归 M5)", "tbd": true }`(M4A 留的最后一个 TBD)。
+- `content_agent/integrations/walk_graph_json.py`:`_unwrap`(22-25)`if isinstance(value,dict) and "value" in value: return value["value"]`;`_unwrap_policy`(约 75-85)对 `raw["global"]` 每键 `_unwrap`;`load_policy`(约 57-64)读+解包+`_validate_policy`+raise。`global` 现有键 max_total_actions_per_run/max_depth/max_reseed_rounds/gemini_calls_per_run_cap。
+- `walk_engine.py:63` `policy = store.load_policy()`——recall 可同款 `WalkGraphStore().load_policy()` 读。
+- `tests/test_walk_graph_config.py`:M4A 已有 `test_policy_unwraps_pinned_values` 等 6 例,断言 global 拍定值。
+
+## 修改范围
+
+- 改 `tech_documents/数据接口与来源/walk_policy.json`:① `gemini_calls_per_run_cap` 的 `value:null→200`、`tbd:true→false`、provenance 注明"拍板 2026-06-12:上限约 $4.4/run,M7 真负标定";② `global` 增 `gemini_concurrency.max_workers=4`(纯值或 `{value:4,provenance,tbd:false}` 包裹皆可,loader 兼容)。
+- 改 `tests/test_walk_graph_config.py`:加断言 `gemini_calls_per_run_cap==200`、`gemini_concurrency.max_workers==4`(或扩 `test_policy_unwraps_pinned_values`)。
+
+## 不修改范围
+
+- 不改 `_unwrap`/`_unwrap_policy` 逻辑(已能解 `{value}` 包裹;若 max_workers 写嵌套 dict 需确认 `_unwrap_policy` 对 `gemini_concurrency` 子 dict 的处理——见涉及文件)。
+- 不动 walk_policy 其余键(edge_budgets/edge_permissions/budget_tiers/dedup 等 M4 已拍)。
+- 不接 loop/recall(M5B/C 才 import 使用);不引新依赖。
+
+## 涉及文件 / 函数 / 类
+
+- `tech_documents/数据接口与来源/walk_policy.json`
+  - `global.gemini_calls_per_run_cap`:`{ "value": 200, "provenance": "拍板 2026-06-12...", "tbd": false }`。
+  - `global.gemini_concurrency`:`{ "max_workers": 4, "provenance": "拍板 2026-06-12:IO 密集判定并发度,M7 实测调", "tbd": false }`。**注意**:若做成嵌套 dict,`_unwrap_policy` 当前对 `global` 每键调 `_unwrap`——`gemini_concurrency` 整个 dict 无顶层 `value` 键会被原样返回(子 dict 不解包),M5B 读 `policy["global"]["gemini_concurrency"]["max_workers"]` 即可;若想 `_unwrap` 友好,也可平铺为 `global.gemini_max_workers: 4`(纯值)。**推荐平铺 `gemini_max_workers`**(解包零特例、读取最简)。
+- `content_agent/integrations/walk_graph_json.py`
+  - 若平铺 `gemini_max_workers`:`_unwrap` 已覆盖,**可不改 loader**;`_validate_policy` 若对 global 键做白名单则加两键(确认是否有白名单,无则零改)。
+- `tests/test_walk_graph_config.py`
+  - `test_policy_pins_gemini_cap_and_workers`:`load_policy()["global"]["gemini_calls_per_run_cap"]==200`、`["gemini_max_workers"]==4`。
+
+## 数据合同
+
+- `WalkGraphStore.load_policy()["global"]["gemini_calls_per_run_cap"]` == 200(int 纯值)。
+- `...["gemini_max_workers"]` == 4(int 纯值)。
+- 二者均 `tbd:false`;walk_policy 其余键不变;config 不参与 config gate(walk_policy 非规则包,不过 build_config_from_excel)。
+
+## 实施步骤
+
+1. 改 walk_policy.json:cap→200/tbd:false;global 加 `gemini_max_workers:4`(平铺,解包零特例)。
+2. 确认 `_unwrap`/`_validate_policy` 是否需认新键(平铺则 `_unwrap` 已覆盖;有白名单则补)。
+3. 加 `tests/test_walk_graph_config.py` 断言。
+4. `uv run pytest tests/test_walk_graph_config.py -q` 绿;`uv run pytest -q` 零回归(尚无人读新键)。
+
+## 验证命令
+
+```bash
+uv run pytest tests/test_walk_graph_config.py -q
+uv run python -c "from content_agent.integrations.walk_graph_json import WalkGraphStore; g=WalkGraphStore().load_policy()['global']; print(g['gemini_calls_per_run_cap'], g['gemini_max_workers'])"   # 200 4
+uv run pytest -q                                   # 零回归
+```
+
+## 失败归因
+
+- 读出来是 dict 而非 int:max_workers 做了嵌套 dict 又没在 `_unwrap_policy` 解 → 平铺为 `gemini_max_workers` 纯值,或在 loader 显式解包子 dict。
+- 既有回归:误在本 brief 就改 recall 去读新键(那是 M5B);本 brief 只落配置+单测。
+- 校验报错:`_validate_policy` 有 global 键白名单却没加新键。
+
+## sub-agent 交叉验证要点
+
+- 确认 cap=200/max_workers=4 落成纯值、tbd:false、provenance 注明拍板日期与依据。
+- 确认 loader 解包路径(平铺则 `_unwrap` 已覆盖)、未改 `_unwrap` 核心逻辑、未动 walk_policy 既有键。
+- 确认 `uv run pytest -q` 零回归(M5B/C 才消费新键)。

+ 85 - 0
tech_documents/工程落地/v3_implementation_briefs/M5/M5B_Recall_Batch_Concurrency.md

@@ -0,0 +1,85 @@
+# M5B 判定批并发 Implementation Brief
+
+状态:本 brief 是 M5 核心。覆盖「`recall_decision.run` 内 `ThreadPoolExecutor` 并发 `analyze`、按 offset 回收;组装与落盘完全不动」。**在 M5A 之后**(读 max_workers)。**行为等价硬约束**:产物与串行逐条相等(固定 run_id 下全 jsonl 逐字节相同)。本 brief **不接配额**(M5C 协同),先把"并发==串行"立住。
+
+## 目标
+
+把 recall_decision 那唯一的 Gemini 循环从"逐条串行 analyze"改成"批并发 analyze + 按 offset 回收",覆盖全 run 所有判定(初始 recall + walk 两处 recall 都走它)。并发只发生在纯 IO 的 `analyze`;ID 编号、三个 list 的 append、`append_jsonl` 全部留主线程、按 offset 串行 → 与串行实现逐条等价。
+
+## 现有证据
+
+- `recall_decision.py` 循环(37-48):`for offset,item in enumerate(discovered_content_items): media=...; recall_evidence_id=f"recall_{start_index+offset:03d}"(39); judgment=gemini_video_client.analyze(item,media,source_context)(40); updated_items.append(...)(42); updated_bundles.append(_update_evidence_bundle(evidence_bundles[offset],...))(43-45); evidence_rows.append(_build_evidence_row(...))(46-48)`;循环后 `runtime.append_jsonl(... evidence_rows)`(50)、`(... updated_items)`(51);`return {...}`(52-56)。
+- `analyze`(gemini_video.py:106-142)纯 IO、self 全常量、失败返回 `_fail`(35-42,status=failed)不抛。
+- 三处调用方:graph.py:109、walk_engine.py:200/496,只传 `gemini_video_client`+`start_index`——并发封在 run 内,三处零改。
+- `evidence_bundles[offset]` 索引安全:上游传入的 `evidence_bundles` 与 `discovered_content_items` 等长(同批 content_discovery 产出),组装循环仍按 offset 单线程取,无越界、无竞争。
+- max_workers 来源:M5A 落 `global.gemini_max_workers=4`,`WalkGraphStore().load_policy()` 读(walk_engine.py:63 同款)。
+- 无现成并发范式 → 用标准库 `concurrent.futures.ThreadPoolExecutor` + `as_completed`。
+
+## 修改范围
+
+- 改 `content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py`:`run` 内把 37-48 单循环拆为「并发 analyze 收集 judgments[] + 串行组装」两段;新增模块级 `_safe_analyze` 与 `_resolve_max_workers`。
+- 不改 `run` 签名、不改 `_build_pattern_match_result`/`_update_discovered_item`/`_update_evidence_bundle`/`_build_evidence_row`、不改 50-56 落盘与返回。
+
+## 不修改范围
+
+- 不接配额(M5C);本阶段全部内容都判(无截断)。
+- 不改三处调用方(graph/walk_engine)、不改 graph 编排、不改 DB/落盘路径(仍主线程一次性 append_jsonl)。
+- 不改 gemini_video.py/video_fetch.py(analyze 已并发安全);不加 RateLimiter 锁(判定不经限流)。
+- 不引第三方依赖(仅标准库 concurrent.futures)。
+
+## 涉及文件 / 函数 / 类
+
+- `content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py`
+  - 顶部 `from concurrent.futures import ThreadPoolExecutor, as_completed`。
+  - 新 `_resolve_max_workers() -> int`:`try: return int(WalkGraphStore().load_policy()["global"]["gemini_max_workers"]) except Exception: return 4`(兜底 4,避免配置缺失炸 run)。
+  - 新 `_safe_analyze(client, item, media, source_context) -> dict`:`try: return client.analyze(...) except Exception as exc: return {fit_senior_50plus:False, fit_confidence:0.0, relevance_score:0.0, reason:f"analyze_raised:{type(exc).__name__}", status:"failed"}`——future 内任何意外都转 _fail,绝不让主线程 `future.result()` 抛。
+  - 改 `run`:保持 30-36(created_at/media_by_content_id/三空 list)不动;插入并发段:
+    ```
+    judgments: list[dict | None] = [None] * len(discovered_content_items)
+    with ThreadPoolExecutor(max_workers=_resolve_max_workers()) as pool:
+        future_to_offset = {
+            pool.submit(_safe_analyze, gemini_video_client, item,
+                        media_by_content_id.get(item["platform_content_id"], {}), source_context): offset
+            for offset, item in enumerate(discovered_content_items)
+        }
+        for future in as_completed(future_to_offset):
+            judgments[future_to_offset[future]] = future.result()   # 按 offset 归位,与完成序无关
+    ```
+    再保留**原组装循环**(主线程、按 offset 串行),只把第 40 行 `analyze` 调用换成 `judgment = judgments[offset]`,其余 39/41-48 一字不改;50-56 落盘+返回不动。
+  - worker 线程**只返回 judgment**,绝不 append 共享 list(updated_items/evidence_rows 全在主线程组装)。
+
+## 数据合同
+
+- 同输入(同 discovered_content_items + 同确定性 FakeGemini)→ `judgments[offset]` 与串行第 offset 次 analyze 结果相同 → recall_evidence_id 编号、三 list 内容/顺序、append_jsonl 字节与串行逐条等价。
+- `_resolve_max_workers` 返回 ≥1;max_workers=1 即退化为串行(供 M5D 对照)。
+- analyze 抛错经 `_safe_analyze` 转 status=failed(与串行时 analyze 自吞降级语义一致),run 不崩。
+
+## 实施步骤
+
+1. 确认 M5A 的 `gemini_max_workers` 可 `load_policy()` 读到。
+2. 加 `_safe_analyze`/`_resolve_max_workers`,import concurrent.futures。
+3. `run` 内插并发收集段(submit 全部→按 offset 归位 judgments),组装循环改读 `judgments[offset]`。
+4. `uv run pytest tests/test_replay_gemini_seam.py tests/test_case_replay.py -q`(现有判定/回放应全绿——确定性 fake 下并发结果==串行)。
+5. `uv run pytest -q` 全绿(M5D 才加并发专测)。
+
+## 验证命令
+
+```bash
+uv run pytest tests/test_case_replay.py tests/test_replay_gemini_seam.py -q   # 现有判定回放(并发不改结果)
+uv run pytest tests/test_walk_profile_degradation.py -q                       # 指纹基线全绿
+uv run pytest -q                                                              # 全量
+```
+
+## 失败归因
+
+- 并发结果飘/张冠李戴:误用 `as_completed` 完成序直接 append updated_items,而非 `judgments[offset]=future.result()` 按 offset 归位(串行测试碰巧过、并发偶发 → M5D 的 Jittered fake 必踩)。
+- 一条判定崩炸整 run:`future.result()` 未经 `_safe_analyze` 兜底,意外异常在主线程抛(串行时 analyze 自吞不会崩)。
+- 顺序错乱:worker 线程碰了主线程的共享 list(必须 worker 只返值、append 全留主线程)。
+- 配置缺失炸 run:`_resolve_max_workers` 没兜底(必须 try/except→4)。
+
+## sub-agent 交叉验证要点
+
+- 确认 `run` 签名不变、三处调用方零改、并发只在 analyze、组装/落盘仍主线程按 offset。
+- 确认 `judgments[offset]=future.result()` 按 offset 归位(非完成序)、worker 不碰共享 list、`_safe_analyze` 兜底不抛。
+- 确认 max_workers 经 load_policy 读 + 兜底 4、max_workers=1 退化串行可供对照。
+- 确认现有回放/判定测试全绿(并发未引入结果变化)。

+ 82 - 0
tech_documents/工程落地/v3_implementation_briefs/M5/M5C_Quota_Cap_And_Observability.md

@@ -0,0 +1,82 @@
+# M5C 配额闸 + 可观测 Implementation Brief
+
+状态:本 brief 覆盖「`QuotaCappedGeminiVideoClient` wrapper 单点注入 + run-scoped 计数 + **确定性截断**(按 offset)+ 配额命中可观测」。**在 M5A(读 cap)+ M5B(截断逻辑落在 B 改的循环里)之后**。wrapper 实现 `GeminiVideoClient` Protocol,对所有调用方透明,签名零改——符合"将来变动最少化"。
+
+## 目标
+
+给单 run Gemini 调用封顶(cap=200),防并发放大成本。用 wrapper client 在 run_service 单点注入→自动覆盖全 run 所有 analyze(初始 recall + walk 两处);截断**按 offset 在提交线程池前预判**(非完成序),保证串行/并发截断边界相同;命中可观测(evidence reason + run_event)。
+
+## 现有证据
+
+- `GeminiVideoClient` Protocol:`interfaces.py:63-69` `analyze(content,media,source_context)->dict`。wrapper 实现它即对 recall/walk_engine 透明。
+- 注入单点:`run_service.py:54` `self._gemini_video_client = gemini_video_client or _DeterministicGeminiVideoClient()`;`:139-144` `RunDependencies(... gemini_video_client=self._gemini_video_client)`;`start_run`(:98)每 run 一次。
+- `_fail`(gemini_video.py:35-42):`{fit_senior_50plus:False, fit_confidence:0.0, relevance_score:0.0, reason:<arg>, status:"failed"}`——wrapper 复用它产 quota_exhausted 结构。
+- cap 来源:M5A `global.gemini_calls_per_run_cap=200`,`WalkGraphStore().load_policy()` 读。
+- 可观测落点:`recall_decision._build_evidence_row`(109)把 judgment 的 status/reason 写进 `pattern_recall_evidence.evidence_summary`(reason 逐条可追);`RunService._append_lifecycle_event`(356-386)event_type/message/raw_payload 自由结构(run 级汇总);被截断内容无判定 → walk_engine `_terminal_stage` `decision_by_target_id.get` 缺项自然跳过(M4 "无判定不产终端动作"语义,validate_run 不误判)。
+- M5B 已把 recall 循环拆成「并发收集 judgments + 串行组装」——截断在收集段提交前按 offset 决定。
+
+## 修改范围
+
+- 新建 `content_agent/integrations/gemini_quota.py`(或在 gemini_video.py 内加类):`QuotaCappedGeminiVideoClient`。
+- 改 `content_agent/run_service.py`:`start_run` 入口用 cap 包装 `self._gemini_video_client` 成 per-run wrapper 实例注入 deps(每 run new,避免跨 run 计数泄漏)。
+- 改 `recall_decision.py`(与 M5B 协同):收集段提交前读 `remaining_quota()`,offset<remaining 提交、其余按 offset 直接 `_fail("gemini_quota_exhausted")`;末尾 `consume(min(remaining,len))`。
+- (可选)run 末发 `gemini_quota_exhausted` 汇总 run_event;按需在 decision_reason_codes catalog 加 `gemini_quota_exhausted`(若有 reason 白名单校验)。
+
+## 不修改范围
+
+- 不改 `GeminiVideoClient` Protocol、不改真/假 client 内部(wrapper 包在外层)。
+- 不改 recall/walk_engine/graph 的 run 签名(wrapper 经 deps 流入,透明)。
+- 不改判定口径/DB schema;不动 cap 之外的 walk_policy 键。
+- wrapper 不做并发(它只计数+转发;并发在 recall_decision 的 ThreadPool)。
+
+## 涉及文件 / 函数 / 类
+
+- `content_agent/integrations/gemini_quota.py`(新建,~40 行)
+  - `class QuotaCappedGeminiVideoClient`(实现 GeminiVideoClient Protocol):
+    - `__init__(inner: GeminiVideoClient, cap: int | None)`:`self._inner=inner; self._cap=cap; self._used=0; self._lock=threading.Lock()`。
+    - `remaining_quota() -> int`:`with self._lock: return (self._cap - self._used) if self._cap is not None else 1_000_000`(无 cap=不限)。
+    - `consume(n: int) -> None`:`with self._lock: self._used += n`(recall 预判后预扣)。
+    - `analyze(content,media,source_context) -> dict`:**backstop**——`with self._lock: over = self._cap is not None and self._used > self._cap`;`return _fail("gemini_quota_exhausted") if over else self._inner.analyze(...)`(正常路径靠 recall 预判,这里仅防漏判;复用 gemini_video._fail)。
+- `content_agent/run_service.py`
+  - `start_run`(98)入口:`cap = WalkGraphStore().load_policy()["global"]["gemini_calls_per_run_cap"]`;`gemini_client = QuotaCappedGeminiVideoClient(self._gemini_video_client, cap)`;`RunDependencies(... gemini_video_client=gemini_client)`(每 run new → per-run 计数)。
+- `content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py`(与 M5B 协同)
+  - 收集段提交前:`remaining = getattr(gemini_video_client, "remaining_quota", lambda: 1_000_000)()`(兼容非 wrapper 的测试 client);`for offset,item: if offset < remaining: submit; else: judgments[offset]=_fail("gemini_quota_exhausted")`;循环后 `consume(submitted)`,其中 `submitted = min(remaining, len(items))`(=实际提交条数;失败的 Gemini 调用也算消耗配额)。
+  - **跨三次 recall 累计**:run_bounded_walk 内初始 recall + query_batch + author_batch 三次调 pattern_recall.run,每次 `remaining_quota()` 读 `cap - used`(已被前次 consume 递减)→ 累计封顶,确定性递减。
+  - `_fail` 在 recall 侧也需可用:复用 gemini_video 的 `_fail` 或本地等价(返回同结构 status=failed/reason)。
+
+## 数据合同
+
+- 单 run 跨"初始 recall + walk query batch + walk author batch"三次 recall 共享同一 wrapper 实例 → 累计 analyze 调用 ≤ cap。
+- 截断确定性:同输入→同 remaining 序列→同 offset 截断边界→串行/并发逐条相同(完成序无关)。
+- 超额内容:judgment.status=failed/reason=`gemini_quota_exhausted` 落 evidence_summary;无判定 → 不进 rule_judgment、不产终端 walk_action(优雅退化)。
+- cap=null(M5A 默认非此)→ remaining=∞,等价不限额(向后兼容)。
+
+## 实施步骤
+
+1. 写 `QuotaCappedGeminiVideoClient`(Lock 计数 + remaining/consume/analyze backstop,复用 _fail)。
+2. `run_service.start_run` 每 run 包装注入(读 cap)。
+3. recall_decision 收集段加按 offset 预判截断 + consume(与 M5B 同处改)。
+4. (可选)run 末 `_append_lifecycle_event("gemini_quota_exhausted", raw_payload={cap,submitted,truncated})`;核对 reason catalog 是否白名单(不校验则零成本)。
+5. `uv run pytest -q` 全绿(配额专测在 M5D)。
+
+## 验证命令
+
+```bash
+uv run pytest tests/test_concurrency_consistency.py -q   # M5D 的 cap=2 确定性截断测试
+uv run pytest -q                                         # 全量(默认 cap=200 远高于测试 N,不截断 → 现有测试不受影响)
+uv run python -c "from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient; print('import ok')"
+```
+
+## 失败归因
+
+- 截断不确定:按 future 完成序而非 offset 预判(并发下哪几条先完成不定)→ 必须提交前按 offset 决定。
+- 跨 run 计数泄漏:wrapper 在 `__init__`(54)建一次复用 → 必须 `start_run` 每 run new 新实例(或 reset)。
+- 现有测试被误截断:cap 默认应远高于测试 N(200);若测试 client 无 remaining_quota,recall 侧 getattr 兜底 ∞。
+- 配额内容仍进下游:被截断内容应 status=failed 不产 decision;若 _terminal_stage 报 KeyError 则是没用 `.get` 跳过(M4 已是 .get,确认未回退)。
+
+## sub-agent 交叉验证要点
+
+- 确认 wrapper 实现 Protocol、单点注入、per-run 计数(每 run new)、跨三次 recall 共享。
+- 确认截断按 offset 预判(非完成序)、wrapper analyze 仅作 backstop、复用 _fail 语义。
+- 确认可观测落 evidence reason + run_event、reason catalog 处理、被截断内容优雅退化不误判。
+- 确认 recall/walk_engine/graph 签名零改、判定口径/DB schema 不动。

+ 88 - 0
tech_documents/工程落地/v3_implementation_briefs/M5/M5D_Consistency_And_Wallclock.md

@@ -0,0 +1,88 @@
+# M5D 一致性 + 墙钟验证 Implementation Brief
+
+状态:本 brief 是 M5 收尾。覆盖「新建 `test_concurrency_consistency.py`(并发==串行 + 配额 + 兜底)+ R9 固定 run_id 注入 + Jittered fake 暴露乱序 + 墙钟口径重述」。M5A/B/C 全落地后跑,全量 pytest 绿(并发不改结果)。
+
+## 目标
+
+把 M5 的验收硬门槛"并发结果与串行逐条完全一致"做成可重复跑的测试:同确定性 fake、固定 run_id、max_workers=1 vs 4 全 jsonl 逐字节相等;Jittered fake 让"乱序完成"成测试常态以暴露 offset 错位;cap 截断确定性;analyze 抛错兜底。顺带兑现 R9(固定 run_id 注入)并重述墙钟 baseline。
+
+## 现有证据
+
+- `tests/test_walk_profile_degradation.py:19` `_fingerprint(walk_actions)` = sorted([edge_id,from,to,walk_action,walk_status,budget_tier,reason_code])——7 字段组**不含 wa_id**(注释:run_id 随机);`tests/fixtures/snapshots/real_id45/walk_actions_fingerprint.json` 7 行基线。
+- `replay_case`(replay_harness.py:63):`replay_case(case_id, *, runtime_root, cases_dir=CASES_DIR, config_overrides=None, gemini_video_client=None)`;内部 `service.start_run(RunStartRequest(...))`,run_id 由 service 随机生成(run_service.py:99 `f"v1_run_{uuid4().hex[:12]}"`)——**不支持固定 run_id**(R9)。
+- `FakeGeminiVideoClient`(gemini_helpers.py):按 content_id 确定性返回(`result_by_content_id`/`default_result`);`self.calls.append`(48)**无锁**(并发下顺序非确定)。`fake_gemini_pool/review/fail` 工厂。
+- real_id45(4 item douyin)/sph_caihong(5 item shipinhao)fixture;`RunStartRequest`(schemas.py)。
+- 无 `test_concurrency_consistency.py`。
+
+## 修改范围
+
+- 新建 `tests/test_concurrency_consistency.py`(5 例)。
+- 改 `tests/replay_harness.py` + `content_agent/run_service.py`(`start_run`)+ `RunStartRequest`(schemas.py):加可选 `run_id`(最小 3 处,兑现 R9)。
+- 改 `tests/gemini_helpers.py`:`FakeGeminiVideoClient.calls.append` 加 `threading.Lock`;新增 `JitteredFakeGeminiVideoClient`。
+- (可选)R9 后给 `_fingerprint` 加 wa_id 字段(固定 run_id 下可跨 run 钉死)——若加则重钉快照。
+
+## 不修改范围
+
+- 不改 recall/walk_engine/gemini_quota 实现(M5B/C);本 brief 只测 + 测试基建。
+- 不削弱测试(无 assert True/skip/xfail);一致性断言用实跑产物。
+- 不改判定口径/DB schema;现有测试不得为迁就并发而改断言(并发==串行,应天然全绿)。
+
+## 涉及文件 / 函数 / 类(测试用例清单)
+
+- `tests/gemini_helpers.py`
+  - `FakeGeminiVideoClient.__init__` 加 `self._lock = threading.Lock()`;`analyze` 内 `with self._lock: self.calls.append(...)`(消并发 flaky)。
+  - `class JitteredFakeGeminiVideoClient(FakeGeminiVideoClient)`:`analyze` 内 `time.sleep((int(sha1(content_id)[:4],16)%10)/1000)` 后调 super——确定性制造"完成序≠提交序",返回值仍按 content_id 确定。
+- `content_agent/run_service.py` / `schemas.py` / `tests/replay_harness.py`(R9)
+  - `RunStartRequest` 加 `run_id: str | None = None`;`start_run`(99)`run_id = request.run_id or f"v1_run_{uuid4().hex[:12]}"`;`replay_case` 加 `run_id: str | None = None` 透传。
+- `tests/test_concurrency_consistency.py`(新建)
+  - `test_serial_vs_concurrent_recall_identical`:同 fake、同固定 run_id,`config_overrides` 设 max_workers=1 与 4 各跑一次,断言 discovered_content_items/pattern_recall_evidence/rule_decisions/walk_actions 全 jsonl 排序后逐条相等。
+  - `test_jittered_completion_preserves_offset_order`:用 `JitteredFakeGeminiVideoClient`,断言每条 recall_evidence_id ↔ platform_content_id 对齐(offset 未错位)。
+  - `test_quota_cap_deterministic_truncation`:cap=2、≥5 条内容,断言前 2 条有判定、其余 reason=`gemini_quota_exhausted`,且 max_workers=1/4 截断边界相同。
+  - `test_quota_exhaustion_is_observable`:命中 cap → evidence_summary 含 quota reason(+ run_event 若实现)。
+  - `test_analyze_exception_does_not_break_run`:注入 analyze 抛错的 fake → `_safe_analyze` 兜底 status=failed,run state status=success。
+  - **max_workers 注入(关键)**:`replay_case` 的 `config_overrides` 当前只注入 `policy_store`,**够不到 walk_policy 的 max_workers**(它由 recall_decision 从 `WalkGraphStore().load_policy()` 读磁盘)。故串/并行对照用 `monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda: 1/4)` 注入(确定、最小);不要依赖 config_overrides。
+
+## 数据合同
+
+- 固定 run_id + 确定性 fake 下,串行(workers=1)与并发(workers=4)产出的四类 jsonl 逐条相等;Jittered 变体亦相等(证明 offset 归位正确)。
+- cap 截断按 offset 确定,串/并行边界一致;命中可观测。
+- 现有全量测试绿(并发不改结果);快照(若加 wa_id)由实跑重钉,不手编。
+
+## 实施步骤
+
+1. R9:`RunStartRequest`/`start_run`/`replay_case` 加可选 run_id(3 处)。
+2. gemini_helpers:calls 加锁 + 写 `JitteredFakeGeminiVideoClient`。
+3. 写 `test_concurrency_consistency.py` 5 例(实跑取产物比对,不手编)。
+4. (可选)`_fingerprint` 加 wa_id + 重钉 `walk_actions_fingerprint.json`。
+5. `uv run pytest -q` 全绿 + 墙钟口径写入(见下)。
+
+## 墙钟口径(重述,M7 实测核)
+
+- 旧 V2 83min 是 decode 主导(M2 已删 decode),**作废,不作分母**。
+- 串/并行对照在测试里靠 monkeypatch `_resolve_max_workers`(=1 串行、=4 并发),不经 config_overrides。
+- M5 串行:`T_serial ≈ N × 24s`(Gemini ~24s/条实测,N=初始发现+翻页/作者增量,受 edge_budgets 翻页≤3/作者≤2×作品≤3 约束,N≈20–40)。
+- M5 并发:`T_concurrent ≈ ceil(N/max_workers) × 24s + 组装/落盘串行开销`(max_workers=4)。
+- 加速比目标 **分段报告**:判定段(应接近 ×4 上限)vs 整 run(被 12s 限流抓取段稀释);整 run 目标 < 10min(09 §10)。一致性是 must、墙钟是 should(M7 真跑核,写进 run_event raw_payload)。
+
+## 验证命令
+
+```bash
+uv run pytest tests/test_concurrency_consistency.py -q
+uv run pytest tests/test_walk_profile_degradation.py tests/test_case_replay.py -q   # 指纹/回放全绿
+uv run pytest -q                                                                    # 全量(并发不改结果)
+```
+
+## 失败归因
+
+- 串/并行不等:offset 回收错位(M5B)或截断按完成序(M5C)——Jittered + cap 测试定位。
+- Jittered 测试 flaky:`FakeGeminiVideoClient.calls.append` 未加锁。
+- run_id 注入没生效:`start_run` 仍无条件随机(须 `request.run_id or ...`)。
+- 快照对不上:手编而非实跑产物;或加 wa_id 后未重钉。
+- 现有测试被迫改断言:说明并发引入了结果变化(真回归),应回查 M5B/C 而非改测试。
+
+## sub-agent 交叉验证要点
+
+- 确认一致性测试逐条比对四类产物 + Jittered 暴露乱序 + cap 确定性截断 + 兜底,全部实跑断言、无削弱。
+- 确认 R9 固定 run_id 三处最小接入、calls 加锁。
+- 确认现有全量测试绿(并发未改结果)、墙钟口径分段报告且作废旧 83min。
+- 确认未改 M5B/C 实现、未改判定口径/DB schema。