learning_review.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609
  1. from __future__ import annotations
  2. from collections import Counter
  3. import hashlib
  4. from typing import Any
  5. from content_agent.constants import RUNTIME_SCHEMA_VERSION
  6. from content_agent.interfaces import RuntimeFileStore
  7. from content_agent.record_payload import with_raw_payload
  8. CURRENT_DECISION_ACTIONS = {
  9. "ADD_TO_CONTENT_POOL",
  10. "KEEP_CONTENT_FOR_REVIEW",
  11. "REJECT_CONTENT",
  12. }
  13. CURRENT_QUERY_EFFECT_STATUSES = {"success", "pending", "failed", "rule_blocked"}
  14. def run(
  15. run_id: str,
  16. policy_run_id: str,
  17. runtime: RuntimeFileStore,
  18. review_id: str | None = None,
  19. ) -> dict[str, Any]:
  20. final_output = runtime.read_json(run_id, "final_output.json")
  21. search_clues = _current_contract_search_clues(runtime.read_jsonl(run_id, "search_clues.jsonl"))
  22. decisions = _current_contract_decisions(runtime.read_jsonl(run_id, "rule_decisions.jsonl"))
  23. discovered_content_items = runtime.read_jsonl(run_id, "discovered_content_items.jsonl")
  24. source_path_records = runtime.read_jsonl(run_id, "source_path_records.jsonl")
  25. walk_actions = runtime.read_jsonl(run_id, "walk_actions.jsonl")
  26. performance_rows = runtime.read_performance_feedback(run_id, policy_run_id)
  27. query_review = _build_query_review(search_clues)
  28. rule_review = _build_rule_review(decisions)
  29. performance_feedback = _build_performance_feedback_summary(performance_rows)
  30. productive_paths = _build_productive_paths(source_path_records)
  31. search_clue_assets, search_clue_evidence = _build_search_clue_asset_promotions(
  32. run_id,
  33. policy_run_id,
  34. final_output,
  35. search_clues,
  36. decisions,
  37. discovered_content_items,
  38. source_path_records,
  39. performance_rows,
  40. )
  41. if search_clue_assets:
  42. runtime.write_search_clue_assets(search_clue_assets)
  43. runtime.write_search_clue_asset_evidence(search_clue_evidence)
  44. recommendations = _build_recommendations(
  45. final_output.get("summary", {}),
  46. query_review,
  47. rule_review["top_reject_reasons"],
  48. performance_feedback,
  49. )
  50. suggestions = [
  51. {"suggestion": item["suggested_action"], "basis": item["basis"]}
  52. for item in recommendations
  53. ]
  54. review = {
  55. "schema_version": RUNTIME_SCHEMA_VERSION,
  56. "run_id": run_id,
  57. "policy_run_id": policy_run_id,
  58. "review_id": review_id or f"review_{policy_run_id}",
  59. "review_status": "generated",
  60. "data_window": _build_data_window(run_id, policy_run_id, final_output),
  61. "summary": final_output["summary"],
  62. "metric_summary": _build_metric_summary(final_output, search_clues, decisions),
  63. "query_review": query_review,
  64. "rule_review": rule_review,
  65. "walk_review": _build_walk_review(walk_actions),
  66. "asset_review": _build_asset_review(final_output, search_clue_assets),
  67. "performance_feedback": performance_feedback,
  68. "recommendations": recommendations,
  69. "decision_distribution": rule_review["decision_distribution"],
  70. "effective_search_queries": [
  71. item["search_query"] for item in query_review["effective_queries"]
  72. ],
  73. "weak_search_queries": [
  74. item["search_query"] for item in query_review["review_queries"]
  75. ],
  76. "top_reject_reasons": rule_review["top_reject_reasons"],
  77. "productive_paths": productive_paths,
  78. "suggestions": suggestions,
  79. }
  80. review = with_raw_payload(review)
  81. runtime.write_json(run_id, "strategy_review.json", review)
  82. return review
  83. def _current_contract_search_clues(search_clues: list[dict[str, Any]]) -> list[dict[str, Any]]:
  84. return [
  85. clue
  86. for clue in search_clues
  87. if clue.get("search_query_effect_status") in CURRENT_QUERY_EFFECT_STATUSES
  88. ]
  89. def _current_contract_decisions(decisions: list[dict[str, Any]]) -> list[dict[str, Any]]:
  90. return [
  91. decision
  92. for decision in decisions
  93. if decision.get("decision_action") in CURRENT_DECISION_ACTIONS
  94. and decision.get("search_query_effect_status") in CURRENT_QUERY_EFFECT_STATUSES
  95. ]
  96. def _build_data_window(
  97. run_id: str,
  98. policy_run_id: str,
  99. final_output: dict[str, Any],
  100. ) -> dict[str, Any]:
  101. return {
  102. "scope": "single_run",
  103. "run_id": run_id,
  104. "policy_run_id": policy_run_id,
  105. "source_files": [
  106. "final_output.json",
  107. "search_clues.jsonl",
  108. "rule_decisions.jsonl",
  109. "discovered_content_items.jsonl",
  110. "source_path_records.jsonl",
  111. "walk_actions.jsonl",
  112. ],
  113. "policy_context": _policy_context(final_output),
  114. }
  115. def _policy_context(final_output: dict[str, Any]) -> dict[str, Any]:
  116. policy = final_output.get("policy") or {}
  117. walk_strategy = final_output.get("walk_strategy") or {}
  118. return {
  119. "policy_context_status": "available" if policy or walk_strategy else "missing",
  120. "strategy_version": policy.get("strategy_version"),
  121. "policy_bundle_id": policy.get("policy_bundle_id"),
  122. "rule_pack_id": policy.get("rule_pack_id"),
  123. "rule_pack_version": policy.get("rule_pack_version"),
  124. "walk_strategy_version": walk_strategy.get("walk_strategy_version"),
  125. "policy_bundle_hash": policy.get("policy_bundle_hash"),
  126. }
  127. def _build_metric_summary(
  128. final_output: dict[str, Any],
  129. search_clues: list[dict[str, Any]],
  130. decisions: list[dict[str, Any]],
  131. ) -> dict[str, Any]:
  132. summary = dict(final_output.get("summary") or {})
  133. status_counts = Counter(clue["search_query_effect_status"] for clue in search_clues)
  134. action_counts = Counter(decision["decision_action"] for decision in decisions)
  135. summary.update(
  136. {
  137. "search_query_count": len(search_clues),
  138. "success_query_count": status_counts["success"],
  139. "pending_query_count": status_counts["pending"],
  140. "failed_query_count": status_counts["failed"],
  141. "rule_blocked_query_count": status_counts["rule_blocked"],
  142. "pooled_decision_count": action_counts["ADD_TO_CONTENT_POOL"],
  143. "review_decision_count": action_counts["KEEP_CONTENT_FOR_REVIEW"],
  144. "rejected_decision_count": action_counts["REJECT_CONTENT"],
  145. }
  146. )
  147. return summary
  148. def _build_query_review(search_clues: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
  149. return {
  150. "effective_queries": _queries_with_status(search_clues, "success"),
  151. "review_queries": _queries_with_status(search_clues, "pending"),
  152. "failed_queries": _queries_with_status(search_clues, "failed"),
  153. "rule_blocked_queries": _queries_with_status(search_clues, "rule_blocked"),
  154. }
  155. def _queries_with_status(
  156. search_clues: list[dict[str, Any]],
  157. status: str,
  158. ) -> list[dict[str, Any]]:
  159. return [
  160. {
  161. "clue_id": clue.get("clue_id"),
  162. "search_query_id": clue.get("search_query_id"),
  163. "search_query": clue.get("search_query"),
  164. "search_query_effect_status": clue.get("search_query_effect_status"),
  165. "result_count": clue.get("result_count", 0),
  166. "pooled_content_count": clue.get("pooled_content_count", 0),
  167. "review_content_count": clue.get("review_content_count", 0),
  168. "rejected_content_count": clue.get("rejected_content_count", 0),
  169. }
  170. for clue in search_clues
  171. if clue.get("search_query_effect_status") == status
  172. ]
  173. def _build_rule_review(decisions: list[dict[str, Any]]) -> dict[str, Any]:
  174. reject_reasons = Counter(
  175. decision["decision_reason_code"]
  176. for decision in decisions
  177. if decision["decision_action"] == "REJECT_CONTENT"
  178. )
  179. return {
  180. "decision_distribution": dict(
  181. Counter(decision["decision_action"] for decision in decisions)
  182. ),
  183. "top_reject_reasons": [
  184. {"decision_reason_code": reason, "count": count}
  185. for reason, count in reject_reasons.most_common()
  186. ],
  187. "hard_gate_block_count": sum(
  188. 1
  189. for decision in decisions
  190. if decision.get("search_query_effect_status") == "rule_blocked"
  191. ),
  192. }
  193. def _build_walk_review(walk_actions: list[dict[str, Any]]) -> dict[str, Any]:
  194. status_counts = Counter(action.get("walk_status") for action in walk_actions)
  195. edge_counts = Counter(action.get("edge_id") for action in walk_actions)
  196. return {
  197. "walk_action_count": len(walk_actions),
  198. "walk_status_distribution": dict(status_counts),
  199. "edge_distribution": dict(edge_counts),
  200. }
  201. def _build_asset_review(
  202. final_output: dict[str, Any],
  203. search_clue_assets: list[dict[str, Any]],
  204. ) -> dict[str, Any]:
  205. return {
  206. "content_asset_count": len(final_output.get("content_assets") or []),
  207. "author_asset_count": len(final_output.get("author_assets") or []),
  208. "search_clue_assets": {
  209. "promoted_count": len(search_clue_assets),
  210. "assets": [
  211. {
  212. "search_clue_asset_id": asset["search_clue_asset_id"],
  213. "platform": asset["platform"],
  214. "clue_type": asset["clue_type"],
  215. "normalized_clue_text": asset["normalized_clue_text"],
  216. "can_seed_next_run": asset["can_seed_next_run"],
  217. }
  218. for asset in search_clue_assets
  219. ],
  220. },
  221. }
  222. def _build_search_clue_asset_promotions(
  223. run_id: str,
  224. policy_run_id: str,
  225. final_output: dict[str, Any],
  226. search_clues: list[dict[str, Any]],
  227. decisions: list[dict[str, Any]],
  228. discovered_content_items: list[dict[str, Any]],
  229. source_path_records: list[dict[str, Any]],
  230. performance_rows: list[dict[str, Any]],
  231. ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
  232. content_assets = final_output.get("content_assets") or []
  233. if not content_assets:
  234. return [], []
  235. decisions_by_id = {
  236. decision["decision_id"]: decision
  237. for decision in decisions
  238. if decision.get("decision_action") == "ADD_TO_CONTENT_POOL"
  239. }
  240. source_paths_by_id = {
  241. path["source_path_record_id"]: path
  242. for path in source_path_records
  243. if path.get("source_path_type") == "decision_to_asset"
  244. }
  245. discovered_by_content_id = {
  246. item["platform_content_id"]: item
  247. for item in discovered_content_items
  248. if item.get("platform_content_id")
  249. }
  250. discovered_by_discovery_id = {
  251. item["content_discovery_id"]: item
  252. for item in discovered_content_items
  253. if item.get("content_discovery_id")
  254. }
  255. feedback_refs = [
  256. row["feedback_id"]
  257. for row in performance_rows
  258. if row.get("feedback_id")
  259. ]
  260. assets: list[dict[str, Any]] = []
  261. evidence: list[dict[str, Any]] = []
  262. for clue in search_clues:
  263. if clue.get("search_query_effect_status") != "success":
  264. continue
  265. if int(clue.get("pooled_content_count") or 0) <= 0:
  266. continue
  267. normalized = _normalize_clue_text(clue.get("search_query"))
  268. if not normalized:
  269. continue
  270. matched = _matched_asset_lineage_for_clue(
  271. clue,
  272. content_assets,
  273. decisions_by_id,
  274. source_paths_by_id,
  275. discovered_by_content_id,
  276. discovered_by_discovery_id,
  277. )
  278. if not matched["source_path_record_ids"] or not matched["decision_ids"]:
  279. continue
  280. platform = matched["platform"] or "douyin"
  281. asset_id = _stable_id("search_clue_asset", platform, "search_query", normalized)
  282. asset = {
  283. "search_clue_asset_id": asset_id,
  284. "platform": platform,
  285. "clue_type": "search_query",
  286. "normalized_clue_text": normalized,
  287. "display_clue_text": clue.get("search_query"),
  288. "promotion_status": "promoted",
  289. "reusable_priority": int(clue.get("pooled_content_count") or 0),
  290. "can_seed_next_run": 1,
  291. "first_seen_run_id": run_id,
  292. "first_seen_policy_run_id": policy_run_id,
  293. "summary_metrics": {
  294. "result_count": clue.get("result_count", 0),
  295. "pooled_content_count": clue.get("pooled_content_count", 0),
  296. "review_content_count": clue.get("review_content_count", 0),
  297. "failed_content_count": clue.get("rejected_content_count", 0),
  298. "matched_content_asset_count": len(matched["content_asset_ids"]),
  299. },
  300. }
  301. asset["raw_payload"] = {
  302. **asset,
  303. "promotion_reason": "success_search_clue_with_content_asset_lineage",
  304. }
  305. assets.append(asset)
  306. evidence_row = {
  307. "evidence_id": _stable_id("search_clue_evidence", run_id, policy_run_id, clue["clue_id"]),
  308. "search_clue_asset_id": asset_id,
  309. "run_id": run_id,
  310. "policy_run_id": policy_run_id,
  311. "clue_id": clue["clue_id"],
  312. "search_query_id": clue.get("search_query_id"),
  313. "pooled_content_count": int(clue.get("pooled_content_count") or 0),
  314. "review_content_count": int(clue.get("review_content_count") or 0),
  315. "failed_content_count": int(clue.get("rejected_content_count") or 0),
  316. "source_path_record_ids": matched["source_path_record_ids"],
  317. "decision_ids": matched["decision_ids"],
  318. "performance_feedback_refs": feedback_refs,
  319. }
  320. evidence_row["raw_payload"] = dict(evidence_row)
  321. evidence.append(evidence_row)
  322. return assets, evidence
  323. def _matched_asset_lineage_for_clue(
  324. clue: dict[str, Any],
  325. content_assets: list[dict[str, Any]],
  326. decisions_by_id: dict[str, dict[str, Any]],
  327. source_paths_by_id: dict[str, dict[str, Any]],
  328. discovered_by_content_id: dict[str, dict[str, Any]],
  329. discovered_by_discovery_id: dict[str, dict[str, Any]],
  330. ) -> dict[str, Any]:
  331. clue_query_id = clue.get("search_query_id")
  332. result = {
  333. "platform": None,
  334. "content_asset_ids": [],
  335. "source_path_record_ids": [],
  336. "decision_ids": [],
  337. }
  338. for asset in content_assets:
  339. decision_id = asset.get("decision_id") or next(iter(asset.get("decision_ids", [])), None)
  340. decision = decisions_by_id.get(decision_id)
  341. if not decision:
  342. continue
  343. if clue_query_id not in _asset_query_ids(asset, decision, discovered_by_content_id, discovered_by_discovery_id):
  344. continue
  345. path_ids = _matched_decision_to_asset_path_ids(asset, decision, source_paths_by_id)
  346. if not path_ids:
  347. continue
  348. result["platform"] = result["platform"] or asset.get("platform")
  349. result["content_asset_ids"].append(
  350. asset.get("content_asset_id") or asset.get("platform_content_id")
  351. )
  352. result["source_path_record_ids"].extend(path_ids)
  353. result["decision_ids"].append(decision_id)
  354. return {
  355. **result,
  356. "content_asset_ids": sorted({value for value in result["content_asset_ids"] if value}),
  357. "source_path_record_ids": sorted(set(result["source_path_record_ids"])),
  358. "decision_ids": sorted(set(result["decision_ids"])),
  359. }
  360. def _asset_query_ids(
  361. asset: dict[str, Any],
  362. decision: dict[str, Any],
  363. discovered_by_content_id: dict[str, dict[str, Any]],
  364. discovered_by_discovery_id: dict[str, dict[str, Any]],
  365. ) -> set[str]:
  366. item = (
  367. discovered_by_discovery_id.get(asset.get("content_discovery_id"))
  368. or discovered_by_content_id.get(asset.get("platform_content_id"))
  369. or {}
  370. )
  371. evidence = decision.get("source_evidence") or {}
  372. asset_evidence = asset.get("source_evidence") or {}
  373. values = {
  374. item.get("search_query_id"),
  375. evidence.get("search_query_id"),
  376. asset_evidence.get("search_query_id"),
  377. }
  378. values.update(evidence.get("matched_search_query_ids") or [])
  379. values.update(asset_evidence.get("matched_search_query_ids") or [])
  380. return {value for value in values if value}
  381. def _matched_decision_to_asset_path_ids(
  382. asset: dict[str, Any],
  383. decision: dict[str, Any],
  384. source_paths_by_id: dict[str, dict[str, Any]],
  385. ) -> list[str]:
  386. asset_path_ids = set(asset.get("source_path_record_ids") or [])
  387. asset_node_ids = {
  388. asset.get("content_asset_id"),
  389. asset.get("platform_content_id"),
  390. }
  391. matched = []
  392. for path_id in asset_path_ids:
  393. path = source_paths_by_id.get(path_id)
  394. if not path:
  395. continue
  396. if path.get("decision_id") != decision.get("decision_id"):
  397. continue
  398. if path.get("from_node_id") != decision.get("decision_id"):
  399. continue
  400. if path.get("to_node_id") not in asset_node_ids:
  401. continue
  402. matched.append(path_id)
  403. return matched
  404. def _normalize_clue_text(value: Any) -> str:
  405. if not isinstance(value, str):
  406. return ""
  407. return " ".join(value.strip().lower().split())
  408. def _stable_id(prefix: str, *parts: str) -> str:
  409. digest = hashlib.sha1("|".join(parts).encode("utf-8")).hexdigest()[:16]
  410. return f"{prefix}_{digest}"
  411. def _build_performance_feedback_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
  412. if not rows:
  413. return {
  414. "performance_feedback_status": "missing",
  415. "feedback_source": "none",
  416. "feedback_count": 0,
  417. }
  418. status_counts = Counter(row.get("feedback_status", "available") for row in rows)
  419. return {
  420. "performance_feedback_status": "available",
  421. "feedback_count": len(rows),
  422. "feedback_status_distribution": dict(status_counts),
  423. "average_completion_rate": _average_metric(rows, "completion_rate"),
  424. "average_share_rate": _average_metric(rows, "share_rate"),
  425. "average_watch_seconds": _average_metric(rows, "average_watch_seconds"),
  426. }
  427. def _average_metric(rows: list[dict[str, Any]], key: str) -> float | None:
  428. values = [row.get(key) for row in rows if row.get(key) is not None]
  429. if not values:
  430. return None
  431. return sum(float(value) for value in values) / len(values)
  432. def _build_productive_paths(source_path_records: list[dict[str, Any]]) -> list[dict[str, Any]]:
  433. return [
  434. {
  435. "source_path_record_id": path["source_path_record_id"],
  436. "from": f"{path['from_node_type']}:{path['from_node_id']}",
  437. "to": f"{path['to_node_type']}:{path['to_node_id']}",
  438. }
  439. for path in source_path_records
  440. if path["source_path_type"] == "decision_to_asset"
  441. ]
  442. def _build_recommendations(
  443. summary: dict[str, Any],
  444. query_review: dict[str, list[dict[str, Any]]],
  445. reject_reasons: list[dict[str, Any]],
  446. performance_feedback: dict[str, Any],
  447. ) -> list[dict[str, Any]]:
  448. recommendations: list[dict[str, Any]] = []
  449. if summary.get("pooled_content_count", 0) > 0:
  450. recommendations.append(
  451. _recommendation(
  452. "rec_query_keep_success",
  453. "query_strategy",
  454. "search_query",
  455. "success_queries",
  456. "keep",
  457. "本次已有搜索词产生 ADD_TO_CONTENT_POOL 结果。",
  458. [
  459. f"search_clues.jsonl:{item['clue_id']}"
  460. for item in query_review["effective_queries"]
  461. if item.get("clue_id")
  462. ],
  463. )
  464. )
  465. if query_review["review_queries"]:
  466. recommendations.append(
  467. _recommendation(
  468. "rec_query_review_budget",
  469. "query_strategy",
  470. "search_query",
  471. "pending_queries",
  472. "review_with_small_budget",
  473. "存在只产生 KEEP_CONTENT_FOR_REVIEW 的搜索词。",
  474. [
  475. f"search_clues.jsonl:{item['clue_id']}"
  476. for item in query_review["review_queries"]
  477. if item.get("clue_id")
  478. ],
  479. )
  480. )
  481. if reject_reasons:
  482. reason = reject_reasons[0]["decision_reason_code"]
  483. recommendations.append(
  484. _recommendation(
  485. "rec_rule_fix_reject_reason",
  486. "rule_evidence",
  487. "decision_reason_code",
  488. reason,
  489. "inspect_reject_evidence",
  490. f"最高频淘汰原因是 {reason}。",
  491. [f"rule_decisions.jsonl:{reason}"],
  492. )
  493. )
  494. if performance_feedback["performance_feedback_status"] == "missing":
  495. recommendations.append(
  496. _recommendation(
  497. "rec_performance_feedback_missing",
  498. "performance_feedback",
  499. "run",
  500. "performance_feedback",
  501. "keep_feedback_empty",
  502. "当前没有后验表现反馈,先不据此修改策略。",
  503. [],
  504. )
  505. )
  506. else:
  507. recommendations.append(
  508. _recommendation(
  509. "rec_performance_feedback_review",
  510. "performance_feedback",
  511. "run",
  512. "performance_feedback",
  513. "review_feedback_before_strategy_change",
  514. "已有后验表现反馈,只作为下一轮建议证据,不覆盖本轮规则判断。",
  515. ["performance_feedback"],
  516. )
  517. )
  518. return recommendations
  519. def _recommendation(
  520. recommendation_id: str,
  521. recommendation_type: str,
  522. target_type: str,
  523. target_id: str,
  524. suggested_action: str,
  525. basis: str,
  526. evidence_refs: list[str],
  527. ) -> dict[str, Any]:
  528. return {
  529. "recommendation_id": recommendation_id,
  530. "recommendation_type": recommendation_type,
  531. "target_type": target_type,
  532. "target_id": target_id,
  533. "suggested_action": suggested_action,
  534. "basis": basis,
  535. "evidence_refs": evidence_refs,
  536. "requires_human_approval": True,
  537. }
  538. def _build_suggestions(
  539. summary: dict[str, Any],
  540. search_clues: list[dict[str, Any]],
  541. reject_reasons: Counter[str],
  542. ) -> list[dict[str, str]]:
  543. query_review = _build_query_review(_current_contract_search_clues(search_clues))
  544. recommendations = _build_recommendations(
  545. summary,
  546. query_review,
  547. [
  548. {"decision_reason_code": reason, "count": count}
  549. for reason, count in reject_reasons.most_common()
  550. ],
  551. _build_performance_feedback_summary([]),
  552. )
  553. return [
  554. {"suggestion": item["suggested_action"], "basis": item["basis"]}
  555. for item in recommendations
  556. ]