"""V3 游走配置入口(M4A):读 walk_graph / walk_policy / platform_profiles 并校验。 镜像 walk_strategy_json.WalkStrategyStore 的 load+validate+raise 结构。 walk_policy 的拍板值可能带 {value, provenance, tbd} 包裹(留痕),load_policy 解包成纯值。 """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any from content_agent.findings import fail as _fail WALK_GRAPH_PATH = Path("tech_documents/数据接口与来源/walk_graph.json") WALK_POLICY_PATH = Path("tech_documents/数据接口与来源/walk_policy.json") PROFILE_DIR = Path("tech_documents/数据接口与来源/platform_profiles") EDGE_CATALOG_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json") PERMISSION_ACTIONS = ["ADD_TO_CONTENT_POOL", "KEEP_CONTENT_FOR_REVIEW", "REJECT_CONTENT"] _PERMISSION_META_KEYS = {"_meaning", "_provenance", "note", "tbd"} def _unwrap(value: Any) -> Any: if isinstance(value, dict) and "value" in value: return value["value"] return value def low_budget(budget: int) -> int: """low_budget 档:减半向下取整,至少 1(2026-06-11 拍板,policy 值 halve_min_1)。""" return max(1, budget // 2) def edge_permission(policy: dict[str, Any], decision_action: str | None, edge_id: str) -> str: """判定结果 → 该出边通行证;无判定/未列入的组合一律 deny(从严)。""" row = policy["edge_permissions"].get(decision_action or "", {}) return row.get(edge_id, "deny") def edge_supported(profile: dict[str, Any], edge_id: str) -> bool: """profile 未列出的边 = 平台无关控制边(终端边),恒 supported。""" return profile["edges"].get(edge_id, {"status": "supported"})["status"] == "supported" @dataclass(frozen=True) class WalkGraphStore: root_dir: Path = Path(".") def load_graph(self) -> dict[str, Any]: from content_agent.integrations import config_store graph, _ = config_store.load_json(self.root_dir / WALK_GRAPH_PATH) catalog, _ = config_store.load_json(self.root_dir / EDGE_CATALOG_PATH) catalog_ids = {row["edge_id"] for row in catalog["walk_edge_catalog"]} _raise_on_fail(_validate_graph(graph, catalog_ids), "walk graph") return graph def load_policy(self) -> dict[str, Any]: from content_agent.integrations import config_store raw_policy, _ = config_store.load_json(self.root_dir / WALK_POLICY_PATH) policy = _unwrap_policy(raw_policy) edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]} _raise_on_fail(_validate_policy(policy, edge_ids), "walk policy") return policy def load_profile(self, platform: str) -> dict[str, Any]: from content_agent.integrations import config_store profile, _ = config_store.load_json(self.root_dir / PROFILE_DIR / f"{platform}.json") edge_ids = {edge["edge_id"] for edge in self.load_graph()["edges"]} _raise_on_fail(_validate_profile(profile, edge_ids), f"platform profile {platform}") return profile def _unwrap_policy(raw: dict[str, Any]) -> dict[str, Any]: policy = dict(raw) policy["global"] = {key: _unwrap(value) for key, value in raw["global"].items()} policy["edge_permissions"] = { action: {edge: _unwrap(perm) for edge, perm in row.items() if edge not in _PERMISSION_META_KEYS} for action, row in raw["edge_permissions"].items() if action in PERMISSION_ACTIONS } policy["budget_tiers"] = {key: _unwrap(value) for key, value in raw["budget_tiers"].items()} policy["edge_budgets_by_id"] = {row["edge_id"]: row for row in raw["edge_budgets"]} return policy def _validate_graph(graph: dict[str, Any], catalog_ids: set[str]) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] node_types = {node["node_type"] for node in graph.get("nodes", [])} for edge in graph.get("edges", []): if edge.get("edge_id") not in catalog_ids: _fail(findings, "edge_not_in_catalog", f"{edge.get('edge_id')} not in walk_edge_catalog") for field in ["from_node", "to_node"]: if edge.get(field) not in node_types: _fail(findings, "edge_node_ref", f"{edge.get('edge_id')} unknown {field}: {edge.get(field)}") return findings def _validate_policy(policy: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] for row in policy.get("edge_budgets", []): if row.get("edge_id") not in edge_ids: _fail(findings, "budget_edge_ref", f"edge_budgets unknown edge_id: {row.get('edge_id')}") permission_rows = policy.get("edge_permissions", {}) key_sets = [] for action in PERMISSION_ACTIONS: if action not in permission_rows: _fail(findings, "permission_action_missing", f"edge_permissions missing action: {action}") continue keys = set(permission_rows[action]) key_sets.append(keys) for edge_id in keys - edge_ids: _fail(findings, "permission_edge_ref", f"{action} unknown edge_id: {edge_id}") if key_sets and any(keys != key_sets[0] for keys in key_sets[1:]): _fail(findings, "permission_closure", "edge_permissions edge keys differ across actions") return findings def _validate_profile(profile: dict[str, Any], edge_ids: set[str]) -> list[dict[str, Any]]: findings: list[dict[str, Any]] = [] for edge_id, spec in profile.get("edges", {}).items(): if edge_id not in edge_ids: _fail(findings, "profile_edge_ref", f"profile unknown edge_id: {edge_id}") if spec.get("status") not in {"supported", "blocked"}: _fail(findings, "profile_edge_status", f"{edge_id} invalid status: {spec.get('status')}") return findings def _raise_on_fail(findings: list[dict[str, Any]], label: str) -> None: failures = [finding for finding in findings if finding["level"] == "fail"] if failures: raise ValueError(f"invalid {label} config: {failures}")