| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Any
- from content_agent.business_modules import content_discovery, rule_judgment, source_seed
- from content_agent.business_modules.content_discovery import pattern_recall
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
- from content_agent.integrations.policy_json import JsonPolicyBundleStore
- from content_agent.integrations.runtime_files import LocalRuntimeFileStore
- from content_agent.record_payload import with_raw_payload
- from tests.gemini_helpers import FakeGeminiVideoClient
- from tests.p1_helpers import real_source_payload
- class FakeWalkPlatformClient:
- def __init__(self, fail_next_page: bool = False, tags: list[str] | None = None) -> None:
- self.fail_next_page = fail_next_page
- self.tags = tags or ["#人物故事"]
- self.search_calls: list[dict[str, Any]] = []
- self.author_calls: list[dict[str, Any]] = []
- def search(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- self.search_calls.append(dict(query))
- method = query.get("search_query_generation_method")
- if method == "query_next_page" and self.fail_next_page:
- raise RuntimeError("page unavailable")
- if method == "query_next_page":
- return [_platform_result(query, "7390000000000000101", "下一页内容", [])]
- if method == "tag_query":
- return [_platform_result(query, "7390000000000000201", "标签内容", [])]
- return []
- def fetch_author_works(self, query: dict[str, Any]) -> list[dict[str, Any]]:
- self.author_calls.append(dict(query))
- return [_platform_result(query, "7390000000000000301", "作者作品", [])]
- def build_initial_walk_context(tmp_path: Path, *, tags: list[str] | None = None) -> dict[str, Any]:
- run_id = "run_001"
- policy_run_id = "policy_run_001"
- runtime = LocalRuntimeFileStore(tmp_path / "runtime")
- runtime.prepare_run(run_id)
- seed = source_seed.run(run_id, policy_run_id, real_source_payload(), runtime)
- search_query = with_raw_payload(
- {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "search_query_id": "q_001",
- "search_query": "人物故事",
- "search_query_generation_method": "item_single",
- "discovery_start_source": "pattern_itemset",
- "previous_discovery_step": "search_query_generated",
- "pattern_seed_ref": {"seed_term": "人物故事"},
- }
- )
- runtime.append_jsonl(run_id, "search_queries.jsonl", [search_query])
- initial_result = _platform_result(search_query, "7390000000000000001", "首轮内容", tags or ["#人物故事"])
- initial_result["has_more"] = True
- initial_result["next_cursor"] = "10"
- discovered = content_discovery.run(
- run_id,
- policy_run_id,
- [initial_result],
- seed["source_context"],
- runtime,
- )
- recalled = pattern_recall.run(
- run_id,
- policy_run_id,
- discovered["discovered_content_items"],
- discovered["content_media_records"],
- discovered["evidence_bundles"],
- seed["source_context"],
- runtime,
- FakeGeminiVideoClient(),
- )
- policy_bundle = JsonPolicyBundleStore(Path(".")).load_policy_bundle("V1")
- decisions = rule_judgment.run(
- run_id,
- policy_run_id,
- recalled["evidence_bundles"],
- policy_bundle,
- runtime,
- )
- return {
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "runtime": runtime,
- "source_context": seed["source_context"],
- "pattern_seed_pack": seed["pattern_seed_pack"],
- "search_queries": [search_query],
- "discovered_content_items": recalled["discovered_content_items"],
- "content_media_records": discovered["content_media_records"],
- "evidence_bundles": recalled["evidence_bundles"],
- "rule_decisions": decisions,
- "policy_bundle": policy_bundle,
- "gemini_video_client": FakeGeminiVideoClient(),
- }
- def _platform_result(
- query: dict[str, Any],
- platform_content_id: str,
- description: str,
- tags: list[str],
- ) -> dict[str, Any]:
- return {
- "content_discovery_id": f"{query['search_query_id']}_content_{platform_content_id[-3:]}",
- "search_query_id": query["search_query_id"],
- "platform": "douyin",
- "platform_content_id": platform_content_id,
- "platform_content_format": "video",
- "description": description,
- "platform_author_id": "MS4wLjABAAAA001",
- "author_display_name": "作者",
- "statistics": {"digg_count": 9000, "comment_count": 800, "share_count": 700},
- "tags": tags,
- "score": 72,
- "risk_level": "low",
- "availability": "available",
- "discovery_relation": "fake_walk",
- "discovery_start_source": query.get("discovery_start_source", "pattern_itemset"),
- "previous_discovery_step": query.get("previous_discovery_step", "search_query_direct"),
- "content_metadata_source": "fake_platform_search",
- "platform_raw_payload": {"content_id": platform_content_id},
- }
|