| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- """V3 游走配置入口(M4A):读 walk_graph / walk_policy / platform_profiles 并校验。
- 镜像 walk_strategy_json.WalkStrategyStore 的 load+validate+raise 结构。
- walk_policy 的拍板值可能带 {value, provenance, tbd} 包裹(留痕),load_policy 解包成纯值。
- """
- from __future__ import annotations
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any
- from content_agent.findings import fail as _fail
- WALK_GRAPH_PATH = Path("tech_documents/数据接口与来源/walk_graph.json")
- WALK_POLICY_PATH = Path("tech_documents/数据接口与来源/walk_policy.json")
- PROFILE_DIR = Path("tech_documents/数据接口与来源/platform_profiles")
- EDGE_CATALOG_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
- PERMISSION_ACTIONS = ["ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW", "REJECT_CONTENT"]
- _PERMISSION_META_KEYS = {"_meaning", "_provenance", "note", "tbd"}
- def _unwrap(value: Any) -> Any:
- if isinstance(value, dict) and "value" in value:
- return value["value"]
- return value
- def low_budget(budget: int) -> int:
- """low_budget 档:减半向下取整,至少 1(2026-06-11 拍板,policy 值 halve_min_1)。"""
- return max(1, budget // 2)
- def edge_permission(policy: dict[str, Any], decision_action: str | None, edge_id: str) -> str:
- """判定结果 → 该出边通行证;无判定/未列入的组合一律 deny(从严)。"""
- row = policy["edge_permissions"].get(decision_action or "", {})
- return row.get(edge_id, "deny")
- def edge_supported(profile: dict[str, Any], edge_id: str) -> bool:
- """profile 未列出的边 = 平台无关控制边(终端边),恒 supported。"""
- return profile["edges"].get(edge_id, {"status": "supported"})["status"] == "supported"
- @dataclass(frozen=True)
- class WalkGraphStore:
- root_dir: Path = Path(".")
- def load_graph(self) -> dict[str, Any]:
- from content_agent.integrations import config_store
- graph, _ = config_store.load_json(self.root_dir / WALK_GRAPH_PATH)
- catalog, _ = config_store.load_json(self.root_dir / EDGE_CATALOG_PATH)
- catalog_ids = {row["edge_id"] for row in catalog["walk_edge_catalog"]}
- _raise_on_fail(_validate_graph(graph, catalog_ids), "walk graph")
- return graph
- def load_policy(self) -> dict[str, Any]:
- from content_agent.integrations import config_store
- raw_policy, _ = config_store.load_json(self.root_dir / WALK_POLICY_PATH)
- policy = _unwrap_policy(raw_policy)
- edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]}
- _raise_on_fail(_validate_policy(policy, edge_ids), "walk policy")
- return policy
- def load_profile(self, platform: str) -> dict[str, Any]:
- from content_agent.integrations import config_store
- profile, _ = config_store.load_json(self.root_dir / PROFILE_DIR / f"{platform}.json")
- edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]}
- _raise_on_fail(_validate_profile(profile, edge_ids), f"platform profile {platform}")
- return profile
- def _unwrap_policy(raw: dict[str, Any]) -> dict[str, Any]:
- policy = dict(raw)
- policy["global"] = {key: _unwrap(value) for key, value in raw["global"].items()}
- policy["edge_permissions"] = {
- action: {edge: _unwrap(perm) for edge, perm in row.items() if edge not in _PERMISSION_META_KEYS}
- for action, row in raw["edge_permissions"].items()
- if action in PERMISSION_ACTIONS
- }
- policy["budget_tiers"] = {key: _unwrap(value) for key, value in raw["budget_tiers"].items()}
- policy["edge_budgets_by_id"] = {row["edge_id"]: row for row in raw["edge_budgets"]}
- return policy
- def _validate_graph(graph: dict[str, Any], catalog_ids: set[str]) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- node_types = {node["node_type"] for node in graph.get("nodes", [])}
- for edge in graph.get("edges", []):
- if edge.get("edge_id") not in catalog_ids:
- _fail(findings, "edge_not_in_catalog", f"{edge.get('edge_id')} not in walk_edge_catalog")
- for field in ["from_node", "to_node"]:
- if edge.get(field) not in node_types:
- _fail(findings, "edge_node_ref", f"{edge.get('edge_id')} unknown {field}: {edge.get(field)}")
- return findings
- def _validate_policy(policy: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- for row in policy.get("edge_budgets", []):
- if row.get("edge_id") not in edge_ids:
- _fail(findings, "budget_edge_ref", f"edge_budgets unknown edge_id: {row.get('edge_id')}")
- permission_rows = policy.get("edge_permissions", {})
- key_sets = []
- for action in PERMISSION_ACTIONS:
- if action not in permission_rows:
- _fail(findings, "permission_action_missing", f"edge_permissions missing action: {action}")
- continue
- keys = set(permission_rows[action])
- key_sets.append(keys)
- for edge_id in keys - edge_ids:
- _fail(findings, "permission_edge_ref", f"{action} unknown edge_id: {edge_id}")
- if key_sets and any(keys != key_sets[0] for keys in key_sets[1:]):
- _fail(findings, "permission_closure", "edge_permissions edge keys differ across actions")
- return findings
- def _validate_profile(profile: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]:
- findings: list[dict[str, Any]] = []
- for edge_id, spec in profile.get("edges", {}).items():
- if edge_id not in edge_ids:
- _fail(findings, "profile_edge_ref", f"profile unknown edge_id: {edge_id}")
- if spec.get("status") not in {"supported", "blocked"}:
- _fail(findings, "profile_edge_status", f"{edge_id} invalid status: {spec.get('status')}")
- return findings
- def _raise_on_fail(findings: list[dict[str, Any]], label: str) -> None:
- failures = [finding for finding in findings if finding["level"] == "fail"]
- if failures:
- raise ValueError(f"invalid {label} config: {failures}")
|