| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609 |
- from __future__ import annotations
- from collections import Counter
- import hashlib
- from typing import Any
- from content_agent.constants import RUNTIME_SCHEMA_VERSION
- from content_agent.interfaces import RuntimeFileStore
- from content_agent.record_payload import with_raw_payload
- CURRENT_DECISION_ACTIONS = {
- "ADD_TO_CONTENT_POOL",
- "KEEP_CONTENT_FOR_REVIEW",
- "REJECT_CONTENT",
- }
- CURRENT_QUERY_EFFECT_STATUSES = {"success", "pending", "failed", "rule_blocked"}
- def run(
- run_id: str,
- policy_run_id: str,
- runtime: RuntimeFileStore,
- review_id: str | None = None,
- ) -> dict[str, Any]:
- final_output = runtime.read_json(run_id, "final_output.json")
- search_clues = _current_contract_search_clues(runtime.read_jsonl(run_id, "search_clues.jsonl"))
- decisions = _current_contract_decisions(runtime.read_jsonl(run_id, "rule_decisions.jsonl"))
- discovered_content_items = runtime.read_jsonl(run_id, "discovered_content_items.jsonl")
- source_path_records = runtime.read_jsonl(run_id, "source_path_records.jsonl")
- walk_actions = runtime.read_jsonl(run_id, "walk_actions.jsonl")
- performance_rows = runtime.read_performance_feedback(run_id, policy_run_id)
- query_review = _build_query_review(search_clues)
- rule_review = _build_rule_review(decisions)
- performance_feedback = _build_performance_feedback_summary(performance_rows)
- productive_paths = _build_productive_paths(source_path_records)
- search_clue_assets, search_clue_evidence = _build_search_clue_asset_promotions(
- run_id,
- policy_run_id,
- final_output,
- search_clues,
- decisions,
- discovered_content_items,
- source_path_records,
- performance_rows,
- )
- if search_clue_assets:
- runtime.write_search_clue_assets(search_clue_assets)
- runtime.write_search_clue_asset_evidence(search_clue_evidence)
- recommendations = _build_recommendations(
- final_output.get("summary", {}),
- query_review,
- rule_review["top_reject_reasons"],
- performance_feedback,
- )
- suggestions = [
- {"suggestion": item["suggested_action"], "basis": item["basis"]}
- for item in recommendations
- ]
- review = {
- "schema_version": RUNTIME_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "review_id": review_id or f"review_{policy_run_id}",
- "review_status": "generated",
- "data_window": _build_data_window(run_id, policy_run_id, final_output),
- "summary": final_output["summary"],
- "metric_summary": _build_metric_summary(final_output, search_clues, decisions),
- "query_review": query_review,
- "rule_review": rule_review,
- "walk_review": _build_walk_review(walk_actions),
- "asset_review": _build_asset_review(final_output, search_clue_assets),
- "performance_feedback": performance_feedback,
- "recommendations": recommendations,
- "decision_distribution": rule_review["decision_distribution"],
- "effective_search_queries": [
- item["search_query"] for item in query_review["effective_queries"]
- ],
- "weak_search_queries": [
- item["search_query"] for item in query_review["review_queries"]
- ],
- "top_reject_reasons": rule_review["top_reject_reasons"],
- "productive_paths": productive_paths,
- "suggestions": suggestions,
- }
- review = with_raw_payload(review)
- runtime.write_json(run_id, "strategy_review.json", review)
- return review
- def _current_contract_search_clues(search_clues: list[dict[str, Any]]) -> list[dict[str, Any]]:
- return [
- clue
- for clue in search_clues
- if clue.get("search_query_effect_status") in CURRENT_QUERY_EFFECT_STATUSES
- ]
- def _current_contract_decisions(decisions: list[dict[str, Any]]) -> list[dict[str, Any]]:
- return [
- decision
- for decision in decisions
- if decision.get("decision_action") in CURRENT_DECISION_ACTIONS
- and decision.get("search_query_effect_status") in CURRENT_QUERY_EFFECT_STATUSES
- ]
- def _build_data_window(
- run_id: str,
- policy_run_id: str,
- final_output: dict[str, Any],
- ) -> dict[str, Any]:
- return {
- "scope": "single_run",
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "source_files": [
- "final_output.json",
- "search_clues.jsonl",
- "rule_decisions.jsonl",
- "discovered_content_items.jsonl",
- "source_path_records.jsonl",
- "walk_actions.jsonl",
- ],
- "policy_context": _policy_context(final_output),
- }
- def _policy_context(final_output: dict[str, Any]) -> dict[str, Any]:
- policy = final_output.get("policy") or {}
- walk_strategy = final_output.get("walk_strategy") or {}
- return {
- "policy_context_status": "available" if policy or walk_strategy else "missing",
- "strategy_version": policy.get("strategy_version"),
- "policy_bundle_id": policy.get("policy_bundle_id"),
- "rule_pack_id": policy.get("rule_pack_id"),
- "rule_pack_version": policy.get("rule_pack_version"),
- "walk_strategy_version": walk_strategy.get("walk_strategy_version"),
- "policy_bundle_hash": policy.get("policy_bundle_hash"),
- }
- def _build_metric_summary(
- final_output: dict[str, Any],
- search_clues: list[dict[str, Any]],
- decisions: list[dict[str, Any]],
- ) -> dict[str, Any]:
- summary = dict(final_output.get("summary") or {})
- status_counts = Counter(clue["search_query_effect_status"] for clue in search_clues)
- action_counts = Counter(decision["decision_action"] for decision in decisions)
- summary.update(
- {
- "search_query_count": len(search_clues),
- "success_query_count": status_counts["success"],
- "pending_query_count": status_counts["pending"],
- "failed_query_count": status_counts["failed"],
- "rule_blocked_query_count": status_counts["rule_blocked"],
- "pooled_decision_count": action_counts["ADD_TO_CONTENT_POOL"],
- "review_decision_count": action_counts["KEEP_CONTENT_FOR_REVIEW"],
- "rejected_decision_count": action_counts["REJECT_CONTENT"],
- }
- )
- return summary
- def _build_query_review(search_clues: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
- return {
- "effective_queries": _queries_with_status(search_clues, "success"),
- "review_queries": _queries_with_status(search_clues, "pending"),
- "failed_queries": _queries_with_status(search_clues, "failed"),
- "rule_blocked_queries": _queries_with_status(search_clues, "rule_blocked"),
- }
- def _queries_with_status(
- search_clues: list[dict[str, Any]],
- status: str,
- ) -> list[dict[str, Any]]:
- return [
- {
- "clue_id": clue.get("clue_id"),
- "search_query_id": clue.get("search_query_id"),
- "search_query": clue.get("search_query"),
- "search_query_effect_status": clue.get("search_query_effect_status"),
- "result_count": clue.get("result_count", 0),
- "pooled_content_count": clue.get("pooled_content_count", 0),
- "review_content_count": clue.get("review_content_count", 0),
- "rejected_content_count": clue.get("rejected_content_count", 0),
- }
- for clue in search_clues
- if clue.get("search_query_effect_status") == status
- ]
- def _build_rule_review(decisions: list[dict[str, Any]]) -> dict[str, Any]:
- reject_reasons = Counter(
- decision["decision_reason_code"]
- for decision in decisions
- if decision["decision_action"] == "REJECT_CONTENT"
- )
- return {
- "decision_distribution": dict(
- Counter(decision["decision_action"] for decision in decisions)
- ),
- "top_reject_reasons": [
- {"decision_reason_code": reason, "count": count}
- for reason, count in reject_reasons.most_common()
- ],
- "hard_gate_block_count": sum(
- 1
- for decision in decisions
- if decision.get("search_query_effect_status") == "rule_blocked"
- ),
- }
- def _build_walk_review(walk_actions: list[dict[str, Any]]) -> dict[str, Any]:
- status_counts = Counter(action.get("walk_status") for action in walk_actions)
- edge_counts = Counter(action.get("edge_id") for action in walk_actions)
- return {
- "walk_action_count": len(walk_actions),
- "walk_status_distribution": dict(status_counts),
- "edge_distribution": dict(edge_counts),
- }
- def _build_asset_review(
- final_output: dict[str, Any],
- search_clue_assets: list[dict[str, Any]],
- ) -> dict[str, Any]:
- return {
- "content_asset_count": len(final_output.get("content_assets") or []),
- "author_asset_count": len(final_output.get("author_assets") or []),
- "search_clue_assets": {
- "promoted_count": len(search_clue_assets),
- "assets": [
- {
- "search_clue_asset_id": asset["search_clue_asset_id"],
- "platform": asset["platform"],
- "clue_type": asset["clue_type"],
- "normalized_clue_text": asset["normalized_clue_text"],
- "can_seed_next_run": asset["can_seed_next_run"],
- }
- for asset in search_clue_assets
- ],
- },
- }
- def _build_search_clue_asset_promotions(
- run_id: str,
- policy_run_id: str,
- final_output: dict[str, Any],
- search_clues: list[dict[str, Any]],
- decisions: list[dict[str, Any]],
- discovered_content_items: list[dict[str, Any]],
- source_path_records: list[dict[str, Any]],
- performance_rows: list[dict[str, Any]],
- ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
- content_assets = final_output.get("content_assets") or []
- if not content_assets:
- return [], []
- decisions_by_id = {
- decision["decision_id"]: decision
- for decision in decisions
- if decision.get("decision_action") == "ADD_TO_CONTENT_POOL"
- }
- source_paths_by_id = {
- path["source_path_record_id"]: path
- for path in source_path_records
- if path.get("source_path_type") == "decision_to_asset"
- }
- discovered_by_content_id = {
- item["platform_content_id"]: item
- for item in discovered_content_items
- if item.get("platform_content_id")
- }
- discovered_by_discovery_id = {
- item["content_discovery_id"]: item
- for item in discovered_content_items
- if item.get("content_discovery_id")
- }
- feedback_refs = [
- row["feedback_id"]
- for row in performance_rows
- if row.get("feedback_id")
- ]
- assets: list[dict[str, Any]] = []
- evidence: list[dict[str, Any]] = []
- for clue in search_clues:
- if clue.get("search_query_effect_status") != "success":
- continue
- if int(clue.get("pooled_content_count") or 0) <= 0:
- continue
- normalized = _normalize_clue_text(clue.get("search_query"))
- if not normalized:
- continue
- matched = _matched_asset_lineage_for_clue(
- clue,
- content_assets,
- decisions_by_id,
- source_paths_by_id,
- discovered_by_content_id,
- discovered_by_discovery_id,
- )
- if not matched["source_path_record_ids"] or not matched["decision_ids"]:
- continue
- platform = matched["platform"] or "douyin"
- asset_id = _stable_id("search_clue_asset", platform, "search_query", normalized)
- asset = {
- "search_clue_asset_id": asset_id,
- "platform": platform,
- "clue_type": "search_query",
- "normalized_clue_text": normalized,
- "display_clue_text": clue.get("search_query"),
- "promotion_status": "promoted",
- "reusable_priority": int(clue.get("pooled_content_count") or 0),
- "can_seed_next_run": 1,
- "first_seen_run_id": run_id,
- "first_seen_policy_run_id": policy_run_id,
- "summary_metrics": {
- "result_count": clue.get("result_count", 0),
- "pooled_content_count": clue.get("pooled_content_count", 0),
- "review_content_count": clue.get("review_content_count", 0),
- "failed_content_count": clue.get("rejected_content_count", 0),
- "matched_content_asset_count": len(matched["content_asset_ids"]),
- },
- }
- asset["raw_payload"] = {
- **asset,
- "promotion_reason": "success_search_clue_with_content_asset_lineage",
- }
- assets.append(asset)
- evidence_row = {
- "evidence_id": _stable_id("search_clue_evidence", run_id, policy_run_id, clue["clue_id"]),
- "search_clue_asset_id": asset_id,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "clue_id": clue["clue_id"],
- "search_query_id": clue.get("search_query_id"),
- "pooled_content_count": int(clue.get("pooled_content_count") or 0),
- "review_content_count": int(clue.get("review_content_count") or 0),
- "failed_content_count": int(clue.get("rejected_content_count") or 0),
- "source_path_record_ids": matched["source_path_record_ids"],
- "decision_ids": matched["decision_ids"],
- "performance_feedback_refs": feedback_refs,
- }
- evidence_row["raw_payload"] = dict(evidence_row)
- evidence.append(evidence_row)
- return assets, evidence
- def _matched_asset_lineage_for_clue(
- clue: dict[str, Any],
- content_assets: list[dict[str, Any]],
- decisions_by_id: dict[str, dict[str, Any]],
- source_paths_by_id: dict[str, dict[str, Any]],
- discovered_by_content_id: dict[str, dict[str, Any]],
- discovered_by_discovery_id: dict[str, dict[str, Any]],
- ) -> dict[str, Any]:
- clue_query_id = clue.get("search_query_id")
- result = {
- "platform": None,
- "content_asset_ids": [],
- "source_path_record_ids": [],
- "decision_ids": [],
- }
- for asset in content_assets:
- decision_id = asset.get("decision_id") or next(iter(asset.get("decision_ids", [])), None)
- decision = decisions_by_id.get(decision_id)
- if not decision:
- continue
- if clue_query_id not in _asset_query_ids(asset, decision, discovered_by_content_id, discovered_by_discovery_id):
- continue
- path_ids = _matched_decision_to_asset_path_ids(asset, decision, source_paths_by_id)
- if not path_ids:
- continue
- result["platform"] = result["platform"] or asset.get("platform")
- result["content_asset_ids"].append(
- asset.get("content_asset_id") or asset.get("platform_content_id")
- )
- result["source_path_record_ids"].extend(path_ids)
- result["decision_ids"].append(decision_id)
- return {
- **result,
- "content_asset_ids": sorted({value for value in result["content_asset_ids"] if value}),
- "source_path_record_ids": sorted(set(result["source_path_record_ids"])),
- "decision_ids": sorted(set(result["decision_ids"])),
- }
- def _asset_query_ids(
- asset: dict[str, Any],
- decision: dict[str, Any],
- discovered_by_content_id: dict[str, dict[str, Any]],
- discovered_by_discovery_id: dict[str, dict[str, Any]],
- ) -> set[str]:
- item = (
- discovered_by_discovery_id.get(asset.get("content_discovery_id"))
- or discovered_by_content_id.get(asset.get("platform_content_id"))
- or {}
- )
- evidence = decision.get("source_evidence") or {}
- asset_evidence = asset.get("source_evidence") or {}
- values = {
- item.get("search_query_id"),
- evidence.get("search_query_id"),
- asset_evidence.get("search_query_id"),
- }
- values.update(evidence.get("matched_search_query_ids") or [])
- values.update(asset_evidence.get("matched_search_query_ids") or [])
- return {value for value in values if value}
- def _matched_decision_to_asset_path_ids(
- asset: dict[str, Any],
- decision: dict[str, Any],
- source_paths_by_id: dict[str, dict[str, Any]],
- ) -> list[str]:
- asset_path_ids = set(asset.get("source_path_record_ids") or [])
- asset_node_ids = {
- asset.get("content_asset_id"),
- asset.get("platform_content_id"),
- }
- matched = []
- for path_id in asset_path_ids:
- path = source_paths_by_id.get(path_id)
- if not path:
- continue
- if path.get("decision_id") != decision.get("decision_id"):
- continue
- if path.get("from_node_id") != decision.get("decision_id"):
- continue
- if path.get("to_node_id") not in asset_node_ids:
- continue
- matched.append(path_id)
- return matched
- def _normalize_clue_text(value: Any) -> str:
- if not isinstance(value, str):
- return ""
- return " ".join(value.strip().lower().split())
- def _stable_id(prefix: str, *parts: str) -> str:
- digest = hashlib.sha1("|".join(parts).encode("utf-8")).hexdigest()[:16]
- return f"{prefix}_{digest}"
- def _build_performance_feedback_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
- if not rows:
- return {
- "performance_feedback_status": "missing",
- "feedback_source": "none",
- "feedback_count": 0,
- }
- status_counts = Counter(row.get("feedback_status", "available") for row in rows)
- return {
- "performance_feedback_status": "available",
- "feedback_count": len(rows),
- "feedback_status_distribution": dict(status_counts),
- "average_completion_rate": _average_metric(rows, "completion_rate"),
- "average_share_rate": _average_metric(rows, "share_rate"),
- "average_watch_seconds": _average_metric(rows, "average_watch_seconds"),
- }
- def _average_metric(rows: list[dict[str, Any]], key: str) -> float | None:
- values = [row.get(key) for row in rows if row.get(key) is not None]
- if not values:
- return None
- return sum(float(value) for value in values) / len(values)
- def _build_productive_paths(source_path_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
- return [
- {
- "source_path_record_id": path["source_path_record_id"],
- "from": f"{path['from_node_type']}:{path['from_node_id']}",
- "to": f"{path['to_node_type']}:{path['to_node_id']}",
- }
- for path in source_path_records
- if path["source_path_type"] == "decision_to_asset"
- ]
- def _build_recommendations(
- summary: dict[str, Any],
- query_review: dict[str, list[dict[str, Any]]],
- reject_reasons: list[dict[str, Any]],
- performance_feedback: dict[str, Any],
- ) -> list[dict[str, Any]]:
- recommendations: list[dict[str, Any]] = []
- if summary.get("pooled_content_count", 0) > 0:
- recommendations.append(
- _recommendation(
- "rec_query_keep_success",
- "query_strategy",
- "search_query",
- "success_queries",
- "keep",
- "本次已有搜索词产生 ADD_TO_CONTENT_POOL 结果。",
- [
- f"search_clues.jsonl:{item['clue_id']}"
- for item in query_review["effective_queries"]
- if item.get("clue_id")
- ],
- )
- )
- if query_review["review_queries"]:
- recommendations.append(
- _recommendation(
- "rec_query_review_budget",
- "query_strategy",
- "search_query",
- "pending_queries",
- "review_with_small_budget",
- "存在只产生 KEEP_CONTENT_FOR_REVIEW 的搜索词。",
- [
- f"search_clues.jsonl:{item['clue_id']}"
- for item in query_review["review_queries"]
- if item.get("clue_id")
- ],
- )
- )
- if reject_reasons:
- reason = reject_reasons[0]["decision_reason_code"]
- recommendations.append(
- _recommendation(
- "rec_rule_fix_reject_reason",
- "rule_evidence",
- "decision_reason_code",
- reason,
- "inspect_reject_evidence",
- f"最高频淘汰原因是 {reason}。",
- [f"rule_decisions.jsonl:{reason}"],
- )
- )
- if performance_feedback["performance_feedback_status"] == "missing":
- recommendations.append(
- _recommendation(
- "rec_performance_feedback_missing",
- "performance_feedback",
- "run",
- "performance_feedback",
- "keep_feedback_empty",
- "当前没有后验表现反馈,先不据此修改策略。",
- [],
- )
- )
- else:
- recommendations.append(
- _recommendation(
- "rec_performance_feedback_review",
- "performance_feedback",
- "run",
- "performance_feedback",
- "review_feedback_before_strategy_change",
- "已有后验表现反馈,只作为下一轮建议证据,不覆盖本轮规则判断。",
- ["performance_feedback"],
- )
- )
- return recommendations
- def _recommendation(
- recommendation_id: str,
- recommendation_type: str,
- target_type: str,
- target_id: str,
- suggested_action: str,
- basis: str,
- evidence_refs: list[str],
- ) -> dict[str, Any]:
- return {
- "recommendation_id": recommendation_id,
- "recommendation_type": recommendation_type,
- "target_type": target_type,
- "target_id": target_id,
- "suggested_action": suggested_action,
- "basis": basis,
- "evidence_refs": evidence_refs,
- "requires_human_approval": True,
- }
- def _build_suggestions(
- summary: dict[str, Any],
- search_clues: list[dict[str, Any]],
- reject_reasons: Counter[str],
- ) -> list[dict[str, str]]:
- query_review = _build_query_review(_current_contract_search_clues(search_clues))
- recommendations = _build_recommendations(
- summary,
- query_review,
- [
- {"decision_reason_code": reason, "count": count}
- for reason, count in reject_reasons.most_common()
- ],
- _build_performance_feedback_summary([]),
- )
- return [
- {"suggestion": item["suggested_action"], "basis": item["basis"]}
- for item in recommendations
- ]
|