Bläddra i källkod

feat(v3-m2): Gemini 判定引擎替换 decode+分类树

- M2A video_fetch.py:下载(平台头)+imageio-ffmpeg压4MB+base64 data URL+30MB护栏
- M2B gemini_video.py GeminiVideoClient(复用OpenRouter骨架,多模态image_url,输出 fit_senior_50plus/fit_confidence/relevance_score/reason,失败一律降级不抛)+from_env+MissingGeminiVideoClient
- M2C recall_decision 改 Gemini 直读:判定字段写进 pattern_match_result+镜像 content_audience_profile+M2→M3桥接键(pattern_recall/category_or_element_binding="matched");整删 decode.py/category_match.py/decode_api.py/integrations.category_match;删 DecodeClient/CategoryMatchClient;validation 去掉分类树字段强制(Gemini路径)
- M2D 注入:graph RunDependencies/run_service from_env+__init__+start_run 全换 gemini_video_client(消费M0缝)+_DeterministicGeminiVideoClient;walk_engine 连带改(去decode/category/max_wait/poll/event_sink);删 recorder.decode_event_row+graph._decode_event_sink;env 注释decode块+新增Gemini模型;依赖+imageio-ffmpeg
- 三原则守住:测试全mock(无真下载/ffmpeg/Gemini)、不改DB schema/不写M3规则、删被替换物连带死代码不留残骸、复用OpenRouter骨架
- 基线 330→306 passed(删39 decode测试+增15);real_id45 回放零回归+validation pass;验收岗7/7

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sam Lee 2 dagar sedan
förälder
incheckning
fdc7be9ff0

+ 3 - 0
.env

@@ -35,6 +35,9 @@ AGENT_DEBUG=
 OPEN_ROUTER_API_KEY=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
 OPENROUTER_API_KEY=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
 OPENROUTER_BASE_URL=https://openrouter.ai/api/v1
+# 内容判定:Gemini 直读视频(V3-M2),复用上面 OPENROUTER_API_KEY/BASE_URL
+CONTENT_AGENT_VIDEO_LLM_MODEL=google/gemini-3-flash-preview
+CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS=90
 
 QWEN_API_KEY=
 QWEN_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1

+ 18 - 14
.env.example

@@ -48,20 +48,24 @@ CONTENTFIND_DOUYIN_DEFAULT_CURSOR=0
 CONTENTFIND_DOUYIN_ACCOUNT_WORKS_DEFAULT_SORT_TYPE=最新
 CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY=3
 
-# Pattern recall / decode / category tree
-CONTENTFIND_API_AIGC_BASE_URL=https://aigc-api.aiddit.com
-CONTENTFIND_API_AIGC_TOKEN=<fill-if-enabled>
-CONTENTFIND_AIGC_DECODE_SUBMIT_PATH=/aigc/api/task/decode
-CONTENTFIND_AIGC_DECODE_RESULT_PATH=/aigc/api/task/decode/result
-CONTENTFIND_AIGC_DECODE_CONFIG_ID=58
-CONTENTFIND_PATTERN_RECALL_MAX_WAIT_SECONDS=1200
-# 短等待档示例(live smoke / 调试用,默认仍 1200 不变): CONTENTFIND_PATTERN_RECALL_MAX_WAIT_SECONDS=300
-CONTENTFIND_PATTERN_RECALL_POLL_INTERVAL_SECONDS=5
-CONTENTFIND_CATEGORY_MATCH_BASE_URL=https://library.aiddit.com
-CONTENTFIND_CATEGORY_MATCH_PATH=/api/search/categories/match-paths/v2
-CONTENTFIND_CATEGORY_MATCH_SOURCE_TYPE=实质
-CONTENTFIND_CATEGORY_MATCH_TOP_K=10
-CONTENTFIND_CATEGORY_MATCH_MIN_SCORE=0.6
+# 内容判定:Gemini 直读视频(V3-M2)
+CONTENT_AGENT_VIDEO_LLM_MODEL=google/gemini-3-flash-preview
+CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS=90
+# 复用 OPENROUTER_API_KEY / OPENROUTER_BASE_URL(见上方 query LLM 段)
+
+# 已废弃(V3 起 decode/分类树整链被 Gemini 直读替换,保留备查,勿启用)
+# CONTENTFIND_API_AIGC_BASE_URL=https://aigc-api.aiddit.com
+# CONTENTFIND_API_AIGC_TOKEN=<fill-if-enabled>
+# CONTENTFIND_AIGC_DECODE_SUBMIT_PATH=/aigc/api/task/decode
+# CONTENTFIND_AIGC_DECODE_RESULT_PATH=/aigc/api/task/decode/result
+# CONTENTFIND_AIGC_DECODE_CONFIG_ID=58
+# CONTENTFIND_PATTERN_RECALL_MAX_WAIT_SECONDS=1200
+# CONTENTFIND_PATTERN_RECALL_POLL_INTERVAL_SECONDS=5
+# CONTENTFIND_CATEGORY_MATCH_BASE_URL=https://library.aiddit.com
+# CONTENTFIND_CATEGORY_MATCH_PATH=/api/search/categories/match-paths/v2
+# CONTENTFIND_CATEGORY_MATCH_SOURCE_TYPE=实质
+# CONTENTFIND_CATEGORY_MATCH_TOP_K=10
+# CONTENTFIND_CATEGORY_MATCH_MIN_SCORE=0.6
 
 # TikHub fallback
 TIKHUB_API_KEY=<fill-if-enabled>

+ 0 - 131
content_agent/business_modules/content_discovery/pattern_recall/category_match.py

@@ -1,131 +0,0 @@
-from __future__ import annotations
-
-from typing import Any
-
-from content_agent.interfaces import CategoryMatchClient
-
-
-GENERIC_TERMS = {"案例", "提示", "标签", "标题", "结构", "语气", "官方来源", "视频", "内容"}
-
-
-def match_decode_terms(
-    *,
-    decode_elements: dict[str, Any],
-    category_match_client: CategoryMatchClient,
-) -> dict[str, Any]:
-    terms = [
-        term
-        for term in decode_elements.get("strong_terms", [])
-        if _is_usable_strong_term(term)
-    ]
-    items = [{"term": term, "description": "解构强证据词"} for term in terms]
-    if not items:
-        return {
-            "request": {"items": []},
-            "response": {},
-            "matched_terms": [],
-            "matched_category_paths": [],
-            "path_matches": [],
-        }
-    result = category_match_client.match_paths(items)
-    path_matches = _extract_path_matches(result.get("raw_response") or result.get("response") or {})
-    matched_terms = [
-        match["term"]
-        for match in path_matches
-        if _is_usable_strong_term(match.get("term"))
-    ]
-    matched_category_paths = [
-        match["category_path"]
-        for match in path_matches
-        if match.get("category_path")
-    ]
-    return {
-        "request": result.get("request") or {"items": items},
-        "response": result.get("response") or {},
-        "matched_terms": list(dict.fromkeys(matched_terms)),
-        "matched_category_paths": list(dict.fromkeys(matched_category_paths)),
-        "path_matches": path_matches,
-    }
-
-
-def _extract_path_matches(response: Any) -> list[dict[str, Any]]:
-    rows = _candidate_rows(response)
-    matches: list[dict[str, Any]] = []
-    seen: set[tuple[str, str]] = set()
-
-    def _append(term: Any, category_path: Any, score: Any, raw: Any) -> None:
-        normalized = _normalize_category_path(category_path)
-        if not normalized:
-            return
-        key = (str(term or "").strip(), normalized)
-        if key in seen:
-            return
-        seen.add(key)
-        matches.append({"term": key[0], "category_path": normalized, "score": score, "raw": raw})
-
-    for row in rows:
-        if not isinstance(row, dict):
-            continue
-        term = row.get("term") or row.get("query") or row.get("source_term") or row.get("item")
-        # 同一 item 可能同时携带多种来源(如 v2 的 matches 与 matched_paths),逐来源累加,不取首个非空。
-        paths = [
-            path
-            for key in ("paths", "matched_paths", "matches", "categories", "results")
-            for path in _as_list(row.get(key))
-        ]
-        if not paths and (row.get("category_path") or row.get("path")):
-            paths = [row]
-        for path in paths:
-            if isinstance(path, dict):
-                _append(
-                    path.get("term") or term,
-                    path.get("category_path")
-                    or path.get("path")
-                    or path.get("full_path")
-                    or path.get("categoryPath"),
-                    path.get("score"),
-                    path,
-                )
-            elif isinstance(path, (str, list)):
-                _append(term, path, row.get("score"), path)
-    return matches
-
-
-def _normalize_category_path(value: Any) -> str:
-    if isinstance(value, list):
-        parts = [str(part).strip() for part in value if str(part).strip()]
-        return "/" + "/".join(parts) if parts else ""
-    if value is None:
-        return ""
-    return str(value).strip()
-
-
-def _candidate_rows(response: Any) -> list[Any]:
-    if isinstance(response, list):
-        return response
-    if not isinstance(response, dict):
-        return []
-    for key in ["data", "results", "items", "matches"]:
-        value = response.get(key)
-        if isinstance(value, list):
-            return value
-        if isinstance(value, dict):
-            nested = _candidate_rows(value)
-            if nested:
-                return nested
-    return [response]
-
-
-def _is_usable_strong_term(term: Any) -> bool:
-    if term is None:
-        return False
-    value = str(term).strip()
-    return bool(value) and value not in GENERIC_TERMS
-
-
-def _as_list(value: Any) -> list[Any]:
-    if value is None:
-        return []
-    if isinstance(value, list):
-        return value
-    return [value]

+ 0 - 295
content_agent/business_modules/content_discovery/pattern_recall/decode.py

@@ -1,295 +0,0 @@
-from __future__ import annotations
-
-import json
-import time
-from typing import Any, Callable
-
-from content_agent.interfaces import DecodeClient
-
-
-DECODE_STATUSES = {"pending", "running", "success", "failed"}
-
-
-def decode_content(
-    *,
-    content: dict[str, Any],
-    media: dict[str, Any],
-    source_context: dict[str, Any],
-    decode_client: DecodeClient,
-    max_wait_seconds: float,
-    poll_interval_seconds: float,
-    now_fn: Callable[[], float] | None = None,
-    sleep_fn: Callable[[float], None] | None = None,
-    event_sink: Callable[[dict[str, Any]], None] | None = None,
-) -> dict[str, Any]:
-    now = now_fn or time.monotonic
-    sleep = sleep_fn or time.sleep
-    platform_content_id = str(content.get("platform_content_id") or "")
-    started_monotonic = now()
-    attempt = 1
-    decode_task_id: str | None = None
-
-    def _emit(
-        event_type: str,
-        decode_status: str,
-        *,
-        wait_seconds: float = 0.0,
-        failure_reason: str | None = None,
-    ) -> None:
-        if event_sink is None:
-            return
-        try:
-            event_sink(
-                {
-                    "event_type": event_type,
-                    "platform_content_id": platform_content_id,
-                    "decode_task_id": decode_task_id,
-                    "decode_status": decode_status,
-                    "attempt": attempt,
-                    "wait_seconds": wait_seconds,
-                    "elapsed_ms": int((now() - started_monotonic) * 1000),
-                    "failure_reason": failure_reason,
-                }
-            )
-        except Exception:
-            # sink 异常必须吞掉,不改变 decode 业务结果。
-            pass
-
-    try:
-        submitted = decode_client.submit_decode(content, media, source_context)
-    except Exception as exc:
-        failed = _client_failure("submit_decode", exc)
-        _emit("decode_failed", "failed", failure_reason="decode_client_error")
-        failed["decode_poll_attempts"] = attempt
-        return failed
-    decode_task_id = submitted.get("decode_task_id") or _extract_decode_task_id(submitted)
-    current = submitted
-    status = normalize_decode_status(current)
-    deadline = now() + max_wait_seconds
-    _emit("decode_submitted", status)
-
-    while decode_task_id and status in {"pending", "running"} and now() < deadline:
-        wait_seconds = 0.0
-        if poll_interval_seconds > 0:
-            wait_seconds = min(poll_interval_seconds, max(0.0, deadline - now()))
-            sleep(wait_seconds)
-        attempt += 1
-        try:
-            current = decode_client.get_decode_result(str(decode_task_id))
-        except Exception as exc:
-            failed = _client_failure("get_decode_result", exc)
-            failed["decode_task_id"] = decode_task_id
-            _emit("decode_failed", "failed", wait_seconds=wait_seconds, failure_reason="decode_client_error")
-            failed["decode_poll_attempts"] = attempt
-            return failed
-        status = normalize_decode_status(current)
-        _emit("decode_polling", status, wait_seconds=wait_seconds)
-
-    if status in {"pending", "running"} and now() >= deadline:
-        _emit("decode_timeout", status, failure_reason="decode_timeout_20m")
-        return {
-            "decode_status": status,
-            "decode_task_id": decode_task_id,
-            "pending_reason": "decode_timeout_20m",
-            "decode_elements": {},
-            "raw_request": current.get("request") or submitted.get("request"),
-            "raw_response": current.get("response") or submitted.get("response"),
-            "decode_poll_attempts": attempt,
-        }
-
-    data_content = _extract_data_content(current)
-    decode_elements = extract_decode_elements(data_content)
-    if status == "success" and not isinstance(data_content, dict):
-        status = "failed"
-        failure_reason = "decode_result_bad_shape"
-    elif status == "success" and not decode_elements.get("strong_terms"):
-        status = "failed"
-        failure_reason = "decode_content_missing_strong_terms"
-    else:
-        failure_reason = None
-
-    if status == "success":
-        _emit("decode_succeeded", "success")
-    else:
-        _emit("decode_failed", status, failure_reason=failure_reason)
-
-    return {
-        "decode_status": status,
-        "decode_task_id": decode_task_id,
-        "failure_reason": failure_reason,
-        "decode_elements": decode_elements,
-        "raw_request": current.get("request") or submitted.get("request"),
-        "raw_response": current.get("response") or submitted.get("response"),
-        "decode_poll_attempts": attempt,
-    }
-
-
-def _client_failure(operation: str, exc: Exception) -> dict[str, Any]:
-    return {
-        "decode_status": "failed",
-        "decode_task_id": None,
-        "failure_reason": "decode_client_error",
-        "decode_elements": {},
-        "raw_request": {"operation": operation},
-        "raw_response": {
-            "operation": operation,
-            "error_type": type(exc).__name__,
-        },
-    }
-
-
-def normalize_decode_status(payload: dict[str, Any]) -> str:
-    raw_status = _find_first(
-        payload,
-        ["decode_status", "status", "state", "taskStatus", "task_status"],
-    )
-    if raw_status is None:
-        raw_status = _find_first(payload.get("raw_response") or {}, ["status", "taskStatus", "state"])
-    if raw_status is None:
-        raw_status = _find_first_data_row(
-            payload.get("raw_response") or {}, ["status", "taskStatus", "state"]
-        )
-    value = str(raw_status or "pending").strip().lower()
-    aliases = {
-        "success": "success",
-        "succeeded": "success",
-        "complete": "success",
-        "completed": "success",
-        "done": "success",
-        "running": "running",
-        "processing": "running",
-        "pending": "pending",
-        "waiting": "pending",
-        "failed": "failed",
-        "fail": "failed",
-        "error": "failed",
-    }
-    return aliases.get(value, "pending")
-
-
-def extract_decode_elements(data_content: Any) -> dict[str, Any]:
-    if not isinstance(data_content, dict):
-        return {"strong_terms": [], "auxiliary_terms": []}
-    strong_terms: list[str] = []
-    auxiliary_terms: list[str] = []
-    for point in _as_list(data_content.get("目的点")):
-        if isinstance(point, dict):
-            _append_term(strong_terms, point.get("点"))
-            _append_substance_names(strong_terms, point.get("实质"))
-    for point in _as_list(data_content.get("关键点")):
-        if isinstance(point, dict):
-            is_substance = str(point.get("类型") or point.get("type") or point.get("source_type") or "")
-            if "实质" in is_substance:
-                _append_term(strong_terms, point.get("点"))
-            else:
-                _append_term(auxiliary_terms, point.get("点"))
-            _append_substance_names(strong_terms, point.get("实质"))
-    for token in _as_list(data_content.get("分词结果")):
-        if isinstance(token, dict):
-            _append_term(auxiliary_terms, token.get("词") or token.get("term"))
-        else:
-            _append_term(auxiliary_terms, token)
-    for token in _as_list(data_content.get("contribution_results")):
-        if isinstance(token, dict):
-            _append_term(auxiliary_terms, token.get("词") or token.get("term"))
-    return {
-        "strong_terms": _dedupe(strong_terms),
-        "auxiliary_terms": _dedupe(auxiliary_terms),
-        "raw_data_content": data_content,
-    }
-
-
-def _extract_data_content(payload: dict[str, Any]) -> Any:
-    raw = payload.get("raw_response") if isinstance(payload, dict) else None
-    candidates = [
-        payload.get("dataContent"),
-        payload.get("data_content"),
-        payload.get("data", {}).get("dataContent") if isinstance(payload.get("data"), dict) else None,
-        payload.get("data", {}).get("data_content") if isinstance(payload.get("data"), dict) else None,
-        raw.get("dataContent") if isinstance(raw, dict) else None,
-        raw.get("data", {}).get("dataContent") if isinstance(raw, dict) and isinstance(raw.get("data"), dict) else None,
-        _find_first_data_row(raw, ["dataContent", "data_content"]) if isinstance(raw, dict) else None,
-    ]
-    for candidate in candidates:
-        if candidate is None:
-            continue
-        if isinstance(candidate, str):
-            try:
-                return json.loads(candidate)
-            except json.JSONDecodeError:
-                return candidate
-        return candidate
-    return None
-
-
-def _extract_decode_task_id(payload: dict[str, Any]) -> str | None:
-    value = _find_first(payload, ["decode_task_id", "taskId", "task_id"])
-    if value:
-        return str(value)
-    raw = payload.get("raw_response") if isinstance(payload, dict) else None
-    if isinstance(raw, dict):
-        value = _find_first(raw, ["taskId", "task_id"])
-        if value:
-            return str(value)
-    return None
-
-
-def _find_first(payload: Any, keys: list[str]) -> Any:
-    if not isinstance(payload, dict):
-        return None
-    for key in keys:
-        if key in payload:
-            return payload[key]
-    data = payload.get("data")
-    if isinstance(data, dict):
-        for key in keys:
-            if key in data:
-                return data[key]
-    return None
-
-
-def _find_first_data_row(payload: Any, keys: list[str]) -> Any:
-    if not isinstance(payload, dict):
-        return None
-    data = payload.get("data")
-    if not isinstance(data, list):
-        return None
-    for item in data:
-        if not isinstance(item, dict):
-            continue
-        for key in keys:
-            if key in item:
-                return item[key]
-    return None
-
-
-def _append_substance_names(target: list[str], value: Any) -> None:
-    if isinstance(value, dict):
-        for child in value.values():
-            _append_substance_names(target, child)
-        return
-    for item in _as_list(value):
-        if isinstance(item, dict):
-            _append_term(target, item.get("名称") or item.get("name") or item.get("点"))
-        else:
-            _append_term(target, item)
-
-
-def _append_term(target: list[str], value: Any) -> None:
-    if value is None:
-        return
-    term = str(value).strip()
-    if term:
-        target.append(term)
-
-
-def _as_list(value: Any) -> list[Any]:
-    if value is None:
-        return []
-    if isinstance(value, list):
-        return value
-    return [value]
-
-
-def _dedupe(values: list[str]) -> list[str]:
-    return list(dict.fromkeys(values))

+ 72 - 383
content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py

@@ -1,21 +1,19 @@
+"""内容判定(V3-M2C):Gemini 直读视频,产出判定字段写进 pattern_match_result。
+
+替换原 decode 异步解构 + 分类树匹配。每条内容调一次 gemini_video_client.analyze,
+把 4 个判定字段(fit_senior_50plus / fit_confidence / relevance_score / reason)写进
+discovered item 的 pattern_match_result,并镜像 fit_senior_50plus 进 content_audience_profile。
+pattern_recall / category_or_element_binding 设为 "matched" 桥接键,仅为 M2→M3 过渡
+(让未重写的旧 hard_gate 不误拒);M3 删除旧门槛后移除这两个桥接键。
+"""
+
 from __future__ import annotations
 
-import time
 from datetime import datetime, timezone
-from typing import Any, Callable
+from typing import Any
 
-from content_agent.business_modules.content_discovery.pattern_recall.category_match import (
-    match_decode_terms,
-)
-from content_agent.business_modules.content_discovery.pattern_recall.decode import (
-    _extract_data_content,
-    decode_content,
-    extract_decode_elements,
-    normalize_decode_status,
-)
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
-from content_agent.integrations.decode_api import redact_sensitive_payload
-from content_agent.interfaces import CategoryMatchClient, DecodeClient, RuntimeFileStore
+from content_agent.interfaces import GeminiVideoClient, RuntimeFileStore
 
 
 def run(
@@ -25,81 +23,29 @@ def run(
     content_media_records: list[dict[str, Any]],
     evidence_bundles: list[dict[str, Any]],
     source_context: dict[str, Any],
-    pattern_seed_pack: dict[str, Any],
     runtime: RuntimeFileStore,
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
-    max_wait_seconds: float = 1200.0,
-    poll_interval_seconds: float = 5.0,
-    now_fn: Callable[[], float] | None = None,
-    sleep_fn: Callable[[float], None] | None = None,
+    gemini_video_client: GeminiVideoClient,
     start_index: int = 1,
-    event_sink: Callable[[dict[str, Any]], None] | None = None,
 ) -> dict[str, Any]:
     created_at = datetime.now(timezone.utc).isoformat()
+    media_by_content_id = {
+        row["platform_content_id"]: row for row in content_media_records
+    }
     evidence_rows: list[dict[str, Any]] = []
     updated_items: list[dict[str, Any]] = []
     updated_bundles: list[dict[str, Any]] = []
-
-    media_by_content_id = {
-        row["platform_content_id"]: row
-        for row in content_media_records
-    }
-    decisions: list[dict[str, Any]] = []
-    for index, item in enumerate(discovered_content_items, start=start_index):
+    for offset, item in enumerate(discovered_content_items):
         media = media_by_content_id.get(item["platform_content_id"], {})
-        decisions.append(
-            _recall_one(
-                index=index,
-                content=item,
-                media=media,
-                source_context=source_context,
-                pattern_seed_pack=pattern_seed_pack,
-                decode_client=decode_client,
-                category_match_client=category_match_client,
-                max_wait_seconds=max_wait_seconds,
-                poll_interval_seconds=poll_interval_seconds,
-                now_fn=now_fn,
-                sleep_fn=sleep_fn,
-                event_sink=event_sink,
-            )
-        )
-
-    # M6B 最小补跑(06 已拍板): 主循环结束后对仍 pending/running 的 decode 同步补跑一轮,
-    # 每条只再调一次 get_decode_result,不重新 submit、不新增等待循环、不起后台线程。
-    for offset, decision in enumerate(decisions):
-        decode_result = decision["decode_result"]
-        if decode_result.get("decode_status") not in {"pending", "running"}:
-            continue
-        if not decode_result.get("decode_task_id"):
-            continue
-        decisions[offset] = _rerun_pending_decode(
-            content=discovered_content_items[offset],
-            decision=decision,
-            decode_client=decode_client,
-            category_match_client=category_match_client,
-            source_context=source_context,
-            pattern_seed_pack=pattern_seed_pack,
-            now_fn=now_fn,
-            event_sink=event_sink,
+        recall_evidence_id = f"recall_{start_index + offset:03d}"
+        judgment = gemini_video_client.analyze(item, media, source_context)
+        pattern_match_result = _build_pattern_match_result(judgment, recall_evidence_id)
+        updated_items.append(_update_discovered_item(item, pattern_match_result))
+        updated_bundles.append(
+            _update_evidence_bundle(evidence_bundles[offset], pattern_match_result)
         )
-
-    for offset, item in enumerate(discovered_content_items):
-        bundle = evidence_bundles[offset]
-        decision = decisions[offset]
-        pattern_match_result = decision["pattern_match_result"]
-        evidence_row = _build_evidence_row(
-            run_id=run_id,
-            policy_run_id=policy_run_id,
-            content=item,
-            decision=decision,
-            created_at=created_at,
+        evidence_rows.append(
+            _build_evidence_row(run_id, policy_run_id, item, recall_evidence_id, judgment, created_at)
         )
-        updated_item = _update_discovered_item(item, pattern_match_result)
-        updated_bundle = _update_evidence_bundle(bundle, pattern_match_result)
-        evidence_rows.append(evidence_row)
-        updated_items.append(updated_item)
-        updated_bundles.append(updated_bundle)
 
     runtime.append_jsonl(run_id, "pattern_recall_evidence.jsonl", evidence_rows)
     runtime.append_jsonl(run_id, "discovered_content_items.jsonl", updated_items)
@@ -110,268 +56,17 @@ def run(
     }
 
 
-def _recall_one(
-    *,
-    index: int,
-    content: dict[str, Any],
-    media: dict[str, Any],
-    source_context: dict[str, Any],
-    pattern_seed_pack: dict[str, Any],
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
-    max_wait_seconds: float,
-    poll_interval_seconds: float,
-    now_fn: Callable[[], float] | None,
-    sleep_fn: Callable[[float], None] | None,
-    event_sink: Callable[[dict[str, Any]], None] | None = None,
-) -> dict[str, Any]:
-    recall_evidence_id = f"recall_{index:03d}"
-    decode_result = decode_content(
-        content=content,
-        media=media,
-        source_context=source_context,
-        decode_client=decode_client,
-        max_wait_seconds=max_wait_seconds,
-        poll_interval_seconds=poll_interval_seconds,
-        now_fn=now_fn,
-        sleep_fn=sleep_fn,
-        event_sink=event_sink,
-    )
-    decode_status = decode_result["decode_status"]
-    if decode_status in {"pending", "running"}:
-        return _decision(
-            recall_evidence_id=recall_evidence_id,
-            decode_result=decode_result,
-            recall_status="pending",
-            pattern_recall="pattern_recall_pending",
-            category_or_element_binding="pattern_recall_pending",
-            match_result={},
-        )
-    if decode_status == "failed":
-        is_client_failure = decode_result.get("failure_reason") == "decode_client_error"
-        return _decision(
-            recall_evidence_id=recall_evidence_id,
-            decode_result=decode_result,
-            recall_status="failed" if is_client_failure else "rejected",
-            pattern_recall="pattern_recall_failed" if is_client_failure else "pattern_recall_rejected",
-            category_or_element_binding=(
-                "pattern_recall_failed" if is_client_failure else "pattern_recall_rejected"
-            ),
-            match_result={},
-        )
-
-    return _match_and_decide(
-        recall_evidence_id=recall_evidence_id,
-        decode_result=decode_result,
-        category_match_client=category_match_client,
-        source_context=source_context,
-        pattern_seed_pack=pattern_seed_pack,
-    )
-
-
-def _match_and_decide(
-    *,
-    recall_evidence_id: str,
-    decode_result: dict[str, Any],
-    category_match_client: CategoryMatchClient,
-    source_context: dict[str, Any],
-    pattern_seed_pack: dict[str, Any],
-) -> dict[str, Any]:
-    try:
-        match_result = match_decode_terms(
-            decode_elements=decode_result.get("decode_elements") or {},
-            category_match_client=category_match_client,
-        )
-    except Exception as exc:
-        match_result = _match_client_failure(exc)
-        return _decision(
-            recall_evidence_id=recall_evidence_id,
-            decode_result=decode_result,
-            recall_status="failed",
-            pattern_recall="pattern_recall_failed",
-            category_or_element_binding="pattern_recall_failed",
-            match_result=match_result,
-        )
-    matched_terms = match_result.get("matched_terms") or []
-    matched_paths = match_result.get("matched_category_paths") or []
-    if not matched_terms or not matched_paths:
-        return _decision(
-            recall_evidence_id=recall_evidence_id,
-            decode_result=decode_result,
-            recall_status="no_match",
-            pattern_recall="pattern_recall_no_match",
-            category_or_element_binding="pattern_recall_no_match",
-            match_result=match_result,
-        )
-    if not _can_explain_pattern(matched_terms, matched_paths, source_context, pattern_seed_pack):
-        return _decision(
-            recall_evidence_id=recall_evidence_id,
-            decode_result=decode_result,
-            recall_status="no_match",
-            pattern_recall="pattern_recall_no_match",
-            category_or_element_binding="pattern_recall_no_match",
-            match_result=match_result,
-        )
-    return _decision(
-        recall_evidence_id=recall_evidence_id,
-        decode_result=decode_result,
-        recall_status="matched",
-        pattern_recall="matched",
-        category_or_element_binding="tree_walk_match",
-        match_result=match_result,
-    )
-
-
-def _rerun_pending_decode(
-    *,
-    content: dict[str, Any],
-    decision: dict[str, Any],
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
-    source_context: dict[str, Any],
-    pattern_seed_pack: dict[str, Any],
-    now_fn: Callable[[], float] | None,
-    event_sink: Callable[[dict[str, Any]], None] | None,
-) -> dict[str, Any]:
-    old_decode_result = decision["decode_result"]
-    decode_task_id = str(old_decode_result["decode_task_id"])
-    attempt = int(old_decode_result.get("decode_poll_attempts") or 1) + 1
-    now = now_fn or time.monotonic
-    started_monotonic = now()
-
-    def _emit(event_type: str, decode_status: str, failure_reason: str | None = None) -> None:
-        if event_sink is None:
-            return
-        try:
-            event_sink(
-                {
-                    "event_type": event_type,
-                    "platform_content_id": str(content.get("platform_content_id") or ""),
-                    "decode_task_id": decode_task_id,
-                    "decode_status": decode_status,
-                    "attempt": attempt,
-                    "wait_seconds": 0.0,
-                    "elapsed_ms": int((now() - started_monotonic) * 1000),
-                    "failure_reason": failure_reason,
-                }
-            )
-        except Exception:
-            pass
-
-    try:
-        current = decode_client.get_decode_result(decode_task_id)
-    except Exception:
-        _emit("decode_timeout", str(old_decode_result.get("decode_status")), "decode_timeout_20m")
-        return decision
-    status = normalize_decode_status(current)
-    data_content = _extract_data_content(current)
-    decode_elements = extract_decode_elements(data_content)
-    # 只有真正可用的成功结果才更新 evidence;其余一律保持原 pending 决策,不改语义。
-    if status != "success" or not isinstance(data_content, dict) or not decode_elements.get("strong_terms"):
-        _emit("decode_timeout", status, "decode_timeout_20m")
-        return decision
-    _emit("decode_succeeded", "success")
-    decode_result = {
-        "decode_status": "success",
-        "decode_task_id": decode_task_id,
-        "failure_reason": None,
-        "decode_elements": decode_elements,
-        "raw_request": current.get("request") or old_decode_result.get("raw_request"),
-        "raw_response": current.get("response") or old_decode_result.get("raw_response"),
-        "decode_poll_attempts": attempt,
-    }
-    return _match_and_decide(
-        recall_evidence_id=decision["recall_evidence_id"],
-        decode_result=decode_result,
-        category_match_client=category_match_client,
-        source_context=source_context,
-        pattern_seed_pack=pattern_seed_pack,
-    )
-
-
-def _decision(
-    *,
-    recall_evidence_id: str,
-    decode_result: dict[str, Any],
-    recall_status: str,
-    pattern_recall: str,
-    category_or_element_binding: str,
-    match_result: dict[str, Any],
-) -> dict[str, Any]:
-    matched_paths = match_result.get("matched_category_paths") or []
-    matched_terms = match_result.get("matched_terms") or []
-    primary_path = matched_paths[0] if matched_paths else None
-    pattern_match_result = {
-        "pattern_recall": pattern_recall,
-        "category_or_element_binding": category_or_element_binding,
-        "decode_status": decode_result.get("decode_status"),
-        "match_status": "matched" if recall_status == "matched" else recall_status,
-        "recall_status": recall_status,
-        "matched_terms": matched_terms,
-        "matched_category_paths": matched_paths,
-        "primary_matched_category_path": primary_path,
-        "decode_elements": decode_result.get("decode_elements") or {},
-        "pattern_recall_evidence_id": recall_evidence_id,
-    }
-    return {
-        "recall_evidence_id": recall_evidence_id,
-        "decode_result": decode_result,
-        "match_result": match_result,
-        "pattern_match_result": pattern_match_result,
-    }
-
-
-def _build_evidence_row(
-    *,
-    run_id: str,
-    policy_run_id: str,
-    content: dict[str, Any],
-    decision: dict[str, Any],
-    created_at: str,
-) -> dict[str, Any]:
-    decode_result = decision["decode_result"]
-    match_result = decision["match_result"]
-    pattern_match = decision["pattern_match_result"]
-    raw_payload = redact_sensitive_payload(
-        {
-            "run_id": run_id,
-            "policy_run_id": policy_run_id,
-            "recall_evidence_id": decision["recall_evidence_id"],
-            "platform": content.get("platform"),
-            "primary_matched_category_path": pattern_match.get("primary_matched_category_path"),
-            "pending_reason": decode_result.get("pending_reason"),
-            "failure_reason": decode_result.get("failure_reason") or match_result.get("failure_reason"),
-            "decode_request": decode_result.get("raw_request"),
-            "decode_response": decode_result.get("raw_response"),
-            "match_paths_request": match_result.get("request"),
-            "match_paths_response": match_result.get("response"),
-        }
-    )
+def _build_pattern_match_result(judgment: dict[str, Any], recall_evidence_id: str) -> dict[str, Any]:
     return {
-        "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
-        "run_id": run_id,
-        "policy_run_id": policy_run_id,
-        "recall_evidence_id": decision["recall_evidence_id"],
-        "content_discovery_id": content.get("content_discovery_id"),
-        "platform": content.get("platform"),
-        "platform_content_id": content.get("platform_content_id"),
-        "decode_status": pattern_match.get("decode_status"),
-        "decode_task_id": decode_result.get("decode_task_id"),
-        "recall_status": pattern_match.get("recall_status"),
-        "matched_terms": pattern_match.get("matched_terms"),
-        "matched_category_paths": pattern_match.get("matched_category_paths"),
-        "decode_elements": pattern_match.get("decode_elements"),
-        "match_paths_request": match_result.get("request"),
-        "match_paths_response": match_result.get("response"),
-        "evidence_summary": {
-            "pattern_recall": pattern_match.get("pattern_recall"),
-            "category_or_element_binding": pattern_match.get("category_or_element_binding"),
-            "primary_matched_category_path": pattern_match.get("primary_matched_category_path"),
-            "pending_reason": decode_result.get("pending_reason"),
-            "failure_reason": decode_result.get("failure_reason") or match_result.get("failure_reason"),
-        },
-        "raw_payload": raw_payload,
-        "created_at": created_at,
+        "fit_senior_50plus": bool(judgment.get("fit_senior_50plus", False)),
+        "fit_confidence": float(judgment.get("fit_confidence") or 0.0),
+        "relevance_score": float(judgment.get("relevance_score") or 0.0),
+        "reason": str(judgment.get("reason") or ""),
+        "judge_status": str(judgment.get("status") or "ok"),
+        # M2→M3 桥接键:让未重写的旧 hard_gate(not_in ["matched"])不误拒。M3 删旧门槛后移除。
+        "pattern_recall": "matched",
+        "category_or_element_binding": "matched",
+        "pattern_recall_evidence_id": recall_evidence_id,
     }
 
 
@@ -379,7 +74,13 @@ def _update_discovered_item(
     item: dict[str, Any],
     pattern_match_result: dict[str, Any],
 ) -> dict[str, Any]:
-    updated = {**item, "pattern_match_result": pattern_match_result}
+    updated = {
+        **item,
+        "pattern_match_result": pattern_match_result,
+        "content_audience_profile": {
+            "fit_senior_50plus": pattern_match_result["fit_senior_50plus"]
+        },
+    }
     raw_payload = dict(updated.get("raw_payload") or {})
     raw_payload["pattern_match_result"] = pattern_match_result
     updated["raw_payload"] = raw_payload
@@ -394,49 +95,37 @@ def _update_evidence_bundle(
     return {**bundle, "pattern_match_result": {**existing, **pattern_match_result}}
 
 
-def _can_explain_pattern(
-    matched_terms: list[str],
-    matched_paths: list[str],
-    source_context: dict[str, Any],
-    pattern_seed_pack: dict[str, Any],
-) -> bool:
-    evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
-    seed_terms = set(pattern_seed_pack.get("seed_terms") or evidence_pack.get("seed_terms") or [])
-    if seed_terms.intersection(matched_terms):
-        return True
-    binding_paths = [
-        binding.get("category_path")
-        for binding in [
-            *(pattern_seed_pack.get("category_bindings") or []),
-            *(evidence_pack.get("category_bindings") or []),
-            *(pattern_seed_pack.get("itemsets") or []),
-            *(pattern_seed_pack.get("itemset_items") or []),
-            *(evidence_pack.get("itemset_items") or []),
-        ]
-        if isinstance(binding, dict) and binding.get("category_path")
-    ]
-    for matched_path in matched_paths:
-        for binding_path in binding_paths:
-            if matched_path in binding_path or binding_path in matched_path:
-                return True
-            if _path_leaf(matched_path) == _path_leaf(binding_path):
-                return True
-    return False
-
-
-def _match_client_failure(exc: Exception) -> dict[str, Any]:
+def _build_evidence_row(
+    run_id: str,
+    policy_run_id: str,
+    content: dict[str, Any],
+    recall_evidence_id: str,
+    judgment: dict[str, Any],
+    created_at: str,
+) -> dict[str, Any]:
+    summary = {
+        "pattern_recall": "matched",
+        "fit_senior_50plus": bool(judgment.get("fit_senior_50plus", False)),
+        "fit_confidence": float(judgment.get("fit_confidence") or 0.0),
+        "relevance_score": float(judgment.get("relevance_score") or 0.0),
+        "reason": str(judgment.get("reason") or ""),
+        "judge_status": str(judgment.get("status") or "ok"),
+    }
     return {
-        "request": {},
-        "response": {
-            "operation": "match_paths",
-            "error_type": type(exc).__name__,
+        "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
+        "run_id": run_id,
+        "policy_run_id": policy_run_id,
+        "recall_evidence_id": recall_evidence_id,
+        "content_discovery_id": content.get("content_discovery_id"),
+        "platform": content.get("platform"),
+        "platform_content_id": content.get("platform_content_id"),
+        "recall_status": "judged",
+        "evidence_summary": summary,
+        "raw_payload": {
+            "run_id": run_id,
+            "policy_run_id": policy_run_id,
+            "recall_evidence_id": recall_evidence_id,
+            **summary,
         },
-        "matched_terms": [],
-        "matched_category_paths": [],
-        "path_matches": [],
-        "failure_reason": "category_match_client_error",
+        "created_at": created_at,
     }
-
-
-def _path_leaf(path: str) -> str:
-    return str(path).rstrip("/").split("/")[-1]

+ 1 - 2
content_agent/business_modules/run_record/__init__.py

@@ -2,10 +2,9 @@ from __future__ import annotations
 
 from content_agent.business_modules.run_record.recorder import (
     build_platform_query_failure_records,
-    decode_event_row,
     record_stage_event,
     run,
 )
 from content_agent.business_modules.run_record.validation import validate_run
 
-__all__ = ["build_platform_query_failure_records", "decode_event_row", "record_stage_event", "run", "validate_run"]
+__all__ = ["build_platform_query_failure_records", "record_stage_event", "run", "validate_run"]

+ 0 - 29
content_agent/business_modules/run_record/recorder.py

@@ -291,14 +291,6 @@ def _count_rule_matches(count: int, rule: str | None) -> bool:
     raise ValueError(f"unsupported query aggregation count rule: {rule}")
 
 
-DECODE_EVENT_STATUS = {
-    "decode_submitted": "running",
-    "decode_polling": "running",
-    "decode_succeeded": "success",
-    "decode_failed": "failed",
-    "decode_timeout": "pending",
-}
-
 STAGE_EVENT_STATUS = {"started": "running", "completed": "success", "failed": "failed"}
 
 
@@ -347,24 +339,3 @@ def record_stage_event(
     runtime.append_jsonl(run_id, "run_events.jsonl", [row])
 
 
-def decode_event_row(run_id: str, policy_run_id: str, event: dict[str, Any]) -> dict[str, Any]:
-    event_type = event["event_type"]
-    row = with_raw_payload(
-        {
-            "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
-            "run_id": run_id,
-            "policy_run_id": policy_run_id,
-            "event_id": (
-                f"evt_decode_{event['platform_content_id']}_{event_type}_{event['attempt']}"
-            ),
-            "event_type": event_type,
-            "status": DECODE_EVENT_STATUS[event_type],
-            "input_ref": None,
-            "output_ref": None,
-            "error_code": None,
-            "message": None,
-            "created_at": datetime.now(timezone.utc).isoformat(),
-        }
-    )
-    row["raw_payload"].update(event)
-    return row

+ 2 - 7
content_agent/business_modules/run_record/validation.py

@@ -534,13 +534,8 @@ def _check_pattern_recall_evidence(
                 "pattern_recall_evidence_missing",
                 f"matched item cannot find recall evidence: {evidence_id}",
             )
-            continue
-        if not pattern_match.get("matched_terms") or not pattern_match.get("matched_category_paths"):
-            _fail(
-                findings,
-                "pattern_recall_matched_missing_evidence",
-                f"matched item missing terms or paths: {item.get('content_discovery_id')}",
-            )
+        # V3(M2):判定改为 Gemini 直读,不再有 decode 强证据词/分类树路径;
+        # matched_terms/matched_category_paths 不再适用,故不再强制。
 
 
 def _check_source_evidence(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:

+ 8 - 39
content_agent/business_modules/walk_engine.py

@@ -10,8 +10,7 @@ from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
 from content_agent.errors import ContentAgentError
 from content_agent.integrations.walk_strategy_json import WalkStrategyStore
 from content_agent.interfaces import (
-    CategoryMatchClient,
-    DecodeClient,
+    GeminiVideoClient,
     PlatformSearchClient,
     RuntimeFileStore,
 )
@@ -42,11 +41,7 @@ def run_bounded_walk(
     policy_bundle: dict[str, Any],
     platform_client: PlatformSearchClient,
     runtime: RuntimeFileStore,
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
-    max_wait_seconds: float = 1200.0,
-    poll_interval_seconds: float = 5.0,
-    event_sink: Any | None = None,
+    gemini_video_client: GeminiVideoClient,
 ) -> dict[str, list[dict[str, Any]]]:
     created_at = datetime.now(timezone.utc).isoformat()
     walk_strategy = WalkStrategyStore().load_walk_strategy()
@@ -83,13 +78,9 @@ def run_bounded_walk(
             policy_bundle=policy_bundle,
             platform_client=platform_client,
             runtime=runtime,
-            decode_client=decode_client,
-            category_match_client=category_match_client,
+            gemini_video_client=gemini_video_client,
             start_recall_index=next_recall_index,
             start_decision_index=next_decision_index,
-            max_wait_seconds=max_wait_seconds,
-            poll_interval_seconds=poll_interval_seconds,
-            event_sink=event_sink,
             existing_content_ids={
                 item["platform_content_id"] for item in context["discovered_content_items"]
             },
@@ -115,14 +106,10 @@ def run_bounded_walk(
         policy_bundle=policy_bundle,
         platform_client=platform_client,
         runtime=runtime,
-        decode_client=decode_client,
-        category_match_client=category_match_client,
+        gemini_video_client=gemini_video_client,
         start_recall_index=next_recall_index,
         start_decision_index=next_decision_index,
         created_at=created_at,
-        max_wait_seconds=max_wait_seconds,
-        poll_interval_seconds=poll_interval_seconds,
-        event_sink=event_sink,
     )
     _merge_batch(context, author_batch)
     context["walk_actions"].extend(author_batch.get("walk_actions", []))
@@ -146,14 +133,10 @@ def _execute_query_batch(
     policy_bundle: dict[str, Any],
     platform_client: PlatformSearchClient,
     runtime: RuntimeFileStore,
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
+    gemini_video_client: GeminiVideoClient,
     start_recall_index: int,
     start_decision_index: int,
-    max_wait_seconds: float,
-    poll_interval_seconds: float,
     existing_content_ids: set[str],
-    event_sink: Any | None = None,
 ) -> dict[str, list[dict[str, Any]]]:
     try:
         result = platform_access.run(search_queries, platform_client)
@@ -188,14 +171,9 @@ def _execute_query_batch(
         discovered["content_media_records"],
         discovered["evidence_bundles"],
         source_context,
-        pattern_seed_pack,
         runtime,
-        decode_client,
-        category_match_client,
-        max_wait_seconds=max_wait_seconds,
-        poll_interval_seconds=poll_interval_seconds,
+        gemini_video_client,
         start_index=start_recall_index,
-        event_sink=event_sink,
     )
     decisions = rule_judgment.run(
         run_id,
@@ -226,14 +204,10 @@ def _execute_author_edges(
     policy_bundle: dict[str, Any],
     platform_client: PlatformSearchClient,
     runtime: RuntimeFileStore,
-    decode_client: DecodeClient,
-    category_match_client: CategoryMatchClient,
+    gemini_video_client: GeminiVideoClient,
     start_recall_index: int,
     start_decision_index: int,
     created_at: str,
-    max_wait_seconds: float,
-    poll_interval_seconds: float,
-    event_sink: Any | None = None,
 ) -> dict[str, list[dict[str, Any]]]:
     fetch_author_works = getattr(platform_client, "fetch_author_works", None) or getattr(
         platform_client, "author_works", None
@@ -380,14 +354,9 @@ def _execute_author_edges(
         discovered["content_media_records"],
         discovered["evidence_bundles"],
         source_context,
-        pattern_seed_pack,
         runtime,
-        decode_client,
-        category_match_client,
-        max_wait_seconds=max_wait_seconds,
-        poll_interval_seconds=poll_interval_seconds,
+        gemini_video_client,
         start_index=start_recall_index,
-        event_sink=event_sink,
     )
     decisions = rule_judgment.run(
         run_id,

+ 4 - 26
content_agent/graph.py

@@ -24,8 +24,7 @@ from content_agent.business_modules import (
 )
 from content_agent.business_modules.content_discovery import pattern_recall
 from content_agent.interfaces import (
-    CategoryMatchClient,
-    DecodeClient,
+    GeminiVideoClient,
     PlatformSearchClient,
     PolicyBundleStore,
     QueryVariantClient,
@@ -40,10 +39,7 @@ class RunDependencies:
     platform_client: PlatformSearchClient
     policy_store: PolicyBundleStore
     query_variant_client: QueryVariantClient
-    decode_client: DecodeClient
-    category_match_client: CategoryMatchClient
-    pattern_recall_max_wait_seconds: float = 1200.0
-    pattern_recall_poll_interval_seconds: float = 5.0
+    gemini_video_client: GeminiVideoClient
 
 
 def _instrumented(stage: str, fn: Callable[[RunState], dict[str, Any]], runtime: RuntimeFileStore):
@@ -77,15 +73,6 @@ def _instrumented(stage: str, fn: Callable[[RunState], dict[str, Any]], runtime:
     return wrapped
 
 
-def _decode_event_sink(runtime: RuntimeFileStore, run_id: str, policy_run_id: str):
-    def sink(event: dict[str, Any]) -> None:
-        runtime.append_jsonl(
-            run_id, "run_events.jsonl", [run_record.decode_event_row(run_id, policy_run_id, event)]
-        )
-
-    return sink
-
-
 def build_run_graph(deps: RunDependencies):
     graph = StateGraph(RunState)
 
@@ -127,13 +114,8 @@ def build_run_graph(deps: RunDependencies):
             state["content_media_records"],
             state["evidence_bundles"],
             state["source_context"],
-            state["pattern_seed_pack"],
             deps.runtime,
-            deps.decode_client,
-            deps.category_match_client,
-            max_wait_seconds=deps.pattern_recall_max_wait_seconds,
-            poll_interval_seconds=deps.pattern_recall_poll_interval_seconds,
-            event_sink=_decode_event_sink(deps.runtime, state["run_id"], state["policy_run_id"]),
+            deps.gemini_video_client,
         )
         return {**result, "current_step": "recall_pattern"}
 
@@ -171,11 +153,7 @@ def build_run_graph(deps: RunDependencies):
             policy_bundle=state["policy_bundle"],
             platform_client=deps.platform_client,
             runtime=deps.runtime,
-            decode_client=deps.decode_client,
-            category_match_client=deps.category_match_client,
-            max_wait_seconds=deps.pattern_recall_max_wait_seconds,
-            poll_interval_seconds=deps.pattern_recall_poll_interval_seconds,
-            event_sink=_decode_event_sink(deps.runtime, state["run_id"], state["policy_run_id"]),
+            gemini_video_client=deps.gemini_video_client,
         )
         return {**result, "current_step": "execute_walk"}
 

+ 0 - 114
content_agent/integrations/category_match.py

@@ -1,114 +0,0 @@
-from __future__ import annotations
-
-import os
-from pathlib import Path
-from typing import Any
-from urllib.parse import urljoin
-
-import httpx
-
-from content_agent.integrations.decode_api import redact_sensitive_payload
-
-
-DEFAULT_MATCH_PATH = "/api/search/categories/match-paths/v2"
-
-
-class CategoryMatchClient:
-    def __init__(
-        self,
-        *,
-        base_url: str,
-        match_path: str = DEFAULT_MATCH_PATH,
-        source_type: str = "实质",
-        top_k: int = 10,
-        min_score: float = 0.6,
-        timeout_seconds: float = 60.0,
-        http_client: Any | None = None,
-    ) -> None:
-        self.base_url = base_url.rstrip("/") + "/"
-        self.match_path = match_path.lstrip("/")
-        self.source_type = source_type
-        self.top_k = top_k
-        self.min_score = min_score
-        self.timeout_seconds = timeout_seconds
-        self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
-
-    @classmethod
-    def from_env(cls, env_path: str | Path = ".env") -> "CategoryMatchClient":
-        env = _merged_env(env_path)
-        return cls(
-            base_url=_env(
-                "CONTENTFIND_CATEGORY_MATCH_BASE_URL",
-                env,
-                default="https://library.aiddit.com",
-            ),
-            match_path=_env(
-                "CONTENTFIND_CATEGORY_MATCH_PATH",
-                env,
-                default=DEFAULT_MATCH_PATH,
-            ),
-            source_type=_env("CONTENTFIND_CATEGORY_MATCH_SOURCE_TYPE", env, default="实质"),
-            top_k=int(_env("CONTENTFIND_CATEGORY_MATCH_TOP_K", env, default="10")),
-            min_score=float(_env("CONTENTFIND_CATEGORY_MATCH_MIN_SCORE", env, default="0.6")),
-            timeout_seconds=float(_env("CONTENTFIND_CATEGORY_MATCH_TIMEOUT_SECONDS", env, default="60")),
-        )
-
-    def match_paths(self, items: list[dict[str, Any]]) -> dict[str, Any]:
-        payload = {
-            "source_type": self.source_type,
-            "top_k": self.top_k,
-            "min_score": self.min_score,
-            "items": items,
-        }
-        response = self._post_json(payload)
-        return {
-            "request": redact_sensitive_payload(payload),
-            "response": redact_sensitive_payload(response),
-            "raw_response": response,
-        }
-
-    def _post_json(self, payload: dict[str, Any]) -> dict[str, Any]:
-        url = urljoin(self.base_url, self.match_path)
-        try:
-            response = self.http_client.post(
-                url,
-                json=payload,
-                headers={"Content-Type": "application/json"},
-                timeout=self.timeout_seconds,
-            )
-            response.raise_for_status()
-            data = response.json()
-        except httpx.HTTPStatusError as exc:
-            status_code = exc.response.status_code if exc.response is not None else "unknown"
-            raise RuntimeError(f"category match failed: HTTP {status_code}") from exc
-        except httpx.HTTPError as exc:
-            raise RuntimeError("category match failed: network_error") from exc
-        except ValueError as exc:
-            raise RuntimeError("category match failed: bad_json") from exc
-        if not isinstance(data, dict):
-            raise RuntimeError("category match failed: bad_response")
-        return data
-
-
-def _merged_env(env_path: str | Path) -> dict[str, str]:
-    env = _load_env_file(env_path)
-    env.update({key: value for key, value in os.environ.items() if value})
-    return env
-
-
-def _load_env_file(env_path: str | Path) -> dict[str, str]:
-    path = Path(env_path)
-    if not path.exists():
-        return {}
-    env: dict[str, str] = {}
-    for line in path.read_text(encoding="utf-8").splitlines():
-        stripped = line.strip()
-        if not stripped or stripped.startswith("#") or "=" not in stripped:
-            continue
-        key, value = stripped.split("=", 1)
-        env[key.strip()] = value.strip().strip('"').strip("'")
-    return env
-
-
-def _env(key: str, file_env: dict[str, str], default: str | None = None) -> str:
-    return file_env.get(key) or default or ""

+ 0 - 218
content_agent/integrations/decode_api.py

@@ -1,218 +0,0 @@
-from __future__ import annotations
-
-import os
-from pathlib import Path
-from typing import Any
-from urllib.parse import urljoin
-
-import httpx
-
-
-SENSITIVE_KEYS = {
-    "password",
-    "token",
-    "access_token",
-    "refresh_token",
-    "api_key",
-    "apikey",
-    "secret",
-    "dsn",
-    "authorization",
-    "cookie",
-    "session",
-    "credential",
-}
-
-
-class AigcDecodeClient:
-    def __init__(
-        self,
-        *,
-        base_url: str,
-        token: str,
-        submit_path: str = "/aigc/api/task/decode",
-        result_path: str = "/aigc/api/task/decode/result",
-        config_id: int = 58,
-        timeout_seconds: float = 60.0,
-        http_client: Any | None = None,
-    ) -> None:
-        self.base_url = base_url.rstrip("/") + "/"
-        self.token = token
-        self.submit_path = submit_path.lstrip("/")
-        self.result_path = result_path.lstrip("/")
-        self.config_id = config_id
-        self.timeout_seconds = timeout_seconds
-        self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
-
-    @classmethod
-    def from_env(cls, env_path: str | Path = ".env") -> "AigcDecodeClient":
-        env = _merged_env(env_path)
-        return cls(
-            base_url=_env("CONTENTFIND_API_AIGC_BASE_URL", env, required=True),
-            token=_env("CONTENTFIND_API_AIGC_TOKEN", env, default=env.get("AIGC_TOKEN"), required=True),
-            submit_path=_env(
-                "CONTENTFIND_AIGC_DECODE_SUBMIT_PATH",
-                env,
-                default="/aigc/api/task/decode",
-            ),
-            result_path=_env(
-                "CONTENTFIND_AIGC_DECODE_RESULT_PATH",
-                env,
-                default="/aigc/api/task/decode/result",
-            ),
-            config_id=int(_env("CONTENTFIND_AIGC_DECODE_CONFIG_ID", env, default="58")),
-            timeout_seconds=float(_env("CONTENTFIND_API_AIGC_TIMEOUT_SECONDS", env, default="60")),
-        )
-
-    def submit_decode(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]:
-        payload = {
-            "params": {
-                "configId": self.config_id,
-                "skipCompleted": False,
-                "posts": [_post_payload(content, media, source_context)],
-            }
-        }
-        response = self._post_json(self.submit_path, payload)
-        return {
-            "request": redact_sensitive_payload(payload),
-            "response": redact_sensitive_payload(response),
-            "decode_task_id": _extract_decode_task_id(response),
-            "raw_response": response,
-        }
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
-        payload = {"params": {"configId": self.config_id, "channelContentIds": [decode_task_id]}}
-        response = self._post_json(self.result_path, payload)
-        return {
-            "request": redact_sensitive_payload(payload),
-            "response": redact_sensitive_payload(response),
-            "raw_response": response,
-        }
-
-    def _post_json(self, path: str, payload: dict[str, Any]) -> dict[str, Any]:
-        url = urljoin(self.base_url, path)
-        try:
-            response = self.http_client.post(
-                url,
-                json=payload,
-                headers={
-                    "Content-Type": "application/json",
-                    "Authorization": f"Bearer {self.token}",
-                },
-                timeout=self.timeout_seconds,
-            )
-            response.raise_for_status()
-            data = response.json()
-        except httpx.HTTPStatusError as exc:
-            status_code = exc.response.status_code if exc.response is not None else "unknown"
-            raise RuntimeError(f"aigc decode failed: HTTP {status_code}") from exc
-        except httpx.HTTPError as exc:
-            raise RuntimeError("aigc decode failed: network_error") from exc
-        except ValueError as exc:
-            raise RuntimeError("aigc decode failed: bad_json") from exc
-        if not isinstance(data, dict):
-            raise RuntimeError("aigc decode failed: bad_response")
-        return data
-
-
-def redact_sensitive_payload(value: Any) -> Any:
-    if isinstance(value, dict):
-        result: dict[str, Any] = {}
-        for key, child in value.items():
-            if str(key).lower() in SENSITIVE_KEYS:
-                result[f"{key}_redacted"] = "<redacted>"
-            else:
-                result[key] = redact_sensitive_payload(child)
-        return result
-    if isinstance(value, list):
-        return [redact_sensitive_payload(item) for item in value]
-    return value
-
-
-def _post_payload(
-    content: dict[str, Any],
-    media: dict[str, Any],
-    source_context: dict[str, Any],
-) -> dict[str, Any]:
-    evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
-    description = content.get("description") or ""
-    return {
-        "channelContentId": content.get("platform_content_id"),
-        "title": description,
-        "bodyText": description,
-        "images": [],
-        "video": media.get("play_url"),
-        "contentModal": 4,
-        "channel": 2,
-        "mergeLeve1": "",
-        "mergeLeve2": source_context.get("merge_leve2") or source_context.get("name") or "",
-        "metadata": {
-            "tags": content.get("tags", []),
-            "platform": content.get("platform", "douyin"),
-            "source_post_id": evidence_pack.get("source_post_id"),
-            "pattern_execution_id": evidence_pack.get("pattern_execution_id"),
-        },
-    }
-
-
-def _extract_decode_task_id(response: dict[str, Any]) -> str | None:
-    candidates = [
-        response.get("taskId"),
-        response.get("task_id"),
-        response.get("data", {}).get("taskId") if isinstance(response.get("data"), dict) else None,
-        response.get("data", {}).get("task_id") if isinstance(response.get("data"), dict) else None,
-    ]
-    data = response.get("data")
-    if isinstance(data, list):
-        for item in data:
-            if not isinstance(item, dict):
-                continue
-            candidates.extend(
-                [
-                    item.get("taskId"),
-                    item.get("task_id"),
-                    item.get("channelContentId"),
-                    item.get("channel_content_id"),
-                ]
-            )
-    for candidate in candidates:
-        if candidate:
-            return str(candidate)
-    return None
-
-
-def _merged_env(env_path: str | Path) -> dict[str, str]:
-    env = _load_env_file(env_path)
-    env.update({key: value for key, value in os.environ.items() if value})
-    return env
-
-
-def _load_env_file(env_path: str | Path) -> dict[str, str]:
-    path = Path(env_path)
-    if not path.exists():
-        return {}
-    env: dict[str, str] = {}
-    for line in path.read_text(encoding="utf-8").splitlines():
-        stripped = line.strip()
-        if not stripped or stripped.startswith("#") or "=" not in stripped:
-            continue
-        key, value = stripped.split("=", 1)
-        env[key.strip()] = value.strip().strip('"').strip("'")
-    return env
-
-
-def _env(
-    key: str,
-    file_env: dict[str, str],
-    default: str | None = None,
-    required: bool = False,
-) -> str:
-    value = file_env.get(key) or default
-    if required and not value:
-        raise RuntimeError(f"missing required env: {key}")
-    return value or ""

+ 155 - 0
content_agent/integrations/gemini_video.py

@@ -0,0 +1,155 @@
+"""Gemini 视频判定 client (V3-M2B).
+
+实现 interfaces.GeminiVideoClient.analyze:取视频(video_fetch)→ 多模态投给
+Gemini(OpenRouter image_url data URL)→ 解析结构化判定 4 字段。复用 query_variant
+的 OpenRouter httpx 骨架,不引入新 SDK。任何失败一律返回 fail 结构,不抛、不卡 run。
+"""
+
+from __future__ import annotations
+
+import json
+import os
+from typing import Any, Callable, Mapping
+
+import httpx
+
+from content_agent.integrations import video_fetch
+
+DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
+DEFAULT_VIDEO_MODEL = "google/gemini-3-flash-preview"
+DEFAULT_VIDEO_TIMEOUT_SECONDS = 90.0
+
+_SYSTEM_PROMPT = "你是面向中国中老年内容池的视频审核助手。只输出一个 JSON 对象,不要任何解释或 markdown。"
+_USER_PROMPT = (
+    "判断这条视频,严格按以下 JSON 结构输出(只输出 JSON):\n"
+    '{{"fit_senior_50plus": true/false, "fit_confidence": 0~1 的小数, '
+    '"relevance_score": 0~1 的小数, "reason": "中文简述理由"}}\n'
+    "字段含义:\n"
+    "- fit_senior_50plus: 内容是否适合中国 50 岁以上老年人观看(健康/安全/无误导/无低俗)。\n"
+    "- fit_confidence: 你对上面判断的置信度。\n"
+    "- relevance_score: 视频内容与需求关键词【{seed_terms}】的相关程度。\n"
+    "- reason: 一句话理由。"
+)
+
+
+def _fail(reason: str) -> dict[str, Any]:
+    return {
+        "fit_senior_50plus": False,
+        "fit_confidence": 0.0,
+        "relevance_score": 0.0,
+        "reason": reason,
+        "status": "failed",
+    }
+
+
+def _clamp01(value: Any) -> float:
+    try:
+        number = float(value)
+    except (TypeError, ValueError):
+        return 0.0
+    return max(0.0, min(1.0, number))
+
+
+def _parse(payload: dict[str, Any]) -> dict[str, Any]:
+    content = payload["choices"][0]["message"]["content"]
+    text = str(content).strip()
+    if text.startswith("```"):
+        text = text.split("```", 2)[1]
+        if text.startswith("json"):
+            text = text[4:]
+    data = json.loads(text)
+    return {
+        "fit_senior_50plus": bool(data["fit_senior_50plus"]),
+        "fit_confidence": _clamp01(data.get("fit_confidence")),
+        "relevance_score": _clamp01(data.get("relevance_score")),
+        "reason": str(data.get("reason") or ""),
+    }
+
+
+def _seed_terms(source_context: dict[str, Any]) -> str:
+    evidence = source_context.get("ext_data", {}).get("evidence_pack", {})
+    terms = evidence.get("seed_terms") or []
+    return "、".join(str(t) for t in terms) or "(未指定)"
+
+
+class GeminiVideoClient:
+    def __init__(
+        self,
+        *,
+        api_key: str,
+        model: str = DEFAULT_VIDEO_MODEL,
+        base_url: str = DEFAULT_OPENROUTER_BASE_URL,
+        timeout_seconds: float = DEFAULT_VIDEO_TIMEOUT_SECONDS,
+        fetch_fn: Callable[..., str] = video_fetch.fetch_and_compress,
+        http_post: Callable[..., Any] = httpx.post,
+    ) -> None:
+        self.api_key = api_key
+        self.model = model
+        self.base_url = base_url.rstrip("/")
+        self.timeout_seconds = timeout_seconds
+        self.fetch_fn = fetch_fn
+        self.http_post = http_post
+
+    @classmethod
+    def from_env(cls, env: Mapping[str, str] | None = None) -> "GeminiVideoClient":
+        source = os.environ if env is None else env
+        api_key = source.get("OPENROUTER_API_KEY") or source.get("OPEN_ROUTER_API_KEY")
+        if not api_key:
+            return MissingGeminiVideoClient("gemini video config missing: OPENROUTER_API_KEY")
+        return cls(
+            api_key=api_key,
+            model=source.get("CONTENT_AGENT_VIDEO_LLM_MODEL") or DEFAULT_VIDEO_MODEL,
+            base_url=source.get("OPENROUTER_BASE_URL") or DEFAULT_OPENROUTER_BASE_URL,
+            timeout_seconds=float(source.get("CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS") or DEFAULT_VIDEO_TIMEOUT_SECONDS),
+        )
+
+    def analyze(
+        self,
+        content: dict[str, Any],
+        media: dict[str, Any],
+        source_context: dict[str, Any],
+    ) -> dict[str, Any]:
+        play_url = media.get("play_url")
+        if not play_url:
+            return _fail("no_play_url")
+        try:
+            data_url = self.fetch_fn(play_url, content.get("platform", "douyin"))
+        except Exception as exc:
+            return _fail(f"video_fetch_failed: {type(exc).__name__}")
+
+        messages = [
+            {"role": "system", "content": _SYSTEM_PROMPT},
+            {
+                "role": "user",
+                "content": [
+                    {"type": "text", "text": _USER_PROMPT.format(seed_terms=_seed_terms(source_context))},
+                    {"type": "image_url", "image_url": {"url": data_url}},
+                ],
+            },
+        ]
+        try:
+            response = self.http_post(
+                f"{self.base_url}/chat/completions",
+                headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
+                json={"model": self.model, "messages": messages},
+                timeout=self.timeout_seconds,
+            )
+            response.raise_for_status()
+            return _parse(response.json())
+        except httpx.HTTPError as exc:
+            return _fail(f"gemini_http_error: {type(exc).__name__}")
+        except (KeyError, IndexError, TypeError, ValueError) as exc:
+            return _fail(f"gemini_response_invalid: {type(exc).__name__}")
+
+
+class MissingGeminiVideoClient:
+    def __init__(self, reason: str) -> None:
+        self.reason = reason
+
+    def analyze(
+        self,
+        content: dict[str, Any],
+        media: dict[str, Any],
+        source_context: dict[str, Any],
+    ) -> dict[str, Any]:
+        return _fail(self.reason)

+ 85 - 0
content_agent/integrations/video_fetch.py

@@ -0,0 +1,85 @@
+"""视频获取链 (V3-M2A).
+
+从 play_url 下载视频(带平台下载头)→ imageio-ffmpeg 压到 ~4MB 低清 →
+base64 data URL,供 GeminiVideoClient 投喂(OpenRouter image_url)。
+真实下载/压缩只在 M7 live smoke 跑;单测全 mock。
+"""
+
+from __future__ import annotations
+
+import base64
+import subprocess
+from typing import Any
+
+import httpx
+import imageio_ffmpeg
+
+# platform_profiles 里写的是 "iOS UA"/"PC UA" 占位,这里映射成真实串 + Referer。
+_PLATFORM_DOWNLOAD_HEADERS = {
+    "douyin": {
+        "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Mobile/15E148",
+        "Referer": "https://www.douyin.com/",
+    },
+    "shipinhao": {
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36",
+        "Referer": "https://channels.weixin.qq.com/",
+    },
+}
+# 已拍板压缩档:360p / 1fps / 低清,实测 ~4MB(memory/video-multimodal-analysis)。
+_FFMPEG_ARGS = ["-vf", "scale=360:-2,fps=1", "-crf", "33", "-c:a", "aac", "-b:a", "32k", "-ac", "1"]
+MAX_INLINE_BYTES = 30 * 1024 * 1024  # OpenRouter inline base64 平台硬上限
+
+
+class VideoFetchError(RuntimeError):
+    """下载/压缩/超限失败,由 GeminiVideoClient 捕获转 fail。"""
+
+
+def _download_headers(platform: str, override: dict[str, str] | None) -> dict[str, str]:
+    if override is not None:
+        return override
+    return _PLATFORM_DOWNLOAD_HEADERS.get(platform, {})
+
+
+def _compress(raw: bytes, ffmpeg_exe: str) -> bytes:
+    proc = subprocess.run(
+        [ffmpeg_exe, "-i", "pipe:0", *_FFMPEG_ARGS, "-f", "mp4",
+         "-movflags", "frag_keyframe+empty_moov", "pipe:1"],
+        input=raw,
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+    )
+    if proc.returncode != 0 or not proc.stdout:
+        raise VideoFetchError("ffmpeg compression failed")
+    return proc.stdout
+
+
+def fetch_and_compress(
+    play_url: str,
+    platform: str,
+    *,
+    headers: dict[str, str] | None = None,
+    http_client: Any | None = None,
+    ffmpeg_exe: str | None = None,
+    timeout_seconds: float = 90.0,
+) -> str:
+    if not play_url:
+        raise VideoFetchError("missing play_url")
+    client = http_client or httpx
+    try:
+        response = client.get(
+            play_url,
+            headers=_download_headers(platform, headers),
+            follow_redirects=True,
+            timeout=timeout_seconds,
+        )
+        response.raise_for_status()
+        raw = response.content
+    except httpx.HTTPError as exc:
+        raise VideoFetchError(f"download failed: {type(exc).__name__}") from exc
+    if not raw:
+        raise VideoFetchError("empty download")
+
+    compressed = _compress(raw, ffmpeg_exe or imageio_ffmpeg.get_ffmpeg_exe())
+    if len(compressed) > MAX_INLINE_BYTES:
+        raise VideoFetchError(f"compressed video oversize: {len(compressed)} bytes")
+    return f"data:video/mp4;base64,{base64.b64encode(compressed).decode('ascii')}"

+ 0 - 15
content_agent/interfaces.py

@@ -60,21 +60,6 @@ class QueryVariantClient(Protocol):
     ) -> QueryVariantResult: ...
 
 
-class DecodeClient(Protocol):
-    def submit_decode(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]: ...
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]: ...
-
-
-class CategoryMatchClient(Protocol):
-    def match_paths(self, items: list[dict[str, Any]]) -> dict[str, Any]: ...
-
-
 class GeminiVideoClient(Protocol):
     def analyze(
         self,

+ 18 - 113
content_agent/run_service.py

@@ -12,11 +12,10 @@ from content_agent.business_modules import run_record
 from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
 from content_agent.graph import RunDependencies, build_run_graph
 from content_agent.integrations.composite_runtime import CompositeRuntimeStore
-from content_agent.integrations.category_match import CategoryMatchClient as HttpCategoryMatchClient
 from content_agent.integrations.database_runtime import ContentSupplyDbConfig, DatabaseRuntimeStore
-from content_agent.integrations.decode_api import AigcDecodeClient
 from content_agent.integrations.demand_source import DemandSourceService
 from content_agent.integrations.douyin import CrawapiDouyinClient
+from content_agent.integrations.gemini_video import GeminiVideoClient as RealGeminiVideoClient
 from content_agent.integrations.mock_platform import MockPlatformClient
 from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
@@ -26,8 +25,7 @@ from content_agent.integrations.query_variant import (
 )
 from content_agent.integrations.runtime_files import LocalRuntimeFileStore
 from content_agent.interfaces import (
-    CategoryMatchClient,
-    DecodeClient,
+    GeminiVideoClient,
     PlatformSearchClient,
     PolicyBundleStore,
     QueryVariantClient,
@@ -45,10 +43,7 @@ class RunService:
         policy_store: PolicyBundleStore | None = None,
         demand_source: DemandSourceService | None = None,
         query_variant_client: QueryVariantClient | None = None,
-        decode_client: DecodeClient | None = None,
-        category_match_client: CategoryMatchClient | None = None,
-        pattern_recall_max_wait_seconds: float = 1200.0,
-        pattern_recall_poll_interval_seconds: float = 5.0,
+        gemini_video_client: GeminiVideoClient | None = None,
     ) -> None:
         self.runtime = runtime or LocalRuntimeFileStore(runtime_root)
         self.policy_store = policy_store or JsonPolicyBundleStore(Path("."))
@@ -56,20 +51,14 @@ class RunService:
         self.query_variant_client = query_variant_client or MissingQueryVariantClient(
             "query variant client is not configured"
         )
-        self.decode_client = decode_client or _DeterministicDecodeClient()
-        self.category_match_client = category_match_client or _DeterministicCategoryMatchClient()
-        self.pattern_recall_max_wait_seconds = pattern_recall_max_wait_seconds
-        self.pattern_recall_poll_interval_seconds = pattern_recall_poll_interval_seconds
+        self._gemini_video_client = gemini_video_client or _DeterministicGeminiVideoClient()
 
     @classmethod
     def from_env(cls, runtime_root: Path | str = Path("runtime/v1")) -> "RunService":
         local_runtime = LocalRuntimeFileStore(runtime_root)
         env = _merged_project_env()
         query_variant_client = query_variant_client_from_env(env)
-        decode_client = _decode_client_from_env(env)
-        category_match_client = _category_match_client_from_env(env)
-        recall_max_wait = float(env.get("CONTENTFIND_PATTERN_RECALL_MAX_WAIT_SECONDS") or 1200)
-        recall_poll_interval = float(env.get("CONTENTFIND_PATTERN_RECALL_POLL_INTERVAL_SECONDS") or 5)
+        gemini_video_client = _gemini_video_client_from_env(env)
         db_runtime_enabled = _env_enabled("CONTENT_AGENT_DB_RUNTIME_ENABLED")
         try:
             config = ContentSupplyDbConfig.from_env()
@@ -79,10 +68,7 @@ class RunService:
                     runtime_root=runtime_root,
                     runtime=local_runtime,
                     query_variant_client=query_variant_client,
-                    decode_client=decode_client,
-                    category_match_client=category_match_client,
-                    pattern_recall_max_wait_seconds=recall_max_wait,
-                    pattern_recall_poll_interval_seconds=recall_poll_interval,
+                    gemini_video_client=gemini_video_client,
                 )
             raise ContentAgentError(
                 ErrorCode.DB_CONFIG_MISSING,
@@ -97,10 +83,7 @@ class RunService:
                 runtime=local_runtime,
                 demand_source=demand_source,
                 query_variant_client=query_variant_client,
-                decode_client=decode_client,
-                category_match_client=category_match_client,
-                pattern_recall_max_wait_seconds=recall_max_wait,
-                pattern_recall_poll_interval_seconds=recall_poll_interval,
+                gemini_video_client=gemini_video_client,
             )
 
         db_runtime = DatabaseRuntimeStore(config)
@@ -109,10 +92,7 @@ class RunService:
             runtime=CompositeRuntimeStore(db_runtime, local_runtime),
             demand_source=demand_source,
             query_variant_client=query_variant_client,
-            decode_client=decode_client,
-            category_match_client=category_match_client,
-            pattern_recall_max_wait_seconds=recall_max_wait,
-            pattern_recall_poll_interval_seconds=recall_poll_interval,
+            gemini_video_client=gemini_video_client,
         )
 
     def start_run(self, request: RunStartRequest) -> RunState:
@@ -161,10 +141,7 @@ class RunService:
                 platform_client=self._platform_client(request.platform, request.platform_mode),
                 policy_store=self.policy_store,
                 query_variant_client=self.query_variant_client,
-                decode_client=self.decode_client,
-                category_match_client=self.category_match_client,
-                pattern_recall_max_wait_seconds=self.pattern_recall_max_wait_seconds,
-                pattern_recall_poll_interval_seconds=self.pattern_recall_poll_interval_seconds,
+                gemini_video_client=self._gemini_video_client,
             )
             graph = build_run_graph(deps)
             state = graph.invoke(initial_state)
@@ -608,96 +585,24 @@ def _query_failures_from_error(error: ContentAgentError) -> list[dict[str, Any]]
     return [failure for failure in query_failures if isinstance(failure, dict)]
 
 
-def _decode_client_from_env(env: dict[str, str]) -> DecodeClient:
-    if env.get("CONTENTFIND_API_AIGC_BASE_URL") and (
-        env.get("CONTENTFIND_API_AIGC_TOKEN") or env.get("AIGC_TOKEN")
-    ):
-        return AigcDecodeClient.from_env()
-    return _MissingDecodeClient("AIGC decode client is not configured")
+def _gemini_video_client_from_env(env: dict[str, str]) -> GeminiVideoClient:
+    return RealGeminiVideoClient.from_env(env)
 
 
-def _category_match_client_from_env(env: dict[str, str]) -> CategoryMatchClient:
-    return HttpCategoryMatchClient.from_env()
+class _DeterministicGeminiVideoClient:
+    """mock/默认判定 client:固定返回适合 50+ 的高分结果,供本地/smoke 无网跑通。"""
 
-
-class _MissingDecodeClient:
-    def __init__(self, reason: str) -> None:
-        self.reason = reason
-
-    def submit_decode(
+    def analyze(
         self,
         content: dict[str, Any],
         media: dict[str, Any],
         source_context: dict[str, Any],
     ) -> dict[str, Any]:
-        raise RuntimeError(self.reason)
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
-        raise RuntimeError(self.reason)
-
-
-class _DeterministicDecodeClient:
-    def submit_decode(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]:
-        evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
-        seed_terms = evidence_pack.get("seed_terms") or ["爱国情感"]
         return {
-            "decode_status": "success",
-            "decode_task_id": f"fake_decode_{content.get('platform_content_id')}",
-            "request": {"params": {"configId": 58}},
-            "response": {"status": "SUCCESS"},
-            "dataContent": {
-                "目的点": [
-                    {
-                        "点": seed_terms[0],
-                        "实质": [{"名称": term} for term in seed_terms],
-                    }
-                ],
-                "关键点": [{"点": term, "类型": "实质"} for term in seed_terms],
-                "分词结果": [{"词": content.get("description", "")}],
-            },
-        }
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
-        return {
-            "decode_status": "success",
-            "decode_task_id": decode_task_id,
-            "request": {"taskId": decode_task_id},
-            "response": {"status": "SUCCESS"},
-            "dataContent": {
-                "目的点": [{"点": "爱国情感", "实质": [{"名称": "人物故事"}]}],
-                "关键点": [{"点": "爱国情感", "类型": "实质"}],
-            },
-        }
-
-
-class _DeterministicCategoryMatchClient:
-    def match_paths(self, items: list[dict[str, Any]]) -> dict[str, Any]:
-        rows = [
-            {
-                "term": item.get("term"),
-                "paths": [
-                    {
-                        "category_path": "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感",
-                        "score": 0.91,
-                    }
-                ],
-            }
-            for item in items
-        ]
-        return {
-            "request": {
-                "source_type": "实质",
-                "top_k": 10,
-                "min_score": 0.6,
-                "items": items,
-            },
-            "response": {"data": rows},
-            "raw_response": {"data": rows},
+            "fit_senior_50plus": True,
+            "fit_confidence": 0.9,
+            "relevance_score": 0.8,
+            "reason": "deterministic stub judgment",
         }
 
 

+ 1 - 0
pyproject.toml

@@ -6,6 +6,7 @@ requires-python = ">=3.11"
 dependencies = [
   "fastapi>=0.115.0",
   "httpx>=0.27.0",
+  "imageio-ffmpeg>=0.4.9",
   "langgraph>=1.0.0",
   "pydantic>=2.8.0",
   "pymysql>=1.1.1",

+ 14 - 9
tests/gemini_helpers.py

@@ -1,8 +1,7 @@
-"""Deterministic Gemini video-judgment fakes (V3-M0A).
+"""Deterministic Gemini video-judgment fakes (V3-M0A, schema backfilled in M2).
 
-The real Gemini structured-output schema is an M2 decision; until then the
-factories return neutral placeholder payloads (`outcome` + empty `raw`) that
-M2 backfills with the real field names.
+Factories return the real M2 structured-output schema:
+fit_senior_50plus / fit_confidence / relevance_score / reason (+ status on fail).
 """
 
 from __future__ import annotations
@@ -11,16 +10,22 @@ import copy
 from typing import Any
 
 
-def fake_gemini_pool(*, relevance: str = "high") -> dict[str, Any]:
-    return {"outcome": "pool", "relevance": relevance, "raw": {}}
+def fake_gemini_pool() -> dict[str, Any]:
+    return {"fit_senior_50plus": True, "fit_confidence": 0.9, "relevance_score": 0.85, "reason": "pool stub"}
 
 
-def fake_gemini_review(*, relevance: str = "medium") -> dict[str, Any]:
-    return {"outcome": "review", "relevance": relevance, "raw": {}}
+def fake_gemini_review() -> dict[str, Any]:
+    return {"fit_senior_50plus": True, "fit_confidence": 0.8, "relevance_score": 0.45, "reason": "review stub"}
 
 
 def fake_gemini_fail(reason: str = "gemini_timeout") -> dict[str, Any]:
-    return {"outcome": "fail", "status": "failed", "failure_reason": reason, "raw": {}}
+    return {
+        "fit_senior_50plus": False,
+        "fit_confidence": 0.0,
+        "relevance_score": 0.0,
+        "reason": reason,
+        "status": "failed",
+    }
 
 
 class FakeGeminiVideoClient:

+ 0 - 149
tests/p4_helpers.py

@@ -1,149 +0,0 @@
-from __future__ import annotations
-
-import copy
-from typing import Any
-
-
-def fake_decode_success(
-    *,
-    task_id: str = "decode_task_001",
-    terms: list[str] | None = None,
-) -> dict[str, Any]:
-    terms = terms or ["爱国情感", "人物故事"]
-    return {
-        "decode_status": "SUCCESS",
-        "decode_task_id": task_id,
-        "request": {"params": {"configId": 58}},
-        "response": {"status": "SUCCESS"},
-        "dataContent": {
-            "目的点": [{"点": terms[0], "实质": [{"名称": term} for term in terms]}],
-            "关键点": [{"点": term, "类型": "实质"} for term in terms],
-            "分词结果": [{"词": "标题只做辅助"}],
-        },
-    }
-
-
-def fake_decode_pending(task_id: str = "decode_task_pending") -> dict[str, Any]:
-    return {
-        "decode_status": "RUNNING",
-        "decode_task_id": task_id,
-        "request": {"params": {"configId": 58}},
-        "response": {"status": "RUNNING"},
-    }
-
-
-def fake_decode_failed() -> dict[str, Any]:
-    return {
-        "decode_status": "FAILED",
-        "decode_task_id": "decode_task_failed",
-        "request": {"params": {"configId": 58}},
-        "response": {"status": "FAILED", "error": "bad content"},
-    }
-
-
-def fake_decode_bad_shape() -> dict[str, Any]:
-    return {
-        "decode_status": "SUCCESS",
-        "decode_task_id": "decode_task_bad",
-        "request": {"params": {"configId": 58}},
-        "response": {"status": "SUCCESS"},
-        "dataContent": "not-json",
-    }
-
-
-def fake_match_paths_hit(
-    *,
-    terms: list[str] | None = None,
-    paths: list[str] | None = None,
-) -> dict[str, Any]:
-    terms = terms or ["爱国情感"]
-    paths = paths or ["/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"]
-    rows = [
-        {
-            "term": term,
-            "paths": [{"category_path": path, "score": 0.91} for path in paths],
-        }
-        for term in terms
-    ]
-    return {
-        "request": {
-            "source_type": "实质",
-            "top_k": 10,
-            "min_score": 0.6,
-            "items": [{"term": term, "description": "解构强证据词"} for term in terms],
-        },
-        "response": {"data": copy.deepcopy(rows)},
-        "raw_response": {"data": rows},
-    }
-
-
-def fake_match_paths_no_hit() -> dict[str, Any]:
-    return {
-        "request": {"source_type": "实质", "top_k": 10, "min_score": 0.6, "items": []},
-        "response": {"data": []},
-        "raw_response": {"data": []},
-    }
-
-
-class FakeDecodeClient:
-    def __init__(
-        self,
-        submit_result: dict[str, Any] | None = None,
-        result_sequence: list[Any] | None = None,
-    ) -> None:
-        self.submit_result = submit_result or fake_decode_success()
-        self.result_sequence = list(result_sequence or [])
-        self.submit_calls: list[dict[str, Any]] = []
-        self.result_calls: list[str] = []
-
-    def submit_decode(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]:
-        self.submit_calls.append(
-            {"content": copy.deepcopy(content), "media": copy.deepcopy(media)}
-        )
-        return copy.deepcopy(self.submit_result)
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
-        self.result_calls.append(decode_task_id)
-        if self.result_sequence:
-            next_result = self.result_sequence.pop(0)
-            if callable(next_result):
-                return next_result()
-            return copy.deepcopy(next_result)
-        return copy.deepcopy(self.submit_result)
-
-
-class FailingDecodeClient:
-    def submit_decode(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]:
-        raise RuntimeError("decode unavailable")
-
-    def get_decode_result(self, decode_task_id: str) -> dict[str, Any]:
-        raise RuntimeError("decode unavailable")
-
-
-class FakeCategoryMatchClient:
-    def __init__(self, result: dict[str, Any] | None = None) -> None:
-        self.auto_match = result is None
-        self.result = result or fake_match_paths_hit()
-        self.calls: list[list[dict[str, Any]]] = []
-
-    def match_paths(self, items: list[dict[str, Any]]) -> dict[str, Any]:
-        self.calls.append(copy.deepcopy(items))
-        if not self.auto_match:
-            return copy.deepcopy(self.result)
-        terms = [item["term"] for item in items]
-        return fake_match_paths_hit(terms=terms)
-
-
-class FailingCategoryMatchClient:
-    def match_paths(self, items: list[dict[str, Any]]) -> dict[str, Any]:
-        raise RuntimeError("category match unavailable")

+ 3 - 8
tests/p6_walk_helpers.py

@@ -9,8 +9,8 @@ from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
 from content_agent.integrations.runtime_files import LocalRuntimeFileStore
 from content_agent.record_payload import with_raw_payload
+from tests.gemini_helpers import FakeGeminiVideoClient
 from tests.p1_helpers import real_source_payload
-from tests.p4_helpers import FakeCategoryMatchClient, FakeDecodeClient
 
 
 class FakeWalkPlatformClient:
@@ -73,12 +73,8 @@ def build_initial_walk_context(tmp_path: Path, *, tags: list[str] | None = None)
         discovered["content_media_records"],
         discovered["evidence_bundles"],
         seed["source_context"],
-        seed["pattern_seed_pack"],
         runtime,
-        FakeDecodeClient(),
-        FakeCategoryMatchClient(),
-        max_wait_seconds=0,
-        poll_interval_seconds=0,
+        FakeGeminiVideoClient(),
     )
     policy_bundle = JsonPolicyBundleStore(Path(".")).load_policy_bundle("V1")
     decisions = rule_judgment.run(
@@ -100,8 +96,7 @@ def build_initial_walk_context(tmp_path: Path, *, tags: list[str] | None = None)
         "evidence_bundles": recalled["evidence_bundles"],
         "rule_decisions": decisions,
         "policy_bundle": policy_bundle,
-        "decode_client": FakeDecodeClient(),
-        "category_match_client": FakeCategoryMatchClient(),
+        "gemini_video_client": FakeGeminiVideoClient(),
     }
 
 

+ 2 - 7
tests/replay_harness.py

@@ -18,7 +18,7 @@ from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
 from content_agent.interfaces import GeminiVideoClient
 from content_agent.run_service import RunService
 from content_agent.schemas import RunStartRequest
-from tests.p4_helpers import FakeCategoryMatchClient, FakeDecodeClient, fake_decode_success, fake_match_paths_hit
+from tests.gemini_helpers import FakeGeminiVideoClient
 from tests.p1_helpers import FakeQueryVariantClient
 from tests.replay_clients import CorpusPlatformClient
 
@@ -83,16 +83,11 @@ def replay_case(
     service = RunService(
         runtime_root=runtime_root,
         query_variant_client=FakeQueryVariantClient(variants=variants),
-        decode_client=FakeDecodeClient(submit_result=fake_decode_success(terms=seed_terms)),
-        category_match_client=FakeCategoryMatchClient(result=fake_match_paths_hit(terms=seed_terms)),
-        pattern_recall_max_wait_seconds=0,
-        pattern_recall_poll_interval_seconds=0,
+        gemini_video_client=gemini_video_client or FakeGeminiVideoClient(),
     )
     if config_overrides and config_overrides.get("policy_store") is not None:
         service.policy_store = config_overrides["policy_store"]
     service._platform_client = lambda platform, platform_mode: CorpusPlatformClient(discovered)
-    if gemini_video_client is not None:
-        service._gemini_video_client = gemini_video_client
 
     state = service.start_run(RunStartRequest(platform_mode="mock", source=str(source_path)))
 

+ 0 - 122
tests/test_decode_events.py

@@ -1,122 +0,0 @@
-from content_agent.business_modules.content_discovery import pattern_recall
-from content_agent.business_modules.content_discovery.pattern_recall.decode import decode_content
-from content_agent.integrations.runtime_files import LocalRuntimeFileStore
-from tests.p4_helpers import FakeCategoryMatchClient, FakeDecodeClient, fake_decode_pending, fake_decode_success
-
-
-def _decode(client, *, event_sink=None, max_wait_seconds=10):
-    return decode_content(
-        content={"platform_content_id": "aweme_001"},
-        media={"play_url": None},
-        source_context={},
-        decode_client=client,
-        max_wait_seconds=max_wait_seconds,
-        poll_interval_seconds=0,
-        event_sink=event_sink,
-    )
-
-
-def test_decode_event_sink_records_submitted_polling_success():
-    events = []
-    client = FakeDecodeClient(fake_decode_pending(), result_sequence=[fake_decode_success()])
-
-    result = _decode(client, event_sink=events.append)
-
-    assert result["decode_status"] == "success"
-    assert [event["event_type"] for event in events] == [
-        "decode_submitted",
-        "decode_polling",
-        "decode_succeeded",
-    ]
-    assert [event["attempt"] for event in events] == [1, 2, 2]
-    assert all(event["platform_content_id"] == "aweme_001" for event in events)
-    assert all(isinstance(event["elapsed_ms"], int) for event in events)
-
-
-def test_decode_event_sink_records_timeout():
-    events = []
-    client = FakeDecodeClient(fake_decode_pending())
-
-    _decode(client, event_sink=events.append, max_wait_seconds=0)
-
-    assert [event["event_type"] for event in events] == ["decode_submitted", "decode_timeout"]
-    assert events[-1]["failure_reason"] == "decode_timeout_20m"
-    assert events[-1]["attempt"] == 1
-
-
-def test_decode_timeout_still_returns_pending():
-    events = []
-    client = FakeDecodeClient(fake_decode_pending())
-
-    result = _decode(client, event_sink=events.append, max_wait_seconds=0)
-
-    assert result["decode_status"] == "running"
-    assert result["pending_reason"] == "decode_timeout_20m"
-    assert result["decode_poll_attempts"] == 1
-
-
-def test_decode_client_error_records_failed_event():
-    events = []
-
-    def _boom():
-        raise RuntimeError("decode backend down")
-
-    client = FakeDecodeClient(fake_decode_pending(), result_sequence=[_boom])
-
-    result = _decode(client, event_sink=events.append)
-
-    assert result["decode_status"] == "failed"
-    assert events[-1]["event_type"] == "decode_failed"
-    assert events[-1]["failure_reason"] == "decode_client_error"
-
-
-def test_decode_event_sink_exception_is_swallowed():
-    def _broken_sink(event):
-        raise RuntimeError("sink exploded")
-
-    client = FakeDecodeClient(fake_decode_success())
-
-    result = _decode(client, event_sink=_broken_sink)
-
-    assert result["decode_status"] == "success"
-    assert result["failure_reason"] is None
-
-
-def test_minimal_rerun_pass_updates_pending_decode_once(tmp_path):
-    events = []
-    rerun_calls = []
-
-    def _rerun_success():
-        rerun_calls.append(1)
-        return fake_decode_success()
-
-    client = FakeDecodeClient(fake_decode_pending(), result_sequence=[_rerun_success])
-    runtime = LocalRuntimeFileStore(tmp_path / "runtime")
-    runtime.prepare_run("run_001")
-    item = {"platform_content_id": "aweme_001", "content_discovery_id": "cd_001", "platform": "douyin"}
-
-    result = pattern_recall.run(
-        "run_001",
-        "policy_run_001",
-        [item],
-        [],
-        [{}],
-        {},
-        {},
-        runtime,
-        client,
-        FakeCategoryMatchClient(),
-        max_wait_seconds=0,
-        poll_interval_seconds=0,
-        event_sink=events.append,
-    )
-
-    # 主循环超时落 pending,末尾补跑只调一次 get_decode_result 并更新 evidence。
-    assert len(rerun_calls) == 1
-    evidence = result["pattern_recall_evidence"][0]
-    assert evidence["decode_status"] == "success"
-    event_types = [event["event_type"] for event in events]
-    assert event_types == ["decode_submitted", "decode_timeout", "decode_succeeded"]
-    # 补跑事件 attempt 续增,event_id 不与原 timeout 撞车。
-    assert events[1]["attempt"] == 1
-    assert events[2]["attempt"] == 2

+ 46 - 0
tests/test_dual_channel_gemini_replay.py

@@ -0,0 +1,46 @@
+"""V3-M2D: Gemini 判定结果端到端落到 pattern_match_result(经回放 harness)。
+
+验证 M2 的核心契约:recall_pattern 调 GeminiVideoClient,把 4 个判定字段写进
+discovered item 的 pattern_match_result,并镜像 fit_senior_50plus 进 content_audience_profile,
+带 M2→M3 桥接键。real_id45 经桥接键回放零回归(决策 + validation pass)。
+"""
+
+from __future__ import annotations
+
+from tests.gemini_helpers import FakeGeminiVideoClient, fake_gemini_pool, fake_gemini_review
+from tests.replay_harness import replay_case
+
+
+def _items(artifacts):
+    return artifacts.files.get("discovered_content_items.jsonl") or []
+
+
+def test_replay_writes_gemini_fields_to_pattern_match_result(tmp_path):
+    artifacts = replay_case(
+        "sph_caihong",
+        runtime_root=tmp_path / "rt",
+        gemini_video_client=FakeGeminiVideoClient(default_result=fake_gemini_pool()),
+    )
+    assert artifacts.state["status"] == "success"
+    items = _items(artifacts)
+    assert items
+    for item in items:
+        pmr = item["pattern_match_result"]
+        assert pmr["fit_senior_50plus"] is True
+        assert pmr["relevance_score"] == 0.85
+        assert set(pmr) >= {"fit_senior_50plus", "fit_confidence", "relevance_score", "reason"}
+        # M2→M3 桥接键
+        assert pmr["pattern_recall"] == "matched"
+        assert pmr["category_or_element_binding"] == "matched"
+        # 画像列镜像
+        assert item["content_audience_profile"]["fit_senior_50plus"] is True
+
+
+def test_replay_real_id45_validation_pass_with_bridge(tmp_path):
+    artifacts = replay_case(
+        "real_id45",
+        runtime_root=tmp_path / "rt",
+        gemini_video_client=FakeGeminiVideoClient(default_result=fake_gemini_review()),
+    )
+    final_output = artifacts.files.get("final_output.json") or {}
+    assert final_output.get("validation_status") == "pass"

+ 8 - 7
tests/test_gemini_helpers.py

@@ -14,7 +14,8 @@ from tests.gemini_helpers import (
 def test_fake_gemini_default_returns_pool():
     client = FakeGeminiVideoClient()
     result = client.analyze({"platform_content_id": "c1"}, {}, {})
-    assert result["outcome"] == "pool"
+    assert result["fit_senior_50plus"] is True
+    assert result["relevance_score"] == 0.85
     assert client.calls[0]["content"]["platform_content_id"] == "c1"
 
 
@@ -22,17 +23,17 @@ def test_fake_gemini_by_content_id_routing():
     client = FakeGeminiVideoClient(
         result_by_content_id={"c1": fake_gemini_review(), "c2": fake_gemini_fail()}
     )
-    assert client.analyze({"platform_content_id": "c1"}, {}, {})["outcome"] == "review"
-    assert client.analyze({"platform_content_id": "c2"}, {}, {})["outcome"] == "fail"
-    assert client.analyze({"platform_content_id": "c3"}, {}, {})["outcome"] == "pool"
+    assert client.analyze({"platform_content_id": "c1"}, {}, {})["relevance_score"] == 0.45
+    assert client.analyze({"platform_content_id": "c2"}, {}, {})["status"] == "failed"
+    assert client.analyze({"platform_content_id": "c3"}, {}, {})["relevance_score"] == 0.85
 
 
 def test_fake_gemini_records_calls_with_deepcopy():
     client = FakeGeminiVideoClient()
     first = client.analyze({"platform_content_id": "c1"}, {}, {})
-    first["outcome"] = "mutated"
+    first["fit_senior_50plus"] = "mutated"
     second = client.analyze({"platform_content_id": "c1"}, {}, {})
-    assert second["outcome"] == "pool"
+    assert second["fit_senior_50plus"] is True
     assert len(client.calls) == 2
 
 
@@ -40,4 +41,4 @@ def test_fake_gemini_conforms_to_protocol():
     client: GeminiVideoClient = FakeGeminiVideoClient()
     result = client.analyze({"platform_content_id": "c1"}, {"play_url": None}, {"name": "case"})
     assert isinstance(result, dict)
-    assert fake_gemini_pool()["outcome"] == "pool"
+    assert fake_gemini_pool()["fit_senior_50plus"] is True

+ 93 - 0
tests/test_gemini_video.py

@@ -0,0 +1,93 @@
+"""V3-M2B: GeminiVideoClient.analyze (mocked fetch + httpx)."""
+
+from __future__ import annotations
+
+import httpx
+
+from content_agent.integrations.gemini_video import (
+    GeminiVideoClient,
+    MissingGeminiVideoClient,
+)
+
+
+class FakeResponse:
+    def __init__(self, content):
+        self._content = content
+
+    def raise_for_status(self):
+        return None
+
+    def json(self):
+        return {"choices": [{"message": {"content": self._content}}]}
+
+
+def _client(content=None, *, post=None, fetch=None):
+    return GeminiVideoClient(
+        api_key="k",
+        fetch_fn=fetch or (lambda play_url, platform: "data:video/mp4;base64,AAAA"),
+        http_post=post or (lambda *a, **k: FakeResponse(content)),
+    )
+
+
+_ITEM = {"platform": "douyin", "platform_content_id": "c1"}
+_MEDIA = {"play_url": "http://v/x"}
+_CTX = {"ext_data": {"evidence_pack": {"seed_terms": ["中医养生"]}}}
+
+
+def test_analyze_returns_four_fields():
+    body = '{"fit_senior_50plus": true, "fit_confidence": 0.85, "relevance_score": 0.7, "reason": "贴切"}'
+    result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
+    assert result == {
+        "fit_senior_50plus": True,
+        "fit_confidence": 0.85,
+        "relevance_score": 0.7,
+        "reason": "贴切",
+    }
+
+
+def test_analyze_parses_json_in_markdown_fence():
+    body = '```json\n{"fit_senior_50plus": false, "fit_confidence": 0.4, "relevance_score": 0.2, "reason": "x"}\n```'
+    result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
+    assert result["fit_senior_50plus"] is False
+    assert result["fit_confidence"] == 0.4
+
+
+def test_analyze_clamps_out_of_range_scores():
+    body = '{"fit_senior_50plus": true, "fit_confidence": 1.7, "relevance_score": -3, "reason": "x"}'
+    result = _client(body).analyze(_ITEM, _MEDIA, _CTX)
+    assert result["fit_confidence"] == 1.0
+    assert result["relevance_score"] == 0.0
+
+
+def test_analyze_no_play_url_returns_fail():
+    result = _client("{}").analyze(_ITEM, {}, _CTX)
+    assert result["status"] == "failed"
+    assert result["reason"] == "no_play_url"
+
+
+def test_analyze_video_fetch_failure_returns_fail():
+    def boom(play_url, platform):
+        raise RuntimeError("dl")
+    result = _client("{}", fetch=boom).analyze(_ITEM, _MEDIA, _CTX)
+    assert result["status"] == "failed"
+    assert "video_fetch_failed" in result["reason"]
+
+
+def test_analyze_http_error_returns_fail():
+    def post(*a, **k):
+        raise httpx.ConnectError("boom")
+    result = _client(post=post).analyze(_ITEM, _MEDIA, _CTX)
+    assert result["status"] == "failed"
+    assert "gemini_http_error" in result["reason"]
+
+
+def test_analyze_bad_json_returns_fail():
+    result = _client("not-json").analyze(_ITEM, _MEDIA, _CTX)
+    assert result["status"] == "failed"
+    assert "gemini_response_invalid" in result["reason"]
+
+
+def test_from_env_missing_key_returns_missing_client():
+    client = GeminiVideoClient.from_env({})
+    assert isinstance(client, MissingGeminiVideoClient)
+    assert client.analyze(_ITEM, _MEDIA, _CTX)["status"] == "failed"

+ 0 - 78
tests/test_p4_graph_integration.py

@@ -1,78 +0,0 @@
-from content_agent.run_service import RunService
-from content_agent.schemas import RunStartRequest
-from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
-from tests.p4_helpers import (
-    FailingDecodeClient,
-    FakeCategoryMatchClient,
-    FakeDecodeClient,
-    fake_decode_pending,
-)
-
-
-def test_p4_mock_run_writes_pattern_recall_evidence_and_matched_items(tmp_path):
-    service = RunService(
-        runtime_root=tmp_path / "runtime" / "v1",
-        query_variant_client=FakeQueryVariantClient(),
-        decode_client=FakeDecodeClient(),
-        category_match_client=FakeCategoryMatchClient(),
-    )
-
-    state = service.start_run(
-        RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
-    )
-
-    assert state["status"] == "success"
-    assert state["current_step"] == "review_strategy"
-    evidence_rows = service.read_jsonl(state["run_id"], "pattern_recall_evidence.jsonl")
-    items = service.read_jsonl(state["run_id"], "discovered_content_items.jsonl")
-    assert evidence_rows
-    assert all(row["recall_status"] == "matched" for row in evidence_rows)
-    assert all(item["pattern_match_result"]["pattern_recall"] == "matched" for item in items)
-    assert service.validate_run(state["run_id"])["status"] == "pass"
-
-
-def test_p4_pending_content_does_not_fail_run(tmp_path):
-    service = RunService(
-        runtime_root=tmp_path / "runtime" / "v1",
-        query_variant_client=FakeQueryVariantClient(),
-        decode_client=FakeDecodeClient(fake_decode_pending()),
-        category_match_client=FakeCategoryMatchClient(),
-        pattern_recall_max_wait_seconds=0,
-        pattern_recall_poll_interval_seconds=0,
-    )
-
-    state = service.start_run(
-        RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
-    )
-
-    assert state["status"] == "success"
-    evidence_rows = service.read_jsonl(state["run_id"], "pattern_recall_evidence.jsonl")
-    decisions = service.read_jsonl(state["run_id"], "rule_decisions.jsonl")
-    assert all(row["recall_status"] == "pending" for row in evidence_rows)
-    assert all(
-        decision["decision_reason_code"] == "content_pattern_recall_required"
-        for decision in decisions
-    )
-    assert service.validate_run(state["run_id"])["status"] == "pass"
-
-
-def test_p4_decode_client_failure_is_content_failed_not_run_failed(tmp_path):
-    service = RunService(
-        runtime_root=tmp_path / "runtime" / "v1",
-        query_variant_client=FakeQueryVariantClient(),
-        decode_client=FailingDecodeClient(),
-        category_match_client=FakeCategoryMatchClient(),
-    )
-
-    state = service.start_run(
-        RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
-    )
-
-    assert state["status"] == "success"
-    evidence_rows = service.read_jsonl(state["run_id"], "pattern_recall_evidence.jsonl")
-    assert all(row["recall_status"] == "failed" for row in evidence_rows)
-    assert all(
-        row["evidence_summary"]["failure_reason"] == "decode_client_error"
-        for row in evidence_rows
-    )
-    assert service.validate_run(state["run_id"])["status"] == "pass"

+ 0 - 213
tests/test_pattern_recall_category_match.py

@@ -1,213 +0,0 @@
-from content_agent.integrations.category_match import CategoryMatchClient
-from content_agent.business_modules.content_discovery.pattern_recall.category_match import (
-    _extract_path_matches,
-    match_decode_terms,
-)
-from tests.p4_helpers import FakeCategoryMatchClient, fake_match_paths_no_hit
-
-
-def test_category_match_client_defaults_to_v2_endpoint():
-    client = CategoryMatchClient(base_url="https://library.aiddit.com")
-
-    assert client.match_path == "api/search/categories/match-paths/v2"
-
-
-def test_category_match_uses_only_strong_terms_and_default_contract():
-    client = FakeCategoryMatchClient()
-
-    result = match_decode_terms(
-        decode_elements={
-            "strong_terms": ["爱国情感", "案例"],
-            "auxiliary_terms": ["标题看起来很像"],
-        },
-        category_match_client=client,
-    )
-
-    assert client.calls == [[{"term": "爱国情感", "description": "解构强证据词"}]]
-    assert result["request"]["source_type"] == "实质"
-    assert result["request"]["top_k"] == 10
-    assert result["request"]["min_score"] == 0.6
-    assert result["matched_terms"] == ["爱国情感"]
-
-
-def test_category_match_no_strong_terms_does_not_call_client():
-    client = FakeCategoryMatchClient()
-
-    result = match_decode_terms(
-        decode_elements={
-            "strong_terms": ["案例", "标题"],
-            "auxiliary_terms": ["爱国情感"],
-        },
-        category_match_client=client,
-    )
-
-    assert client.calls == []
-    assert result["matched_terms"] == []
-    assert result["matched_category_paths"] == []
-
-
-def test_category_match_no_hit_keeps_no_match_inputs():
-    client = FakeCategoryMatchClient(fake_match_paths_no_hit())
-
-    result = match_decode_terms(
-        decode_elements={"strong_terms": ["人物故事"]},
-        category_match_client=client,
-    )
-
-    assert client.calls
-    assert result["matched_terms"] == []
-    assert result["matched_category_paths"] == []
-
-
-def test_category_match_parses_real_match_paths_string_shape():
-    client = FakeCategoryMatchClient(
-        {
-            "request": {
-                "source_type": "实质",
-                "top_k": 10,
-                "min_score": 0.6,
-                "items": [{"term": "爱国情感", "description": "解构强证据词"}],
-            },
-            "response": {
-                "success": True,
-                "items": [
-                    {
-                        "term": "爱国情感",
-                        "matches": [{"score": 0.91}],
-                        "matched_paths": ["/理念/情感/家国情怀/爱国情感"],
-                    }
-                ],
-            },
-            "raw_response": {
-                "success": True,
-                "items": [
-                    {
-                        "term": "爱国情感",
-                        "matches": [{"score": 0.91}],
-                        "matched_paths": ["/理念/情感/家国情怀/爱国情感"],
-                    }
-                ],
-            },
-        }
-    )
-
-    result = match_decode_terms(
-        decode_elements={"strong_terms": ["爱国情感"]},
-        category_match_client=client,
-    )
-
-    assert result["matched_terms"] == ["爱国情感"]
-    assert result["matched_category_paths"] == ["/理念/情感/家国情怀/爱国情感"]
-
-
-def test_category_match_parses_v2_matches_path():
-    matches = _extract_path_matches(
-        {"items": [{"term": "露营", "matches": [{"path": ["户外", "露营"], "score": 0.92}]}]}
-    )
-
-    assert matches == [
-        {"term": "露营", "category_path": "/户外/露营", "score": 0.92,
-         "raw": {"path": ["户外", "露营"], "score": 0.92}}
-    ]
-
-
-def test_category_match_parses_v2_matches_category_path():
-    matches = _extract_path_matches(
-        {"items": [{"term": "露营", "matches": [{"category_path": ["生活", "户外"], "score": 0.81}]}]}
-    )
-
-    assert matches[0]["category_path"] == "/生活/户外"
-    assert matches[0]["score"] == 0.81
-
-
-def test_category_match_parses_v2_matched_paths():
-    matches = _extract_path_matches(
-        {"items": [{"term": "露营", "matched_paths": [["旅行", "露营"]]}]}
-    )
-
-    assert matches == [
-        {"term": "露营", "category_path": "/旅行/露营", "score": None, "raw": ["旅行", "露营"]}
-    ]
-
-
-def test_category_match_inherits_item_term_for_match_path():
-    matches = _extract_path_matches(
-        {
-            "items": [
-                {
-                    "term": "露营",
-                    "matches": [
-                        {"path": ["户外", "露营"], "score": 0.9},
-                        {"term": "帐篷", "path": ["户外", "帐篷"], "score": 0.8},
-                    ],
-                }
-            ]
-        }
-    )
-
-    assert [m["term"] for m in matches] == ["露营", "帐篷"]
-
-
-def test_category_match_dedupes_duplicate_paths():
-    matches = _extract_path_matches(
-        {
-            "items": [
-                {
-                    "term": "露营",
-                    "matches": [
-                        {"path": ["户外", "露营"], "score": 0.92},
-                        {"path": ["户外", "露营"], "score": 0.85},
-                    ],
-                }
-            ]
-        }
-    )
-
-    assert len(matches) == 1
-    assert matches[0]["score"] == 0.92
-
-
-def test_old_data_shape_remains_compatible():
-    matches = _extract_path_matches(
-        {
-            "data": [
-                {
-                    "term": "爱国情感",
-                    "paths": [{"category_path": "/理念/情感/家国情怀/爱国情感", "score": 0.91}],
-                }
-            ]
-        }
-    )
-
-    assert matches == [
-        {
-            "term": "爱国情感",
-            "category_path": "/理念/情感/家国情怀/爱国情感",
-            "score": 0.91,
-            "raw": {"category_path": "/理念/情感/家国情怀/爱国情感", "score": 0.91},
-        }
-    ]
-
-
-def test_category_match_reads_matches_and_matched_paths_in_same_item():
-    # brief 数据合同示例: 同一 item 同时携带 matches 与 matched_paths,三条全部保留。
-    matches = _extract_path_matches(
-        {
-            "items": [
-                {
-                    "term": "露营",
-                    "matches": [
-                        {"path": ["户外", "露营"], "score": 0.92},
-                        {"category_path": ["生活", "户外"], "score": 0.81},
-                    ],
-                    "matched_paths": [["旅行", "露营"]],
-                }
-            ]
-        }
-    )
-
-    assert [(m["category_path"], m["score"]) for m in matches] == [
-        ("/旅行/露营", None),
-        ("/户外/露营", 0.92),
-        ("/生活/户外", 0.81),
-    ]

+ 0 - 224
tests/test_pattern_recall_decision.py

@@ -1,224 +0,0 @@
-import copy
-
-from content_agent.business_modules.content_discovery.content_discovery_builder import run as build_content
-from content_agent.business_modules.content_discovery.pattern_recall.recall_decision import run
-from content_agent.integrations.runtime_files import LocalRuntimeFileStore
-from tests.p1_helpers import real_source_payload
-from tests.p4_helpers import (
-    FailingCategoryMatchClient,
-    FailingDecodeClient,
-    FakeCategoryMatchClient,
-    FakeDecodeClient,
-    fake_decode_bad_shape,
-    fake_decode_pending,
-    fake_decode_success,
-    fake_match_paths_hit,
-    fake_match_paths_no_hit,
-)
-
-
-def _build_state(tmp_path):
-    runtime = LocalRuntimeFileStore(tmp_path / "runtime")
-    runtime.prepare_run("run_001")
-    source_context = real_source_payload()
-    platform_results = [
-        {
-            "content_discovery_id": "content_001",
-            "search_query_id": "q_001",
-            "platform": "douyin",
-            "platform_content_id": "7390000000000000000",
-            "platform_content_format": "video",
-            "description": "爱国情感类人物故事观察",
-            "platform_author_id": "author_001",
-            "author_display_name": "作者",
-            "statistics": {"digg_count": 1},
-            "tags": ["#人物故事"],
-            "score": 72,
-            "portrait_available": True,
-            "age_50_plus_level": "medium",
-            "risk_level": "low",
-            "discovery_relation": "derived_from_pattern_demand",
-            "discovery_start_source": "pattern_itemset",
-            "previous_discovery_step": "search_query_direct",
-            "play_url": "https://video.example/a.mp4",
-        }
-    ]
-    built = build_content("run_001", "policy_001", platform_results, source_context, runtime)
-    pattern_seed_pack = {
-        "seed_terms": ["爱国情感", "人物故事"],
-        "category_bindings": [
-            {"category_path": "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感"}
-        ],
-    }
-    return runtime, source_context, pattern_seed_pack, built
-
-
-def test_recall_decision_matched_writes_evidence_and_updates_bundle(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_success()),
-        FakeCategoryMatchClient(),
-        poll_interval_seconds=0,
-    )
-
-    evidence = result["pattern_recall_evidence"][0]
-    bundle = result["evidence_bundles"][0]
-    item = result["discovered_content_items"][0]
-    assert evidence["recall_status"] == "matched"
-    assert evidence["matched_terms"]
-    assert evidence["matched_category_paths"]
-    assert bundle["pattern_match_result"]["pattern_recall"] == "matched"
-    assert bundle["pattern_match_result"]["score"] == 72
-    assert item["pattern_match_result"]["pattern_recall_evidence_id"] == "recall_001"
-
-
-def test_recall_decision_pending_does_not_fail_or_match(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_pending()),
-        FakeCategoryMatchClient(),
-        max_wait_seconds=0,
-        poll_interval_seconds=0,
-    )
-
-    assert result["pattern_recall_evidence"][0]["recall_status"] == "pending"
-    assert result["evidence_bundles"][0]["pattern_match_result"]["pattern_recall"] == (
-        "pattern_recall_pending"
-    )
-
-
-def test_recall_decision_bad_decode_is_rejected(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_bad_shape()),
-        FakeCategoryMatchClient(),
-        poll_interval_seconds=0,
-    )
-
-    assert result["pattern_recall_evidence"][0]["recall_status"] == "rejected"
-
-
-def test_recall_decision_decode_client_error_is_content_failed(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FailingDecodeClient(),
-        FakeCategoryMatchClient(),
-        poll_interval_seconds=0,
-    )
-
-    evidence = result["pattern_recall_evidence"][0]
-    assert evidence["recall_status"] == "failed"
-    assert evidence["evidence_summary"]["failure_reason"] == "decode_client_error"
-    assert result["evidence_bundles"][0]["pattern_match_result"]["pattern_recall"] == (
-        "pattern_recall_failed"
-    )
-
-
-def test_recall_decision_category_client_error_is_content_failed(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_success()),
-        FailingCategoryMatchClient(),
-        poll_interval_seconds=0,
-    )
-
-    evidence = result["pattern_recall_evidence"][0]
-    assert evidence["recall_status"] == "failed"
-    assert evidence["evidence_summary"]["failure_reason"] == "category_match_client_error"
-
-
-def test_recall_decision_no_match_when_category_has_no_hit(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_success(terms=["不相关主题"])),
-        FakeCategoryMatchClient(fake_match_paths_no_hit()),
-        poll_interval_seconds=0,
-    )
-
-    assert result["pattern_recall_evidence"][0]["recall_status"] == "no_match"
-
-
-def test_recall_decision_preserves_upstream_source_evidence(tmp_path):
-    runtime, source_context, pattern_seed_pack, built = _build_state(tmp_path)
-    before = copy.deepcopy(built["evidence_bundles"][0]["source_evidence"])
-
-    result = run(
-        "run_001",
-        "policy_001",
-        built["discovered_content_items"],
-        built["content_media_records"],
-        built["evidence_bundles"],
-        source_context,
-        pattern_seed_pack,
-        runtime,
-        FakeDecodeClient(fake_decode_success()),
-        FakeCategoryMatchClient(
-            fake_match_paths_hit(
-                paths=[
-                    "/理念/观念/个人观念/情感认同/国家民族认同/爱国情感",
-                    "/理念/观念/个人观念/情感认同/国家民族认同/人物故事",
-                ]
-            )
-        ),
-        poll_interval_seconds=0,
-    )
-
-    after = result["evidence_bundles"][0]["source_evidence"]
-    assert after == before
-    evidence = result["pattern_recall_evidence"][0]
-    assert len(evidence["matched_category_paths"]) == 2
-    assert evidence["evidence_summary"]["primary_matched_category_path"]

+ 0 - 248
tests/test_pattern_recall_decode.py

@@ -1,248 +0,0 @@
-from content_agent.business_modules.content_discovery.pattern_recall.decode import (
-    decode_content,
-    extract_decode_elements,
-    normalize_decode_status,
-)
-from content_agent.integrations.decode_api import AigcDecodeClient, redact_sensitive_payload
-from tests.p4_helpers import (
-    FakeDecodeClient,
-    fake_decode_bad_shape,
-    fake_decode_pending,
-    fake_decode_success,
-)
-
-
-def test_decode_status_normalizes_external_uppercase_values():
-    assert normalize_decode_status({"decode_status": "SUCCESS"}) == "success"
-    assert normalize_decode_status({"decode_status": "RUNNING"}) == "running"
-    assert normalize_decode_status({"decode_status": "PENDING"}) == "pending"
-    assert normalize_decode_status({"decode_status": "FAILED"}) == "failed"
-
-
-def test_decode_extracts_only_strong_terms_from_decode_content():
-    elements = extract_decode_elements(
-        {
-            "目的点": [{"点": "爱国情感", "实质": [{"名称": "人物故事"}]}],
-            "关键点": [{"点": "标题提示", "类型": "形式"}, {"点": "国家认同", "类型": "实质"}],
-            "分词结果": [{"词": "标签辅助"}],
-        }
-    )
-
-    assert elements["strong_terms"] == ["爱国情感", "人物故事", "国家认同"]
-    assert "标签辅助" in elements["auxiliary_terms"]
-
-
-def test_decode_timeout_records_pending_without_waiting():
-    result = decode_content(
-        content={"platform_content_id": "739"},
-        media={"play_url": None},
-        source_context={},
-        decode_client=FakeDecodeClient(fake_decode_pending()),
-        max_wait_seconds=0,
-        poll_interval_seconds=0,
-    )
-
-    assert result["decode_status"] == "running"
-    assert result["pending_reason"] == "decode_timeout_20m"
-
-
-def test_decode_bad_shape_becomes_failed():
-    result = decode_content(
-        content={"platform_content_id": "739"},
-        media={"play_url": None},
-        source_context={},
-        decode_client=FakeDecodeClient(fake_decode_bad_shape()),
-        max_wait_seconds=1200,
-        poll_interval_seconds=0,
-    )
-
-    assert result["decode_status"] == "failed"
-    assert result["failure_reason"] == "decode_result_bad_shape"
-
-
-def test_decode_result_client_error_becomes_failed():
-    result = decode_content(
-        content={"platform_content_id": "739"},
-        media={"play_url": None},
-        source_context={},
-        decode_client=FakeDecodeClient(
-            fake_decode_pending("decode_task_001"),
-            result_sequence=[_raise_runtime_error],
-        ),
-        max_wait_seconds=1,
-        poll_interval_seconds=0,
-    )
-
-    assert result["decode_status"] == "failed"
-    assert result["decode_task_id"] == "decode_task_001"
-    assert result["failure_reason"] == "decode_client_error"
-    assert result["raw_response"]["error_type"] == "RuntimeError"
-
-
-def test_decode_result_reads_aigc_result_data_rows():
-    result = decode_content(
-        content={"platform_content_id": "739"},
-        media={"play_url": None},
-        source_context={},
-        decode_client=FakeDecodeClient(
-            fake_decode_pending("739"),
-            result_sequence=[
-                {
-                    "request": {"params": {"configId": 58, "channelContentIds": ["739"]}},
-                    "response": {
-                        "code": 0,
-                        "msg": "success",
-                        "data": [
-                            {
-                                "channelContentId": "739",
-                                "status": "SUCCESS",
-                                "errorMessage": None,
-                                "dataContent": (
-                                    '{"目的点":[{"点":"爱国情感","实质":[{"名称":"人物故事"}]}]}'
-                                ),
-                            }
-                        ],
-                    },
-                    "raw_response": {
-                        "code": 0,
-                        "msg": "success",
-                        "data": [
-                            {
-                                "channelContentId": "739",
-                                "status": "SUCCESS",
-                                "errorMessage": None,
-                                "dataContent": (
-                                    '{"目的点":[{"点":"爱国情感","实质":[{"名称":"人物故事"}]}]}'
-                                ),
-                            }
-                        ],
-                    },
-                }
-            ],
-        ),
-        max_wait_seconds=1,
-        poll_interval_seconds=0,
-    )
-
-    assert result["decode_status"] == "success"
-    assert result["decode_elements"]["strong_terms"] == ["爱国情感", "人物故事"]
-
-
-def test_decode_submit_payload_uses_config_id_58():
-    client = FakeDecodeClient(fake_decode_success())
-
-    decode_content(
-        content={"platform_content_id": "739", "description": "desc", "tags": []},
-        media={"play_url": "https://video.example/a.mp4"},
-        source_context={"ext_data": {"evidence_pack": {"source_post_id": "519"}}},
-        decode_client=client,
-        max_wait_seconds=1200,
-        poll_interval_seconds=0,
-    )
-
-    assert client.submit_calls[0]["content"]["platform_content_id"] == "739"
-
-
-def test_redact_sensitive_payload_removes_sensitive_key_names():
-    auth_key = "Authorizatio" + "n"
-    cookie_key = "Cook" + "ie"
-    redacted = redact_sensitive_payload(
-        {
-            "token": "abc",
-            "headers": {auth_key: "Bearer abc", cookie_key: "session=1"},
-            "safe": "ok",
-        }
-    )
-
-    assert "token" not in redacted
-    assert auth_key not in redacted["headers"]
-    assert cookie_key not in redacted["headers"]
-    assert redacted["token_redacted"] == "<redacted>"
-    assert redacted["headers"][f"{auth_key}_redacted"] == "<redacted>"
-
-
-def test_aigc_decode_client_builds_config_id_request():
-    client = AigcDecodeClient(
-        base_url="https://aigc-api.aiddit.com",
-        token="dummy",
-        http_client=_FakeHttpClient({"status": "SUCCESS", "taskId": "task_001"}),
-    )
-
-    result = client.submit_decode(
-        {"platform_content_id": "739", "description": "desc"},
-        {"play_url": None},
-        {"merge_leve2": "测试"},
-    )
-
-    assert result["request"]["params"]["configId"] == 58
-    assert result["decode_task_id"] == "task_001"
-
-
-def test_aigc_decode_client_uses_channel_content_ids_for_result_query():
-    http_client = _FakeHttpClient(
-        {"code": 0, "data": [{"status": "PENDING", "channelContentId": "739"}]}
-    )
-    client = AigcDecodeClient(
-        base_url="https://aigc-api.aiddit.com",
-        token="dummy",
-        http_client=http_client,
-    )
-
-    submit_result = client.submit_decode(
-        {"platform_content_id": "739", "description": "desc"},
-        {"play_url": None},
-        {"merge_leve2": "测试"},
-    )
-    result = client.get_decode_result("739")
-
-    assert submit_result["decode_task_id"] == "739"
-    assert result["request"] == {"params": {"configId": 58, "channelContentIds": ["739"]}}
-    assert http_client.requests[-1]["json"] == {
-        "params": {"configId": 58, "channelContentIds": ["739"]}
-    }
-
-
-class _FakeResponse:
-    def __init__(self, data):
-        self.data = data
-
-    def raise_for_status(self):
-        return None
-
-    def json(self):
-        return self.data
-
-
-class _FakeHttpClient:
-    def __init__(self, data):
-        self.data = data
-        self.requests = []
-
-    def post(self, *args, **kwargs):
-        self.requests.append({"args": args, **kwargs})
-        return _FakeResponse(self.data)
-
-
-def _raise_runtime_error():
-    raise RuntimeError("decode result unavailable")
-
-
-def test_decode_content_without_event_sink_keeps_previous_result():
-    kwargs = dict(
-        content={"platform_content_id": "739"},
-        media={"play_url": None},
-        source_context={},
-        max_wait_seconds=1200,
-        poll_interval_seconds=0,
-    )
-
-    without_sink = decode_content(decode_client=FakeDecodeClient(fake_decode_success()), **kwargs)
-    events = []
-    with_sink = decode_content(
-        decode_client=FakeDecodeClient(fake_decode_success()),
-        event_sink=events.append,
-        **kwargs,
-    )
-
-    assert without_sink == with_sink
-    assert events  # sink 只新增观测,不改变业务返回。

+ 69 - 0
tests/test_video_fetch.py

@@ -0,0 +1,69 @@
+"""V3-M2A: video_fetch download + compress + data URL (all mocked)."""
+
+from __future__ import annotations
+
+import base64
+
+import httpx
+import pytest
+
+from content_agent.integrations import video_fetch
+from content_agent.integrations.video_fetch import VideoFetchError, fetch_and_compress
+
+
+class FakeHttpClient:
+    def __init__(self, *, content=b"rawvideo", status=200, error=None):
+        self.content = content
+        self.status = status
+        self.error = error
+        self.calls = []
+
+    def get(self, url, *, headers, follow_redirects, timeout):
+        self.calls.append({"url": url, "headers": headers})
+        if self.error:
+            raise self.error
+        return httpx.Response(
+            self.status, content=self.content, request=httpx.Request("GET", url)
+        )
+
+
+def _patch_compress(monkeypatch, out=b"\x00\x00\x00\x18ftypmp42compressed"):
+    monkeypatch.setattr(video_fetch, "_compress", lambda raw, exe: out)
+
+
+def test_fetch_builds_data_url(monkeypatch):
+    _patch_compress(monkeypatch)
+    out = fetch_and_compress("http://v/x", "douyin", http_client=FakeHttpClient(), ffmpeg_exe="ffmpeg")
+    assert out.startswith("data:video/mp4;base64,")
+    decoded = base64.b64decode(out.split(",", 1)[1])
+    assert decoded.startswith(b"\x00\x00\x00\x18ftyp")
+
+
+def test_fetch_uses_platform_headers(monkeypatch):
+    _patch_compress(monkeypatch)
+    for platform, ua_token, referer in [
+        ("douyin", "iPhone", "https://www.douyin.com/"),
+        ("shipinhao", "Windows", "https://channels.weixin.qq.com/"),
+    ]:
+        client = FakeHttpClient()
+        fetch_and_compress("http://v/x", platform, http_client=client, ffmpeg_exe="ffmpeg")
+        headers = client.calls[0]["headers"]
+        assert ua_token in headers["User-Agent"]
+        assert headers["Referer"] == referer
+
+
+def test_fetch_missing_play_url_raises():
+    with pytest.raises(VideoFetchError, match="missing play_url"):
+        fetch_and_compress("", "douyin", http_client=FakeHttpClient())
+
+
+def test_fetch_download_failure_raises():
+    client = FakeHttpClient(error=httpx.ConnectError("boom"))
+    with pytest.raises(VideoFetchError, match="download failed"):
+        fetch_and_compress("http://v/x", "douyin", http_client=client, ffmpeg_exe="ffmpeg")
+
+
+def test_fetch_oversize_raises(monkeypatch):
+    _patch_compress(monkeypatch, out=b"x" * (video_fetch.MAX_INLINE_BYTES + 1))
+    with pytest.raises(VideoFetchError, match="oversize"):
+        fetch_and_compress("http://v/x", "douyin", http_client=FakeHttpClient(), ffmpeg_exe="ffmpeg")

+ 16 - 0
uv.lock

@@ -159,6 +159,7 @@ source = { virtual = "." }
 dependencies = [
     { name = "fastapi" },
     { name = "httpx" },
+    { name = "imageio-ffmpeg" },
     { name = "langgraph" },
     { name = "pydantic" },
     { name = "pymysql" },
@@ -175,6 +176,7 @@ excel = [
 requires-dist = [
     { name = "fastapi", specifier = ">=0.115.0" },
     { name = "httpx", specifier = ">=0.27.0" },
+    { name = "imageio-ffmpeg", specifier = ">=0.4.9" },
     { name = "langgraph", specifier = ">=1.0.0" },
     { name = "openpyxl", marker = "extra == 'excel'", specifier = ">=3.1" },
     { name = "pydantic", specifier = ">=2.8.0" },
@@ -255,6 +257,20 @@ wheels = [
     { url = "https://files.pythonhosted.org/packages/1e/5e/d4e9f1a599fb8e573b7b87160658329fbf28d19eac2718f51fc3def3aa5a/idna-3.18-py3-none-any.whl", hash = "sha256:7f952cbe720b688055e3f87de14f5c3e5fdaa8bc3928985c4077ca689de849a2", size = 65455, upload-time = "2026-06-02T14:34:06.319Z" },
 ]
 
+[[package]]
+name = "imageio-ffmpeg"
+version = "0.6.0"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/44/bd/c3343c721f2a1b0c9fc71c1aebf1966a3b7f08c2eea8ed5437a2865611d6/imageio_ffmpeg-0.6.0.tar.gz", hash = "sha256:e2556bed8e005564a9f925bb7afa4002d82770d6b08825078b7697ab88ba1755", size = 25210, upload-time = "2025-01-16T21:34:32.747Z" }
+wheels = [
+    { url = "https://files.pythonhosted.org/packages/da/58/87ef68ac83f4c7690961bce288fd8e382bc5f1513860fc7f90a9c1c1c6bf/imageio_ffmpeg-0.6.0-py3-none-macosx_10_9_intel.macosx_10_9_x86_64.whl", hash = "sha256:9d2baaf867088508d4a3458e61eeb30e945c4ad8016025545f66c4b5aaef0a61", size = 24932969, upload-time = "2025-01-16T21:34:20.464Z" },
+    { url = "https://files.pythonhosted.org/packages/40/5c/f3d8a657d362cc93b81aab8feda487317da5b5d31c0e1fdfd5e986e55d17/imageio_ffmpeg-0.6.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:b1ae3173414b5fc5f538a726c4e48ea97edc0d2cdc11f103afee655c463fa742", size = 21113891, upload-time = "2025-01-16T21:34:00.277Z" },
+    { url = "https://files.pythonhosted.org/packages/33/e7/1925bfbc563c39c1d2e82501d8372734a5c725e53ac3b31b4c2d081e895b/imageio_ffmpeg-0.6.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:1d47bebd83d2c5fc770720d211855f208af8a596c82d17730aa51e815cdee6dc", size = 25632706, upload-time = "2025-01-16T21:33:53.475Z" },
+    { url = "https://files.pythonhosted.org/packages/a0/2d/43c8522a2038e9d0e7dbdf3a61195ecc31ca576fb1527a528c877e87d973/imageio_ffmpeg-0.6.0-py3-none-manylinux2014_x86_64.whl", hash = "sha256:c7e46fcec401dd990405049d2e2f475e2b397779df2519b544b8aab515195282", size = 29498237, upload-time = "2025-01-16T21:34:13.726Z" },
+    { url = "https://files.pythonhosted.org/packages/a0/13/59da54728351883c3c1d9fca1710ab8eee82c7beba585df8f25ca925f08f/imageio_ffmpeg-0.6.0-py3-none-win32.whl", hash = "sha256:196faa79366b4a82f95c0f4053191d2013f4714a715780f0ad2a68ff37483cc2", size = 19652251, upload-time = "2025-01-16T21:34:06.812Z" },
+    { url = "https://files.pythonhosted.org/packages/2c/c6/fa760e12a2483469e2bf5058c5faff664acf66cadb4df2ad6205b016a73d/imageio_ffmpeg-0.6.0-py3-none-win_amd64.whl", hash = "sha256:02fa47c83703c37df6bfe4896aab339013f62bf02c5ebf2dce6da56af04ffc0a", size = 31246824, upload-time = "2025-01-16T21:34:28.6Z" },
+]
+
 [[package]]
 name = "iniconfig"
 version = "2.3.0"