walk_graph_json.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """V3 游走配置入口(M4A):读 walk_graph / walk_policy / platform_profiles 并校验。
  2. 镜像 walk_strategy_json.WalkStrategyStore 的 load+validate+raise 结构。
  3. walk_policy 的拍板值可能带 {value, provenance, tbd} 包裹(留痕),load_policy 解包成纯值。
  4. """
  5. from __future__ import annotations
  6. from dataclasses import dataclass
  7. from pathlib import Path
  8. from typing import Any
  9. from content_agent.findings import fail as _fail
  10. WALK_GRAPH_PATH = Path("tech_documents/数据接口与来源/walk_graph.json")
  11. WALK_POLICY_PATH = Path("tech_documents/数据接口与来源/walk_policy.json")
  12. PROFILE_DIR = Path("tech_documents/数据接口与来源/platform_profiles")
  13. EDGE_CATALOG_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
  14. PERMISSION_ACTIONS = ["ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW", "REJECT_CONTENT"]
  15. _PERMISSION_META_KEYS = {"_meaning", "_provenance", "note", "tbd"}
  16. def _unwrap(value: Any) -> Any:
  17. if isinstance(value, dict) and "value" in value:
  18. return value["value"]
  19. return value
  20. def low_budget(budget: int) -> int:
  21. """low_budget 档:减半向下取整,至少 1(2026-06-11 拍板,policy 值 halve_min_1)。"""
  22. return max(1, budget // 2)
  23. def edge_permission(policy: dict[str, Any], decision_action: str | None, edge_id: str) -> str:
  24. """判定结果 → 该出边通行证;无判定/未列入的组合一律 deny(从严)。"""
  25. row = policy["edge_permissions"].get(decision_action or "", {})
  26. return row.get(edge_id, "deny")
  27. def edge_supported(profile: dict[str, Any], edge_id: str) -> bool:
  28. """profile 未列出的边 = 平台无关控制边(终端边),恒 supported。"""
  29. return profile["edges"].get(edge_id, {"status": "supported"})["status"] == "supported"
  30. @dataclass(frozen=True)
  31. class WalkGraphStore:
  32. root_dir: Path = Path(".")
  33. def load_graph(self) -> dict[str, Any]:
  34. from content_agent.integrations import config_store
  35. graph, _ = config_store.load_json(self.root_dir / WALK_GRAPH_PATH)
  36. catalog, _ = config_store.load_json(self.root_dir / EDGE_CATALOG_PATH)
  37. catalog_ids = {row["edge_id"] for row in catalog["walk_edge_catalog"]}
  38. _raise_on_fail(_validate_graph(graph, catalog_ids), "walk graph")
  39. return graph
  40. def load_policy(self) -> dict[str, Any]:
  41. from content_agent.integrations import config_store
  42. raw_policy, _ = config_store.load_json(self.root_dir / WALK_POLICY_PATH)
  43. policy = _unwrap_policy(raw_policy)
  44. edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]}
  45. _raise_on_fail(_validate_policy(policy, edge_ids), "walk policy")
  46. return policy
  47. def load_profile(self, platform: str) -> dict[str, Any]:
  48. from content_agent.integrations import config_store
  49. profile, _ = config_store.load_json(self.root_dir / PROFILE_DIR / f"{platform}.json")
  50. edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]}
  51. _raise_on_fail(_validate_profile(profile, edge_ids), f"platform profile {platform}")
  52. return profile
  53. def _unwrap_policy(raw: dict[str, Any]) -> dict[str, Any]:
  54. policy = dict(raw)
  55. policy["global"] = {key: _unwrap(value) for key, value in raw["global"].items()}
  56. policy["edge_permissions"] = {
  57. action: {edge: _unwrap(perm) for edge, perm in row.items() if edge not in _PERMISSION_META_KEYS}
  58. for action, row in raw["edge_permissions"].items()
  59. if action in PERMISSION_ACTIONS
  60. }
  61. policy["budget_tiers"] = {key: _unwrap(value) for key, value in raw["budget_tiers"].items()}
  62. policy["edge_budgets_by_id"] = {row["edge_id"]: row for row in raw["edge_budgets"]}
  63. return policy
  64. def _validate_graph(graph: dict[str, Any], catalog_ids: set[str]) -> list[dict[str, Any]]:
  65. findings: list[dict[str, Any]] = []
  66. node_types = {node["node_type"] for node in graph.get("nodes", [])}
  67. for edge in graph.get("edges", []):
  68. if edge.get("edge_id") not in catalog_ids:
  69. _fail(findings, "edge_not_in_catalog", f"{edge.get('edge_id')} not in walk_edge_catalog")
  70. for field in ["from_node", "to_node"]:
  71. if edge.get(field) not in node_types:
  72. _fail(findings, "edge_node_ref", f"{edge.get('edge_id')} unknown {field}: {edge.get(field)}")
  73. return findings
  74. def _validate_policy(policy: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]:
  75. findings: list[dict[str, Any]] = []
  76. for row in policy.get("edge_budgets", []):
  77. if row.get("edge_id") not in edge_ids:
  78. _fail(findings, "budget_edge_ref", f"edge_budgets unknown edge_id: {row.get('edge_id')}")
  79. permission_rows = policy.get("edge_permissions", {})
  80. key_sets = []
  81. for action in PERMISSION_ACTIONS:
  82. if action not in permission_rows:
  83. _fail(findings, "permission_action_missing", f"edge_permissions missing action: {action}")
  84. continue
  85. keys = set(permission_rows[action])
  86. key_sets.append(keys)
  87. for edge_id in keys - edge_ids:
  88. _fail(findings, "permission_edge_ref", f"{action} unknown edge_id: {edge_id}")
  89. if key_sets and any(keys != key_sets[0] for keys in key_sets[1:]):
  90. _fail(findings, "permission_closure", "edge_permissions edge keys differ across actions")
  91. return findings
  92. def _validate_profile(profile: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]:
  93. findings: list[dict[str, Any]] = []
  94. for edge_id, spec in profile.get("edges", {}).items():
  95. if edge_id not in edge_ids:
  96. _fail(findings, "profile_edge_ref", f"profile unknown edge_id: {edge_id}")
  97. if spec.get("status") not in {"supported", "blocked"}:
  98. _fail(findings, "profile_edge_status", f"{edge_id} invalid status: {spec.get('status')}")
  99. return findings
  100. def _raise_on_fail(findings: list[dict[str, Any]], label: str) -> None:
  101. failures = [finding for finding in findings if finding["level"] == "fail"]
  102. if failures:
  103. raise ValueError(f"invalid {label} config: {failures}")