| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Any
- from content_agent.constants import DEFAULT_POLICY_BUNDLE_ID, EVIDENCE_BUNDLE_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
- from content_agent.errors import ContentAgentError, ErrorCode
- from content_agent.integrations import config_store
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- class JsonPolicyBundleStore:
- def __init__(self, root_dir: Path | str = Path(".")) -> None:
- self.root_dir = Path(root_dir)
- def load_policy_bundle(self, strategy_version: str) -> dict[str, Any]:
- rule_pack_path = self.root_dir / "product_documents/规则包/douyin_rule_packs.v1.json"
- if not rule_pack_path.exists():
- raise FileNotFoundError(rule_pack_path)
- rule_package, rule_pack_text = config_store.load_json(rule_pack_path)
- rule_pack_hash = config_store.sha256_text(rule_pack_text)
- actual_strategy_version = _strategy_version(rule_package)
- if strategy_version != actual_strategy_version:
- raise ValueError(f"unknown strategy_version: {strategy_version}")
- dispatch = _select_dispatch(rule_package, actual_strategy_version)
- rule_pack = _find_rule_pack_by_dispatch(rule_package, dispatch)
- rule_pack_by_entity = _build_rule_pack_by_entity(rule_package, actual_strategy_version)
- walk_strategy = WalkStrategyStore(self.root_dir).load_walk_strategy()
- bundle = {
- "policy_bundle_id": DEFAULT_POLICY_BUNDLE_ID,
- "strategy_version": actual_strategy_version,
- "rule_package_id": rule_package.get("package_id"),
- "rule_package_name": rule_package.get("package_name"),
- "rule_pack": rule_pack,
- "rule_pack_id": rule_pack["rule_pack_id"],
- "rule_pack_version": rule_pack["version"],
- "dispatch": dispatch,
- "dispatch_id": dispatch["dispatch_id"],
- "rule_pack_by_entity": rule_pack_by_entity,
- "runtime_stage": dispatch["runtime_stage"],
- "target_entity": dispatch["target_entity"],
- "content_format": dispatch["content_format"],
- "evidence_bundle_schema_version": EVIDENCE_BUNDLE_SCHEMA_VERSION,
- "runtime_record_schema_version": RUNTIME_SCHEMA_VERSION,
- "shared_contracts": rule_package.get("shared_contracts", {}),
- "source_evidence_policy": rule_package.get("source_evidence_policy", {}),
- "effect_status_mapping": rule_package.get("effect_status_mapping", []),
- "query_effect_aggregation": rule_package.get("query_effect_aggregation", []),
- "runtime_status_contract": rule_package.get("runtime_status_contract", {}),
- "decision_reason_codes": rule_package.get("decision_reason_codes", []),
- "strategy_id": (rule_package.get("strategy_binding") or {}).get(
- "strategy_id", "douyin_content_find_v1"
- ),
- "walk_strategy_id": walk_strategy.get("strategy_id"),
- "walk_strategy_version": walk_strategy.get("walk_strategy_version"),
- "walk_strategy_source_ref": walk_strategy.get("walk_strategy_source_ref"),
- "strategy_source_ref": {
- "file": str(rule_pack_path),
- "updated_at": rule_package.get("updated_at"),
- "content_sha256": rule_pack_hash,
- "generated_from": "p5_rule_pack_only",
- },
- "rule_pack_source_ref": {
- "file": str(rule_pack_path),
- "updated_at": rule_package.get("updated_at"),
- "content_sha256": rule_pack_hash,
- "generated_from": "local_product_json",
- },
- "policy_bundle_hash": rule_pack_hash,
- }
- return bundle
- def _strategy_version(rule_package: dict[str, Any]) -> str:
- binding_version = (rule_package.get("strategy_binding") or {}).get("strategy_version")
- return binding_version or "V1"
- def _select_dispatch(
- rule_package: dict[str, Any],
- strategy_version: str,
- *,
- target_entity: str = "Content",
- content_format: str = "video",
- runtime_stage: str = "V1.0",
- platform: str = "douyin",
- ) -> dict[str, Any]:
- matches = _enabled_dispatches(
- rule_package,
- strategy_version,
- target_entity=target_entity,
- content_format=content_format,
- runtime_stage=runtime_stage,
- platform=platform,
- )
- return _assert_single_enabled_dispatch(matches, target_entity=target_entity, content_format=content_format)
- def _enabled_dispatches(
- rule_package: dict[str, Any],
- strategy_version: str,
- *,
- target_entity: str,
- content_format: str,
- runtime_stage: str,
- platform: str,
- ) -> list[dict[str, Any]]:
- return [
- dispatch
- for dispatch in rule_package.get("rule_pack_dispatch", [])
- if dispatch.get("dispatch_enabled")
- and dispatch.get("platform") == platform
- and dispatch.get("runtime_stage") == runtime_stage
- and dispatch.get("strategy_version") == strategy_version
- and dispatch.get("target_entity") == target_entity
- and dispatch.get("content_format") == content_format
- ]
- def _assert_single_enabled_dispatch(
- matches: list[dict[str, Any]],
- *,
- target_entity: str,
- content_format: str,
- ) -> dict[str, Any]:
- if len(matches) == 1:
- return matches[0]
- if not matches:
- raise ValueError(f"dispatch not found for {target_entity}/{content_format}")
- conflict_rule_pack_ids = [dispatch.get("rule_pack_id") for dispatch in matches]
- raise ContentAgentError(
- ErrorCode.CONFIG_RULE_PACK_DISPATCH_CONFLICT,
- f"CONFIG_RULE_PACK_DISPATCH_CONFLICT: multiple enabled dispatches for "
- f"{target_entity}/{content_format}: {conflict_rule_pack_ids}",
- {"target_entity": target_entity, "content_format": content_format,
- "conflict_rule_pack_ids": conflict_rule_pack_ids},
- )
- def _build_rule_pack_by_entity(rule_package: dict[str, Any], strategy_version: str) -> dict[str, Any]:
- by_entity: dict[str, Any] = {}
- for dispatch in rule_package.get("rule_pack_dispatch", []):
- if not (
- dispatch.get("dispatch_enabled")
- and dispatch.get("platform") == "douyin"
- and dispatch.get("runtime_stage") == "V1.0"
- and dispatch.get("strategy_version") == strategy_version
- ):
- continue
- by_entity[dispatch["target_entity"]] = {
- "dispatch": dispatch,
- "rule_pack": _find_rule_pack_by_dispatch(rule_package, dispatch),
- }
- return by_entity
- def _find_rule_pack_by_dispatch(rule_package: dict[str, Any], dispatch: dict[str, Any]) -> dict[str, Any]:
- matches = [
- rule_pack
- for rule_pack in rule_package.get("rule_packs", [])
- if rule_pack.get("enabled")
- and rule_pack.get("rule_pack_id") == dispatch.get("rule_pack_id")
- and rule_pack.get("version") == dispatch.get("rule_pack_version")
- ]
- if len(matches) != 1:
- raise ValueError(
- f"dispatch {dispatch.get('dispatch_id')} matched {len(matches)} enabled rule packs"
- )
- return matches[0]
|