from __future__ import annotations from pathlib import Path from typing import Any from content_agent.constants import DEFAULT_POLICY_BUNDLE_ID, EVIDENCE_BUNDLE_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION from content_agent.errors import ContentAgentError, ErrorCode from content_agent.integrations import config_store from content_agent.integrations.walk_strategy_json import WalkStrategyStore class JsonPolicyBundleStore: def __init__(self, root_dir: Path | str = Path(".")) -> None: self.root_dir = Path(root_dir) def load_policy_bundle(self, strategy_version: str) -> dict[str, Any]: rule_pack_path = self.root_dir / "product_documents/规则包/douyin_rule_packs.v1.json" if not rule_pack_path.exists(): raise FileNotFoundError(rule_pack_path) rule_package, rule_pack_text = config_store.load_json(rule_pack_path) rule_pack_hash = config_store.sha256_text(rule_pack_text) actual_strategy_version = _strategy_version(rule_package) if strategy_version != actual_strategy_version: raise ValueError(f"unknown strategy_version: {strategy_version}") dispatch = _select_dispatch(rule_package, actual_strategy_version) rule_pack = _find_rule_pack_by_dispatch(rule_package, dispatch) rule_pack_by_entity = _build_rule_pack_by_entity(rule_package, actual_strategy_version) walk_strategy = WalkStrategyStore(self.root_dir).load_walk_strategy() bundle = { "policy_bundle_id": DEFAULT_POLICY_BUNDLE_ID, "strategy_version": actual_strategy_version, "rule_package_id": rule_package.get("package_id"), "rule_package_name": rule_package.get("package_name"), "rule_pack": rule_pack, "rule_pack_id": rule_pack["rule_pack_id"], "rule_pack_version": rule_pack["version"], "dispatch": dispatch, "dispatch_id": dispatch["dispatch_id"], "rule_pack_by_entity": rule_pack_by_entity, "runtime_stage": dispatch["runtime_stage"], "target_entity": dispatch["target_entity"], "content_format": dispatch["content_format"], "evidence_bundle_schema_version": EVIDENCE_BUNDLE_SCHEMA_VERSION, "runtime_record_schema_version": RUNTIME_SCHEMA_VERSION, "shared_contracts": rule_package.get("shared_contracts", {}), "source_evidence_policy": rule_package.get("source_evidence_policy", {}), "effect_status_mapping": rule_package.get("effect_status_mapping", []), "query_effect_aggregation": rule_package.get("query_effect_aggregation", []), "runtime_status_contract": rule_package.get("runtime_status_contract", {}), "decision_reason_codes": rule_package.get("decision_reason_codes", []), "strategy_id": (rule_package.get("strategy_binding") or {}).get( "strategy_id", "douyin_content_find_v1" ), "walk_strategy_id": walk_strategy.get("strategy_id"), "walk_strategy_version": walk_strategy.get("walk_strategy_version"), "walk_strategy_source_ref": walk_strategy.get("walk_strategy_source_ref"), "strategy_source_ref": { "file": str(rule_pack_path), "updated_at": rule_package.get("updated_at"), "content_sha256": rule_pack_hash, "generated_from": "p5_rule_pack_only", }, "rule_pack_source_ref": { "file": str(rule_pack_path), "updated_at": rule_package.get("updated_at"), "content_sha256": rule_pack_hash, "generated_from": "local_product_json", }, "policy_bundle_hash": rule_pack_hash, } return bundle def _strategy_version(rule_package: dict[str, Any]) -> str: binding_version = (rule_package.get("strategy_binding") or {}).get("strategy_version") return binding_version or "V1" def _select_dispatch( rule_package: dict[str, Any], strategy_version: str, *, target_entity: str = "Content", content_format: str = "video", runtime_stage: str = "V1.0", platform: str = "douyin", ) -> dict[str, Any]: matches = _enabled_dispatches( rule_package, strategy_version, target_entity=target_entity, content_format=content_format, runtime_stage=runtime_stage, platform=platform, ) return _assert_single_enabled_dispatch(matches, target_entity=target_entity, content_format=content_format) def _enabled_dispatches( rule_package: dict[str, Any], strategy_version: str, *, target_entity: str, content_format: str, runtime_stage: str, platform: str, ) -> list[dict[str, Any]]: return [ dispatch for dispatch in rule_package.get("rule_pack_dispatch", []) if dispatch.get("dispatch_enabled") and dispatch.get("platform") == platform and dispatch.get("runtime_stage") == runtime_stage and dispatch.get("strategy_version") == strategy_version and dispatch.get("target_entity") == target_entity and dispatch.get("content_format") == content_format ] def _assert_single_enabled_dispatch( matches: list[dict[str, Any]], *, target_entity: str, content_format: str, ) -> dict[str, Any]: if len(matches) == 1: return matches[0] if not matches: raise ValueError(f"dispatch not found for {target_entity}/{content_format}") conflict_rule_pack_ids = [dispatch.get("rule_pack_id") for dispatch in matches] raise ContentAgentError( ErrorCode.CONFIG_RULE_PACK_DISPATCH_CONFLICT, f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for " f"{target_entity}/{content_format}: {conflict_rule_pack_ids}", {"target_entity": target_entity, "content_format": content_format, "conflict_rule_pack_ids": conflict_rule_pack_ids}, ) def _build_rule_pack_by_entity(rule_package: dict[str, Any], strategy_version: str) -> dict[str, Any]: by_entity: dict[str, Any] = {} for dispatch in rule_package.get("rule_pack_dispatch", []): if not ( dispatch.get("dispatch_enabled") and dispatch.get("platform") == "douyin" and dispatch.get("runtime_stage") == "V1.0" and dispatch.get("strategy_version") == strategy_version ): continue by_entity[dispatch["target_entity"]] = { "dispatch": dispatch, "rule_pack": _find_rule_pack_by_dispatch(rule_package, dispatch), } return by_entity def _find_rule_pack_by_dispatch(rule_package: dict[str, Any], dispatch: dict[str, Any]) -> dict[str, Any]: matches = [ rule_pack for rule_pack in rule_package.get("rule_packs", []) if rule_pack.get("enabled") and rule_pack.get("rule_pack_id") == dispatch.get("rule_pack_id") and rule_pack.get("version") == dispatch.get("rule_pack_version") ] if len(matches) != 1: raise ValueError( f"dispatch {dispatch.get('dispatch_id')} matched {len(matches)} enabled rule packs" ) return matches[0]