Ver Fonte

Close platform query failure records

Sam Lee há 3 semanas atrás
pai
commit
800683e418

+ 5 - 2
content_agent/business_modules/run_record/__init__.py

@@ -1,6 +1,9 @@
 from __future__ import annotations
 
-from content_agent.business_modules.run_record.recorder import run
+from content_agent.business_modules.run_record.recorder import (
+    build_platform_query_failure_records,
+    run,
+)
 from content_agent.business_modules.run_record.validation import validate_run
 
-__all__ = ["run", "validate_run"]
+__all__ = ["build_platform_query_failure_records", "run", "validate_run"]

+ 89 - 18
content_agent/business_modules/run_record/recorder.py

@@ -47,7 +47,10 @@ def run(
         query_failures or [],
     )
     run_events = _build_run_events(run_id, policy_run_id, query_failures or [])
+    failed_search_queries = _build_failed_search_queries(search_queries, query_failures or [])
 
+    if failed_search_queries:
+        runtime.append_jsonl(run_id, "search_queries.jsonl", failed_search_queries)
     runtime.append_jsonl(run_id, "walk_actions.jsonl", walk_actions or [])
     runtime.append_jsonl(run_id, "run_events.jsonl", run_events)
     runtime.append_jsonl(run_id, "source_path_records.jsonl", source_path_records)
@@ -59,6 +62,54 @@ def run(
     }
 
 
+def build_platform_query_failure_records(
+    run_id: str,
+    policy_run_id: str,
+    search_queries: list[dict[str, Any]],
+    query_failures: list[dict[str, Any]],
+) -> dict[str, list[dict[str, Any]]]:
+    return {
+        "search_queries": _build_failed_search_queries(search_queries, query_failures),
+        "search_clues": _build_search_clues(
+            run_id,
+            policy_run_id,
+            search_queries,
+            discovered_content_items=[],
+            decisions=[],
+            policy_bundle={},
+            query_failures=query_failures,
+        ),
+        "run_events": _build_query_failure_events(
+            run_id,
+            policy_run_id,
+            query_failures,
+            start_index=1,
+        ),
+    }
+
+
+def _build_failed_search_queries(
+    search_queries: list[dict[str, Any]],
+    query_failures: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    failure_by_query_id = {failure["search_query_id"]: failure for failure in query_failures}
+    failed_queries: list[dict[str, Any]] = []
+    for search_query in search_queries:
+        query_failure = failure_by_query_id.get(search_query["search_query_id"])
+        if not query_failure:
+            continue
+        failed_queries.append(
+            with_raw_payload(
+                {
+                    **search_query,
+                    "search_query_effect_status": "failed",
+                    "query_failure": query_failure,
+                }
+            )
+        )
+    return failed_queries
+
+
 def _build_search_clues(
     run_id: str,
     policy_run_id: str,
@@ -162,28 +213,48 @@ def _build_run_events(
         )
         for index, (event_type, input_ref, output_ref) in enumerate(event_specs, start=1)
     ]
-    for index, failure in enumerate(query_failures, start=len(events) + 1):
-        events.append(
-            with_raw_payload(
-                {
-                    "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
-                    "run_id": run_id,
-                    "policy_run_id": policy_run_id,
-                    "event_id": f"evt_{index:03d}",
-                    "event_type": "platform_query_failed",
-                    "status": "failed",
-                    "input_ref": f"search_queries.jsonl:{failure['search_query_id']}",
-                    "output_ref": "search_clues.jsonl",
-                    "error_code": failure["error_code"],
-                    "message": failure["message"],
-                    "query_failure": failure,
-                    "created_at": created_at,
-                }
-            )
+    events.extend(
+        _build_query_failure_events(
+            run_id,
+            policy_run_id,
+            query_failures,
+            start_index=len(events) + 1,
+            created_at=created_at,
         )
+    )
     return events
 
 
+def _build_query_failure_events(
+    run_id: str,
+    policy_run_id: str,
+    query_failures: list[dict[str, Any]],
+    *,
+    start_index: int,
+    created_at: str | None = None,
+) -> list[dict[str, Any]]:
+    created_at = created_at or datetime.now(timezone.utc).isoformat()
+    return [
+        with_raw_payload(
+            {
+                "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
+                "run_id": run_id,
+                "policy_run_id": policy_run_id,
+                "event_id": f"evt_{index:03d}",
+                "event_type": "platform_query_failed",
+                "status": "failed",
+                "input_ref": f"search_queries.jsonl:{failure['search_query_id']}",
+                "output_ref": "search_clues.jsonl",
+                "error_code": failure["error_code"],
+                "message": failure["message"],
+                "query_failure": failure,
+                "created_at": created_at,
+            }
+        )
+        for index, failure in enumerate(query_failures, start=start_index)
+    ]
+
+
 def _aggregate_query_effect_status(
     effect_counts: Counter[str], aggregations: list[dict[str, Any]]
 ) -> dict[str, str]:

+ 9 - 2
content_agent/integrations/database_runtime.py

@@ -125,6 +125,13 @@ JSON_FILE_PAYLOAD_COLUMNS = {
     "strategy_review.json": "raw_payload",
 }
 
+JSONL_UPSERT_KEYS = {
+    "search_queries.jsonl": ("run_id", "policy_run_id", "search_query_id"),
+    "pattern_recall_evidence.jsonl": ("run_id", "policy_run_id", "recall_evidence_id"),
+    "search_clues.jsonl": ("run_id", "policy_run_id", "clue_id"),
+    "run_events.jsonl": ("run_id", "policy_run_id", "event_id"),
+}
+
 
 @dataclass(frozen=True)
 class ContentSupplyDbConfig:
@@ -229,11 +236,11 @@ class DatabaseRuntimeStore:
             if row.get("run_id") != run_id:
                 raise ValueError(f"{filename} row run_id does not match runtime run_id")
             record = _record_for_jsonl(filename, row)
-            if filename == "pattern_recall_evidence.jsonl":
+            if filename in JSONL_UPSERT_KEYS:
                 self._upsert(
                     table,
                     record,
-                    key_columns=("run_id", "policy_run_id", "recall_evidence_id"),
+                    key_columns=JSONL_UPSERT_KEYS[filename],
                 )
             else:
                 self._insert(table, record)

+ 19 - 6
content_agent/integrations/runtime_files.py

@@ -49,8 +49,12 @@ class LocalRuntimeFileStore:
     def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
         path = self.run_dir(run_id) / filename
         path.parent.mkdir(parents=True, exist_ok=True)
-        if filename == "pattern_recall_evidence.jsonl":
-            rows = _replace_pattern_recall_rows(self.read_jsonl(run_id, filename), rows)
+        if filename in {"pattern_recall_evidence.jsonl", "search_queries.jsonl"}:
+            rows = _replace_keyed_rows(
+                self.read_jsonl(run_id, filename),
+                rows,
+                _jsonl_key_fields(filename),
+            )
             path.write_text(
                 "".join(
                     json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n"
@@ -132,15 +136,24 @@ class LocalRuntimeFileStore:
         return []
 
 
-def _replace_pattern_recall_rows(
+def _replace_keyed_rows(
     existing_rows: list[dict[str, Any]],
     new_rows: list[dict[str, Any]],
+    key_fields: tuple[str, ...],
 ) -> list[dict[str, Any]]:
-    keyed_rows: dict[tuple[Any, Any, Any], dict[str, Any]] = {}
-    order: list[tuple[Any, Any, Any]] = []
+    keyed_rows: dict[tuple[Any, ...], dict[str, Any]] = {}
+    order: list[tuple[Any, ...]] = []
     for row in [*existing_rows, *new_rows]:
-        key = (row.get("run_id"), row.get("policy_run_id"), row.get("recall_evidence_id"))
+        key = tuple(row.get(field) for field in key_fields)
         if key not in keyed_rows:
             order.append(key)
         keyed_rows[key] = row
     return [keyed_rows[key] for key in order]
+
+
+def _jsonl_key_fields(filename: str) -> tuple[str, ...]:
+    if filename == "pattern_recall_evidence.jsonl":
+        return ("run_id", "policy_run_id", "recall_evidence_id")
+    if filename == "search_queries.jsonl":
+        return ("run_id", "policy_run_id", "search_query_id")
+    raise ValueError(f"unsupported keyed JSONL file: {filename}")

+ 34 - 0
content_agent/run_service.py

@@ -8,6 +8,7 @@ from typing import Any
 from uuid import uuid4
 
 from content_agent.constants import RUNTIME_SCHEMA_VERSION
+from content_agent.business_modules import run_record
 from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
 from content_agent.graph import RunDependencies, build_run_graph
 from content_agent.integrations.composite_runtime import CompositeRuntimeStore
@@ -330,6 +331,7 @@ class RunService:
                     "completed_at": _utc_now(),
                 },
             )
+            self._record_platform_query_failure_details(run_id, policy_run_id, error)
             self._append_lifecycle_event(
                 run_id,
                 policy_run_id,
@@ -348,6 +350,31 @@ class RunService:
             # Preserve the original run failure; DB failure here is already reflected by the state.
             return
 
+    def _record_platform_query_failure_details(
+        self,
+        run_id: str,
+        policy_run_id: str,
+        error: ContentAgentError,
+    ) -> None:
+        query_failures = _query_failures_from_error(error)
+        if not query_failures:
+            return
+        try:
+            search_queries = self.runtime.read_jsonl(run_id, "search_queries.jsonl")
+        except FileNotFoundError:
+            return
+        if not search_queries:
+            return
+        records = run_record.build_platform_query_failure_records(
+            run_id,
+            policy_run_id,
+            search_queries,
+            query_failures,
+        )
+        self.runtime.append_jsonl(run_id, "search_queries.jsonl", records["search_queries"])
+        self.runtime.append_jsonl(run_id, "search_clues.jsonl", records["search_clues"])
+        self.runtime.append_jsonl(run_id, "run_events.jsonl", records["run_events"])
+
     def _append_lifecycle_event(
         self,
         run_id: str,
@@ -568,6 +595,13 @@ def _decision_summary(decisions: list[dict[str, Any]]) -> dict[str, Any]:
     }
 
 
+def _query_failures_from_error(error: ContentAgentError) -> list[dict[str, Any]]:
+    query_failures = error.detail.get("query_failures") if isinstance(error.detail, dict) else None
+    if not isinstance(query_failures, list):
+        return []
+    return [failure for failure in query_failures if isinstance(failure, dict)]
+
+
 def _decode_client_from_env(env: dict[str, str]) -> DecodeClient:
     if env.get("CONTENTFIND_API_AIGC_BASE_URL") and (
         env.get("CONTENTFIND_API_AIGC_TOKEN") or env.get("AIGC_TOKEN")

+ 4 - 2
product_documents/抖音游走策略/runtime_v1_records_schema.md

@@ -68,7 +68,7 @@ P6 runtime 文件:
 | `recall_status` | `matched` / `pending` / `failed` / `rejected` / `no_match` |
 | `final_asset_status` | `pooled` / `review_asset` / `stored_author` / `clue_only` |
 
-`search_query_effect_status` 含义:`success` 表示 query 下至少产生一条入池内容;`pending` 表示没有入池但有待复看内容;`failed` 表示无有效候选或普通淘汰;`rule_blocked` 表示候选主要被 hard gate 阻断。
+`search_query_effect_status` 含义:`success` 表示 query 下至少产生一条入池内容;`pending` 表示没有入池但有待复看内容;`failed` 表示无有效候选、平台搜索失败或普通淘汰;`rule_blocked` 表示候选主要被 hard gate 阻断。
 
 `source_context.json` 是 run 级输入事实,不写 `policy_run_id`。从 `pattern_seed_pack.json` 开始,所有策略相关产物都必须写 `policy_run_id`,保证未来同一批输入可以跑多套策略并逐条复盘。
 
@@ -268,6 +268,7 @@ P6 runtime 文件:
 ```jsonl
 {"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","clue_id":"clue_001","search_query_id":"q_001","search_query":"爱国情感","discovery_start_source":"pattern_itemset","previous_discovery_step":"pattern_query","result_count":10,"pooled_content_count":2,"review_content_count":3,"pending_content_count":1,"rejected_content_count":4,"search_query_effect_status":"success","query_aggregation_id":"agg_query_success","walk_next_step":"keep_search_query","raw_payload":{"clue_id":"clue_001","query_aggregation_id":"agg_query_success"}}
 {"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","clue_id":"clue_002","search_query_id":"q_002","search_query":"爱国人物故事","discovery_start_source":"pattern_itemset","previous_discovery_step":"pattern_query","result_count":5,"pooled_content_count":0,"review_content_count":1,"pending_content_count":1,"rejected_content_count":3,"search_query_effect_status":"pending","query_aggregation_id":"agg_query_pending","walk_next_step":"review_later_or_small_budget","raw_payload":{"clue_id":"clue_002","query_aggregation_id":"agg_query_pending"}}
+{"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","clue_id":"clue_003","search_query_id":"q_003","search_query":"平台失败样例","discovery_start_source":"pattern_itemset","previous_discovery_step":"pattern_query","result_count":0,"pooled_content_count":0,"review_content_count":0,"pending_content_count":0,"rejected_content_count":0,"search_query_effect_status":"failed","query_aggregation_id":"platform_query_failure","walk_next_step":"stop_search_query","raw_payload":{"clue_id":"clue_003","query_aggregation_id":"platform_query_failure","query_failure":{"search_query_id":"q_003","status":"failed","error_code":"PLATFORM_REQUEST_FAILED","message":"platform query failed"}}}
 ```
 
 ## 12. `run_events.jsonl`
@@ -275,13 +276,14 @@ P6 runtime 文件:
 用途:流水日志,记录每个关键动作是否成功。
 
 必填:`record_schema_version`、`run_id`、`policy_run_id`、`event_id`、`event_type`、`status`、`input_ref`、`output_ref`、`raw_payload`。
-枚举:`status = success / pending / failed`。规则阻断结果在业务效果字段里写 `rule_blocked`。
+枚举:`status = success / pending / failed`。规则阻断结果在业务效果字段里写 `rule_blocked`。平台单 query 搜索失败时写 `event_type=platform_query_failed`,不代表整次 run 一定失败。
 
 样例:
 
 ```jsonl
 {"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","event_id":"evt_001","event_type":"search_query_generated","status":"success","input_ref":"pattern_seed_pack.json","output_ref":"search_queries.jsonl:q_001","raw_payload":{"event_id":"evt_001"},"created_at":"2026-06-05T10:00:00+08:00"}
 {"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","event_id":"evt_002","event_type":"content_portrait_missing","status":"failed","input_ref":"discovered_content_items.jsonl:c_099","output_ref":"rule_decisions.jsonl:d_003","raw_payload":{"event_id":"evt_002","content_effect_status":"rule_blocked"},"created_at":"2026-06-05T10:04:00+08:00"}
+{"record_schema_version":"runtime_record.v1","run_id":"v1_run_001","policy_run_id":"policy_run_001","event_id":"evt_003","event_type":"platform_query_failed","status":"failed","input_ref":"search_queries.jsonl:q_003","output_ref":"search_clues.jsonl","error_code":"PLATFORM_REQUEST_FAILED","message":"platform query failed","raw_payload":{"event_id":"evt_003","query_failure":{"search_query_id":"q_003","status":"failed","error_code":"PLATFORM_REQUEST_FAILED"}},"created_at":"2026-06-05T10:04:00+08:00"}
 ```
 
 ## 13. `final_output.json`

+ 1 - 1
product_documents/规则包/douyin_rule_packs.v1.json

@@ -1373,7 +1373,7 @@
       "walk_next_step": "stop_search_query",
       "priority": 40,
       "enabled": true,
-      "notes": "query 无有效候选,且不是规则阻断主因时为 failed。"
+      "notes": "query 无有效候选、平台搜索失败,且不是规则阻断主因时为 failed。"
     }
   ],
   "decision_reason_codes": [

+ 153 - 1
scripts/validate_content_agent_db.py

@@ -139,6 +139,18 @@ for table in [
     "content_agent_strategy_reviews",
 ]:
     REQUIRED_COLUMNS_BY_TABLE[table].add("raw_payload")
+REQUIRED_COLUMNS_BY_TABLE["content_agent_runs"].update(
+    {"status", "error_code", "error_message", "error_detail"}
+)
+REQUIRED_COLUMNS_BY_TABLE["content_agent_queries"].update(
+    {"search_query_id", "search_query", "search_query_effect_status"}
+)
+REQUIRED_COLUMNS_BY_TABLE["content_agent_search_clues"].update(
+    {"search_query_id", "search_query_effect_status"}
+)
+REQUIRED_COLUMNS_BY_TABLE["content_agent_run_events"].update(
+    {"event_type", "status", "error_code", "message"}
+)
 REQUIRED_UNIQUE_INDEXES_BY_TABLE = {
     "content_agent_runs": [{"run_id"}],
     "content_agent_source_contexts": [{"run_id"}],
@@ -333,13 +345,19 @@ def main() -> int:
                 }
             result["column_status"] = column_status
             result["index_status"] = index_status
+            if args.audit_run_id:
+                result["audit_runs"] = _audit_query_failures(cur, args.audit_run_id)
 
     schema_ready = _schema_ready(result.get("column_status", {}), result.get("index_status", {}))
     result["schema_ready"] = schema_ready
     print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
     if args.allow_missing_tables:
         return 0
-    if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready:
+    audit_ready = all(
+        audit.get("status") in {"pass", "skipped"}
+        for audit in result.get("audit_runs", [])
+    )
+    if result["missing_tables"] or not result["has_runtime_privileges"] or not schema_ready or not audit_ready:
         return 1
     return 0
 
@@ -368,6 +386,12 @@ def _parse_args() -> argparse.Namespace:
     parser.add_argument("--password", default=None)
     parser.add_argument("--timeout", type=int, default=8)
     parser.add_argument("--run-label-like", default="cfa_mysql_100_20260607_batch%")
+    parser.add_argument(
+        "--audit-run-id",
+        action="append",
+        default=[],
+        help="Optional exact run_id to audit query failure closure. May be passed more than once.",
+    )
     parser.add_argument(
         "--allow-missing-tables",
         action="store_true",
@@ -376,6 +400,134 @@ def _parse_args() -> argparse.Namespace:
     return parser.parse_args()
 
 
+def _audit_query_failures(cur: Any, run_ids: list[str]) -> list[dict[str, Any]]:
+    audits: list[dict[str, Any]] = []
+    for run_id in run_ids:
+        cur.execute(
+            "SELECT `run_id`, `error_code`, `error_detail` "
+            "FROM `content_agent_runs` WHERE `run_id` = %s",
+            (run_id,),
+        )
+        run_row = cur.fetchone()
+        if not run_row:
+            audits.append({"run_id": run_id, "status": "fail", "findings": ["run_not_found"]})
+            continue
+        error_detail = _decode_json(run_row.get("error_detail")) or {}
+        query_failures = error_detail.get("query_failures")
+        if not isinstance(query_failures, list) or not query_failures:
+            audits.append({"run_id": run_id, "status": "skipped", "reason": "no_query_failures"})
+            continue
+        failure_ids = [
+            str(failure.get("search_query_id"))
+            for failure in query_failures
+            if isinstance(failure, dict) and failure.get("search_query_id")
+        ]
+        findings: list[str] = []
+        if not failure_ids:
+            findings.append("query_failures_missing_search_query_id")
+
+        query_rows = _fetch_rows_for_ids(
+            cur,
+            "content_agent_queries",
+            run_id,
+            failure_ids,
+            "search_query_id, search_query_effect_status, raw_payload",
+        )
+        query_by_id = {row["search_query_id"]: row for row in query_rows}
+        for query_id in failure_ids:
+            row = query_by_id.get(query_id)
+            if not row:
+                findings.append(f"query_missing:{query_id}")
+                continue
+            if row.get("search_query_effect_status") != "failed":
+                findings.append(f"query_not_failed:{query_id}")
+            raw_payload = _decode_json(row.get("raw_payload")) or {}
+            if not isinstance(raw_payload.get("query_failure"), dict):
+                findings.append(f"query_failure_payload_missing:{query_id}")
+
+        clue_rows = _fetch_rows_for_ids(
+            cur,
+            "content_agent_search_clues",
+            run_id,
+            failure_ids,
+            "search_query_id, search_query_effect_status, raw_payload",
+        )
+        clue_by_id = {row["search_query_id"]: row for row in clue_rows}
+        for query_id in failure_ids:
+            row = clue_by_id.get(query_id)
+            if not row:
+                findings.append(f"search_clue_missing:{query_id}")
+                continue
+            if row.get("search_query_effect_status") != "failed":
+                findings.append(f"search_clue_not_failed:{query_id}")
+
+        event_refs = [f"search_queries.jsonl:{query_id}" for query_id in failure_ids]
+        event_rows = _fetch_rows_for_values(
+            cur,
+            "content_agent_run_events",
+            run_id,
+            event_refs,
+            "input_ref",
+            "input_ref, event_type, status",
+            extra_where="AND event_type = 'platform_query_failed'",
+        )
+        event_by_ref = {row["input_ref"]: row for row in event_rows}
+        for event_ref in event_refs:
+            row = event_by_ref.get(event_ref)
+            if not row:
+                findings.append(f"platform_query_failed_event_missing:{event_ref}")
+                continue
+            if row.get("status") != "failed":
+                findings.append(f"platform_query_failed_event_status:{event_ref}")
+
+        audits.append(
+            {
+                "run_id": run_id,
+                "query_failure_count": len(failure_ids),
+                "status": "fail" if findings else "pass",
+                "findings": findings,
+            }
+        )
+    return audits
+
+
+def _fetch_rows_for_ids(
+    cur: Any,
+    table: str,
+    run_id: str,
+    ids: list[str],
+    columns: str,
+) -> list[dict[str, Any]]:
+    return _fetch_rows_for_values(cur, table, run_id, ids, "search_query_id", columns)
+
+
+def _fetch_rows_for_values(
+    cur: Any,
+    table: str,
+    run_id: str,
+    values: list[str],
+    column: str,
+    columns: str,
+    *,
+    extra_where: str = "",
+) -> list[dict[str, Any]]:
+    if not values:
+        return []
+    placeholders = ", ".join(["%s"] * len(values))
+    cur.execute(
+        f"SELECT {columns} FROM `{table}` "
+        f"WHERE `run_id` = %s AND `{column}` IN ({placeholders}) {extra_where}",
+        [run_id, *values],
+    )
+    return list(cur.fetchall())
+
+
+def _decode_json(value: Any) -> Any:
+    if value is None or isinstance(value, (dict, list)):
+        return value
+    return json.loads(value)
+
+
 def _load_env_file(path_value: str | None) -> dict[str, str]:
     if not path_value:
         return {}

+ 3 - 3
tech_documents/工程落地/implementation_briefs/P3/P3C_QueryFailure_And_PartialSuccess.md

@@ -76,18 +76,18 @@ query 级状态:
 
 - 成功并有候选:`success` 或后续由规则结果推导。
 - 接口失败:`failed`。
-- 被平台配置或登录态阻断:`blocked`。
+- 被平台配置、登录态或验证码阻断:query effect 写 `failed`,失败原因写入 `query_failure` 摘要;不要输出旧 `search_query_effect_status=blocked`。
 - 空结果:新 V1 应归为 `failed`;历史 `weak_effective` 是 deprecated old status。
 
 run 级状态:
 
 - 全部关键链路成功且无 query failure:`success`。
 - 有 query failure,但至少一个 query 成功产出候选且后续链路跑完:`partial_success`。
-- 所有 query 失败或没有候选可继续:`failed` 或 `blocked`。
+- 所有 query 失败或没有候选可继续:run 级 `failed`;query 级明细写 `failed`。
 
 记录位置:
 
-- query failure 摘要:`content_agent_run_events.raw_payload`。
+- query failure 摘要:`content_agent_run_events.raw_payload`、`content_agent_search_clues.raw_payload`,并同步回写 `content_agent_queries.search_query_effect_status=failed`
 - query effect:`content_agent_queries.search_query_effect_status` 或 `content_agent_search_clues.search_query_effect_status`。
 - run 最终状态:`content_agent_runs.status`。
 - 本地兼容导出:`run_events.jsonl`、`search_clues.jsonl`。

+ 1 - 1
tech_documents/工程落地/implementation_briefs/P8/P8A_StrategyReview_Structure_And_RunMetrics.md

@@ -101,7 +101,7 @@ sub-agent 交叉验证结论:
 
 - `effective_queries`: 来自 `search_clues.jsonl` 中 `search_query_effect_status=success`。
 - `review_queries`: 来自 `pending`。
-- `failed_queries`: 来自 `failed`。
+- `failed_queries`: 来自 `failed`;平台搜索失败的 query 必须保留脱敏 `query_failure` 摘要,用于区分接口 / 账号态失败和真实空召回
 - `rule_blocked_queries`: 来自 `rule_blocked`。
 
 `rule_review`:

+ 147 - 0
tests/test_database_runtime.py

@@ -193,6 +193,45 @@ def test_database_runtime_appends_jsonl_with_raw_payload():
     assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
 
 
+def test_database_runtime_upserts_failed_search_query_status():
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.append_jsonl(
+        "run_001",
+        "search_queries.jsonl",
+        [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": "run_001",
+                "policy_run_id": "policy_run_001",
+                "search_query_id": "q_001",
+                "search_query": "接口失败",
+                "search_query_generation_method": "item_single",
+                "search_query_effect_status": "failed",
+                "raw_payload": {
+                    "run_id": "run_001",
+                    "policy_run_id": "policy_run_001",
+                    "search_query_id": "q_001",
+                    "search_query_effect_status": "failed",
+                    "query_failure": {
+                        "status": "failed",
+                        "error_code": "PLATFORM_REQUEST_FAILED",
+                    },
+                },
+            }
+        ],
+    )
+
+    sql, params = connection.statements[-1]
+    values = _insert_values(sql, params)
+    assert "INSERT INTO `content_agent_queries`" in sql
+    assert "ON DUPLICATE KEY UPDATE" in sql
+    assert values["search_query_effect_status"] == "failed"
+    payload = json.loads(values["raw_payload"])
+    assert payload["query_failure"]["error_code"] == "PLATFORM_REQUEST_FAILED"
+
+
 def test_database_runtime_preserves_llm_variant_payload_fields():
     connection = FakeConnection()
     store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
@@ -385,6 +424,86 @@ def test_database_runtime_preserves_p5_search_clue_aggregation_in_raw_payload():
     assert json.loads(values["raw_payload"])["query_aggregation_id"] == "agg_query_rule_blocked"
 
 
+def test_database_runtime_writes_failed_search_clue_and_platform_query_failed_event():
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    query_failure = {
+        "search_query_id": "q_001",
+        "search_query": "接口失败",
+        "search_query_generation_method": "item_single",
+        "status": "failed",
+        "error_code": "PLATFORM_REQUEST_FAILED",
+        "message": "platform query failed",
+    }
+    store.append_jsonl(
+        "run_001",
+        "search_clues.jsonl",
+        [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": "run_001",
+                "policy_run_id": "policy_run_001",
+                "clue_id": "clue_001",
+                "search_query_id": "q_001",
+                "search_query": "接口失败",
+                "discovery_start_source": "pattern_itemset",
+                "previous_discovery_step": "pattern_search_query",
+                "result_count": 0,
+                "pooled_content_count": 0,
+                "review_content_count": 0,
+                "pending_content_count": 0,
+                "rejected_content_count": 0,
+                "search_query_effect_status": "failed",
+                "query_aggregation_id": "platform_query_failure",
+                "walk_next_step": "stop_search_query",
+                "raw_payload": {
+                    "clue_id": "clue_001",
+                    "query_failure": query_failure,
+                },
+            }
+        ],
+    )
+    store.append_jsonl(
+        "run_001",
+        "run_events.jsonl",
+        [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": "run_001",
+                "policy_run_id": "policy_run_001",
+                "event_id": "evt_001",
+                "event_type": "platform_query_failed",
+                "status": "failed",
+                "input_ref": "search_queries.jsonl:q_001",
+                "output_ref": "search_clues.jsonl",
+                "error_code": "PLATFORM_REQUEST_FAILED",
+                "message": "platform query failed",
+                "raw_payload": {
+                    "event_id": "evt_001",
+                    "query_failure": query_failure,
+                },
+            }
+        ],
+    )
+
+    clue_sql, clue_params = connection.statements[-2]
+    clue_values = _insert_values(clue_sql, clue_params)
+    assert "INSERT INTO `content_agent_search_clues`" in clue_sql
+    assert "ON DUPLICATE KEY UPDATE" in clue_sql
+    assert clue_values["search_query_effect_status"] == "failed"
+    assert clue_values["walk_next_step"] == "stop_search_query"
+    assert json.loads(clue_values["raw_payload"])["query_failure"]["search_query_id"] == "q_001"
+
+    event_sql, event_params = connection.statements[-1]
+    event_values = _insert_values(event_sql, event_params)
+    assert "INSERT INTO `content_agent_run_events`" in event_sql
+    assert "ON DUPLICATE KEY UPDATE" in event_sql
+    assert event_values["event_type"] == "platform_query_failed"
+    assert event_values["status"] == "failed"
+    assert event_values["error_code"] == "PLATFORM_REQUEST_FAILED"
+
+
 def test_database_runtime_writes_publish_jobs_db_only_records():
     connection = FakeConnection()
     store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
@@ -658,6 +777,34 @@ def test_database_runtime_update_run_record_ignores_empty_sanitized_updates():
     assert connection.statements == []
 
 
+def test_database_runtime_update_run_record_persists_platform_failure_detail():
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.update_run_record(
+        "run_001",
+        {
+            "status": "failed",
+            "error_code": "PLATFORM_REQUEST_FAILED",
+            "error_detail": {
+                "query_failures": [
+                    {
+                        "search_query_id": "q_001",
+                        "status": "failed",
+                        "error_code": "PLATFORM_REQUEST_FAILED",
+                    }
+                ]
+            },
+        },
+    )
+
+    sql, params = connection.statements[-1]
+    assert "UPDATE `content_agent_runs` SET" in sql
+    assert "`error_detail` = %s" in sql
+    assert json.loads(params[2])["query_failures"][0]["search_query_id"] == "q_001"
+    assert params[-1] == "run_001"
+
+
 def test_business_modules_do_not_import_or_name_database_tables():
     root = Path("content_agent/business_modules")
     text = "\n".join(path.read_text(encoding="utf-8") for path in root.rglob("*.py"))

+ 45 - 0
tests/test_p0d_p0g.py

@@ -112,6 +112,46 @@ def test_run_service_partial_platform_failure_records_partial_success(tmp_path):
     assert failed_clue["search_query_effect_status"] == "failed"
 
 
+def test_run_service_all_platform_queries_fail_records_failed_query_details(tmp_path):
+    runtime = _SpyRuntimeStore(tmp_path / "runtime")
+    demand_source = FakeDemandSource(real_source_payload(demand_content_id=123))
+    service = RunService(
+        runtime=runtime,
+        demand_source=demand_source,
+        query_variant_client=FakeQueryVariantClient(),
+    )
+    service._platform_client = lambda platform, platform_mode: _AllFailurePlatformClient()
+
+    state = service.start_run(RunStartRequest(platform_mode="real"))
+
+    assert state["status"] == "failed"
+    assert state["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
+    failed_query_ids = [failure["search_query_id"] for failure in state["error_detail"]["query_failures"]]
+    assert failed_query_ids == ["q_001", "q_002", "q_003", "q_004"]
+    assert runtime.run_updates[-1]["updates"]["status"] == "failed"
+    assert runtime.run_updates[-1]["updates"]["error_detail"]["query_failures"]
+    assert runtime.lifecycle_events[-1]["event_id"] == "lifecycle_failed"
+    assert runtime.lifecycle_events[-1]["raw_payload"]["error_detail"]["query_failures"]
+
+    search_queries = service.read_jsonl(state["run_id"], "search_queries.jsonl")
+    assert {query["search_query_effect_status"] for query in search_queries} == {"failed"}
+    assert all(query["raw_payload"]["query_failure"]["status"] == "failed" for query in search_queries)
+
+    search_clues = service.read_jsonl(state["run_id"], "search_clues.jsonl")
+    assert [clue["search_query_id"] for clue in search_clues] == failed_query_ids
+    assert {clue["search_query_effect_status"] for clue in search_clues} == {"failed"}
+    assert {clue["walk_next_step"] for clue in search_clues} == {"stop_search_query"}
+
+    run_events = service.read_jsonl(state["run_id"], "run_events.jsonl")
+    platform_failures = [
+        event for event in run_events if event["event_type"] == "platform_query_failed"
+    ]
+    assert [event["input_ref"] for event in platform_failures] == [
+        f"search_queries.jsonl:{query_id}" for query_id in failed_query_ids
+    ]
+    assert {event["status"] for event in platform_failures} == {"failed"}
+
+
 def test_run_service_query_generation_failure_records_error_code(tmp_path):
     runtime = _SpyRuntimeStore(tmp_path / "runtime")
     demand_source = FakeDemandSource(real_source_payload(demand_content_id=123))
@@ -452,6 +492,11 @@ class _PartialFailurePlatformClient:
         return self.mock.search(search_query)
 
 
+class _AllFailurePlatformClient:
+    def search(self, search_query: dict[str, Any]) -> list[dict[str, Any]]:
+        raise RuntimeError("platform unavailable")
+
+
 class _DemandCursor:
     def __init__(self, connection: "_DemandConnection") -> None:
         self.connection = connection

+ 14 - 1
tests/test_platform_access.py

@@ -123,6 +123,11 @@ def test_platform_access_fails_run_when_all_queries_fail():
             "search_query_id": "q_001",
             "search_query": "接口失败",
             "search_query_generation_method": "item_single",
+        },
+        {
+            "search_query_id": "q_002",
+            "search_query": "仍然失败",
+            "search_query_generation_method": "llm_variant",
         }
     ]
 
@@ -130,6 +135,14 @@ def test_platform_access_fails_run_when_all_queries_fail():
         platform_access.run(search_queries, AlwaysFailingClient())
     except ContentAgentError as exc:
         assert exc.error_code == ErrorCode.PLATFORM_REQUEST_FAILED
-        assert exc.detail["query_failures"][0]["search_query_id"] == "q_001"
+        assert [failure["search_query_id"] for failure in exc.detail["query_failures"]] == [
+            "q_001",
+            "q_002",
+        ]
+        for failure in exc.detail["query_failures"]:
+            assert failure["status"] == "failed"
+            assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
+            assert failure["message"] == "platform query failed"
+            assert failure["error_detail"]["exception_type"] == "RuntimeError"
     else:
         raise AssertionError("expected platform request failure")

+ 37 - 0
tests/test_runtime_files.py

@@ -114,6 +114,38 @@ def test_runtime_files_are_parseable_and_consistent(tmp_path):
     assert validation["status"] == "pass"
 
 
+def test_all_platform_query_failure_writes_failed_query_runtime_records(tmp_path):
+    service = RunService(
+        runtime_root=tmp_path / "runtime" / "v1",
+        query_variant_client=FakeQueryVariantClient(),
+    )
+    service._platform_client = lambda platform, platform_mode: _AllFailurePlatformClient()
+
+    state = service.start_run(
+        RunStartRequest(platform_mode="real", source=str(REAL_SOURCE_FIXTURE))
+    )
+
+    assert state["status"] == "failed"
+    assert state["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
+    search_queries = service.read_jsonl(state["run_id"], "search_queries.jsonl")
+    search_clues = service.read_jsonl(state["run_id"], "search_clues.jsonl")
+    run_events = service.read_jsonl(state["run_id"], "run_events.jsonl")
+
+    assert len(search_queries) == len(state["error_detail"]["query_failures"])
+    assert {query["search_query_effect_status"] for query in search_queries} == {"failed"}
+    assert all(query["raw_payload"]["query_failure"]["status"] == "failed" for query in search_queries)
+    assert {clue["search_query_effect_status"] for clue in search_clues} == {"failed"}
+    assert {clue["result_count"] for clue in search_clues} == {0}
+    assert {clue["query_aggregation_id"] for clue in search_clues} == {"platform_query_failure"}
+    platform_failures = [
+        event for event in run_events if event["event_type"] == "platform_query_failed"
+    ]
+    assert len(platform_failures) == len(search_queries)
+    assert {event["error_code"] for event in platform_failures} == {
+        ErrorCode.PLATFORM_REQUEST_FAILED.value
+    }
+
+
 def test_runtime_validation_catches_summary_drift(tmp_path):
     service, run_id = _start_mock_run(tmp_path)
 
@@ -213,6 +245,11 @@ def test_runtime_validation_catches_missing_record_schema_version(tmp_path):
     )
 
 
+class _AllFailurePlatformClient:
+    def search(self, search_query: dict) -> list[dict]:
+        raise RuntimeError("platform unavailable")
+
+
 def test_runtime_validation_catches_missing_raw_payload(tmp_path):
     service, run_id = _start_mock_run(tmp_path)