| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- from __future__ import annotations
- import hashlib
- from collections import Counter
- from typing import Any
- from content_agent.constants import RUNTIME_SCHEMA_VERSION
- from content_agent.business_modules.run_record.validation import compute_final_output_completeness
- from content_agent.interfaces import RuntimeFileStore
- def run(
- run_id: str,
- policy_run_id: str,
- policy_bundle: dict[str, Any],
- discovered_content_items: list[dict[str, Any]],
- content_media_records: list[dict[str, Any]],
- decisions: list[dict[str, Any]],
- source_path_records: list[dict[str, Any]],
- search_clues: list[dict[str, Any]],
- runtime: RuntimeFileStore,
- ) -> dict[str, Any]:
- decision_by_target_id = {decision["decision_target_id"]: decision for decision in decisions}
- media_by_platform_content_id = {
- media["platform_content_id"]: media for media in content_media_records
- }
- paths_by_content_id = _paths_by_content_id(source_path_records)
- content_assets = _build_content_assets(
- policy_run_id,
- discovered_content_items,
- decision_by_target_id,
- media_by_platform_content_id,
- paths_by_content_id,
- )
- review_records = _build_review_records(
- policy_run_id,
- discovered_content_items,
- decision_by_target_id,
- media_by_platform_content_id,
- paths_by_content_id,
- )
- reject_records = _build_reject_records(
- policy_run_id,
- discovered_content_items,
- decision_by_target_id,
- )
- author_assets, author_asset_rows, author_role_rows = _build_author_assets(
- run_id,
- policy_run_id,
- discovered_content_items,
- decision_by_target_id,
- paths_by_content_id,
- )
- action_counts = Counter(decision["decision_action"] for decision in decisions)
- effect_status_counts = Counter(
- decision.get("search_query_effect_status", "failed") for decision in decisions
- )
- final_output = {
- "schema_version": RUNTIME_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "policy": {
- "policy_bundle_id": policy_bundle["policy_bundle_id"],
- "strategy_id": policy_bundle["strategy_id"],
- "strategy_version": policy_bundle["strategy_version"],
- "rule_pack_id": policy_bundle["rule_pack_id"],
- "rule_pack_version": policy_bundle["rule_pack_version"],
- "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
- "strategy_source_ref": policy_bundle["strategy_source_ref"],
- "rule_pack_source_ref": policy_bundle["rule_pack_source_ref"],
- },
- "walk_strategy": {
- "walk_strategy_id": policy_bundle.get("walk_strategy_id"),
- "walk_strategy_version": policy_bundle.get("walk_strategy_version"),
- "walk_strategy_source_ref": policy_bundle.get("walk_strategy_source_ref"),
- },
- "dispatch": policy_bundle.get("dispatch"),
- "dispatch_id": policy_bundle.get("dispatch_id"),
- "runtime_status_contract": policy_bundle.get("runtime_status_contract", {}),
- "content_assets": content_assets,
- "author_assets": author_assets,
- "review_records": review_records,
- "decision_records": [
- {
- "decision_id": decision["decision_id"],
- "policy_run_id": policy_run_id,
- "rule_pack_id": decision["rule_pack_id"],
- "rule_pack_version": decision["rule_pack_version"],
- "strategy_version": decision["strategy_version"],
- "decision_target_id": decision["decision_target_id"],
- "decision_action": decision["decision_action"],
- "decision_reason_code": decision["decision_reason_code"],
- "search_query_effect_status": decision["search_query_effect_status"],
- "decision_replay_data": decision.get("decision_replay_data", {}),
- "source_evidence": decision["source_evidence"],
- }
- for decision in decisions
- ],
- "search_clues": [
- {
- "search_query_id": clue["search_query_id"],
- "policy_run_id": policy_run_id,
- "final_asset_status": "clue_only",
- "search_query_effect_status": clue["search_query_effect_status"],
- }
- for clue in search_clues
- ],
- "reject_records": reject_records,
- "summary": {
- "search_query_count": len(search_clues),
- "pooled_content_count": action_counts["ADD_TO_CONTENT_POOL"],
- "review_content_count": action_counts["KEEP_CONTENT_FOR_REVIEW"],
- "pending_content_count": 0,
- "rejected_content_count": action_counts["REJECT_CONTENT"],
- "effect_status_counts": {
- "success": effect_status_counts["success"],
- "pending": effect_status_counts["pending"],
- "failed": effect_status_counts["failed"],
- "rule_blocked": effect_status_counts["rule_blocked"],
- },
- "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
- },
- }
- completeness = compute_final_output_completeness(
- final_output,
- decisions,
- source_path_records,
- )
- final_output["validation_status"] = completeness["validation_status"]
- final_output["summary"]["run_path_complete"] = completeness["run_path_complete"]
- final_output["summary"]["trace_complete"] = completeness["trace_complete"]
- final_output["summary"]["validation_findings_summary"] = completeness["findings_summary"]
- runtime.write_json(run_id, "final_output.json", final_output)
- runtime.write_publish_jobs(
- run_id,
- policy_run_id,
- _build_publish_jobs(run_id, policy_run_id, content_assets),
- )
- runtime.write_author_assets(author_asset_rows)
- runtime.write_author_asset_roles(author_role_rows)
- return final_output
- def _build_content_assets(
- policy_run_id: str,
- discovered_content_items: list[dict[str, Any]],
- decision_by_target_id: dict[str, dict[str, Any]],
- media_by_platform_content_id: dict[str, dict[str, Any]],
- paths_by_content_id: dict[str, list[str]],
- ) -> list[dict[str, Any]]:
- content_assets: list[dict[str, Any]] = []
- for item in discovered_content_items:
- platform_content_id = item["platform_content_id"]
- decision = decision_by_target_id[platform_content_id]
- if decision["decision_action"] != "ADD_TO_CONTENT_POOL":
- continue
- path_ids = paths_by_content_id[platform_content_id]
- content_assets.append(
- {
- "platform": item["platform"],
- "platform_content_id": platform_content_id,
- "policy_run_id": policy_run_id,
- "content_discovery_id": item["content_discovery_id"],
- "final_asset_status": "pooled",
- "decision_id": decision["decision_id"],
- "rule_pack_id": decision["rule_pack_id"],
- "rule_pack_version": decision["rule_pack_version"],
- "strategy_version": decision["strategy_version"],
- "source_path_record_ids": path_ids,
- "source_evidence": {
- **decision["source_evidence"],
- "source_path_record_ids": path_ids,
- },
- "content_media_status": media_by_platform_content_id[
- platform_content_id
- ]["content_media_status"],
- }
- )
- return content_assets
- def _build_review_records(
- policy_run_id: str,
- discovered_content_items: list[dict[str, Any]],
- decision_by_target_id: dict[str, dict[str, Any]],
- media_by_platform_content_id: dict[str, dict[str, Any]],
- paths_by_content_id: dict[str, list[str]],
- ) -> list[dict[str, Any]]:
- review_records: list[dict[str, Any]] = []
- for item in discovered_content_items:
- platform_content_id = item["platform_content_id"]
- decision = decision_by_target_id[platform_content_id]
- if decision["decision_action"] != "KEEP_CONTENT_FOR_REVIEW":
- continue
- path_ids = paths_by_content_id[platform_content_id]
- review_records.append(
- {
- "platform": item["platform"],
- "platform_content_id": platform_content_id,
- "policy_run_id": policy_run_id,
- "content_discovery_id": item["content_discovery_id"],
- "review_status": "pending_review",
- "final_asset_status": "review_only",
- "decision_id": decision["decision_id"],
- "rule_pack_id": decision["rule_pack_id"],
- "rule_pack_version": decision["rule_pack_version"],
- "strategy_version": decision["strategy_version"],
- "decision_reason_code": decision["decision_reason_code"],
- "source_path_record_ids": path_ids,
- "source_evidence": {
- **decision["source_evidence"],
- "source_path_record_ids": path_ids,
- },
- "content_media_status": media_by_platform_content_id[
- platform_content_id
- ]["content_media_status"],
- }
- )
- return review_records
- def _build_reject_records(
- policy_run_id: str,
- discovered_content_items: list[dict[str, Any]],
- decision_by_target_id: dict[str, dict[str, Any]],
- ) -> list[dict[str, Any]]:
- reject_records: list[dict[str, Any]] = []
- for item in discovered_content_items:
- platform_content_id = item["platform_content_id"]
- decision = decision_by_target_id[platform_content_id]
- if decision["decision_action"] != "REJECT_CONTENT":
- continue
- reject_records.append(
- {
- "decision_target_id": platform_content_id,
- "policy_run_id": policy_run_id,
- "main_decision_reason_code": decision["decision_reason_code"],
- "decision_id": decision["decision_id"],
- "source_evidence": decision["source_evidence"],
- }
- )
- return reject_records
- def _build_publish_jobs(
- run_id: str,
- policy_run_id: str,
- content_assets: list[dict[str, Any]],
- ) -> list[dict[str, Any]]:
- jobs: list[dict[str, Any]] = []
- for asset in content_assets:
- publish_job_id = _stable_id(
- "publish_job",
- run_id,
- policy_run_id,
- asset["platform_content_id"],
- asset["decision_id"],
- )
- jobs.append(
- {
- "publish_job_id": publish_job_id,
- "platform_content_id": asset["platform_content_id"],
- "job_status": "created",
- "trigger_mode": "manual_review",
- "request_payload": {
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "content_asset": asset,
- "decision_id": asset["decision_id"],
- "source_path_record_ids": asset["source_path_record_ids"],
- },
- "response_payload": {},
- }
- )
- return jobs
- def _build_author_assets(
- run_id: str,
- policy_run_id: str,
- discovered_content_items: list[dict[str, Any]],
- decision_by_target_id: dict[str, dict[str, Any]],
- paths_by_content_id: dict[str, list[str]],
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
- by_author: dict[tuple[str, str], list[dict[str, Any]]] = {}
- for item in discovered_content_items:
- author_id = item.get("platform_author_id")
- if not author_id:
- continue
- platform = item.get("platform", "douyin")
- by_author.setdefault((platform, author_id), []).append(item)
- summaries: list[dict[str, Any]] = []
- asset_rows: list[dict[str, Any]] = []
- role_rows: list[dict[str, Any]] = []
- for (platform, author_id), items in sorted(by_author.items()):
- decisions = [
- decision_by_target_id[item["platform_content_id"]]
- for item in items
- if item["platform_content_id"] in decision_by_target_id
- ]
- qualified_decisions = [
- decision for decision in decisions if decision["decision_action"] == "ADD_TO_CONTENT_POOL"
- ]
- sample_count = len(items)
- qualified_count = len(qualified_decisions)
- qualified_ratio = qualified_count / sample_count if sample_count else 0
- age_level = _best_age_level(decisions)
- if not _author_asset_eligible(sample_count, qualified_count, qualified_ratio, age_level):
- continue
- author_asset_id = _stable_id("author_asset", platform, author_id)
- source_path_record_ids = sorted(
- {
- path_id
- for item in items
- for path_id in paths_by_content_id.get(item["platform_content_id"], [])
- }
- )
- decision_ids = [decision["decision_id"] for decision in decisions]
- tags = sorted({tag for item in items for tag in item.get("tags", [])})
- display_name = next((item.get("author_display_name") for item in items if item.get("author_display_name")), "")
- source_type = (
- "new_discovery"
- if any(item.get("previous_discovery_step") == "author_work" for item in items)
- else "new_discovery"
- )
- evidence_refs = {
- "decision_ids": decision_ids,
- "content_discovery_ids": [item["content_discovery_id"] for item in items],
- "source_path_record_ids": source_path_record_ids,
- }
- profile_snapshot = {
- "sample_count": sample_count,
- "qualified_content_count": qualified_count,
- "qualified_content_ratio": qualified_ratio,
- "age_50_plus_level": age_level,
- }
- summary = {
- "author_asset_id": author_asset_id,
- "platform": platform,
- "platform_author_id": author_id,
- "author_display_name": display_name,
- "asset_status": "active",
- "roles": ["author_asset", "source_seed", "high_50plus_profile"],
- "eligible_as_source": True,
- "source_path_record_ids": source_path_record_ids,
- "decision_ids": decision_ids,
- "evidence_refs": evidence_refs,
- }
- summaries.append(summary)
- asset_rows.append(
- {
- "author_asset_id": author_asset_id,
- "platform": platform,
- "platform_author_id": author_id,
- "author_display_name": display_name,
- "author_profile_url": None,
- "asset_status": "active",
- "source_type": source_type,
- "validation_status": "rule_validated",
- "eligible_as_source": 1,
- "elderly_ratio": None,
- "elderly_tgi": None,
- "content_tags": tags,
- "source_run_id": run_id,
- "source_policy_run_id": policy_run_id,
- "profile_snapshot": profile_snapshot,
- "evidence_refs": evidence_refs,
- "raw_payload": {
- "final_output_summary": summary,
- "profile_snapshot": profile_snapshot,
- },
- }
- )
- role_rows.extend(
- {
- "author_asset_id": author_asset_id,
- "role": role,
- "role_status": "active",
- "role_reason_code": "p7_author_asset_eligible",
- "assigned_by": "system",
- "source_run_id": run_id,
- "raw_payload": {"evidence_refs": evidence_refs},
- }
- for role in summary["roles"]
- )
- return summaries, asset_rows, role_rows
- def _author_asset_eligible(
- sample_count: int,
- qualified_count: int,
- qualified_ratio: float,
- age_level: str,
- ) -> bool:
- return (
- sample_count >= 9
- and qualified_count >= 3
- and qualified_ratio >= 1 / 3
- and age_level in {"medium", "strong"}
- )
- def _best_age_level(decisions: list[dict[str, Any]]) -> str:
- priority = {"strong": 3, "medium": 2, "weak": 1, "missing": 0}
- levels = [decision.get("age_50_plus_level", "missing") for decision in decisions]
- return max(levels or ["missing"], key=lambda level: priority.get(level, 0))
- def _paths_by_content_id(source_path_records: list[dict[str, Any]]) -> dict[str, list[str]]:
- pattern_path_records = [
- path for path in source_path_records if path["source_path_type"] == "pattern_to_search_query"
- ]
- by_search_query_id = {
- path["to_node_id"]: path["source_path_record_id"] for path in pattern_path_records
- }
- result: dict[str, list[str]] = {}
- for path in source_path_records:
- if path["source_path_type"] == "search_query_to_content":
- result.setdefault(path["to_node_id"], [])
- pattern_source_path_record_id = by_search_query_id.get(path["from_node_id"])
- if pattern_source_path_record_id:
- result[path["to_node_id"]].append(pattern_source_path_record_id)
- result[path["to_node_id"]].append(path["source_path_record_id"])
- elif path["source_path_type"] == "decision_to_asset":
- result.setdefault(path["to_node_id"], []).append(path["source_path_record_id"])
- return result
- def _stable_id(prefix: str, *parts: Any) -> str:
- raw = ":".join(str(part) for part in parts)
- return f"{prefix}_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}"
|