walk_strategy_json.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. from __future__ import annotations
  2. import json
  3. from dataclasses import dataclass
  4. from pathlib import Path
  5. from typing import Any
  6. from content_agent.findings import fail as _fail
  7. WALK_STRATEGY_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
  8. RULE_PACK_PATH = Path("product_documents/规则包/douyin_rule_packs.v1.json")
  9. # V3 清理: 13 段收窄到 3 个仍被运行时/校验消费的段——
  10. # walk_edge_catalog(walk_graph.json 边 ID 合法性校验)、walk_rule_pack_binding
  11. # (终端阶段归属包)、walk_fact_contract(runtime 文件契约校验);其余 10 段
  12. # (老预算/停止/重试/触发规则等)已被 walk_graph+walk_policy 取代,随段删除。
  13. REQUIRED_SECTIONS = [
  14. "walk_edge_catalog",
  15. "walk_rule_pack_binding",
  16. "walk_fact_contract",
  17. ]
  18. @dataclass(frozen=True)
  19. class WalkStrategyStore:
  20. root_dir: Path = Path(".")
  21. strategy_path: Path = WALK_STRATEGY_PATH
  22. rule_pack_path: Path = RULE_PACK_PATH
  23. def load_walk_strategy(self) -> dict[str, Any]:
  24. from content_agent.integrations import config_store
  25. path = self.root_dir / self.strategy_path
  26. strategy, _ = config_store.load_json(path)
  27. findings = validate_walk_strategy_config(
  28. strategy,
  29. root_dir=self.root_dir,
  30. strategy_path=self.strategy_path,
  31. rule_pack_path=self.rule_pack_path,
  32. )
  33. failures = [finding for finding in findings if finding["level"] == "fail"]
  34. if failures:
  35. raise ValueError(f"invalid walk strategy config: {failures}")
  36. return {
  37. **strategy,
  38. "walk_strategy_version": strategy.get("strategy_version"),
  39. "walk_strategy_source_ref": {
  40. "file": str(self.strategy_path),
  41. "strategy_id": strategy.get("strategy_id"),
  42. "walk_strategy_version": strategy.get("strategy_version"),
  43. "source_of_truth": strategy.get("source_of_truth"),
  44. },
  45. }
  46. def validate_walk_strategy_config(
  47. strategy: dict[str, Any],
  48. *,
  49. root_dir: Path = Path("."),
  50. strategy_path: Path = WALK_STRATEGY_PATH,
  51. rule_pack_path: Path = RULE_PACK_PATH,
  52. ) -> list[dict[str, Any]]:
  53. findings: list[dict[str, Any]] = []
  54. _check_identity(strategy, strategy_path, findings)
  55. _check_required_sections(strategy, findings)
  56. if any(finding["level"] == "fail" for finding in findings):
  57. return findings
  58. edge_ids = _ids(strategy["walk_edge_catalog"], "edge_id")
  59. _check_edge_refs(strategy, edge_ids, findings)
  60. _check_fact_contract(strategy["walk_fact_contract"], findings)
  61. _check_rule_pack_bindings(
  62. strategy["walk_rule_pack_binding"],
  63. root_dir / rule_pack_path,
  64. findings,
  65. )
  66. return findings
  67. def _check_identity(
  68. strategy: dict[str, Any],
  69. strategy_path: Path,
  70. findings: list[dict[str, Any]],
  71. ) -> None:
  72. if strategy.get("strategy_id") != "douyin_walk_strategy_v1":
  73. _fail(findings, "strategy_id", "strategy_id must be douyin_walk_strategy_v1")
  74. if strategy.get("strategy_version") != "V1.0":
  75. _fail(findings, "strategy_version", "walk strategy config version must be V1.0")
  76. if strategy.get("source_of_truth") != str(strategy_path):
  77. _fail(findings, "source_of_truth", f"source_of_truth must be {strategy_path}")
  78. def _check_required_sections(strategy: dict[str, Any], findings: list[dict[str, Any]]) -> None:
  79. for section in REQUIRED_SECTIONS:
  80. value = strategy.get(section)
  81. if not isinstance(value, list) or not value:
  82. _fail(findings, "section_missing", f"{section} must be a non-empty list")
  83. def _check_edge_refs(
  84. strategy: dict[str, Any],
  85. edge_ids: set[str],
  86. findings: list[dict[str, Any]],
  87. ) -> None:
  88. for section in ["walk_rule_pack_binding"]:
  89. for row in strategy.get(section, []):
  90. if row.get("edge_id") not in edge_ids:
  91. _fail(
  92. findings,
  93. "edge_ref",
  94. f"{section} references unknown edge_id: {row.get('edge_id')}",
  95. )
  96. def _check_fact_contract(
  97. contracts: list[dict[str, Any]], findings: list[dict[str, Any]]
  98. ) -> None:
  99. by_file = {contract.get("runtime_file"): contract for contract in contracts}
  100. walk_actions = by_file.get("walk_actions.jsonl")
  101. if not walk_actions:
  102. _fail(findings, "walk_actions_contract", "walk_actions.jsonl contract is required")
  103. else:
  104. unique_key = walk_actions.get("unique_key")
  105. if unique_key != ["run_id", "policy_run_id", "walk_action_id"]:
  106. _fail(findings, "walk_actions_unique_key", "walk_actions unique key is invalid")
  107. search_clues = by_file.get("search_clues.jsonl")
  108. if not search_clues:
  109. _fail(findings, "search_clues_contract", "search_clues.jsonl contract is required")
  110. elif search_clues.get("unique_key") != ["run_id", "policy_run_id", "clue_id"]:
  111. _fail(findings, "search_clues_unique_key", "search_clues unique key must use clue_id")
  112. def _check_rule_pack_bindings(
  113. bindings: list[dict[str, Any]],
  114. rule_pack_path: Path,
  115. findings: list[dict[str, Any]],
  116. ) -> None:
  117. rule_package = json.loads(rule_pack_path.read_text(encoding="utf-8"))
  118. enabled_packs = {
  119. (pack.get("rule_pack_id"), pack.get("version"))
  120. for pack in rule_package.get("rule_packs", [])
  121. if pack.get("enabled")
  122. }
  123. for binding in bindings:
  124. key = (binding.get("rule_pack_id"), binding.get("rule_pack_version"))
  125. if key not in enabled_packs:
  126. _fail(
  127. findings,
  128. "rule_pack_binding",
  129. f"{binding.get('binding_id')} references missing enabled rule pack: {key}",
  130. )
  131. def _ids(rows: list[dict[str, Any]], field: str) -> set[str]:
  132. return {str(row[field]) for row in rows if row.get(field)}