|
|
@@ -1,9 +1,11 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
+import hashlib
|
|
|
from collections import Counter
|
|
|
from typing import Any
|
|
|
|
|
|
from content_agent.constants import RUNTIME_SCHEMA_VERSION
|
|
|
+from content_agent.business_modules.run_record.validation import compute_final_output_completeness
|
|
|
from content_agent.interfaces import RuntimeFileStore
|
|
|
|
|
|
|
|
|
@@ -23,45 +25,32 @@ def run(
|
|
|
media["platform_content_id"]: media for media in content_media_records
|
|
|
}
|
|
|
paths_by_content_id = _paths_by_content_id(source_path_records)
|
|
|
-
|
|
|
- content_assets: list[dict[str, Any]] = []
|
|
|
- reject_records: list[dict[str, Any]] = []
|
|
|
- for item in discovered_content_items:
|
|
|
- decision = decision_by_target_id[item["platform_content_id"]]
|
|
|
- if decision["decision_action"] == "ADD_TO_CONTENT_POOL":
|
|
|
- content_assets.append(
|
|
|
- {
|
|
|
- "platform": item["platform"],
|
|
|
- "platform_content_id": item["platform_content_id"],
|
|
|
- "policy_run_id": policy_run_id,
|
|
|
- "content_discovery_id": item["content_discovery_id"],
|
|
|
- "final_asset_status": "pooled",
|
|
|
- "decision_id": decision["decision_id"],
|
|
|
- "rule_pack_id": decision["rule_pack_id"],
|
|
|
- "rule_pack_version": decision["rule_pack_version"],
|
|
|
- "strategy_version": decision["strategy_version"],
|
|
|
- "source_path_record_ids": paths_by_content_id[item["platform_content_id"]],
|
|
|
- "source_evidence": {
|
|
|
- **decision["source_evidence"],
|
|
|
- "source_path_record_ids": paths_by_content_id[
|
|
|
- item["platform_content_id"]
|
|
|
- ],
|
|
|
- },
|
|
|
- "content_media_status": media_by_platform_content_id[
|
|
|
- item["platform_content_id"]
|
|
|
- ]["content_media_status"],
|
|
|
- }
|
|
|
- )
|
|
|
- if decision["decision_action"] == "REJECT_CONTENT":
|
|
|
- reject_records.append(
|
|
|
- {
|
|
|
- "decision_target_id": item["platform_content_id"],
|
|
|
- "policy_run_id": policy_run_id,
|
|
|
- "main_decision_reason_code": decision["decision_reason_code"],
|
|
|
- "decision_id": decision["decision_id"],
|
|
|
- "source_evidence": decision["source_evidence"],
|
|
|
- }
|
|
|
- )
|
|
|
+ content_assets = _build_content_assets(
|
|
|
+ policy_run_id,
|
|
|
+ discovered_content_items,
|
|
|
+ decision_by_target_id,
|
|
|
+ media_by_platform_content_id,
|
|
|
+ paths_by_content_id,
|
|
|
+ )
|
|
|
+ review_records = _build_review_records(
|
|
|
+ policy_run_id,
|
|
|
+ discovered_content_items,
|
|
|
+ decision_by_target_id,
|
|
|
+ media_by_platform_content_id,
|
|
|
+ paths_by_content_id,
|
|
|
+ )
|
|
|
+ reject_records = _build_reject_records(
|
|
|
+ policy_run_id,
|
|
|
+ discovered_content_items,
|
|
|
+ decision_by_target_id,
|
|
|
+ )
|
|
|
+ author_assets, author_asset_rows, author_role_rows = _build_author_assets(
|
|
|
+ run_id,
|
|
|
+ policy_run_id,
|
|
|
+ discovered_content_items,
|
|
|
+ decision_by_target_id,
|
|
|
+ paths_by_content_id,
|
|
|
+ )
|
|
|
|
|
|
action_counts = Counter(decision["decision_action"] for decision in decisions)
|
|
|
effect_status_counts = Counter(
|
|
|
@@ -71,19 +60,27 @@ def run(
|
|
|
"schema_version": RUNTIME_SCHEMA_VERSION,
|
|
|
"run_id": run_id,
|
|
|
"policy_run_id": policy_run_id,
|
|
|
- "policy_bundle_id": policy_bundle["policy_bundle_id"],
|
|
|
- "strategy_id": policy_bundle["strategy_id"],
|
|
|
- "strategy_version": policy_bundle["strategy_version"],
|
|
|
- "rule_pack_id": policy_bundle["rule_pack_id"],
|
|
|
- "rule_pack_version": policy_bundle["rule_pack_version"],
|
|
|
- "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
|
|
|
+ "policy": {
|
|
|
+ "policy_bundle_id": policy_bundle["policy_bundle_id"],
|
|
|
+ "strategy_id": policy_bundle["strategy_id"],
|
|
|
+ "strategy_version": policy_bundle["strategy_version"],
|
|
|
+ "rule_pack_id": policy_bundle["rule_pack_id"],
|
|
|
+ "rule_pack_version": policy_bundle["rule_pack_version"],
|
|
|
+ "policy_bundle_hash": policy_bundle["policy_bundle_hash"],
|
|
|
+ "strategy_source_ref": policy_bundle["strategy_source_ref"],
|
|
|
+ "rule_pack_source_ref": policy_bundle["rule_pack_source_ref"],
|
|
|
+ },
|
|
|
+ "walk_strategy": {
|
|
|
+ "walk_strategy_id": policy_bundle.get("walk_strategy_id"),
|
|
|
+ "walk_strategy_version": policy_bundle.get("walk_strategy_version"),
|
|
|
+ "walk_strategy_source_ref": policy_bundle.get("walk_strategy_source_ref"),
|
|
|
+ },
|
|
|
"dispatch": policy_bundle.get("dispatch"),
|
|
|
"dispatch_id": policy_bundle.get("dispatch_id"),
|
|
|
"runtime_status_contract": policy_bundle.get("runtime_status_contract", {}),
|
|
|
- "strategy_source_ref": policy_bundle["strategy_source_ref"],
|
|
|
- "rule_pack_source_ref": policy_bundle["rule_pack_source_ref"],
|
|
|
"content_assets": content_assets,
|
|
|
- "author_assets": [],
|
|
|
+ "author_assets": author_assets,
|
|
|
+ "review_records": review_records,
|
|
|
"decision_records": [
|
|
|
{
|
|
|
"decision_id": decision["decision_id"],
|
|
|
@@ -123,13 +120,295 @@ def run(
|
|
|
"rule_blocked": effect_status_counts["rule_blocked"],
|
|
|
},
|
|
|
"policy_bundle_hash": policy_bundle["policy_bundle_hash"],
|
|
|
- "run_path_complete": True,
|
|
|
},
|
|
|
}
|
|
|
+ completeness = compute_final_output_completeness(
|
|
|
+ final_output,
|
|
|
+ decisions,
|
|
|
+ source_path_records,
|
|
|
+ )
|
|
|
+ final_output["validation_status"] = completeness["validation_status"]
|
|
|
+ final_output["summary"]["run_path_complete"] = completeness["run_path_complete"]
|
|
|
+ final_output["summary"]["trace_complete"] = completeness["trace_complete"]
|
|
|
+ final_output["summary"]["validation_findings_summary"] = completeness["findings_summary"]
|
|
|
runtime.write_json(run_id, "final_output.json", final_output)
|
|
|
+ runtime.write_publish_jobs(
|
|
|
+ run_id,
|
|
|
+ policy_run_id,
|
|
|
+ _build_publish_jobs(run_id, policy_run_id, content_assets),
|
|
|
+ )
|
|
|
+ runtime.write_author_assets(author_asset_rows)
|
|
|
+ runtime.write_author_asset_roles(author_role_rows)
|
|
|
return final_output
|
|
|
|
|
|
|
|
|
+def _build_content_assets(
|
|
|
+ policy_run_id: str,
|
|
|
+ discovered_content_items: list[dict[str, Any]],
|
|
|
+ decision_by_target_id: dict[str, dict[str, Any]],
|
|
|
+ media_by_platform_content_id: dict[str, dict[str, Any]],
|
|
|
+ paths_by_content_id: dict[str, list[str]],
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
+ content_assets: list[dict[str, Any]] = []
|
|
|
+ for item in discovered_content_items:
|
|
|
+ platform_content_id = item["platform_content_id"]
|
|
|
+ decision = decision_by_target_id[platform_content_id]
|
|
|
+ if decision["decision_action"] != "ADD_TO_CONTENT_POOL":
|
|
|
+ continue
|
|
|
+ path_ids = paths_by_content_id[platform_content_id]
|
|
|
+ content_assets.append(
|
|
|
+ {
|
|
|
+ "platform": item["platform"],
|
|
|
+ "platform_content_id": platform_content_id,
|
|
|
+ "policy_run_id": policy_run_id,
|
|
|
+ "content_discovery_id": item["content_discovery_id"],
|
|
|
+ "final_asset_status": "pooled",
|
|
|
+ "decision_id": decision["decision_id"],
|
|
|
+ "rule_pack_id": decision["rule_pack_id"],
|
|
|
+ "rule_pack_version": decision["rule_pack_version"],
|
|
|
+ "strategy_version": decision["strategy_version"],
|
|
|
+ "source_path_record_ids": path_ids,
|
|
|
+ "source_evidence": {
|
|
|
+ **decision["source_evidence"],
|
|
|
+ "source_path_record_ids": path_ids,
|
|
|
+ },
|
|
|
+ "content_media_status": media_by_platform_content_id[
|
|
|
+ platform_content_id
|
|
|
+ ]["content_media_status"],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return content_assets
|
|
|
+
|
|
|
+
|
|
|
+def _build_review_records(
|
|
|
+ policy_run_id: str,
|
|
|
+ discovered_content_items: list[dict[str, Any]],
|
|
|
+ decision_by_target_id: dict[str, dict[str, Any]],
|
|
|
+ media_by_platform_content_id: dict[str, dict[str, Any]],
|
|
|
+ paths_by_content_id: dict[str, list[str]],
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
+ review_records: list[dict[str, Any]] = []
|
|
|
+ for item in discovered_content_items:
|
|
|
+ platform_content_id = item["platform_content_id"]
|
|
|
+ decision = decision_by_target_id[platform_content_id]
|
|
|
+ if decision["decision_action"] != "KEEP_CONTENT_FOR_REVIEW":
|
|
|
+ continue
|
|
|
+ path_ids = paths_by_content_id[platform_content_id]
|
|
|
+ review_records.append(
|
|
|
+ {
|
|
|
+ "platform": item["platform"],
|
|
|
+ "platform_content_id": platform_content_id,
|
|
|
+ "policy_run_id": policy_run_id,
|
|
|
+ "content_discovery_id": item["content_discovery_id"],
|
|
|
+ "review_status": "pending_review",
|
|
|
+ "final_asset_status": "review_only",
|
|
|
+ "decision_id": decision["decision_id"],
|
|
|
+ "rule_pack_id": decision["rule_pack_id"],
|
|
|
+ "rule_pack_version": decision["rule_pack_version"],
|
|
|
+ "strategy_version": decision["strategy_version"],
|
|
|
+ "decision_reason_code": decision["decision_reason_code"],
|
|
|
+ "source_path_record_ids": path_ids,
|
|
|
+ "source_evidence": {
|
|
|
+ **decision["source_evidence"],
|
|
|
+ "source_path_record_ids": path_ids,
|
|
|
+ },
|
|
|
+ "content_media_status": media_by_platform_content_id[
|
|
|
+ platform_content_id
|
|
|
+ ]["content_media_status"],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return review_records
|
|
|
+
|
|
|
+
|
|
|
+def _build_reject_records(
|
|
|
+ policy_run_id: str,
|
|
|
+ discovered_content_items: list[dict[str, Any]],
|
|
|
+ decision_by_target_id: dict[str, dict[str, Any]],
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
+ reject_records: list[dict[str, Any]] = []
|
|
|
+ for item in discovered_content_items:
|
|
|
+ platform_content_id = item["platform_content_id"]
|
|
|
+ decision = decision_by_target_id[platform_content_id]
|
|
|
+ if decision["decision_action"] != "REJECT_CONTENT":
|
|
|
+ continue
|
|
|
+ reject_records.append(
|
|
|
+ {
|
|
|
+ "decision_target_id": platform_content_id,
|
|
|
+ "policy_run_id": policy_run_id,
|
|
|
+ "main_decision_reason_code": decision["decision_reason_code"],
|
|
|
+ "decision_id": decision["decision_id"],
|
|
|
+ "source_evidence": decision["source_evidence"],
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return reject_records
|
|
|
+
|
|
|
+
|
|
|
+def _build_publish_jobs(
|
|
|
+ run_id: str,
|
|
|
+ policy_run_id: str,
|
|
|
+ content_assets: list[dict[str, Any]],
|
|
|
+) -> list[dict[str, Any]]:
|
|
|
+ jobs: list[dict[str, Any]] = []
|
|
|
+ for asset in content_assets:
|
|
|
+ publish_job_id = _stable_id(
|
|
|
+ "publish_job",
|
|
|
+ run_id,
|
|
|
+ policy_run_id,
|
|
|
+ asset["platform_content_id"],
|
|
|
+ asset["decision_id"],
|
|
|
+ )
|
|
|
+ jobs.append(
|
|
|
+ {
|
|
|
+ "publish_job_id": publish_job_id,
|
|
|
+ "platform_content_id": asset["platform_content_id"],
|
|
|
+ "job_status": "created",
|
|
|
+ "trigger_mode": "manual_review",
|
|
|
+ "request_payload": {
|
|
|
+ "run_id": run_id,
|
|
|
+ "policy_run_id": policy_run_id,
|
|
|
+ "content_asset": asset,
|
|
|
+ "decision_id": asset["decision_id"],
|
|
|
+ "source_path_record_ids": asset["source_path_record_ids"],
|
|
|
+ },
|
|
|
+ "response_payload": {},
|
|
|
+ }
|
|
|
+ )
|
|
|
+ return jobs
|
|
|
+
|
|
|
+
|
|
|
+def _build_author_assets(
|
|
|
+ run_id: str,
|
|
|
+ policy_run_id: str,
|
|
|
+ discovered_content_items: list[dict[str, Any]],
|
|
|
+ decision_by_target_id: dict[str, dict[str, Any]],
|
|
|
+ paths_by_content_id: dict[str, list[str]],
|
|
|
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
|
|
|
+ by_author: dict[tuple[str, str], list[dict[str, Any]]] = {}
|
|
|
+ for item in discovered_content_items:
|
|
|
+ author_id = item.get("platform_author_id")
|
|
|
+ if not author_id:
|
|
|
+ continue
|
|
|
+ platform = item.get("platform", "douyin")
|
|
|
+ by_author.setdefault((platform, author_id), []).append(item)
|
|
|
+
|
|
|
+ summaries: list[dict[str, Any]] = []
|
|
|
+ asset_rows: list[dict[str, Any]] = []
|
|
|
+ role_rows: list[dict[str, Any]] = []
|
|
|
+ for (platform, author_id), items in sorted(by_author.items()):
|
|
|
+ decisions = [
|
|
|
+ decision_by_target_id[item["platform_content_id"]]
|
|
|
+ for item in items
|
|
|
+ if item["platform_content_id"] in decision_by_target_id
|
|
|
+ ]
|
|
|
+ qualified_decisions = [
|
|
|
+ decision for decision in decisions if decision["decision_action"] == "ADD_TO_CONTENT_POOL"
|
|
|
+ ]
|
|
|
+ sample_count = len(items)
|
|
|
+ qualified_count = len(qualified_decisions)
|
|
|
+ qualified_ratio = qualified_count / sample_count if sample_count else 0
|
|
|
+ age_level = _best_age_level(decisions)
|
|
|
+ if not _author_asset_eligible(sample_count, qualified_count, qualified_ratio, age_level):
|
|
|
+ continue
|
|
|
+
|
|
|
+ author_asset_id = _stable_id("author_asset", platform, author_id)
|
|
|
+ source_path_record_ids = sorted(
|
|
|
+ {
|
|
|
+ path_id
|
|
|
+ for item in items
|
|
|
+ for path_id in paths_by_content_id.get(item["platform_content_id"], [])
|
|
|
+ }
|
|
|
+ )
|
|
|
+ decision_ids = [decision["decision_id"] for decision in decisions]
|
|
|
+ tags = sorted({tag for item in items for tag in item.get("tags", [])})
|
|
|
+ display_name = next((item.get("author_display_name") for item in items if item.get("author_display_name")), "")
|
|
|
+ source_type = (
|
|
|
+ "new_discovery"
|
|
|
+ if any(item.get("previous_discovery_step") == "author_work" for item in items)
|
|
|
+ else "new_discovery"
|
|
|
+ )
|
|
|
+ evidence_refs = {
|
|
|
+ "decision_ids": decision_ids,
|
|
|
+ "content_discovery_ids": [item["content_discovery_id"] for item in items],
|
|
|
+ "source_path_record_ids": source_path_record_ids,
|
|
|
+ }
|
|
|
+ profile_snapshot = {
|
|
|
+ "sample_count": sample_count,
|
|
|
+ "qualified_content_count": qualified_count,
|
|
|
+ "qualified_content_ratio": qualified_ratio,
|
|
|
+ "age_50_plus_level": age_level,
|
|
|
+ }
|
|
|
+ summary = {
|
|
|
+ "author_asset_id": author_asset_id,
|
|
|
+ "platform": platform,
|
|
|
+ "platform_author_id": author_id,
|
|
|
+ "author_display_name": display_name,
|
|
|
+ "asset_status": "active",
|
|
|
+ "roles": ["author_asset", "source_seed", "high_50plus_profile"],
|
|
|
+ "eligible_as_source": True,
|
|
|
+ "source_path_record_ids": source_path_record_ids,
|
|
|
+ "decision_ids": decision_ids,
|
|
|
+ "evidence_refs": evidence_refs,
|
|
|
+ }
|
|
|
+ summaries.append(summary)
|
|
|
+ asset_rows.append(
|
|
|
+ {
|
|
|
+ "author_asset_id": author_asset_id,
|
|
|
+ "platform": platform,
|
|
|
+ "platform_author_id": author_id,
|
|
|
+ "author_display_name": display_name,
|
|
|
+ "author_profile_url": None,
|
|
|
+ "asset_status": "active",
|
|
|
+ "source_type": source_type,
|
|
|
+ "validation_status": "rule_validated",
|
|
|
+ "eligible_as_source": 1,
|
|
|
+ "elderly_ratio": None,
|
|
|
+ "elderly_tgi": None,
|
|
|
+ "content_tags": tags,
|
|
|
+ "source_run_id": run_id,
|
|
|
+ "source_policy_run_id": policy_run_id,
|
|
|
+ "profile_snapshot": profile_snapshot,
|
|
|
+ "evidence_refs": evidence_refs,
|
|
|
+ "raw_payload": {
|
|
|
+ "final_output_summary": summary,
|
|
|
+ "profile_snapshot": profile_snapshot,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ )
|
|
|
+ role_rows.extend(
|
|
|
+ {
|
|
|
+ "author_asset_id": author_asset_id,
|
|
|
+ "role": role,
|
|
|
+ "role_status": "active",
|
|
|
+ "role_reason_code": "p7_author_asset_eligible",
|
|
|
+ "assigned_by": "system",
|
|
|
+ "source_run_id": run_id,
|
|
|
+ "raw_payload": {"evidence_refs": evidence_refs},
|
|
|
+ }
|
|
|
+ for role in summary["roles"]
|
|
|
+ )
|
|
|
+ return summaries, asset_rows, role_rows
|
|
|
+
|
|
|
+
|
|
|
+def _author_asset_eligible(
|
|
|
+ sample_count: int,
|
|
|
+ qualified_count: int,
|
|
|
+ qualified_ratio: float,
|
|
|
+ age_level: str,
|
|
|
+) -> bool:
|
|
|
+ return (
|
|
|
+ sample_count >= 9
|
|
|
+ and qualified_count >= 3
|
|
|
+ and qualified_ratio >= 1 / 3
|
|
|
+ and age_level in {"medium", "strong"}
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+def _best_age_level(decisions: list[dict[str, Any]]) -> str:
|
|
|
+ priority = {"strong": 3, "medium": 2, "weak": 1, "missing": 0}
|
|
|
+ levels = [decision.get("age_50_plus_level", "missing") for decision in decisions]
|
|
|
+ return max(levels or ["missing"], key=lambda level: priority.get(level, 0))
|
|
|
+
|
|
|
+
|
|
|
def _paths_by_content_id(source_path_records: list[dict[str, Any]]) -> dict[str, list[str]]:
|
|
|
pattern_path_records = [
|
|
|
path for path in source_path_records if path["source_path_type"] == "pattern_to_search_query"
|
|
|
@@ -148,3 +427,8 @@ def _paths_by_content_id(source_path_records: list[dict[str, Any]]) -> dict[str,
|
|
|
elif path["source_path_type"] == "decision_to_asset":
|
|
|
result.setdefault(path["to_node_id"], []).append(path["source_path_record_id"])
|
|
|
return result
|
|
|
+
|
|
|
+
|
|
|
+def _stable_id(prefix: str, *parts: Any) -> str:
|
|
|
+ raw = ":".join(str(part) for part in parts)
|
|
|
+ return f"{prefix}_{hashlib.sha1(raw.encode('utf-8')).hexdigest()[:16]}"
|