Просмотр исходного кода

fix(walk): dedupe author works against discovered contents; add E2E test plan

- 真实 E2E(v1_run_e6ba21f7543b,demand 45)抓到的新 bug:作者近期作品包含
  首轮已发现的同一条视频,_execute_author_edges 缺 existing_content_ids 去重
  (查询批次有、作者边漏),撞 DB 唯一索引 uk_ca_items_run_policy_content。
  V1 不可见(作者边接口损坏从未真实返回作品),M5 修好 blogger 后首次暴露
- 修复:作者作品按已发现内容去重(跨作者累积 seen),过滤后再取每作者前 3 条
- 回归测试 test_author_works_skip_already_discovered_content;302 passed
- 新增 07_E2E真实跑测计划.md:全库 130 需求 seed 实测分布(id=45 为最小,
  1 seed→2 query)、7 个真实接口清单与验证点、decode/分类绑定检测项、
  执行手册、V1 基线对照断言、首跑实测结果(M3/M6 真实生效证据)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee 3 дней назад
Родитель
Сommit
143eb2e53e

+ 14 - 1
content_agent/business_modules/walk_engine.py

@@ -238,6 +238,13 @@ def _execute_author_edges(
     decision_by_content_id = _decision_by_content_id(rule_decisions)
     decision_by_content_id = _decision_by_content_id(rule_decisions)
     binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
     binding, _ = _resolve_edge_binding("author_to_works", walk_strategy)
     author_items = _unique_authors(discovered_content_items)[:2]
     author_items = _unique_authors(discovered_content_items)[:2]
+    # 作者近期作品天然可能包含首轮已发现的同一条视频;不去重会撞
+    # uk_ca_items_run_policy_content 唯一索引(真实 E2E v1_run_e6ba21f7543b 实证)。
+    seen_content_ids = {
+        str(item.get("platform_content_id"))
+        for item in discovered_content_items
+        if item.get("platform_content_id")
+    }
     platform_results: list[dict[str, Any]] = []
     platform_results: list[dict[str, Any]] = []
     walk_actions: list[dict[str, Any]] = []
     walk_actions: list[dict[str, Any]] = []
     for item in author_items:
     for item in author_items:
@@ -321,7 +328,13 @@ def _execute_author_edges(
                 raw_extra=_decision_context(decision),
                 raw_extra=_decision_context(decision),
             )
             )
         )
         )
-        for index, work in enumerate(works[:3], start=1):
+        new_works = [
+            work
+            for work in works
+            if str(work.get("platform_content_id") or "") and str(work.get("platform_content_id")) not in seen_content_ids
+        ]
+        for index, work in enumerate(new_works[:3], start=1):
+            seen_content_ids.add(str(work["platform_content_id"]))
             platform_results.append(
             platform_results.append(
                 {
                 {
                     **work,
                     **work,

+ 93 - 0
tech_documents/工程落地/07_E2E真实跑测计划.md

@@ -0,0 +1,93 @@
+# 07 端到端(E2E)真实跑测计划
+
+状态:V2 收官验收的正式跑测计划。参照 V1 真实 E2E 记录(06 计划附录 A:id=45 / v1_run_faaf9a1d0ad6,2026-06-08)与服务器 e2e_logs(id=18/45/70)。本文数据均来自真实 DB / 真实接口实测。
+
+更新时间:2026-06-10
+
+## 1. 切入点选择(search term 最小)
+
+全库 130 个需求的 seed 数实测分布(2026-06-10,直接解析 `demand_content.ext_data.evidence_pack.seed_terms`):
+
+```text
+1 个 seed: 5 条(id=45, 78, 88, 119, 129 —— 全部为「中医养生」)
+2 个 seed: 104 条(主流,如 id=1/3/4「爱国情感+人物故事」)
+3-6 个 seed: 21 条
+```
+
+**首选 id=45**:1 个 seed →(1 原词 + 1 LLM 变体)= **2 条初始 query,全库最小**;且与 V1 基线 `v1_run_faaf9a1d0ad6` 同源,V2 受控变化可逐项对照。同构备选:78/88/119/129(同 seed,可用于复测/对照而不污染 45 的对照链)。
+
+## 2. 真实接口清单(全部真调,零 mock)
+
+来源:`.env`(服务器已验证 from_env 全部客户端构造成功)+ `tech_documents/数据接口与来源/external_data_sources_registry.json`。
+
+| # | 接口 | 调用点 | 真实性验证点 |
+|---|---|---|---|
+| 1 | MySQL `content-deconstruction-supply`(192.168.82.27) | demand 读取 + 21 张 `content_agent_*` 表双写 | DB validator pass;run 后逐表行数核对 |
+| 2 | Query LLM(`CONTENT_AGENT_QUERY_LLM_*`) | search_intent 生成变体 query | q_002 的 generation_method=llm_variant,prompt_version 落 decision 链 |
+| 3 | Crawapi 抖音关键词搜索(`/crawler/dou_yin/keyword`) | platform_access→search(),12s 限流 bucket=douyin_search | HTTP 200/code 0;限流命中则 PLATFORM_RATE_LIMITED(M5 分类) |
+| 4 | Crawapi 点赞画像(`/crawler/dou_yin/re_dian_bao/video_like_portrait`) | 每条内容画像,失败重试 2 次 | portrait_available / age_50_plus_level 落 evidence |
+| 5 | Crawapi 作者作品(`/crawler/dou_yin/blogger`,M5) | 游走作者边(KEEP/ADD 才走),bucket=douyin_blogger | **已单独 live smoke 通过(2026-06-10:200/code 0/22 条作品)** |
+| 6 | AIGC decode(submit `/aigc/api/task/decode` + result) | pattern recall 逐条真实解码 | M6 五类 decode 事件落 run_events;decode_status/strong_terms |
+| 7 | 分类树 match-paths v2(`/api/search/categories/match-paths/v2`) | decode 强词 → 分类路径绑定 | matched_category_paths 落 evidence;v2 双结构解析(M5C) |
+
+## 3. decode 与分类树绑定的检测项
+
+- decode 链:`evt_decode_{cid}_submitted/polling/succeeded|failed|timeout_{attempt}` 全程落 run_events;超时落 pending(decode_timeout_20m)不卡 run,末尾最小补跑一轮(M6B)
+- 量化指标(每次跑测记录):decode 成功率、平均轮询次数/耗时、超时率、失败原因分布(decode_client_error / bad_shape / missing_strong_terms)
+- 分类绑定:strong_terms→match_paths 命中率;`matched_category_paths` 与需求 `category_bindings` 的回扣判定(`_can_explain_pattern`)通过率;`pattern_recall ∈ {matched, no_match, pending, rejected}` 分布
+- 红线:decode/绑定失败只影响该内容的判定结果,不得让 run 失败
+
+## 4. 跑测执行手册(服务器 192.168.82.27,/home/sam/ContentFindAgentNew)
+
+```bash
+# 预检(每次跑测前)
+.venv/bin/python scripts/validate_content_agent_db.py     # DB 21 表
+.venv/bin/python scripts/run_config_gate.py                # 配置闸
+.venv/bin/python -m pytest -q                              # 301 基线
+# 发起(脚本在仓库根,nohup 后台,日志进 e2e_logs/)
+nohup .venv/bin/python run_e2e_v2_tmp.py > e2e_logs/e2e_<id>_v2_<ts>.log 2>&1 &
+# 监控(M6 实时事件,跑到哪一目了然)
+SELECT event_id, status, created_at FROM content_agent_run_events
+  WHERE run_id=<rid> ORDER BY id DESC LIMIT 5;
+# 中止/安全:单需求重试 ≤2 次(控制 crawapi/AIGC 消耗);decode 最长 3×20min
+```
+
+## 5. 验收断言(V2 受控行为,对照 V1 基线)
+
+| 维度 | V1 基线(faaf9a1d0ad6) | V2 预期 |
+|---|---|---|
+| 画像缺失内容 | REJECT_CONTENT/rule_blocked | **KEEP_CONTENT_FOR_REVIEW/pending**(M3,配置驱动) |
+| 被拦/待复看 query 翻页 | rule_blocked 仍翻页 2 次 | 非 success 不翻页(M4A) |
+| 被拦内容扩作者/tag | 仍扩作者 2 次(失败 RuntimeError) | REJECT 不扩;KEEP 仅低预算扩作者(M4B);blogger 接口真实可用(M5) |
+| walk_actions.rule_pack_id | NULL / path_stop 错挂 Content 包 | 全部带归属包;path_stop 归 Path 包;raw_payload 带执行事实(M4C/D) |
+| run_events | 9 条,无耗时 | + evt_stage_*(12 阶段含 duration)+ evt_decode_*(M6) |
+| timeline summary | 无 | 七字段可算(web 时间线页可视) |
+
+## 6. 首跑实测结果(run v1_run_e6ba21f7543b,id=45,2026-06-10 20:33 发起)
+
+**结果:跑到 execute_walk 阶段失败(IntegrityError),但失败前已用真实数据证明 V2 核心行为;失败本身是 E2E 抓到的真实新 bug,已修复(见下)。**
+
+实测事实(全部来自生产 DB):
+- query:2 条(q_001 原词「中医养生」+ q_002 LLM 变体)——全库最小规模 ✓
+- 内容:5 条真实抖音视频(5 个不同作者),关键词搜索/画像接口全部真实返回
+- **decode:5/5 全部成功**(真实 AIGC,polling 共 267 次,M6 事件全程可见;首条视频轮询 44+ 次约 4 分钟)
+- **M3 真实生效**:5 条决策 = 3×KEEP_CONTENT_FOR_REVIEW(missing_content_portrait,V1 会全 REJECT)+ 2×REJECT(content_pattern_recall_required)——画像止血与回扣判定在真实数据上按拍板执行 ✓
+- M6 真实生效:stage 事件 + decode 事件实时落 DB,失败阶段被 evt_stage_execute_walk_1_failed 精准定位 ✓
+- 总耗时 1923 秒(32 分钟),decode 占绝对大头
+
+**失败根因(E2E 价值现场兑现)**:KEEP 决策触发 M4B 低预算作者扩展 → M5 修复后的 blogger 接口真实拉回作者作品 → 作者近期作品包含首轮已发现的同一条视频 → `_execute_author_edges` 缺 `existing_content_ids` 去重(查询批次有、作者边漏)→ 撞 DB 唯一索引 `uk_ca_items_run_policy_content`。该 bug 在 V1 不可见(V1 作者边接口损坏从未真实返回作品),M5 修好接口后首次真实回流即暴露。
+修复:walk_engine 作者作品按已发现内容去重(跨作者累积),回归测试 `test_author_works_skip_already_discovered_content`,302 passed。
+
+## 7. 后续跑测梯度(依据首跑数据再定)
+
+1. **同构复测**:78/88/119/129(同 seed)验证稳定性与限流表现
+2. **2-seed 主流需求**:从 104 条中选 1-2 条(4 query 规模)
+3. **依据实测数据的拍板项**(首跑结果出来后定):是否启用 decode 短等待档(300s)、限流 business code 是否需补白名单、是否需要"限量 decode"、收割哪条 run 做新回放标本
+4. 每轮跑测后:harvest 收割 → 06 计划第 12 节状态更新
+
+## 8. 风险与边界
+
+- crawapi 风控/key 失效 → M5 错误分类可见(PLATFORM_RATE_LIMITED vs REQUEST_FAILED),不盲试
+- AIGC 解码慢/失败 → pending 不卡 run;补跑兜底;真实视频解码成功率待首跑数据
+- 费用控制:单日真实跑测 ≤3 个需求;decode 串行天然限速
+- DB 只增不改:跑测只新增 run 记录,不动历史数据

+ 31 - 0
tests/test_walk_engine_author.py

@@ -81,3 +81,34 @@ def test_author_edge_keeps_review_low_budget(tmp_path):
     assert author_actions
     assert author_actions
     assert author_actions[0]["budget_tier"] == "low_budget"
     assert author_actions[0]["budget_tier"] == "low_budget"
     assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True
     assert author_actions[0]["raw_payload"]["rule_pack_execution"]["executed"] is True
+
+
+def test_author_works_skip_already_discovered_content(tmp_path):
+    # 真实 E2E(v1_run_e6ba21f7543b)实证:作者近期作品包含首轮已发现的同一条视频,
+    # 不去重会撞 DB 唯一索引 uk_ca_items_run_policy_content。
+    context = build_initial_walk_context(tmp_path)
+    context["discovered_content_items"][0]["has_more"] = False
+    context["discovered_content_items"][0]["tags"] = []
+    first_round_id = context["discovered_content_items"][0]["platform_content_id"]
+
+    class OverlappingWorksClient(FakeWalkPlatformClient):
+        def fetch_author_works(self, query):
+            self.author_calls.append(dict(query))
+            duplicate = _dup_platform_result(query, first_round_id)
+            fresh = _dup_platform_result(query, "7390000000000000399")
+            return [duplicate, fresh]
+
+    client = OverlappingWorksClient()
+    result = run_bounded_walk(platform_client=client, **context)
+
+    assert client.author_calls
+    content_ids = [item["platform_content_id"] for item in result["discovered_content_items"]]
+    # 重复视频不得二次进入 discovered;新作品正常进入。
+    assert content_ids.count(first_round_id) == 1
+    assert "7390000000000000399" in content_ids
+
+
+def _dup_platform_result(query, platform_content_id):
+    from tests.p6_walk_helpers import _platform_result
+
+    return _platform_result(query, platform_content_id, "作者作品", [])