| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- from content_agent.business_modules import result_source_lookup
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore
- from content_agent.run_service import RunService
- from content_agent.schemas import RunStartRequest
- from tests.p1_helpers import FakeQueryVariantClient, REAL_SOURCE_FIXTURE
- class CapturingRuntimeStore(LocalRuntimeFileStore):
- def __init__(self, base_dir):
- super().__init__(base_dir)
- self.author_assets = []
- self.author_asset_roles = []
- self.publish_jobs = []
- def write_author_assets(self, rows):
- self.author_assets.extend(rows)
- def write_author_asset_roles(self, rows):
- self.author_asset_roles.extend(rows)
- def write_publish_jobs(self, run_id, policy_run_id, rows):
- self.publish_jobs.extend(rows)
- def test_author_assets_are_written_only_when_sampling_rules_pass(tmp_path):
- runtime = CapturingRuntimeStore(tmp_path / "runtime")
- runtime.prepare_run("run_001")
- final_output = result_source_lookup.run(
- "run_001",
- "policy_run_001",
- _policy_bundle(),
- _items(),
- _media_records(),
- _decisions(),
- _source_paths(),
- [],
- runtime,
- )
- assert len(final_output["author_assets"]) == 1
- author_asset = final_output["author_assets"][0]
- assert author_asset["platform_author_id"] == "author_001"
- assert author_asset["asset_status"] == "active"
- assert author_asset["eligible_as_source"] is True
- assert author_asset["roles"] == ["author_asset", "source_seed", "high_50plus_profile"]
- assert len(author_asset["decision_ids"]) == 9
- assert runtime.author_assets[0]["source_type"] == "new_discovery"
- assert runtime.author_assets[0]["validation_status"] == "rule_validated"
- assert {row["role"] for row in runtime.author_asset_roles} == {
- "author_asset",
- "source_seed",
- "high_50plus_profile",
- }
- def test_mock_full_run_does_not_create_author_asset_without_enough_samples(tmp_path):
- runtime = CapturingRuntimeStore(tmp_path / "runtime" / "v1")
- service = RunService(
- runtime=runtime,
- query_variant_client=FakeQueryVariantClient(),
- )
- state = service.start_run(
- RunStartRequest(platform_mode="mock", source=str(REAL_SOURCE_FIXTURE))
- )
- final_output = service.read_json(state["run_id"], "final_output.json")
- assert final_output["author_assets"] == []
- assert runtime.author_assets == []
- assert runtime.author_asset_roles == []
- def _items():
- return [
- {
- "platform": "douyin",
- "platform_content_id": f"content_{index:03d}",
- "content_discovery_id": f"discovery_{index:03d}",
- "search_query_id": "q_001",
- "platform_author_id": "author_001",
- "author_display_name": "作者一号",
- "tags": ["人物故事"],
- "previous_discovery_step": "author_work",
- "discovery_start_source": "pattern_itemset",
- }
- for index in range(1, 10)
- ]
- def _media_records():
- return [
- {
- "platform_content_id": f"content_{index:03d}",
- "content_media_status": "metadata_only",
- }
- for index in range(1, 10)
- ]
- def _decisions():
- decisions = []
- for index in range(1, 10):
- action = "ADD_TO_CONTENT_POOL" if index <= 3 else "KEEP_CONTENT_FOR_REVIEW"
- decisions.append(
- {
- "decision_id": f"d_{index:03d}",
- "decision_target_id": f"content_{index:03d}",
- "decision_action": action,
- "decision_reason_code": "score_pass" if index <= 3 else "review_needed",
- "rule_pack_id": "rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "strategy_version": "V1",
- "search_query_effect_status": "success" if index <= 3 else "pending",
- "source_evidence": {"source_kind": "pattern_itemset"},
- "age_50_plus_level": "medium",
- }
- )
- return decisions
- def _source_paths():
- paths = [
- {
- "source_path_record_id": "path_pattern_q_001",
- "source_path_type": "pattern_to_search_query",
- "from_node_type": "PatternSeed",
- "from_node_id": 581,
- "to_node_type": "SearchQuery",
- "to_node_id": "q_001",
- }
- ]
- for index in range(1, 10):
- content_id = f"content_{index:03d}"
- decision_id = f"d_{index:03d}"
- paths.append(
- {
- "source_path_record_id": f"path_query_content_{index:03d}",
- "source_path_type": "search_query_to_content",
- "from_node_type": "SearchQuery",
- "from_node_id": "q_001",
- "to_node_type": "Content",
- "to_node_id": content_id,
- "decision_id": decision_id,
- }
- )
- if index <= 3:
- paths.append(
- {
- "source_path_record_id": f"path_decision_asset_{index:03d}",
- "source_path_type": "decision_to_asset",
- "from_node_type": "RuleDecision",
- "from_node_id": decision_id,
- "to_node_type": "ContentAsset",
- "to_node_id": content_id,
- "decision_id": decision_id,
- }
- )
- return paths
- def _policy_bundle():
- return {
- "policy_bundle_id": "douyin_policy_bundle_v1",
- "strategy_id": "douyin_content_find_v1",
- "strategy_version": "V1",
- "rule_pack_id": "rule_pack_v1",
- "rule_pack_version": "1.0.0",
- "policy_bundle_hash": "hash_001",
- "strategy_source_ref": {"file": "rule_pack.json"},
- "rule_pack_source_ref": {"file": "rule_pack.json"},
- }
|