| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- import json
- from pathlib import Path
- from content_agent.integrations.walk_strategy_json import (
- REQUIRED_SECTIONS,
- WalkStrategyStore,
- validate_walk_strategy_config,
- )
- CONFIG_PATH = Path("product_documents/抖音游走策略/douyin_walk_strategy.v1.json")
- def test_walk_strategy_config_loads_only_p6_runtime_source():
- store = WalkStrategyStore(Path("."))
- strategy = store.load_walk_strategy()
- assert strategy["strategy_id"] == "douyin_walk_strategy_v1"
- assert strategy["walk_strategy_version"] == "V1.0"
- assert strategy["walk_strategy_source_ref"]["file"] == str(CONFIG_PATH)
- assert strategy["source_of_truth"] == str(CONFIG_PATH)
- def test_walk_strategy_config_sections_and_references_are_closed():
- strategy = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
- findings = validate_walk_strategy_config(strategy, root_dir=Path("."))
- assert findings == []
- assert [section for section in REQUIRED_SECTIONS if section not in strategy] == []
- def test_walk_strategy_config_keeps_only_consumed_sections():
- # V3 清理受控变化: 13 段收窄到 3 个仍被运行时/校验消费的段;
- # 其余 10 段(老预算/停止/重试/触发规则等)已被 walk_graph+walk_policy 取代并删除。
- strategy = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
- assert set(REQUIRED_SECTIONS) == {
- "walk_edge_catalog",
- "walk_rule_pack_binding",
- "walk_fact_contract",
- }
- assert {key for key in strategy if key.startswith("walk_")} == set(REQUIRED_SECTIONS)
- def test_walk_strategy_config_uses_clue_id_and_real_rule_packs():
- strategy = json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
- facts = {row["runtime_file"]: row for row in strategy["walk_fact_contract"]}
- assert facts["search_clues.jsonl"]["unique_key"] == [
- "run_id",
- "policy_run_id",
- "clue_id",
- ]
- # M4 砍包受控变化:4 future 包及其 7 条 binding 已物理删除,仅剩内容包归属。
- assert {
- (row["rule_pack_id"], row["rule_pack_version"])
- for row in strategy["walk_rule_pack_binding"]
- } == {
- ("douyin_content_discovery_rule_pack_v1", "1.0.0"),
- }
- def test_walk_strategy_binding_is_loaded_from_walk_strategy():
- from content_agent.business_modules.walk_engine import _binding_by_edge_id
- from content_agent.integrations.walk_strategy_json import WalkStrategyStore
- walk_strategy = WalkStrategyStore().load_walk_strategy()
- binding_by_edge = _binding_by_edge_id(walk_strategy)
- # M4 砍包受控变化:仅 decision_to_asset 保留 content binding,其余边走 fallback 戳。
- assert set(binding_by_edge) == {"decision_to_asset"}
- assert binding_by_edge["decision_to_asset"]["rule_pack_id"] == "douyin_content_discovery_rule_pack_v1"
|