from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Any from content_agent.findings import fail as _fail WALK_STRATEGY_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json") RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json") # V3 清理: 13 段收窄到 3 个仍被运行时/校验消费的段—— # walk_edge_catalog(walk_graph.json 边 ID 合法性校验)、walk_rule_pack_binding # (终端阶段归属包)、walk_fact_contract(runtime 文件契约校验);其余 10 段 # (老预算/停止/重试/触发规则等)已被 walk_graph+walk_policy 取代,随段删除。 REQUIRED_SECTIONS = [ "walk_edge_catalog", "walk_rule_pack_binding", "walk_fact_contract", ] @dataclass(frozen=True) class WalkStrategyStore: root_dir: Path = Path(".") strategy_path: Path = WALK_STRATEGY_PATH rule_pack_path: Path = RULE_PACK_PATH def load_walk_strategy(self) -> dict[str, Any]: from content_agent.integrations import config_store path = self.root_dir / self.strategy_path strategy, _ = config_store.load_json(path) findings = validate_walk_strategy_config( strategy, root_dir=self.root_dir, strategy_path=self.strategy_path, rule_pack_path=self.rule_pack_path, ) failures = [finding for finding in findings if finding["level"] == "fail"] if failures: raise ValueError(f"invalid walk strategy config: {failures}") return { **strategy, "walk_strategy_version": strategy.get("strategy_version"), "walk_strategy_source_ref": { "file": str(self.strategy_path), "strategy_id": strategy.get("strategy_id"), "walk_strategy_version": strategy.get("strategy_version"), "source_of_truth": strategy.get("source_of_truth"), }, } def validate_walk_strategy_config( strategy: dict[str, Any], *, root_dir: Path = Path("."), strategy_path: Path = WALK_STRATEGY_PATH, rule_pack_path: Path = RULE_PACK_PATH, ) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] _check_identity(strategy, strategy_path, findings) _check_required_sections(strategy, findings) if any(finding["level"] == "fail" for finding in findings): return findings edge_ids = _ids(strategy["walk_edge_catalog"], "edge_id") _check_edge_refs(strategy, edge_ids, findings) _check_fact_contract(strategy["walk_fact_contract"], findings) _check_rule_pack_bindings( strategy["walk_rule_pack_binding"], root_dir / rule_pack_path, findings, ) return findings def _check_identity( strategy: dict[str, Any], strategy_path: Path, findings: list[dict[str, Any]], ) -> None: if strategy.get("strategy_id") != "douyin_walk_strategy_v1": _fail(findings, "strategy_id", "strategy_id must be douyin_walk_strategy_v1") if strategy.get("strategy_version") != "V1.0": _fail(findings, "strategy_version", "walk strategy config version must be V1.0") if strategy.get("source_of_truth") != str(strategy_path): _fail(findings, "source_of_truth", f"source_of_truth must be {strategy_path}") def _check_required_sections(strategy: dict[str, Any], findings: list[dict[str, Any]]) -> None: for section in REQUIRED_SECTIONS: value = strategy.get(section) if not isinstance(value, list) or not value: _fail(findings, "section_missing", f"{section} must be a non-empty list") def _check_edge_refs( strategy: dict[str, Any], edge_ids: set[str], findings: list[dict[str, Any]], ) -> None: for section in ["walk_rule_pack_binding"]: for row in strategy.get(section, []): if row.get("edge_id") not in edge_ids: _fail( findings, "edge_ref", f"{section} references unknown edge_id: {row.get('edge_id')}", ) def _check_fact_contract( contracts: list[dict[str, Any]], findings: list[dict[str, Any]] ) -> None: by_file = {contract.get("runtime_file"): contract for contract in contracts} walk_actions = by_file.get("walk_actions.jsonl") if not walk_actions: _fail(findings, "walk_actions_contract", "walk_actions.jsonl contract is required") else: unique_key = walk_actions.get("unique_key") if unique_key != ["run_id", "policy_run_id", "walk_action_id"]: _fail(findings, "walk_actions_unique_key", "walk_actions unique key is invalid") search_clues = by_file.get("search_clues.jsonl") if not search_clues: _fail(findings, "search_clues_contract", "search_clues.jsonl contract is required") elif search_clues.get("unique_key") != ["run_id", "policy_run_id", "clue_id"]: _fail(findings, "search_clues_unique_key", "search_clues unique key must use clue_id") def _check_rule_pack_bindings( bindings: list[dict[str, Any]], rule_pack_path: Path, findings: list[dict[str, Any]], ) -> None: rule_package = json.loads(rule_pack_path.read_text(encoding="utf-8")) enabled_packs = { (pack.get("rule_pack_id"), pack.get("version")) for pack in rule_package.get("rule_packs", []) if pack.get("enabled") } for binding in bindings: key = (binding.get("rule_pack_id"), binding.get("rule_pack_version")) if key not in enabled_packs: _fail( findings, "rule_pack_binding", f"{binding.get('binding_id')} references missing enabled rule pack: {key}", ) def _ids(rows: list[dict[str, Any]], field: str) -> set[str]: return {str(row[field]) for row in rows if row.get(field)}