| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- from __future__ import annotations
- import json
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any
- from content_agent.findings import fail as _fail
- WALK_STRATEGY_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
- RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
- # V3 清理: 13 段收窄到 3 个仍被运行时/校验消费的段——
- # walk_edge_catalog(walk_graph.json 边 ID 合法性校验)、walk_rule_pack_binding
- # (终端阶段归属包)、walk_fact_contract(runtime 文件契约校验);其余 10 段
- # (老预算/停止/重试/触发规则等)已被 walk_graph+walk_policy 取代,随段删除。
- REQUIRED_SECTIONS = [
- "walk_edge_catalog",
- "walk_rule_pack_binding",
- "walk_fact_contract",
- ]
- @dataclass(frozen=True)
- class WalkStrategyStore:
- root_dir: Path = Path(".")
- strategy_path: Path = WALK_STRATEGY_PATH
- rule_pack_path: Path = RULE_PACK_PATH
- def load_walk_strategy(self) -> dict[str, Any]:
- from content_agent.integrations import config_store
- path = self.root_dir / self.strategy_path
- strategy, _ = config_store.load_json(path)
- findings = validate_walk_strategy_config(
- strategy,
- root_dir=self.root_dir,
- strategy_path=self.strategy_path,
- rule_pack_path=self.rule_pack_path,
- )
- failures = [finding for finding in findings if finding["level"] == "fail"]
- if failures:
- raise ValueError(f"invalid walk strategy config: {failures}")
- return {
- **strategy,
- "walk_strategy_version": strategy.get("strategy_version"),
- "walk_strategy_source_ref": {
- "file": str(self.strategy_path),
- "strategy_id": strategy.get("strategy_id"),
- "walk_strategy_version": strategy.get("strategy_version"),
- "source_of_truth": strategy.get("source_of_truth"),
- },
- }
- def validate_walk_strategy_config(
- strategy: dict[str, Any],
- *,
- root_dir: Path = Path("."),
- strategy_path: Path = WALK_STRATEGY_PATH,
- rule_pack_path: Path = RULE_PACK_PATH,
- ) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- _check_identity(strategy, strategy_path, findings)
- _check_required_sections(strategy, findings)
- if any(finding["level"] == "fail" for finding in findings):
- return findings
- edge_ids = _ids(strategy["walk_edge_catalog"], "edge_id")
- _check_edge_refs(strategy, edge_ids, findings)
- _check_fact_contract(strategy["walk_fact_contract"], findings)
- _check_rule_pack_bindings(
- strategy["walk_rule_pack_binding"],
- root_dir / rule_pack_path,
- findings,
- )
- return findings
- def _check_identity(
- strategy: dict[str, Any],
- strategy_path: Path,
- findings: list[dict[str, Any]],
- ) -> None:
- if strategy.get("strategy_id") != "douyin_walk_strategy_v1":
- _fail(findings, "strategy_id", "strategy_id must be douyin_walk_strategy_v1")
- if strategy.get("strategy_version") != "V1.0":
- _fail(findings, "strategy_version", "walk strategy config version must be V1.0")
- if strategy.get("source_of_truth") != str(strategy_path):
- _fail(findings, "source_of_truth", f"source_of_truth must be {strategy_path}")
- def _check_required_sections(strategy: dict[str, Any], findings: list[dict[str, Any]]) -> None:
- for section in REQUIRED_SECTIONS:
- value = strategy.get(section)
- if not isinstance(value, list) or not value:
- _fail(findings, "section_missing", f"{section} must be a non-empty list")
- def _check_edge_refs(
- strategy: dict[str, Any],
- edge_ids: set[str],
- findings: list[dict[str, Any]],
- ) -> None:
- for section in ["walk_rule_pack_binding"]:
- for row in strategy.get(section, []):
- if row.get("edge_id") not in edge_ids:
- _fail(
- findings,
- "edge_ref",
- f"{section} references unknown edge_id: {row.get('edge_id')}",
- )
- def _check_fact_contract(
- contracts: list[dict[str, Any]], findings: list[dict[str, Any]]
- ) -> None:
- by_file = {contract.get("runtime_file"): contract for contract in contracts}
- walk_actions = by_file.get("walk_actions.jsonl")
- if not walk_actions:
- _fail(findings, "walk_actions_contract", "walk_actions.jsonl contract is required")
- else:
- unique_key = walk_actions.get("unique_key")
- if unique_key != ["run_id", "policy_run_id", "walk_action_id"]:
- _fail(findings, "walk_actions_unique_key", "walk_actions unique key is invalid")
- search_clues = by_file.get("search_clues.jsonl")
- if not search_clues:
- _fail(findings, "search_clues_contract", "search_clues.jsonl contract is required")
- elif search_clues.get("unique_key") != ["run_id", "policy_run_id", "clue_id"]:
- _fail(findings, "search_clues_unique_key", "search_clues unique key must use clue_id")
- def _check_rule_pack_bindings(
- bindings: list[dict[str, Any]],
- rule_pack_path: Path,
- findings: list[dict[str, Any]],
- ) -> None:
- rule_package = json.loads(rule_pack_path.read_text(encoding="utf-8"))
- enabled_packs = {
- (pack.get("rule_pack_id"), pack.get("version"))
- for pack in rule_package.get("rule_packs", [])
- if pack.get("enabled")
- }
- for binding in bindings:
- key = (binding.get("rule_pack_id"), binding.get("rule_pack_version"))
- if key not in enabled_packs:
- _fail(
- findings,
- "rule_pack_binding",
- f"{binding.get('binding_id')} references missing enabled rule pack: {key}",
- )
- def _ids(rows: list[dict[str, Any]], field: str) -> set[str]:
- return {str(row[field]) for row in rows if row.get(field)}
|