Ver Fonte

feat: add content agent backend skeleton

Sam Lee há 1 semana atrás
pai
commit
92f06b08bc
30 ficheiros alterados com 2258 adições e 0 exclusões
  1. 2 0
      content_agent/__init__.py
  2. 86 0
      content_agent/api.py
  3. 2 0
      content_agent/business_modules/__init__.py
  4. 7 0
      content_agent/business_modules/candidate_evidence/__init__.py
  5. 156 0
      content_agent/business_modules/candidate_evidence/candidate_builder.py
  6. 27 0
      content_agent/business_modules/candidate_evidence/source_evidence.py
  7. 78 0
      content_agent/business_modules/learning_review.py
  8. 25 0
      content_agent/business_modules/platform_access.py
  9. 10 0
      content_agent/business_modules/policy_version.py
  10. 100 0
      content_agent/business_modules/result_source_lookup.py
  11. 20 0
      content_agent/business_modules/rule_judgment/__init__.py
  12. 190 0
      content_agent/business_modules/rule_judgment/evaluator.py
  13. 6 0
      content_agent/business_modules/run_record/__init__.py
  14. 110 0
      content_agent/business_modules/run_record/recorder.py
  15. 326 0
      content_agent/business_modules/run_record/validation.py
  16. 35 0
      content_agent/business_modules/search_intent.py
  17. 20 0
      content_agent/business_modules/source_seed/__init__.py
  18. 116 0
      content_agent/business_modules/source_seed/source_context.py
  19. 84 0
      content_agent/business_modules/walk_strategy.py
  20. 26 0
      content_agent/cli.py
  21. 127 0
      content_agent/graph.py
  22. 2 0
      content_agent/integrations/__init__.py
  23. 320 0
      content_agent/integrations/douyin.py
  24. 72 0
      content_agent/integrations/mock_platform.py
  25. 42 0
      content_agent/integrations/policy_json.py
  26. 73 0
      content_agent/integrations/runtime_files.py
  27. 23 0
      content_agent/interfaces.py
  28. 36 0
      content_agent/models.py
  29. 86 0
      content_agent/run_service.py
  30. 51 0
      content_agent/schemas.py

+ 2 - 0
content_agent/__init__.py

@@ -0,0 +1,2 @@
+"""Content Agent V1 backend package."""
+

+ 86 - 0
content_agent/api.py

@@ -0,0 +1,86 @@
+from __future__ import annotations
+
+from fastapi import FastAPI, HTTPException
+
+from content_agent.run_service import RunService
+from content_agent.schemas import (
+    JsonFileResponse,
+    RecordsResponse,
+    RunStartRequest,
+    RunStartResponse,
+    RunSummaryResponse,
+    ValidationResponse,
+)
+
+
+app = FastAPI(title="Content Agent V1")
+service = RunService()
+
+
+@app.post("/runs", response_model=RunStartResponse)
+def start_run(request: RunStartRequest) -> RunStartResponse:
+    state = service.start_run(request)
+    if state["status"] != "success":
+        raise HTTPException(status_code=500, detail=state.get("errors", ["run failed"]))
+    trace_id = state["trace_id"]
+    return RunStartResponse(
+        trace_id=trace_id,
+        status=state["status"],
+        policy_bundle_version=state["policy_bundle_version"],
+        platform=state["platform"],
+        platform_mode=state["platform_mode"],
+        output_dir=str(service.runtime.run_dir(trace_id)),
+    )
+
+
+@app.get("/runs/{trace_id}", response_model=RunSummaryResponse)
+def get_run(trace_id: str) -> RunSummaryResponse:
+    _ensure_run_exists(trace_id)
+    return RunSummaryResponse(**service.get_summary(trace_id))
+
+
+@app.get("/runs/{trace_id}/candidates", response_model=RecordsResponse)
+def get_candidates(trace_id: str) -> RecordsResponse:
+    return _jsonl_response(trace_id, "candidate_pool.jsonl")
+
+
+@app.get("/runs/{trace_id}/rule-decisions", response_model=RecordsResponse)
+def get_rule_decisions(trace_id: str) -> RecordsResponse:
+    return _jsonl_response(trace_id, "rule_decisions.jsonl")
+
+
+@app.get("/runs/{trace_id}/source-edges", response_model=RecordsResponse)
+def get_source_edges(trace_id: str) -> RecordsResponse:
+    return _jsonl_response(trace_id, "source_edges.jsonl")
+
+
+@app.get("/runs/{trace_id}/final-output", response_model=JsonFileResponse)
+def get_final_output(trace_id: str) -> JsonFileResponse:
+    return _json_response(trace_id, "final_output.json")
+
+
+@app.get("/runs/{trace_id}/strategy-review", response_model=JsonFileResponse)
+def get_strategy_review(trace_id: str) -> JsonFileResponse:
+    _ensure_run_exists(trace_id)
+    return JsonFileResponse(trace_id=trace_id, data=service.strategy_review(trace_id))
+
+
+@app.get("/runs/{trace_id}/validation", response_model=ValidationResponse)
+def get_validation(trace_id: str) -> ValidationResponse:
+    _ensure_run_exists(trace_id)
+    return ValidationResponse(**service.validate_run(trace_id))
+
+
+def _jsonl_response(trace_id: str, filename: str) -> RecordsResponse:
+    _ensure_run_exists(trace_id)
+    return RecordsResponse(trace_id=trace_id, records=service.read_jsonl(trace_id, filename))
+
+
+def _json_response(trace_id: str, filename: str) -> JsonFileResponse:
+    _ensure_run_exists(trace_id)
+    return JsonFileResponse(trace_id=trace_id, data=service.read_json(trace_id, filename))
+
+
+def _ensure_run_exists(trace_id: str) -> None:
+    if not service.runtime.run_dir(trace_id).exists():
+        raise HTTPException(status_code=404, detail=f"run not found: {trace_id}")

+ 2 - 0
content_agent/business_modules/__init__.py

@@ -0,0 +1,2 @@
+"""Business modules for the V1 Content Agent backend."""
+

+ 7 - 0
content_agent/business_modules/candidate_evidence/__init__.py

@@ -0,0 +1,7 @@
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.business_modules.candidate_evidence.candidate_builder import run
+
+__all__ = ["run"]

+ 156 - 0
content_agent/business_modules/candidate_evidence/candidate_builder.py

@@ -0,0 +1,156 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Any
+
+from content_agent.business_modules.candidate_evidence.source_evidence import (
+    build_source_evidence,
+)
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(
+    trace_id: str,
+    platform_results: list[dict[str, Any]],
+    source_context: dict[str, Any],
+    runtime: RuntimeFileStore,
+) -> dict[str, list[dict[str, Any]]]:
+    created_at = datetime.now(timezone.utc).isoformat()
+    candidates: list[dict[str, Any]] = []
+    media_assets: list[dict[str, Any]] = []
+    evidence_bundles: list[dict[str, Any]] = []
+
+    for result in platform_results:
+        candidate = _build_candidate(trace_id, result, created_at)
+        media_asset = _build_media_asset(trace_id, result, created_at)
+        evidence_bundles.append(
+            _build_evidence_bundle(trace_id, candidate, result, source_context)
+        )
+        candidates.append(candidate)
+        media_assets.append(media_asset)
+
+    runtime.append_jsonl(trace_id, "candidate_pool.jsonl", candidates)
+    runtime.append_jsonl(trace_id, "media_assets.jsonl", media_assets)
+    return {
+        "candidates": candidates,
+        "media_assets": media_assets,
+        "evidence_bundles": evidence_bundles,
+    }
+
+
+def _build_candidate(
+    trace_id: str,
+    result: dict[str, Any],
+    created_at: str,
+) -> dict[str, Any]:
+    candidate = {
+        "trace_id": trace_id,
+        "candidate_id": result["candidate_id"],
+        "query_id": result["query_id"],
+        "aweme_id": result["aweme_id"],
+        "desc": result["desc"],
+        "author_sec_uid": result["author_sec_uid"],
+        "author_nickname": result["author_nickname"],
+        "statistics": result["statistics"],
+        "cha_list": result.get("cha_list", []),
+        "origin_source": result["origin_source"],
+        "immediate_source": result["immediate_source"],
+        "created_at": created_at,
+    }
+    for field in ["text_extra", "create_time", "has_more", "next_cursor", "platform_auth_mode"]:
+        if field in result:
+            candidate[field] = result[field]
+    return candidate
+
+
+def _build_media_asset(
+    trace_id: str,
+    result: dict[str, Any],
+    created_at: str,
+) -> dict[str, Any]:
+    return {
+        "trace_id": trace_id,
+        "aweme_id": result["aweme_id"],
+        "media_status": "metadata_only",
+        "metadata_source": result.get("metadata_source", "mock_platform_search"),
+        "play_url": None,
+        "local_path": None,
+        "oss_url": None,
+        "created_at": created_at,
+    }
+
+
+def _build_evidence_bundle(
+    trace_id: str,
+    candidate: dict[str, Any],
+    result: dict[str, Any],
+    source_context: dict[str, Any],
+) -> dict[str, Any]:
+    source_evidence = build_source_evidence(trace_id, candidate, result, source_context)
+    portrait_signal = _build_portrait_signal(result)
+    return {
+        "source_evidence": source_evidence,
+        "entity": {
+            "entity_type": "Video",
+            "entity_id": candidate["aweme_id"],
+            "aweme_id": candidate["aweme_id"],
+            "candidate_id": candidate["candidate_id"],
+            "query_id": candidate["query_id"],
+            "desc": candidate["desc"],
+            "create_time": result.get("create_time"),
+            "author": {
+                "sec_uid": candidate["author_sec_uid"],
+                "nickname": candidate["author_nickname"],
+            },
+            "author_sec_uid": candidate["author_sec_uid"],
+        },
+        "relevance_signal": {
+            "pattern_recall": result.get("pattern_recall", "candidate_related"),
+            "category_or_element_binding": result.get(
+                "category_or_element_binding", "candidate_related"
+            ),
+            "level": result.get("relevance_level", "related"),
+            "score": result.get("score"),
+            "platform_fit": result.get("platform_fit"),
+            "adaptability": result.get("adaptability"),
+        },
+        "interaction_signal": {
+            "statistics": result["statistics"],
+            **result["statistics"],
+        },
+        "portrait_signal": portrait_signal,
+        "account_signal": {
+            "author_sec_uid": candidate["author_sec_uid"],
+            "author": {"sec_uid": candidate["author_sec_uid"]},
+        },
+        "risk_signal": {
+            "risk_level": result.get("risk_level", "unknown"),
+            "age_50_plus_safety": result.get("age_50_plus_safety", "safe"),
+            "availability": result.get("availability", "metadata_only"),
+        },
+        "walk_context": {
+            "origin_source": candidate["origin_source"],
+            "immediate_source": candidate["immediate_source"],
+        },
+        "trace": {
+            "trace_id": trace_id,
+            "input_snapshot_ref": (
+                f"evidence_bundle:{trace_id}:{candidate['candidate_id']}"
+            ),
+        },
+    }
+
+
+def _build_portrait_signal(result: dict[str, Any]) -> dict[str, Any]:
+    if not result.get("portrait_available", False):
+        return {}
+    portrait = {
+        "age_50_plus_level": result.get("age_50_plus_level", "missing"),
+        "portrait_available": True,
+    }
+    for field in ["age_distribution", "age_50_plus_ratio", "age_50_plus_tgi"]:
+        if field in result:
+            portrait[field] = result[field]
+    if "age_50_plus_tgi" in result:
+        portrait["tgi"] = result["age_50_plus_tgi"]
+    return portrait

+ 27 - 0
content_agent/business_modules/candidate_evidence/source_evidence.py

@@ -0,0 +1,27 @@
+from __future__ import annotations
+
+import copy
+from typing import Any
+
+
+def build_source_evidence(
+    trace_id: str,
+    candidate: dict[str, Any],
+    result: dict[str, Any],
+    source_context: dict[str, Any],
+) -> dict[str, Any]:
+    evidence_pack = copy.deepcopy(source_context["ext_data"]["evidence_pack"])
+    source_evidence = {
+        **evidence_pack,
+        "trace_id": trace_id,
+        "query_id": candidate["query_id"],
+        "query": result.get("query"),
+        "generation_type": result.get("generation_type"),
+        "origin_source": candidate["origin_source"],
+        "immediate_source": candidate["immediate_source"],
+        "origin_edge_id": f"query_to_video:{candidate['query_id']}:{candidate['aweme_id']}",
+        "source_edge_ids": [],
+        "candidate_aweme_id": candidate["aweme_id"],
+        "candidate_relation": result.get("candidate_relation", "derived_from_pattern_demand"),
+    }
+    return source_evidence

+ 78 - 0
content_agent/business_modules/learning_review.py

@@ -0,0 +1,78 @@
+from __future__ import annotations
+
+from collections import Counter
+from typing import Any
+
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(trace_id: str, runtime: RuntimeFileStore) -> dict[str, Any]:
+    final_output = runtime.read_json(trace_id, "final_output.json")
+    search_clues = runtime.read_jsonl(trace_id, "search_clues.jsonl")
+    decisions = runtime.read_jsonl(trace_id, "rule_decisions.jsonl")
+    source_edges = runtime.read_jsonl(trace_id, "source_edges.jsonl")
+
+    reject_reasons = Counter(
+        decision["reason_code"]
+        for decision in decisions
+        if decision["final_action"] == "REJECT"
+    )
+    productive_paths = [
+        {
+            "edge_id": edge["edge_id"],
+            "from": f"{edge['from_node_type']}:{edge['from_node_id']}",
+            "to": f"{edge['to_node_type']}:{edge['to_node_id']}",
+        }
+        for edge in source_edges
+        if edge["edge_type"] == "decision_to_asset"
+    ]
+
+    return {
+        "trace_id": trace_id,
+        "summary": final_output["summary"],
+        "effective_queries": [
+            clue["query"] for clue in search_clues if clue["effect_status"] == "success"
+        ],
+        "weak_queries": [
+            clue["query"] for clue in search_clues if clue["effect_status"] == "weak_effective"
+        ],
+        "top_reject_reasons": [
+            {"reason_code": reason, "count": count}
+            for reason, count in reject_reasons.most_common()
+        ],
+        "productive_paths": productive_paths,
+        "suggestions": _build_suggestions(final_output["summary"], search_clues, reject_reasons),
+    }
+
+
+def _build_suggestions(
+    summary: dict[str, Any],
+    search_clues: list[dict[str, Any]],
+    reject_reasons: Counter[str],
+) -> list[dict[str, str]]:
+    suggestions: list[dict[str, str]] = []
+    if summary["pool_count"] > 0:
+        suggestions.append(
+            {
+                "suggestion": "保留产生入池结果的 query,并在下一轮优先验证相近表达。",
+                "basis": "本次已有 query 产生 POOL 结果。",
+            }
+        )
+    weak_queries = [clue["query"] for clue in search_clues if clue["effect_status"] == "weak_effective"]
+    if weak_queries:
+        suggestions.append(
+            {
+                "suggestion": "弱有效 query 继续小预算观察,不直接升级为稳定搜索线索。",
+                "basis": "存在只产生 PENDING 的 query。",
+            }
+        )
+    if reject_reasons:
+        reason, _count = reject_reasons.most_common(1)[0]
+        suggestions.append(
+            {
+                "suggestion": "优先检查高频淘汰原因对应的数据补齐策略。",
+                "basis": f"最高频淘汰原因是 {reason}。",
+            }
+        )
+    return suggestions
+

+ 25 - 0
content_agent/business_modules/platform_access.py

@@ -0,0 +1,25 @@
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.interfaces import PlatformSearchClient
+
+
+def run(queries: list[dict[str, Any]], platform_client: PlatformSearchClient) -> list[dict[str, Any]]:
+    results: list[dict[str, Any]] = []
+    seen_aweme_ids: set[str] = set()
+    for query in queries:
+        for result in platform_client.search(query):
+            aweme_id = result.get("aweme_id")
+            if aweme_id in seen_aweme_ids:
+                continue
+            if aweme_id:
+                seen_aweme_ids.add(aweme_id)
+            results.append(
+                {
+                    **result,
+                    "query": query["query"],
+                    "generation_type": query["generation_type"],
+                }
+            )
+    return results

+ 10 - 0
content_agent/business_modules/policy_version.py

@@ -0,0 +1,10 @@
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.interfaces import PolicyBundleStore
+
+
+def run(policy_bundle_version: str, policy_store: PolicyBundleStore) -> dict[str, Any]:
+    return policy_store.load_policy_bundle(policy_bundle_version)
+

+ 100 - 0
content_agent/business_modules/result_source_lookup.py

@@ -0,0 +1,100 @@
+from __future__ import annotations
+
+from collections import Counter
+from typing import Any
+
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(
+    trace_id: str,
+    candidates: list[dict[str, Any]],
+    media_assets: list[dict[str, Any]],
+    decisions: list[dict[str, Any]],
+    source_edges: list[dict[str, Any]],
+    search_clues: list[dict[str, Any]],
+    runtime: RuntimeFileStore,
+) -> dict[str, Any]:
+    decision_by_entity_id = {decision["entity_id"]: decision for decision in decisions}
+    media_by_aweme_id = {media["aweme_id"]: media for media in media_assets}
+    edges_by_entity_id = _edges_by_entity_id(source_edges)
+
+    content_assets: list[dict[str, Any]] = []
+    reject_records: list[dict[str, Any]] = []
+    for candidate in candidates:
+        decision = decision_by_entity_id[candidate["aweme_id"]]
+        if decision["final_action"] == "POOL":
+            content_assets.append(
+                {
+                    "aweme_id": candidate["aweme_id"],
+                    "asset_status": "pooled",
+                    "decision_id": decision["decision_id"],
+                    "source_edge_ids": edges_by_entity_id[candidate["aweme_id"]],
+                    "source_evidence": {
+                        **decision["source_evidence"],
+                        "source_edge_ids": edges_by_entity_id[candidate["aweme_id"]],
+                    },
+                    "media_status": media_by_aweme_id[candidate["aweme_id"]]["media_status"],
+                }
+            )
+        if decision["final_action"] == "REJECT":
+            reject_records.append(
+                {
+                    "entity_id": candidate["aweme_id"],
+                    "main_reason_code": decision["reason_code"],
+                    "decision_id": decision["decision_id"],
+                    "source_evidence": decision["source_evidence"],
+                }
+            )
+
+    action_counts = Counter(decision["final_action"] for decision in decisions)
+    final_output = {
+        "trace_id": trace_id,
+        "content_assets": content_assets,
+        "author_assets": [],
+        "decision_records": [
+            {
+                "decision_id": decision["decision_id"],
+                "entity_id": decision["entity_id"],
+                "final_action": decision["final_action"],
+                "reason_code": decision["reason_code"],
+                "source_evidence": decision["source_evidence"],
+            }
+            for decision in decisions
+        ],
+        "search_clues": [
+            {
+                "query_id": clue["query_id"],
+                "asset_status": "clue_only",
+                "effect_status": clue["effect_status"],
+            }
+            for clue in search_clues
+        ],
+        "reject_records": reject_records,
+        "summary": {
+            "query_count": len(search_clues),
+            "pool_count": action_counts["POOL"],
+            "candidate_count": action_counts["CANDIDATE"],
+            "pending_count": action_counts["PENDING"],
+            "reject_count": action_counts["REJECT"],
+            "trace_complete": True,
+        },
+    }
+    runtime.write_json(trace_id, "final_output.json", final_output)
+    return final_output
+
+
+def _edges_by_entity_id(source_edges: list[dict[str, Any]]) -> dict[str, list[str]]:
+    pattern_edges = [edge for edge in source_edges if edge["edge_type"] == "pattern_to_query"]
+    by_query_id = {edge["to_node_id"]: edge["edge_id"] for edge in pattern_edges}
+    result: dict[str, list[str]] = {}
+    for edge in source_edges:
+        if edge["edge_type"] == "query_to_video":
+            result.setdefault(edge["to_node_id"], [])
+            pattern_edge_id = by_query_id.get(edge["from_node_id"])
+            if pattern_edge_id:
+                result[edge["to_node_id"]].append(pattern_edge_id)
+            result[edge["to_node_id"]].append(edge["edge_id"])
+        elif edge["edge_type"] == "decision_to_asset":
+            result.setdefault(edge["to_node_id"], []).append(edge["edge_id"])
+    return result

+ 20 - 0
content_agent/business_modules/rule_judgment/__init__.py

@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.business_modules.rule_judgment.evaluator import decide
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(
+    trace_id: str,
+    evidence_bundles: list[dict[str, Any]],
+    policy_bundle: dict[str, Any],
+    runtime: RuntimeFileStore,
+) -> list[dict[str, Any]]:
+    decisions = [
+        decide(trace_id, index + 1, bundle, policy_bundle)
+        for index, bundle in enumerate(evidence_bundles)
+    ]
+    runtime.append_jsonl(trace_id, "rule_decisions.jsonl", decisions)
+    return decisions

+ 190 - 0
content_agent/business_modules/rule_judgment/evaluator.py

@@ -0,0 +1,190 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+ACTION_EFFECT_STATUS = {
+    "POOL": "success",
+    "CANDIDATE": "weak_effective",
+    "PENDING": "pending",
+}
+
+
+def decide(
+    trace_id: str,
+    index: int,
+    bundle: dict[str, Any],
+    policy_bundle: dict[str, Any],
+) -> dict[str, Any]:
+    rule_pack = policy_bundle["rule_pack"]
+    matched_hard_gates: list[str] = []
+
+    for gate in rule_pack.get("hard_gates", []):
+        if not _condition_matches(bundle, gate.get("when", {})):
+            continue
+        matched_hard_gates.append(gate["gate_id"])
+        if gate.get("severity") == "fatal" or gate.get("stop_scoring"):
+            return _build_decision(
+                trace_id=trace_id,
+                index=index,
+                bundle=bundle,
+                policy_bundle=policy_bundle,
+                final_action=gate["action"],
+                reason_code=gate["reason_code"],
+                effect_status="blocked",
+                matched_hard_gates=matched_hard_gates,
+                replay_marker={"matched_gate": gate["gate_id"]},
+            )
+
+    score = _get_path(bundle, "relevance_signal.score")
+    threshold = _match_threshold(score, rule_pack.get("thresholds", []))
+    if threshold is None:
+        threshold = _fallback_threshold(rule_pack.get("thresholds", []))
+    final_action = threshold["action"]
+    reason_code = threshold["reason_code"]
+    effect_status = ACTION_EFFECT_STATUS.get(final_action, "failed")
+    return _build_decision(
+        trace_id=trace_id,
+        index=index,
+        bundle=bundle,
+        policy_bundle=policy_bundle,
+        final_action=final_action,
+        reason_code=reason_code,
+        effect_status=effect_status,
+        matched_hard_gates=matched_hard_gates,
+        replay_marker={"matched_threshold": _threshold_label(threshold)}
+        if score is not None
+        else {"matched_threshold": "fallback_threshold"},
+    )
+
+
+def _build_decision(
+    trace_id: str,
+    index: int,
+    bundle: dict[str, Any],
+    policy_bundle: dict[str, Any],
+    final_action: str,
+    reason_code: str,
+    effect_status: str,
+    matched_hard_gates: list[str],
+    replay_marker: dict[str, Any],
+) -> dict[str, Any]:
+    entity = bundle["entity"]
+    score = _get_path(bundle, "relevance_signal.score")
+    portrait = bundle.get("portrait_signal") or {}
+    return {
+        "trace_id": trace_id,
+        "decision_id": f"d_{index:03d}",
+        "pack_id": policy_bundle["rule_pack_id"],
+        "entity_type": entity["entity_type"],
+        "entity_id": entity["aweme_id"],
+        "matched_hard_gates": matched_hard_gates,
+        "scorecard": _scorecard(score, policy_bundle["rule_pack"].get("scorecard", {})),
+        "score": score,
+        "age_50_plus_level": portrait.get("age_50_plus_level", "missing"),
+        "final_action": final_action,
+        "reason_code": reason_code,
+        "effect_status": effect_status,
+        "source_evidence": bundle["source_evidence"],
+        "input_snapshot_ref": bundle["trace"]["input_snapshot_ref"],
+        "evidence_refs": _evidence_refs(policy_bundle["rule_pack"]),
+        "replay_fields": {
+            "rule_pack_version": policy_bundle["rule_pack_version"],
+            "policy_bundle_version": policy_bundle["policy_bundle_version"],
+            **replay_marker,
+        },
+    }
+
+
+def _condition_matches(bundle: dict[str, Any], condition: dict[str, Any]) -> bool:
+    value = _get_path(bundle, condition.get("field", ""))
+    op = condition.get("op")
+    expected = condition.get("value")
+    if op == "is_empty":
+        return _is_empty(value)
+    if op == "in":
+        return value in (expected or [])
+    if op == "not_in":
+        return value not in (expected or [])
+    if op == "eq":
+        return value == expected
+    if op == "gte":
+        return value is not None and value >= expected
+    return False
+
+
+def _match_threshold(score: int | float | None, thresholds: list[dict[str, Any]]) -> dict[str, Any] | None:
+    if score is None:
+        return None
+    for threshold in thresholds:
+        min_score = threshold.get("min_score")
+        max_score = threshold.get("max_score")
+        if min_score is not None and score < min_score:
+            continue
+        if max_score is not None and score > max_score:
+            continue
+        return threshold
+    return None
+
+
+def _fallback_threshold(thresholds: list[dict[str, Any]]) -> dict[str, Any]:
+    for threshold in thresholds:
+        if "max_score" in threshold and "min_score" not in threshold:
+            return threshold
+    if thresholds:
+        return thresholds[-1]
+    raise ValueError("rule pack thresholds are empty")
+
+
+def _get_path(data: dict[str, Any], path: str) -> Any:
+    current: Any = data
+    for part in path.split("."):
+        if not part:
+            return None
+        if not isinstance(current, dict):
+            return None
+        current = current.get(part)
+        if current is None:
+            return None
+    return current
+
+
+def _is_empty(value: Any) -> bool:
+    return value is None or value == "" or value == [] or value == {}
+
+
+def _threshold_label(threshold: dict[str, Any] | None) -> str:
+    if not threshold:
+        return "unknown"
+    min_score = threshold.get("min_score")
+    max_score = threshold.get("max_score")
+    if min_score is not None and max_score is not None:
+        return f"{min_score}<=score<={max_score}"
+    if min_score is not None:
+        return f"score>={min_score}"
+    if max_score is not None:
+        return f"score<={max_score}"
+    return "unknown"
+
+
+def _scorecard(score: Any, config: dict[str, Any]) -> dict[str, Any]:
+    return {
+        "total_score": score,
+        "dimensions": [
+            {
+                "key": dimension.get("key"),
+                "max_score": dimension.get("max_score"),
+                "score": None,
+                "evidence_paths": dimension.get("evidence_paths", []),
+            }
+            for dimension in config.get("dimensions", [])
+        ],
+    }
+
+
+def _evidence_refs(rule_pack: dict[str, Any]) -> list[str]:
+    refs = ["source_evidence"]
+    for field in rule_pack.get("input_contract", {}).get("required_fields", []):
+        if field not in refs:
+            refs.append(field)
+    return refs

+ 6 - 0
content_agent/business_modules/run_record/__init__.py

@@ -0,0 +1,6 @@
+from __future__ import annotations
+
+from content_agent.business_modules.run_record.recorder import run
+from content_agent.business_modules.run_record.validation import validate_run
+
+__all__ = ["run", "validate_run"]

+ 110 - 0
content_agent/business_modules/run_record/recorder.py

@@ -0,0 +1,110 @@
+from __future__ import annotations
+
+from collections import Counter, defaultdict
+from datetime import datetime, timezone
+from typing import Any
+
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(
+    trace_id: str,
+    queries: list[dict[str, Any]],
+    candidates: list[dict[str, Any]],
+    decisions: list[dict[str, Any]],
+    source_edge_basis: list[dict[str, Any]],
+    runtime: RuntimeFileStore,
+) -> dict[str, list[dict[str, Any]]]:
+    created_at = datetime.now(timezone.utc).isoformat()
+    source_edges = [
+        {"trace_id": trace_id, "edge_id": f"edge_{index:03d}", **basis, "created_at": created_at}
+        for index, basis in enumerate(source_edge_basis, start=1)
+    ]
+    search_clues = _build_search_clues(trace_id, queries, candidates, decisions)
+    trace_events = _build_trace_events(trace_id)
+
+    runtime.append_jsonl(trace_id, "trace_events.jsonl", trace_events)
+    runtime.append_jsonl(trace_id, "source_edges.jsonl", source_edges)
+    runtime.append_jsonl(trace_id, "search_clues.jsonl", search_clues)
+    return {
+        "trace_events": trace_events,
+        "source_edges": source_edges,
+        "search_clues": search_clues,
+    }
+
+
+def _build_search_clues(
+    trace_id: str,
+    queries: list[dict[str, Any]],
+    candidates: list[dict[str, Any]],
+    decisions: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    decision_by_entity_id = {decision["entity_id"]: decision for decision in decisions}
+    candidates_by_query: dict[str, list[dict[str, Any]]] = defaultdict(list)
+    for candidate in candidates:
+        candidates_by_query[candidate["query_id"]].append(candidate)
+
+    clues: list[dict[str, Any]] = []
+    for index, query in enumerate(queries, start=1):
+        query_candidates = candidates_by_query[query["query_id"]]
+        action_counts = Counter(
+            decision_by_entity_id[candidate["aweme_id"]]["final_action"]
+            for candidate in query_candidates
+        )
+        result_count = len(query_candidates)
+        pool_count = action_counts["POOL"]
+        candidate_count = action_counts["CANDIDATE"]
+        pending_count = action_counts["PENDING"]
+        reject_count = action_counts["REJECT"]
+        if pool_count or candidate_count:
+            effect_status = "success"
+            next_action = "keep_query"
+        elif pending_count:
+            effect_status = "weak_effective"
+            next_action = "low_budget_pending"
+        else:
+            effect_status = "failed"
+            next_action = "stop_query"
+        clues.append(
+            {
+                "trace_id": trace_id,
+                "clue_id": f"clue_{index:03d}",
+                "query_id": query["query_id"],
+                "query": query["query"],
+                "origin_source": query["origin_source"],
+                "immediate_source": query["immediate_source"],
+                "result_count": result_count,
+                "pool_count": pool_count,
+                "candidate_count": candidate_count,
+                "pending_count": pending_count,
+                "reject_count": reject_count,
+                "effect_status": effect_status,
+                "next_action": next_action,
+            }
+        )
+    return clues
+
+
+def _build_trace_events(trace_id: str) -> list[dict[str, Any]]:
+    created_at = datetime.now(timezone.utc).isoformat()
+    event_specs = [
+        ("source_loaded", "source_context.json", "pattern_seed_pack.json"),
+        ("query_generated", "pattern_seed_pack.json", "queries.jsonl"),
+        ("platform_searched", "queries.jsonl", "candidate_pool.jsonl"),
+        ("evidence_built", "candidate_pool.jsonl", "rule_decisions.jsonl"),
+        ("rule_decision_evaluated", "evidence_bundle", "rule_decisions.jsonl"),
+        ("walk_planned", "rule_decisions.jsonl", "source_edges.jsonl"),
+        ("run_recorded", "walk_actions", "trace_events.jsonl"),
+    ]
+    return [
+        {
+            "trace_id": trace_id,
+            "event_id": f"evt_{index:03d}",
+            "event_type": event_type,
+            "status": "success",
+            "input_ref": input_ref,
+            "output_ref": output_ref,
+            "created_at": created_at,
+        }
+        for index, (event_type, input_ref, output_ref) in enumerate(event_specs, start=1)
+    ]

+ 326 - 0
content_agent/business_modules/run_record/validation.py

@@ -0,0 +1,326 @@
+from __future__ import annotations
+
+import json
+from collections import Counter
+from typing import Any
+
+from content_agent.integrations.runtime_files import RUNTIME_FILENAMES
+from content_agent.interfaces import RuntimeFileStore
+
+
+JSON_FILES = {"source_context.json", "pattern_seed_pack.json", "final_output.json"}
+JSONL_FILES = set(RUNTIME_FILENAMES) - JSON_FILES
+FINAL_ACTIONS = {"POOL", "CANDIDATE", "PENDING", "REJECT"}
+SOURCE_EVIDENCE_FIELDS = {
+    "source_kind",
+    "pattern_source_system",
+    "case_id_type",
+    "source_post_id",
+    "pattern_execution_id",
+    "mining_config_id",
+    "itemset_ids",
+    "itemset_items",
+    "category_bindings",
+    "element_bindings",
+    "matched_post_ids",
+    "seed_terms",
+    "origin_source",
+    "immediate_source",
+    "origin_edge_id",
+    "trace_id",
+    "candidate_aweme_id",
+    "source_certainty",
+    "validation_status",
+}
+
+
+def validate_run(trace_id: str, runtime: RuntimeFileStore) -> dict[str, Any]:
+    findings: list[dict[str, Any]] = []
+    data = _load_files(trace_id, runtime, findings)
+    if any(finding["level"] == "fail" for finding in findings):
+        return _result(trace_id, findings)
+
+    _check_trace_ids(trace_id, data, findings)
+    _check_unique_ids(data, findings)
+    _check_references(data, findings)
+    _check_source_evidence(data, findings)
+    _check_source_paths(data, findings)
+    _check_summary(data, findings)
+    return _result(trace_id, findings)
+
+
+def _load_files(
+    trace_id: str,
+    runtime: RuntimeFileStore,
+    findings: list[dict[str, Any]],
+) -> dict[str, Any]:
+    run_dir = runtime.run_dir(trace_id)
+    data: dict[str, Any] = {}
+    for filename in RUNTIME_FILENAMES:
+        path = run_dir / filename
+        if not path.exists():
+            _fail(findings, "file_missing", f"missing runtime file: {filename}")
+            continue
+        try:
+            if filename in JSON_FILES:
+                data[filename] = json.loads(path.read_text(encoding="utf-8"))
+            else:
+                rows = []
+                for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
+                    if line.strip():
+                        rows.append(json.loads(line))
+                data[filename] = rows
+        except json.JSONDecodeError as exc:
+            _fail(findings, "json_parse_failed", f"{filename} cannot parse: {exc}")
+    return data
+
+
+def _check_trace_ids(trace_id: str, data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    for filename, value in data.items():
+        rows = value if isinstance(value, list) else [value]
+        for row in rows:
+            if isinstance(row, dict) and row.get("trace_id") != trace_id:
+                _fail(findings, "trace_id_mismatch", f"{filename} has mismatched trace_id")
+
+
+def _check_unique_ids(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    checks = [
+        ("queries.jsonl", "query_id"),
+        ("candidate_pool.jsonl", "candidate_id"),
+        ("candidate_pool.jsonl", "aweme_id"),
+        ("rule_decisions.jsonl", "decision_id"),
+        ("rule_decisions.jsonl", "entity_id"),
+        ("source_edges.jsonl", "edge_id"),
+    ]
+    for filename, field in checks:
+        rows = data.get(filename, [])
+        values = [row.get(field) for row in rows]
+        duplicates = [value for value, count in Counter(values).items() if value and count > 1]
+        if duplicates:
+            _fail(findings, "duplicate_id", f"{filename}.{field} duplicates: {duplicates}")
+
+
+def _check_references(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    queries = {row["query_id"] for row in data.get("queries.jsonl", [])}
+    candidates = data.get("candidate_pool.jsonl", [])
+    candidate_aweme_ids = {row["aweme_id"] for row in candidates}
+    decisions = data.get("rule_decisions.jsonl", [])
+    decision_ids = {row["decision_id"] for row in decisions}
+    edge_ids = {row["edge_id"] for row in data.get("source_edges.jsonl", [])}
+
+    for candidate in candidates:
+        if candidate.get("query_id") not in queries:
+            _fail(findings, "missing_query_ref", f"candidate has unknown query_id: {candidate.get('query_id')}")
+
+    for media in data.get("media_assets.jsonl", []):
+        if media.get("aweme_id") not in candidate_aweme_ids:
+            _fail(findings, "missing_candidate_ref", f"media has unknown aweme_id: {media.get('aweme_id')}")
+
+    for decision in decisions:
+        if decision.get("entity_id") not in candidate_aweme_ids:
+            _fail(findings, "missing_candidate_ref", f"decision has unknown entity_id: {decision.get('entity_id')}")
+        if decision.get("final_action") not in FINAL_ACTIONS:
+            _fail(findings, "bad_final_action", f"unsupported final_action: {decision.get('final_action')}")
+
+    for edge in data.get("source_edges.jsonl", []):
+        decision_id = edge.get("decision_id")
+        if decision_id and decision_id not in decision_ids:
+            _fail(findings, "missing_decision_ref", f"edge has unknown decision_id: {decision_id}")
+
+    final_output = data.get("final_output.json", {})
+    for asset in final_output.get("content_assets", []):
+        if asset.get("decision_id") not in decision_ids:
+            _fail(findings, "missing_decision_ref", f"asset has unknown decision_id: {asset.get('decision_id')}")
+        for edge_id in asset.get("source_edge_ids", []):
+            if edge_id not in edge_ids:
+                _fail(findings, "missing_edge_ref", f"asset has unknown source_edge_id: {edge_id}")
+    for record in final_output.get("reject_records", []):
+        if record.get("decision_id") not in decision_ids:
+            _fail(findings, "missing_decision_ref", f"reject has unknown decision_id: {record.get('decision_id')}")
+    for author_asset in final_output.get("author_assets", []):
+        if author_asset.get("decision_id") not in decision_ids:
+            _fail(findings, "missing_decision_ref", f"author asset has unknown decision_id: {author_asset.get('decision_id')}")
+    for record in final_output.get("decision_records", []):
+        if record.get("decision_id") not in decision_ids:
+            _fail(findings, "missing_decision_ref", f"decision record has unknown decision_id: {record.get('decision_id')}")
+
+
+def _check_source_evidence(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    source_context = data.get("source_context.json", {})
+    evidence_pack = source_context.get("ext_data", {}).get("evidence_pack", {})
+    for decision in data.get("rule_decisions.jsonl", []):
+        source_evidence = decision.get("source_evidence") or {}
+        _check_one_source_evidence(
+            findings,
+            source_evidence,
+            evidence_pack,
+            f"decision {decision.get('decision_id')}",
+        )
+
+    for asset in data.get("final_output.json", {}).get("content_assets", []):
+        _check_one_source_evidence(
+            findings,
+            asset.get("source_evidence") or {},
+            evidence_pack,
+            f"asset {asset.get('aweme_id')}",
+        )
+    for record in data.get("final_output.json", {}).get("reject_records", []):
+        _check_one_source_evidence(
+            findings,
+            record.get("source_evidence") or {},
+            evidence_pack,
+            f"reject {record.get('entity_id')}",
+        )
+    for record in data.get("final_output.json", {}).get("decision_records", []):
+        _check_one_source_evidence(
+            findings,
+            record.get("source_evidence") or {},
+            evidence_pack,
+            f"decision record {record.get('decision_id')}",
+        )
+    _check_final_decision_coverage(data, findings)
+
+
+def _check_one_source_evidence(
+    findings: list[dict[str, Any]],
+    source_evidence: dict[str, Any],
+    evidence_pack: dict[str, Any],
+    label: str,
+) -> None:
+    missing_fields = sorted(field for field in SOURCE_EVIDENCE_FIELDS if field not in source_evidence)
+    if missing_fields:
+        _fail(findings, "source_evidence_missing_fields", f"{label} missing {missing_fields}")
+    if not source_evidence.get("category_bindings") and not source_evidence.get("element_bindings"):
+        _fail(findings, "source_evidence_missing_binding", f"{label} lacks category or element binding")
+    for field in [
+        "pattern_execution_id",
+        "source_post_id",
+        "pattern_source_system",
+        "case_id_type",
+        "mining_config_id",
+        "source_certainty",
+        "validation_status",
+    ]:
+        if source_evidence.get(field) != evidence_pack.get(field):
+            _fail(findings, "source_evidence_mismatch", f"{label} mismatched {field}")
+    for field in ["itemset_ids", "itemset_items", "matched_post_ids", "seed_terms"]:
+        if source_evidence.get(field) != evidence_pack.get(field):
+            _fail(findings, "source_evidence_mismatch", f"{label} mismatched {field}")
+    candidate_aweme_id = source_evidence.get("candidate_aweme_id")
+    if candidate_aweme_id:
+        if candidate_aweme_id == source_evidence.get("source_post_id"):
+            _fail(findings, "source_evidence_aweme_pollution", f"{label} rewrites source_post_id")
+        if candidate_aweme_id in (source_evidence.get("matched_post_ids") or []):
+            _fail(findings, "source_evidence_aweme_pollution", f"{label} rewrites matched_post_ids")
+
+
+def _check_final_decision_coverage(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    decision_ids = {decision["decision_id"] for decision in data.get("rule_decisions.jsonl", [])}
+    final_decision_ids = {
+        record.get("decision_id")
+        for record in data.get("final_output.json", {}).get("decision_records", [])
+    }
+    missing = sorted(decision_ids - final_decision_ids)
+    if missing:
+        _fail(findings, "final_decision_missing", f"final_output decision_records missing {missing}")
+
+
+def _check_source_paths(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    source_context = data.get("source_context.json", {})
+    pattern_execution_id = source_context.get("ext_data", {}).get("evidence_pack", {}).get("pattern_execution_id")
+    edges = data.get("source_edges.jsonl", [])
+    edge_by_id = {edge["edge_id"]: edge for edge in edges}
+    pattern_query_edges = {
+        edge["to_node_id"]: edge
+        for edge in edges
+        if edge.get("edge_type") == "pattern_to_query"
+    }
+    query_video_edges = {
+        edge["to_node_id"]: edge
+        for edge in edges
+        if edge.get("edge_type") == "query_to_video"
+    }
+
+    for decision in data.get("rule_decisions.jsonl", []):
+        _check_entity_source_path(
+            findings,
+            label=f"decision {decision.get('decision_id')}",
+            entity_id=decision.get("entity_id"),
+            pattern_execution_id=pattern_execution_id,
+            pattern_query_edges=pattern_query_edges,
+            query_video_edges=query_video_edges,
+        )
+
+    for asset in data.get("final_output.json", {}).get("content_assets", []):
+        aweme_id = asset.get("aweme_id")
+        edge_ids = set(asset.get("source_edge_ids", []))
+        query_video = query_video_edges.get(aweme_id)
+        if not query_video or query_video.get("edge_id") not in edge_ids:
+            _fail(findings, "source_path_broken", f"asset lacks query_to_video edge: {aweme_id}")
+            continue
+        pattern_query = pattern_query_edges.get(query_video.get("from_node_id"))
+        if not pattern_query or pattern_query.get("edge_id") not in edge_ids:
+            _fail(findings, "source_path_broken", f"asset lacks pattern_to_query edge: {aweme_id}")
+            continue
+        if pattern_query.get("from_node_id") != pattern_execution_id:
+            _fail(findings, "source_path_broken", f"asset path starts from wrong pattern: {aweme_id}")
+        for edge_id in edge_ids:
+            if edge_id not in edge_by_id:
+                _fail(findings, "source_path_broken", f"asset edge missing: {edge_id}")
+
+
+def _check_entity_source_path(
+    findings: list[dict[str, Any]],
+    label: str,
+    entity_id: Any,
+    pattern_execution_id: Any,
+    pattern_query_edges: dict[str, dict[str, Any]],
+    query_video_edges: dict[str, dict[str, Any]],
+) -> None:
+    query_video = query_video_edges.get(entity_id)
+    if not query_video:
+        _fail(findings, "source_path_broken", f"{label} lacks query_to_video edge: {entity_id}")
+        return
+    pattern_query = pattern_query_edges.get(query_video.get("from_node_id"))
+    if not pattern_query:
+        _fail(findings, "source_path_broken", f"{label} lacks pattern_to_query edge: {entity_id}")
+        return
+    if pattern_query.get("from_node_id") != pattern_execution_id:
+        _fail(findings, "source_path_broken", f"{label} path starts from wrong pattern: {entity_id}")
+
+
+def _check_summary(data: dict[str, Any], findings: list[dict[str, Any]]) -> None:
+    decisions = data.get("rule_decisions.jsonl", [])
+    action_counts = Counter(decision.get("final_action") for decision in decisions)
+    summary = data.get("final_output.json", {}).get("summary", {})
+    expected = {
+        "pool_count": action_counts["POOL"],
+        "candidate_count": action_counts["CANDIDATE"],
+        "pending_count": action_counts["PENDING"],
+        "reject_count": action_counts["REJECT"],
+    }
+    for field, value in expected.items():
+        if summary.get(field) != value:
+            _fail(findings, "summary_mismatch", f"summary.{field} expected {value}, got {summary.get(field)}")
+
+    clue_counts = Counter()
+    for clue in data.get("search_clues.jsonl", []):
+        clue_counts["POOL"] += clue.get("pool_count", 0)
+        clue_counts["CANDIDATE"] += clue.get("candidate_count", 0)
+        clue_counts["PENDING"] += clue.get("pending_count", 0)
+        clue_counts["REJECT"] += clue.get("reject_count", 0)
+    for action, count in action_counts.items():
+        if clue_counts[action] != count:
+            _fail(findings, "search_clue_mismatch", f"search_clues {action} expected {count}, got {clue_counts[action]}")
+
+
+def _result(trace_id: str, findings: list[dict[str, Any]]) -> dict[str, Any]:
+    return {
+        "trace_id": trace_id,
+        "status": "fail" if any(finding["level"] == "fail" for finding in findings) else "pass",
+        "findings": findings,
+    }
+
+
+def _fail(findings: list[dict[str, Any]], check_id: str, message: str) -> None:
+    findings.append({"level": "fail", "check_id": check_id, "message": message})

+ 35 - 0
content_agent/business_modules/search_intent.py

@@ -0,0 +1,35 @@
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Any
+
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(trace_id: str, pattern_seed_pack: dict[str, Any], runtime: RuntimeFileStore) -> list[dict[str, Any]]:
+    created_at = datetime.now(timezone.utc).isoformat()
+    queries = [
+        {
+            "trace_id": trace_id,
+            "query_id": "q_001",
+            "query": "基层公职人员贪腐案例",
+            "generation_type": "item_combo",
+            "origin_source": "pattern_itemset",
+            "immediate_source": "pattern_query",
+            "effect_status": "pending",
+            "created_at": created_at,
+        },
+        {
+            "trace_id": trace_id,
+            "query_id": "q_002",
+            "query": f"{pattern_seed_pack['seed_terms'][1]} 警示案例",
+            "generation_type": "light_extension",
+            "origin_source": "pattern_itemset",
+            "immediate_source": "pattern_query",
+            "effect_status": "pending",
+            "created_at": created_at,
+        },
+    ]
+    runtime.append_jsonl(trace_id, "queries.jsonl", queries)
+    return queries
+

+ 20 - 0
content_agent/business_modules/source_seed/__init__.py

@@ -0,0 +1,20 @@
+from __future__ import annotations
+
+from typing import Any
+
+from content_agent.business_modules.source_seed.source_context import (
+    build_pattern_seed_pack,
+    load_source_context,
+)
+from content_agent.interfaces import RuntimeFileStore
+
+
+def run(trace_id: str, source: str | None, runtime: RuntimeFileStore) -> dict[str, Any]:
+    source_context = load_source_context(trace_id, source)
+    pattern_seed_pack = build_pattern_seed_pack(trace_id, source_context)
+    runtime.write_json(trace_id, "source_context.json", source_context)
+    runtime.write_json(trace_id, "pattern_seed_pack.json", pattern_seed_pack)
+    return {
+        "source_context": source_context,
+        "pattern_seed_pack": pattern_seed_pack,
+    }

+ 116 - 0
content_agent/business_modules/source_seed/source_context.py

@@ -0,0 +1,116 @@
+from __future__ import annotations
+
+import copy
+import json
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+
+DEFAULT_ITEMSET_ITEMS = [
+    {
+        "itemset_id": "391369",
+        "items": ["叙事结构", "综合性腐败", "表达手法"],
+        "support": 0.1,
+        "absolute_support": 5,
+    },
+    {
+        "itemset_id": "391371",
+        "items": ["基层公职人员", "贪腐", "警示案例"],
+        "support": 0.08,
+        "absolute_support": 4,
+    },
+]
+
+
+DEFAULT_EVIDENCE_PACK = {
+    "source_kind": "pattern_itemset",
+    "pattern_source_system": "demand_agent_pattern_tree",
+    "case_id_type": "post_id",
+    "source_post_id": "69509310",
+    "pattern_execution_id": "2038",
+    "mining_config_id": "mining_config_mock_v1",
+    "itemset_ids": ["391369", "391371"],
+    "itemset_items": DEFAULT_ITEMSET_ITEMS,
+    "category_bindings": [
+        {"category_id": "cat_001", "category_path": "社会议题 / 腐败警示"}
+    ],
+    "element_bindings": [{"element_id": "element_001", "name": "基层公职人员"}],
+    "matched_post_ids": ["69509310", "69509322"],
+    "seed_terms": ["叙事结构", "综合性腐败", "基层公职人员"],
+    "source_certainty": "mock_verified",
+    "validation_status": "mock_validated",
+}
+
+
+def load_source_context(trace_id: str, source: str | None) -> dict[str, Any]:
+    if source and Path(source).exists():
+        data = json.loads(Path(source).read_text(encoding="utf-8"))
+        fill_defaults = False
+    else:
+        data = _default_source_context(trace_id)
+        fill_defaults = True
+
+    data["trace_id"] = trace_id
+    data.setdefault("ext_data", {})
+    evidence_pack = data["ext_data"].get("evidence_pack") or {}
+    data["ext_data"]["evidence_pack"] = normalize_evidence_pack(
+        trace_id,
+        evidence_pack,
+        fill_defaults=fill_defaults,
+    )
+    return data
+
+
+def normalize_evidence_pack(
+    trace_id: str,
+    evidence_pack: dict[str, Any],
+    fill_defaults: bool,
+) -> dict[str, Any]:
+    normalized = copy.deepcopy(DEFAULT_EVIDENCE_PACK) if fill_defaults else {}
+    incoming = copy.deepcopy(evidence_pack)
+    upstream_trace_id = incoming.get("trace_id")
+    normalized.update(incoming)
+    if upstream_trace_id and upstream_trace_id != trace_id:
+        normalized["upstream_trace_id"] = upstream_trace_id
+    normalized["trace_id"] = trace_id
+    normalized["itemset_ids"] = list(normalized.get("itemset_ids") or [])
+    normalized["itemset_items"] = list(normalized.get("itemset_items") or [])
+    normalized["category_bindings"] = list(normalized.get("category_bindings") or [])
+    normalized["element_bindings"] = list(normalized.get("element_bindings") or [])
+    normalized["matched_post_ids"] = list(normalized.get("matched_post_ids") or [])
+    normalized["seed_terms"] = list(normalized.get("seed_terms") or [])
+    return normalized
+
+
+def build_pattern_seed_pack(trace_id: str, source_context: dict[str, Any]) -> dict[str, Any]:
+    evidence_pack = source_context["ext_data"]["evidence_pack"]
+    seed_terms = _unique_terms([*evidence_pack.get("seed_terms", []), "贪腐"])
+    return {
+        "trace_id": trace_id,
+        "pattern_execution_id": evidence_pack["pattern_execution_id"],
+        "mining_config_id": evidence_pack.get("mining_config_id"),
+        "source_post_id": evidence_pack.get("source_post_id"),
+        "itemsets": evidence_pack.get("itemset_items", []),
+        "seed_terms": seed_terms,
+        "category_bindings": evidence_pack.get("category_bindings", []),
+        "element_bindings": evidence_pack.get("element_bindings", []),
+        "matched_post_ids": evidence_pack.get("matched_post_ids", []),
+    }
+
+
+def _default_source_context(trace_id: str) -> dict[str, Any]:
+    return {
+        "trace_id": trace_id,
+        "demand_content_id": "1",
+        "merge_leve2": "贪污腐败",
+        "name": "警花凌娅贪腐案",
+        "suggestion": None,
+        "score": None,
+        "dt": datetime.now(timezone.utc).date().isoformat(),
+        "ext_data": {"evidence_pack": copy.deepcopy(DEFAULT_EVIDENCE_PACK)},
+    }
+
+
+def _unique_terms(terms: list[str]) -> list[str]:
+    return list(dict.fromkeys(term for term in terms if term))

+ 84 - 0
content_agent/business_modules/walk_strategy.py

@@ -0,0 +1,84 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+def run(
+    pattern_seed_pack: dict[str, Any],
+    queries: list[dict[str, Any]],
+    candidates: list[dict[str, Any]],
+    decisions: list[dict[str, Any]],
+) -> dict[str, list[dict[str, Any]]]:
+    decision_by_entity_id = {decision["entity_id"]: decision for decision in decisions}
+    walk_actions: list[dict[str, Any]] = []
+    source_edge_basis: list[dict[str, Any]] = []
+
+    for query in queries:
+        source_edge_basis.append(
+            {
+                "from_node_type": "PatternSeed",
+                "from_node_id": pattern_seed_pack["pattern_execution_id"],
+                "to_node_type": "Query",
+                "to_node_id": query["query_id"],
+                "edge_type": "pattern_to_query",
+                "rule_pack_id": None,
+                "decision_id": None,
+                "origin_source": query["origin_source"],
+                "immediate_source": query["immediate_source"],
+                "origin_edge_id": f"pattern_to_query:{query['query_id']}",
+                "source_evidence_ref": "source_context.json#ext_data.evidence_pack",
+            }
+        )
+
+    for candidate in candidates:
+        decision = decision_by_entity_id[candidate["aweme_id"]]
+        source_edge_basis.append(
+            {
+                "from_node_type": "Query",
+                "from_node_id": candidate["query_id"],
+                "to_node_type": "Video",
+                "to_node_id": candidate["aweme_id"],
+                "edge_type": "query_to_video",
+                "rule_pack_id": decision["pack_id"],
+                "decision_id": decision["decision_id"],
+                "origin_source": candidate["origin_source"],
+                "immediate_source": candidate["immediate_source"],
+                "origin_edge_id": decision["source_evidence"]["origin_edge_id"],
+                "source_evidence_ref": decision["input_snapshot_ref"],
+            }
+        )
+        walk_actions.append(
+            {
+                "entity_id": candidate["aweme_id"],
+                "decision_id": decision["decision_id"],
+                "next_action": _next_action(decision["final_action"]),
+            }
+        )
+        if decision["final_action"] == "POOL":
+            source_edge_basis.append(
+                {
+                    "from_node_type": "RuleDecision",
+                    "from_node_id": decision["decision_id"],
+                    "to_node_type": "ContentAsset",
+                    "to_node_id": candidate["aweme_id"],
+                    "edge_type": "decision_to_asset",
+                    "rule_pack_id": decision["pack_id"],
+                    "decision_id": decision["decision_id"],
+                    "origin_source": candidate["origin_source"],
+                    "immediate_source": "asset_commit",
+                    "origin_edge_id": f"decision_to_asset:{decision['decision_id']}:{candidate['aweme_id']}",
+                    "source_evidence_ref": decision["input_snapshot_ref"],
+                }
+            )
+
+    return {"walk_actions": walk_actions, "source_edge_basis": source_edge_basis}
+
+
+def _next_action(final_action: str) -> str:
+    if final_action == "POOL":
+        return "commit_asset"
+    if final_action == "PENDING":
+        return "keep_trace"
+    if final_action == "CANDIDATE":
+        return "low_budget_review"
+    return "stop"

+ 26 - 0
content_agent/cli.py

@@ -0,0 +1,26 @@
+from __future__ import annotations
+
+import argparse
+import json
+
+from content_agent.run_service import RunService
+from content_agent.schemas import RunStartRequest
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(description="Run the Content Agent V1 mock chain.")
+    parser.add_argument("--trace-id", default=None)
+    parser.add_argument("--source", default=None)
+    parser.add_argument("--platform", default="douyin")
+    args = parser.parse_args()
+
+    service = RunService()
+    state = service.start_run(
+        RunStartRequest(trace_id=args.trace_id, source=args.source, platform=args.platform)
+    )
+    print(json.dumps(service.get_summary(state["trace_id"]), ensure_ascii=False, indent=2))
+
+
+if __name__ == "__main__":
+    main()
+

+ 127 - 0
content_agent/graph.py

@@ -0,0 +1,127 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import Any
+
+from langgraph.graph import END, START, StateGraph
+
+from content_agent.business_modules import (
+    candidate_evidence,
+    learning_review,
+    platform_access,
+    policy_version,
+    result_source_lookup,
+    rule_judgment,
+    run_record,
+    search_intent,
+    source_seed,
+    walk_strategy,
+)
+from content_agent.interfaces import PlatformSearchClient, PolicyBundleStore, RuntimeFileStore
+from content_agent.models import RunState
+
+
+@dataclass(frozen=True)
+class RunDependencies:
+    runtime: RuntimeFileStore
+    platform_client: PlatformSearchClient
+    policy_store: PolicyBundleStore
+
+
+def build_run_graph(deps: RunDependencies):
+    graph = StateGraph(RunState)
+
+    def load_source(state: RunState) -> dict[str, Any]:
+        result = source_seed.run(state["trace_id"], state.get("source"), deps.runtime)
+        return {**result, "current_step": "load_source"}
+
+    def plan_queries(state: RunState) -> dict[str, Any]:
+        queries = search_intent.run(state["trace_id"], state["pattern_seed_pack"], deps.runtime)
+        return {"queries": queries, "current_step": "plan_queries"}
+
+    def search_platform(state: RunState) -> dict[str, Any]:
+        results = platform_access.run(state["queries"], deps.platform_client)
+        return {"platform_results": results, "current_step": "search_platform"}
+
+    def build_candidates(state: RunState) -> dict[str, Any]:
+        result = candidate_evidence.run(
+            state["trace_id"],
+            state["platform_results"],
+            state["source_context"],
+            deps.runtime,
+        )
+        return {**result, "current_step": "build_candidates"}
+
+    def load_policy(state: RunState) -> dict[str, Any]:
+        bundle = policy_version.run(state["policy_bundle_version"], deps.policy_store)
+        return {"policy_bundle": bundle, "current_step": "load_policy"}
+
+    def evaluate_rules(state: RunState) -> dict[str, Any]:
+        decisions = rule_judgment.run(
+            state["trace_id"],
+            state["evidence_bundles"],
+            state["policy_bundle"],
+            deps.runtime,
+        )
+        return {"rule_decisions": decisions, "current_step": "evaluate_rules"}
+
+    def plan_walk(state: RunState) -> dict[str, Any]:
+        result = walk_strategy.run(
+            state["pattern_seed_pack"],
+            state["queries"],
+            state["candidates"],
+            state["rule_decisions"],
+        )
+        return {**result, "current_step": "plan_walk"}
+
+    def record_run(state: RunState) -> dict[str, Any]:
+        result = run_record.run(
+            state["trace_id"],
+            state["queries"],
+            state["candidates"],
+            state["rule_decisions"],
+            state["source_edge_basis"],
+            deps.runtime,
+        )
+        return {**result, "current_step": "record_run"}
+
+    def commit_results(state: RunState) -> dict[str, Any]:
+        final_output = result_source_lookup.run(
+            state["trace_id"],
+            state["candidates"],
+            state["media_assets"],
+            state["rule_decisions"],
+            state["source_edges"],
+            state["search_clues"],
+            deps.runtime,
+        )
+        return {"final_output": final_output, "current_step": "commit_results"}
+
+    def review_strategy(state: RunState) -> dict[str, Any]:
+        review = learning_review.run(state["trace_id"], deps.runtime)
+        return {"strategy_review": review, "current_step": "review_strategy", "status": "success"}
+
+    graph.add_node("load_source", load_source)
+    graph.add_node("plan_queries", plan_queries)
+    graph.add_node("search_platform", search_platform)
+    graph.add_node("build_candidates", build_candidates)
+    graph.add_node("load_policy", load_policy)
+    graph.add_node("evaluate_rules", evaluate_rules)
+    graph.add_node("plan_walk", plan_walk)
+    graph.add_node("record_run", record_run)
+    graph.add_node("commit_results", commit_results)
+    graph.add_node("review_strategy", review_strategy)
+
+    graph.add_edge(START, "load_source")
+    graph.add_edge("load_source", "plan_queries")
+    graph.add_edge("plan_queries", "search_platform")
+    graph.add_edge("search_platform", "build_candidates")
+    graph.add_edge("build_candidates", "load_policy")
+    graph.add_edge("load_policy", "evaluate_rules")
+    graph.add_edge("evaluate_rules", "plan_walk")
+    graph.add_edge("plan_walk", "record_run")
+    graph.add_edge("record_run", "commit_results")
+    graph.add_edge("commit_results", "review_strategy")
+    graph.add_edge("review_strategy", END)
+    return graph.compile()
+

+ 2 - 0
content_agent/integrations/__init__.py

@@ -0,0 +1,2 @@
+"""External integration implementations for the V1 Content Agent backend."""
+

+ 320 - 0
content_agent/integrations/douyin.py

@@ -0,0 +1,320 @@
+from __future__ import annotations
+
+import os
+import re
+from pathlib import Path
+from typing import Any
+from urllib.parse import urljoin
+
+import httpx
+
+
+class CrawapiDouyinClient:
+    def __init__(
+        self,
+        base_url: str,
+        keyword_path: str,
+        content_portrait_path: str,
+        timeout_seconds: float = 60.0,
+        default_account_id: str = "",
+        default_content_type: str = "视频",
+        default_sort_type: str = "综合排序",
+        default_publish_time: str = "不限",
+        default_cursor: str = "0",
+        max_results_per_query: int | None = 3,
+        http_client: Any | None = None,
+    ) -> None:
+        self.base_url = base_url.rstrip("/") + "/"
+        self.keyword_path = keyword_path.lstrip("/")
+        self.content_portrait_path = content_portrait_path.lstrip("/")
+        self.timeout_seconds = timeout_seconds
+        self.default_account_id = default_account_id
+        self.default_content_type = default_content_type
+        self.default_sort_type = default_sort_type
+        self.default_publish_time = default_publish_time
+        self.default_cursor = default_cursor
+        self.max_results_per_query = max_results_per_query
+        self.http_client = http_client or httpx.Client(timeout=timeout_seconds)
+
+    @classmethod
+    def from_env(cls, env_path: str | Path = ".env") -> "CrawapiDouyinClient":
+        env = _load_env_file(env_path)
+        return cls(
+            base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
+            keyword_path=_env("CONTENTFIND_DOUYIN_KEYWORD_PATH", env, required=True),
+            content_portrait_path=_env(
+                "CONTENTFIND_DOUYIN_VIDEO_LIKE_PORTRAIT_PATH", env, required=True
+            ),
+            timeout_seconds=float(
+                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
+            ),
+            default_account_id=_env("CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID", env, default=""),
+            default_content_type=_env("CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE", env, default="视频"),
+            default_sort_type=_env("CONTENTFIND_DOUYIN_DEFAULT_SORT_TYPE", env, default="综合排序"),
+            default_publish_time=_env("CONTENTFIND_DOUYIN_DEFAULT_PUBLISH_TIME", env, default="不限"),
+            default_cursor=_env("CONTENTFIND_DOUYIN_DEFAULT_CURSOR", env, default="0"),
+            max_results_per_query=_optional_positive_int(
+                _env("CONTENTFIND_DOUYIN_MAX_RESULTS_PER_QUERY", env, default="3")
+            ),
+        )
+
+    def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
+        payload = {
+            "keyword": query["query"],
+            "content_type": self.default_content_type,
+            "sort_type": self.default_sort_type,
+            "publish_time": self.default_publish_time,
+            "cursor": self.default_cursor,
+            "account_id": self.default_account_id,
+        }
+        data = self._post_json(self.keyword_path, payload, operation="keyword_search")
+        data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+        items = data_block.get("data", []) if isinstance(data_block.get("data"), list) else []
+        has_more = bool(data_block.get("has_more", False))
+        next_cursor = str(data_block.get("next_cursor") or "")
+
+        results: list[dict[str, Any]] = []
+        selected_items = items[: self.max_results_per_query] if self.max_results_per_query else items
+        for index, item in enumerate(selected_items, start=1):
+            normalized = self._normalize_video_item(query, item, index, has_more, next_cursor)
+            portrait = self._fetch_content_portrait(normalized["aweme_id"])
+            normalized.update(portrait)
+            results.append(normalized)
+        return results
+
+    def _normalize_video_item(
+        self,
+        query: dict[str, Any],
+        item: dict[str, Any],
+        index: int,
+        has_more: bool,
+        next_cursor: str,
+    ) -> dict[str, Any]:
+        author = item.get("author", {}) if isinstance(item.get("author"), dict) else {}
+        statistics = item.get("statistics", {}) if isinstance(item.get("statistics"), dict) else {}
+        aweme_id = str(item.get("aweme_id") or "")
+        return {
+            "candidate_id": f"{query['query_id']}_c_{index:03d}",
+            "query_id": query["query_id"],
+            "aweme_id": aweme_id,
+            "desc": item.get("desc") or item.get("item_title") or "",
+            "author_sec_uid": author.get("sec_uid") or "",
+            "author_nickname": author.get("nickname") or "",
+            "statistics": {
+                "digg_count": int(statistics.get("digg_count") or 0),
+                "comment_count": int(statistics.get("comment_count") or 0),
+                "share_count": int(statistics.get("share_count") or 0),
+                "collect_count": int(statistics.get("collect_count") or 0),
+                "play_count": int(statistics.get("play_count") or 0),
+            },
+            "cha_list": _extract_tags(item),
+            "text_extra": item.get("text_extra") or [],
+            "create_time": item.get("create_time"),
+            "has_more": has_more,
+            "next_cursor": next_cursor,
+            "score": _score_from_statistics(statistics),
+            "risk_level": "unknown",
+            "pattern_recall": "candidate_related",
+            "category_or_element_binding": "candidate_related",
+            "candidate_relation": "derived_from_pattern_demand",
+            "origin_source": query["origin_source"],
+            "immediate_source": "query_direct",
+            "metadata_source": "douyin_keyword_search",
+            "platform_auth_mode": "no_bearer",
+        }
+
+    def _fetch_content_portrait(self, aweme_id: str) -> dict[str, Any]:
+        try:
+            data = self._post_json(
+                self.content_portrait_path,
+                {
+                    "content_id": aweme_id,
+                    "need_age": True,
+                    "need_gender": True,
+                    "need_province": True,
+                    "need_city": False,
+                    "need_city_level": False,
+                    "need_phone_brand": False,
+                    "need_phone_price": False,
+                },
+                operation="content_portrait",
+            )
+        except RuntimeError:
+            return {"portrait_available": False, "age_50_plus_level": "missing"}
+
+        portrait = _extract_portrait_dimensions(data)
+        age_distribution = _normalize_age_distribution(portrait.get("年龄"))
+        if not age_distribution:
+            return {"portrait_available": False, "age_50_plus_level": "missing"}
+
+        age_50_ratio = sum(row["percentage"] for row in age_distribution if row["is_50_plus"])
+        age_50_tgi = max(
+            [row["preference"] for row in age_distribution if row["is_50_plus"]] or [0.0]
+        )
+        return {
+            "portrait_available": True,
+            "age_50_plus_level": _age_level(age_50_ratio, age_50_tgi),
+            "age_distribution": age_distribution,
+            "age_50_plus_ratio": age_50_ratio,
+            "age_50_plus_tgi": age_50_tgi,
+        }
+
+    def _post_json(self, path: str, payload: dict[str, Any], operation: str) -> dict[str, Any]:
+        url = urljoin(self.base_url, path)
+        try:
+            response = self.http_client.post(
+                url,
+                json=payload,
+                headers={"Content-Type": "application/json"},
+                timeout=self.timeout_seconds,
+            )
+            response.raise_for_status()
+            data = response.json()
+        except httpx.HTTPStatusError as exc:
+            status_code = exc.response.status_code if exc.response is not None else "unknown"
+            raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
+        except httpx.HTTPError as exc:
+            raise RuntimeError(f"crawapi {operation} failed: network_error") from exc
+        except ValueError as exc:
+            raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc
+        if not isinstance(data, dict):
+            raise RuntimeError(f"crawapi {operation} failed: bad_response")
+        return data
+
+
+def _load_env_file(env_path: str | Path) -> dict[str, str]:
+    path = Path(env_path)
+    if not path.exists():
+        return {}
+    env: dict[str, str] = {}
+    for line in path.read_text(encoding="utf-8").splitlines():
+        stripped = line.strip()
+        if not stripped or stripped.startswith("#") or "=" not in stripped:
+            continue
+        key, value = stripped.split("=", 1)
+        env[key.strip()] = value.strip().strip('"').strip("'")
+    return env
+
+
+def _env(
+    key: str,
+    file_env: dict[str, str],
+    default: str | None = None,
+    required: bool = False,
+) -> str:
+    value = os.getenv(key) or file_env.get(key) or default
+    if required and not value:
+        raise RuntimeError(f"missing required env: {key}")
+    return value or ""
+
+
+def _optional_positive_int(value: str) -> int | None:
+    try:
+        parsed = int(value)
+    except ValueError:
+        return None
+    return parsed if parsed > 0 else None
+
+
+def _extract_tags(item: dict[str, Any]) -> list[str]:
+    tags: list[str] = []
+    for tag in item.get("cha_list") or []:
+        if isinstance(tag, str):
+            tags.append(tag if tag.startswith("#") else f"#{tag}")
+        elif isinstance(tag, dict):
+            name = tag.get("cha_name") or tag.get("hashtag_name") or tag.get("name")
+            if name:
+                tags.append(str(name) if str(name).startswith("#") else f"#{name}")
+    for text in item.get("text_extra") or []:
+        if isinstance(text, dict) and text.get("hashtag_name"):
+            tags.append(f"#{text['hashtag_name']}")
+    return list(dict.fromkeys(tags))
+
+
+def _score_from_statistics(statistics: dict[str, Any]) -> int:
+    digg = int(statistics.get("digg_count") or 0)
+    comment = int(statistics.get("comment_count") or 0)
+    share = int(statistics.get("share_count") or 0)
+    weighted = digg + comment * 3 + share * 4
+    if weighted >= 3000:
+        return 72
+    if weighted >= 1000:
+        return 62
+    if weighted >= 300:
+        return 55
+    return 45
+
+
+def _normalize_age_distribution(age_data: Any) -> list[dict[str, Any]]:
+    rows: list[dict[str, Any]] = []
+    items = age_data.items() if isinstance(age_data, dict) else []
+    if isinstance(age_data, list):
+        items = [(row.get("name"), row) for row in age_data if isinstance(row, dict)]
+    for name, value in items:
+        metrics = value if isinstance(value, dict) else {}
+        label = str(name or metrics.get("name") or "")
+        if not label:
+            continue
+        rows.append(
+            {
+                "name": label,
+                "percentage": _to_float(metrics.get("percentage")),
+                "preference": _to_float(metrics.get("preference")),
+                "is_50_plus": _is_50_plus_label(label),
+            }
+        )
+    return rows
+
+
+def _extract_portrait_dimensions(data: dict[str, Any]) -> dict[str, Any]:
+    data_block = data.get("data", {}) if isinstance(data.get("data"), dict) else {}
+    candidates = [
+        data_block.get("data"),
+        data_block.get("portrait"),
+        data_block,
+        data,
+    ]
+    for candidate in candidates:
+        if not isinstance(candidate, dict):
+            continue
+        dimensions = candidate.get("dimensions")
+        if isinstance(dimensions, dict):
+            return dimensions
+        portrait = candidate.get("portrait")
+        if isinstance(portrait, dict) and isinstance(portrait.get("dimensions"), dict):
+            return portrait["dimensions"]
+        if "年龄" in candidate:
+            return candidate
+    return {}
+
+
+def _to_float(value: Any) -> float:
+    if value is None:
+        return 0.0
+    if isinstance(value, (int, float)):
+        return float(value)
+    text = str(value).strip().replace("%", "")
+    try:
+        parsed = float(text)
+    except ValueError:
+        return 0.0
+    return parsed / 100 if "%" in str(value) else parsed
+
+
+def _is_50_plus_label(label: str) -> bool:
+    if "50+" in label or "50以上" in label or "50-" in label or "老年" in label:
+        return True
+    numbers = [int(value) for value in re.findall(r"\d+", label)]
+    if not numbers:
+        return False
+    if "-" in label and numbers[0] < 50:
+        return False
+    return min(numbers) >= 50
+
+
+def _age_level(ratio: float, tgi: float) -> str:
+    if ratio >= 0.25 or tgi >= 130:
+        return "strong"
+    if ratio >= 0.1 or tgi >= 100:
+        return "medium"
+    return "weak"

+ 72 - 0
content_agent/integrations/mock_platform.py

@@ -0,0 +1,72 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+class MockPlatformClient:
+    """Deterministic platform client used before real Douyin access exists."""
+
+    def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
+        query_id = query["query_id"]
+        if query_id == "q_001":
+            return [
+                {
+                    "candidate_id": "c_001",
+                    "query_id": query_id,
+                    "aweme_id": "7390000000000000000",
+                    "desc": "基层干部贪腐警示案例",
+                    "author_sec_uid": "MS4wLjABAAAA001",
+                    "author_nickname": "警示故事",
+                    "statistics": {"digg_count": 1200, "comment_count": 80, "share_count": 300},
+                    "cha_list": ["#警示教育"],
+                    "score": 72,
+                    "age_50_plus_level": "medium",
+                    "risk_level": "low",
+                    "portrait_available": True,
+                    "pattern_recall": "matched",
+                    "category_or_element_binding": "matched",
+                    "candidate_relation": "mock_pattern_matched",
+                    "origin_source": "pattern_itemset",
+                    "immediate_source": "query_direct",
+                },
+                {
+                    "candidate_id": "c_099",
+                    "query_id": query_id,
+                    "aweme_id": "7390000000000000099",
+                    "desc": "缺少画像的低可信候选",
+                    "author_sec_uid": "MS4wLjABAAAA099",
+                    "author_nickname": "未知作者",
+                    "statistics": {"digg_count": 12, "comment_count": 0, "share_count": 1},
+                    "cha_list": [],
+                    "score": None,
+                    "age_50_plus_level": "missing",
+                    "risk_level": "unknown",
+                    "portrait_available": False,
+                    "pattern_recall": "matched",
+                    "category_or_element_binding": "matched",
+                    "candidate_relation": "mock_pattern_matched",
+                    "origin_source": "pattern_itemset",
+                    "immediate_source": "query_direct",
+                },
+            ]
+        return [
+            {
+                "candidate_id": "c_002",
+                "query_id": query_id,
+                "aweme_id": "7390000000000000001",
+                "desc": "同主题延展候选案例",
+                "author_sec_uid": "MS4wLjABAAAA002",
+                "author_nickname": "案例观察",
+                "statistics": {"digg_count": 600, "comment_count": 20, "share_count": 70},
+                "cha_list": ["#基层治理"],
+                "score": 55,
+                "age_50_plus_level": "medium",
+                "risk_level": "low",
+                "portrait_available": True,
+                "pattern_recall": "matched",
+                "category_or_element_binding": "matched",
+                "candidate_relation": "mock_pattern_matched",
+                "origin_source": "pattern_itemset",
+                "immediate_source": "query_direct",
+            }
+        ]

+ 42 - 0
content_agent/integrations/policy_json.py

@@ -0,0 +1,42 @@
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Any
+
+
+class JsonPolicyBundleStore:
+    def __init__(self, root_dir: Path | str = Path(".")) -> None:
+        self.root_dir = Path(root_dir)
+
+    def load_policy_bundle(self, policy_bundle_version: str) -> dict[str, Any]:
+        rule_pack_path = self.root_dir / "product_documents/规则包/douyin_rule_packs.v1.json"
+        strategy_path = self.root_dir / "product_documents/抖音游走策略/douyin_available_walk_strategy.v1.json"
+        if not rule_pack_path.exists():
+            raise FileNotFoundError(rule_pack_path)
+
+        rule_package = json.loads(rule_pack_path.read_text(encoding="utf-8"))
+        rule_pack = _select_video_rule_pack(rule_package)
+        bundle = {
+            "policy_bundle_version": policy_bundle_version,
+            "rule_package_id": rule_package.get("package_id"),
+            "rule_package_name": rule_package.get("package_name"),
+            "rule_pack": rule_pack,
+            "rule_pack_id": rule_pack["pack_id"],
+            "rule_pack_version": rule_pack["version"],
+            "shared_contracts": rule_package.get("shared_contracts", {}),
+            "source_evidence_policy": rule_package.get("source_evidence_policy", {}),
+            "strategy_id": "douyin_available_walk_strategy_v1",
+        }
+        if strategy_path.exists():
+            strategy = json.loads(strategy_path.read_text(encoding="utf-8"))
+            bundle["strategy_name"] = strategy.get("strategy_name")
+        return bundle
+
+
+def _select_video_rule_pack(rule_package: dict[str, Any]) -> dict[str, Any]:
+    for rule_pack in rule_package.get("rule_packs", []):
+        applies_to = rule_pack.get("applies_to", {})
+        if rule_pack.get("enabled") and applies_to.get("target_entity") == "Video":
+            return rule_pack
+    raise ValueError("enabled Video rule pack not found")

+ 73 - 0
content_agent/integrations/runtime_files.py

@@ -0,0 +1,73 @@
+from __future__ import annotations
+
+import json
+import shutil
+from pathlib import Path
+from typing import Any
+
+
+RUNTIME_FILENAMES = [
+    "source_context.json",
+    "pattern_seed_pack.json",
+    "queries.jsonl",
+    "candidate_pool.jsonl",
+    "media_assets.jsonl",
+    "rule_decisions.jsonl",
+    "trace_events.jsonl",
+    "source_edges.jsonl",
+    "search_clues.jsonl",
+    "final_output.json",
+]
+
+
+class LocalRuntimeFileStore:
+    def __init__(self, base_dir: Path | str = Path("runtime/v1")) -> None:
+        self.base_dir = Path(base_dir)
+
+    def prepare_run(self, trace_id: str) -> Path:
+        path = self.run_dir(trace_id)
+        if path.exists():
+            shutil.rmtree(path)
+        path.mkdir(parents=True, exist_ok=True)
+        return path
+
+    def run_dir(self, trace_id: str) -> Path:
+        return self.base_dir / trace_id
+
+    def write_json(self, trace_id: str, filename: str, data: dict[str, Any]) -> Path:
+        path = self.run_dir(trace_id) / filename
+        path.parent.mkdir(parents=True, exist_ok=True)
+        path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
+        return path
+
+    def append_jsonl(self, trace_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
+        path = self.run_dir(trace_id) / filename
+        path.parent.mkdir(parents=True, exist_ok=True)
+        with path.open("a", encoding="utf-8") as file:
+            for row in rows:
+                file.write(json.dumps(row, ensure_ascii=False, separators=(",", ":")) + "\n")
+        return path
+
+    def read_json(self, trace_id: str, filename: str) -> dict[str, Any]:
+        path = self.run_dir(trace_id) / filename
+        return json.loads(path.read_text(encoding="utf-8"))
+
+    def read_jsonl(self, trace_id: str, filename: str) -> list[dict[str, Any]]:
+        path = self.run_dir(trace_id) / filename
+        if not path.exists():
+            return []
+        return [
+            json.loads(line)
+            for line in path.read_text(encoding="utf-8").splitlines()
+            if line.strip()
+        ]
+
+    def file_status(self, trace_id: str) -> dict[str, bool]:
+        run_dir = self.run_dir(trace_id)
+        return {filename: (run_dir / filename).exists() for filename in RUNTIME_FILENAMES}
+
+    def list_runs(self) -> list[str]:
+        if not self.base_dir.exists():
+            return []
+        return sorted(path.name for path in self.base_dir.iterdir() if path.is_dir())
+

+ 23 - 0
content_agent/interfaces.py

@@ -0,0 +1,23 @@
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any, Protocol
+
+
+class RuntimeFileStore(Protocol):
+    def prepare_run(self, trace_id: str) -> Path: ...
+    def run_dir(self, trace_id: str) -> Path: ...
+    def write_json(self, trace_id: str, filename: str, data: dict[str, Any]) -> Path: ...
+    def append_jsonl(self, trace_id: str, filename: str, rows: list[dict[str, Any]]) -> Path: ...
+    def read_json(self, trace_id: str, filename: str) -> dict[str, Any]: ...
+    def read_jsonl(self, trace_id: str, filename: str) -> list[dict[str, Any]]: ...
+    def file_status(self, trace_id: str) -> dict[str, bool]: ...
+
+
+class PlatformSearchClient(Protocol):
+    def search(self, query: dict[str, Any]) -> list[dict[str, Any]]: ...
+
+
+class PolicyBundleStore(Protocol):
+    def load_policy_bundle(self, policy_bundle_version: str) -> dict[str, Any]: ...
+

+ 36 - 0
content_agent/models.py

@@ -0,0 +1,36 @@
+from __future__ import annotations
+
+from typing import Any, Literal, TypedDict
+
+
+RunStatus = Literal["running", "success", "blocked", "failed"]
+FinalAction = Literal["POOL", "CANDIDATE", "PENDING", "REJECT"]
+EffectStatus = Literal["success", "weak_effective", "pending", "blocked", "failed"]
+
+
+class RunState(TypedDict, total=False):
+    trace_id: str
+    platform: str
+    platform_mode: str
+    source: str | None
+    policy_bundle_version: str
+    current_step: str
+    status: RunStatus
+    errors: list[str]
+    source_context: dict[str, Any]
+    pattern_seed_pack: dict[str, Any]
+    queries: list[dict[str, Any]]
+    platform_results: list[dict[str, Any]]
+    candidates: list[dict[str, Any]]
+    media_assets: list[dict[str, Any]]
+    evidence_bundles: list[dict[str, Any]]
+    policy_bundle: dict[str, Any]
+    rule_decisions: list[dict[str, Any]]
+    walk_actions: list[dict[str, Any]]
+    source_edge_basis: list[dict[str, Any]]
+    trace_events: list[dict[str, Any]]
+    source_edges: list[dict[str, Any]]
+    search_clues: list[dict[str, Any]]
+    final_output: dict[str, Any]
+    strategy_review: dict[str, Any]
+

+ 86 - 0
content_agent/run_service.py

@@ -0,0 +1,86 @@
+from __future__ import annotations
+
+from pathlib import Path
+from uuid import uuid4
+
+from content_agent.graph import RunDependencies, build_run_graph
+from content_agent.integrations.douyin import CrawapiDouyinClient
+from content_agent.integrations.mock_platform import MockPlatformClient
+from content_agent.integrations.policy_json import JsonPolicyBundleStore
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore
+from content_agent.interfaces import PlatformSearchClient
+from content_agent.models import RunState
+from content_agent.schemas import RunStartRequest
+
+
+class RunService:
+    def __init__(self, runtime_root: Path | str = Path("runtime/v1")) -> None:
+        self.runtime = LocalRuntimeFileStore(runtime_root)
+        self.policy_store = JsonPolicyBundleStore(Path("."))
+
+    def start_run(self, request: RunStartRequest) -> RunState:
+        trace_id = request.trace_id or f"v1_trace_{uuid4().hex[:12]}"
+        self.runtime.prepare_run(trace_id)
+        deps = RunDependencies(
+            runtime=self.runtime,
+            platform_client=self._platform_client(request.platform_mode),
+            policy_store=self.policy_store,
+        )
+        graph = build_run_graph(deps)
+        initial_state: RunState = {
+            "trace_id": trace_id,
+            "platform": request.platform,
+            "platform_mode": request.platform_mode,
+            "source": request.source,
+            "policy_bundle_version": request.policy_bundle_version,
+            "current_step": "start",
+            "status": "running",
+            "errors": [],
+        }
+        try:
+            return graph.invoke(initial_state)
+        except Exception as exc:
+            failed_state: RunState = {
+                **initial_state,
+                "current_step": "failed",
+                "status": "failed",
+                "errors": [str(exc)],
+            }
+            return failed_state
+
+    def _platform_client(self, platform_mode: str) -> PlatformSearchClient:
+        if platform_mode == "mock":
+            return MockPlatformClient()
+        if platform_mode == "real":
+            return CrawapiDouyinClient.from_env()
+        raise ValueError(f"unsupported platform_mode: {platform_mode}")
+
+    def get_summary(self, trace_id: str) -> dict:
+        final_output_exists = (self.runtime.run_dir(trace_id) / "final_output.json").exists()
+        status = "success" if final_output_exists else "failed"
+        validation = self.validate_run(trace_id) if final_output_exists else {"status": "fail"}
+        return {
+            "trace_id": trace_id,
+            "status": status,
+            "current_step": "review_strategy" if final_output_exists else "unknown",
+            "output_dir": str(self.runtime.run_dir(trace_id)),
+            "files": self.runtime.file_status(trace_id),
+            "validation_status": validation["status"],
+            "errors": [],
+        }
+
+    def read_jsonl(self, trace_id: str, filename: str) -> list[dict]:
+        return self.runtime.read_jsonl(trace_id, filename)
+
+    def read_json(self, trace_id: str, filename: str) -> dict:
+        return self.runtime.read_json(trace_id, filename)
+
+    def strategy_review(self, trace_id: str) -> dict:
+        from content_agent.business_modules.learning_review import run
+
+        return run(trace_id, self.runtime)
+
+    def validate_run(self, trace_id: str) -> dict:
+        from content_agent.business_modules.run_record import validate_run
+
+        return validate_run(trace_id, self.runtime)

+ 51 - 0
content_agent/schemas.py

@@ -0,0 +1,51 @@
+from __future__ import annotations
+
+from typing import Any, Literal
+
+from pydantic import BaseModel, Field
+
+
+class RunStartRequest(BaseModel):
+    source: str | None = Field(default=None, description="Optional source_context.json path.")
+    platform: str = Field(default="douyin", description="Requested platform.")
+    platform_mode: Literal["mock", "real"] = Field(
+        default="mock",
+        description="mock uses deterministic fixtures; real calls the configured platform API.",
+    )
+    trace_id: str | None = Field(default=None, description="Optional caller-provided run id.")
+    policy_bundle_version: str = Field(default="douyin_policy_bundle_v1")
+
+
+class RunStartResponse(BaseModel):
+    trace_id: str
+    status: str
+    policy_bundle_version: str
+    platform: str
+    platform_mode: str
+    output_dir: str
+
+
+class RunSummaryResponse(BaseModel):
+    trace_id: str
+    status: str
+    current_step: str
+    output_dir: str
+    files: dict[str, bool]
+    validation_status: str
+    errors: list[str] = Field(default_factory=list)
+
+
+class RecordsResponse(BaseModel):
+    trace_id: str
+    records: list[dict[str, Any]]
+
+
+class JsonFileResponse(BaseModel):
+    trace_id: str
+    data: dict[str, Any]
+
+
+class ValidationResponse(BaseModel):
+    trace_id: str
+    status: str
+    findings: list[dict[str, Any]]