evaluator.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. from __future__ import annotations
  2. from typing import Any
  3. from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
  4. from content_agent.record_payload import with_raw_payload
  5. HARD_GATE_REASON_CATEGORY = "hard_gate"
  6. SCORE_REASON_CATEGORY = "score_or_data_failed"
  7. REVIEW_REASON_CATEGORY = "review_needed"
  8. SUCCESS_REASON_CATEGORY = "score_pass"
  9. def decide(
  10. run_id: str,
  11. policy_run_id: str,
  12. index: int,
  13. bundle: dict[str, Any],
  14. policy_bundle: dict[str, Any],
  15. ) -> dict[str, Any]:
  16. rule_pack = policy_bundle["rule_pack"]
  17. matched_gates = _evaluate_hard_gates(bundle, rule_pack.get("hard_gates", []))
  18. blocking_gates = [
  19. gate for gate in matched_gates if gate.get("severity") == "fatal" or gate.get("stop_scoring")
  20. ]
  21. triggered_blocking_rules = [gate["gate_id"] for gate in blocking_gates]
  22. if blocking_gates:
  23. primary_gate = _primary_hard_gate(blocking_gates)
  24. effect = _effect_status_for_decision(
  25. policy_bundle,
  26. primary_gate["decision_action"],
  27. HARD_GATE_REASON_CATEGORY,
  28. is_hard_gate=True,
  29. )
  30. return _build_decision(
  31. run_id=run_id,
  32. policy_run_id=policy_run_id,
  33. index=index,
  34. bundle=bundle,
  35. policy_bundle=policy_bundle,
  36. decision_action=primary_gate["decision_action"],
  37. decision_reason_code=primary_gate["decision_reason_code"],
  38. search_query_effect_status=effect["content_effect_status"],
  39. triggered_blocking_rules=triggered_blocking_rules,
  40. scorecard=_scorecard(None, rule_pack.get("scorecard", {}), missing=True),
  41. score=None,
  42. replay_marker={
  43. "matched_gates": [_gate_replay(gate) for gate in matched_gates],
  44. "primary_gate_id": primary_gate["gate_id"],
  45. "primary_reason_code": primary_gate["decision_reason_code"],
  46. "primary_gate_priority": primary_gate.get("priority"),
  47. "effect_mapping_id": effect.get("mapping_id"),
  48. "reason_category": HARD_GATE_REASON_CATEGORY,
  49. },
  50. )
  51. scorecard = _scorecard_total(bundle, rule_pack.get("scorecard", {}))
  52. score = scorecard.get("total_score")
  53. if score is None or _scorecard_all_dimensions_missing(scorecard):
  54. return _missing_score_decision(
  55. run_id,
  56. policy_run_id,
  57. index,
  58. bundle,
  59. policy_bundle,
  60. matched_gates,
  61. scorecard,
  62. )
  63. threshold = _match_threshold(score, rule_pack.get("thresholds", []))
  64. if threshold is None:
  65. raise ValueError(f"no threshold matched score: {score}")
  66. decision_action = threshold["decision_action"]
  67. decision_reason_code = threshold["decision_reason_code"]
  68. reason_category = _reason_category_for_threshold(decision_action, decision_reason_code)
  69. effect = _effect_status_for_decision(
  70. policy_bundle, decision_action, reason_category, is_hard_gate=False
  71. )
  72. return _build_decision(
  73. run_id=run_id,
  74. policy_run_id=policy_run_id,
  75. index=index,
  76. bundle=bundle,
  77. policy_bundle=policy_bundle,
  78. decision_action=decision_action,
  79. decision_reason_code=decision_reason_code,
  80. search_query_effect_status=effect["content_effect_status"],
  81. triggered_blocking_rules=triggered_blocking_rules,
  82. scorecard=scorecard,
  83. score=score,
  84. replay_marker={
  85. "matched_gates": [_gate_replay(gate) for gate in matched_gates],
  86. "matched_threshold": _threshold_label(threshold),
  87. "matched_threshold_priority": threshold.get("priority"),
  88. "effect_mapping_id": effect.get("mapping_id"),
  89. "reason_category": reason_category,
  90. "matched_scoring_rules": scorecard.get("matched_scoring_rules", []),
  91. },
  92. )
  93. def _build_decision(
  94. run_id: str,
  95. policy_run_id: str,
  96. index: int,
  97. bundle: dict[str, Any],
  98. policy_bundle: dict[str, Any],
  99. decision_action: str,
  100. decision_reason_code: str,
  101. search_query_effect_status: str,
  102. triggered_blocking_rules: list[str],
  103. scorecard: dict[str, Any],
  104. score: Any,
  105. replay_marker: dict[str, Any],
  106. ) -> dict[str, Any]:
  107. content = bundle["content"]
  108. decision = {
  109. "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  110. "run_id": run_id,
  111. "policy_run_id": policy_run_id,
  112. "decision_id": f"d_{index:03d}",
  113. "policy_bundle_id": policy_bundle["policy_bundle_id"],
  114. "rule_pack_id": policy_bundle["rule_pack_id"],
  115. "rule_pack_version": policy_bundle["rule_pack_version"],
  116. "strategy_id": policy_bundle["strategy_id"],
  117. "strategy_version": policy_bundle["strategy_version"],
  118. "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
  119. "dispatch_id": policy_bundle.get("dispatch_id"),
  120. "decision_target_type": content["decision_target_type"],
  121. "decision_target_id": content["decision_target_id"],
  122. "triggered_blocking_rules": triggered_blocking_rules,
  123. "scorecard": scorecard,
  124. "score": score,
  125. "decision_action": decision_action,
  126. "decision_reason_code": decision_reason_code,
  127. "search_query_effect_status": search_query_effect_status,
  128. "source_evidence": bundle["source_evidence"],
  129. "decision_input_snapshot_id": bundle["run_context"]["decision_input_snapshot_id"],
  130. "decision_evidence_refs": _evidence_refs(policy_bundle["rule_pack"]),
  131. "decision_replay_data": {
  132. "policy_run_id": policy_run_id,
  133. "policy_bundle_id": policy_bundle["policy_bundle_id"],
  134. "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
  135. "rule_package_id": policy_bundle.get("rule_package_id"),
  136. "rule_pack_id": policy_bundle["rule_pack_id"],
  137. "rule_pack_version": policy_bundle["rule_pack_version"],
  138. "dispatch_id": policy_bundle.get("dispatch_id"),
  139. "runtime_stage": policy_bundle.get("runtime_stage"),
  140. "strategy_version": policy_bundle["strategy_version"],
  141. "runtime_record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  142. "decision_evidence_refs": _evidence_refs(policy_bundle["rule_pack"]),
  143. "missing_dimensions": [
  144. row["key"] for row in scorecard.get("dimensions", []) if row.get("score_missing")
  145. ],
  146. **replay_marker,
  147. },
  148. }
  149. return with_raw_payload(decision)
  150. def _evaluate_hard_gates(bundle: dict[str, Any], gates: list[dict[str, Any]]) -> list[dict[str, Any]]:
  151. return [gate for gate in gates if _condition_matches(bundle, gate.get("when", {}))]
  152. def _primary_hard_gate(gates: list[dict[str, Any]]) -> dict[str, Any]:
  153. return sorted(gates, key=lambda gate: (gate.get("priority", 999), gate.get("gate_id", "")))[0]
  154. def _condition_matches(bundle: dict[str, Any], condition: dict[str, Any]) -> bool:
  155. value = _get_path(bundle, condition.get("field", ""))
  156. op = condition.get("op")
  157. expected = condition.get("value")
  158. if op == "is_empty":
  159. return _is_empty(value)
  160. if op == "in":
  161. return value in (expected or [])
  162. if op == "not_in":
  163. return value not in (expected or [])
  164. if op == "eq":
  165. return value == expected
  166. if op == "gte":
  167. return value is not None and value >= expected
  168. if op == "lt":
  169. return value is not None and value < expected
  170. if op == "lte":
  171. return value is not None and value <= expected
  172. raise ValueError(f"unsupported rule operator: {op}")
  173. def _match_threshold(score: int | float | None, thresholds: list[dict[str, Any]]) -> dict[str, Any] | None:
  174. if score is None:
  175. return None
  176. for threshold in thresholds:
  177. min_score = threshold.get("min_score")
  178. max_score = threshold.get("max_score")
  179. if min_score is not None and score < min_score:
  180. continue
  181. if max_score is not None and score > max_score:
  182. continue
  183. return threshold
  184. return None
  185. def _get_path(data: dict[str, Any], path: str) -> Any:
  186. current: Any = data
  187. for part in path.split("."):
  188. if not part:
  189. return None
  190. if not isinstance(current, dict):
  191. return None
  192. current = current.get(part)
  193. if current is None:
  194. return None
  195. return current
  196. def _is_empty(value: Any) -> bool:
  197. return value is None or value == "" or value == [] or value == {}
  198. def _threshold_label(threshold: dict[str, Any] | None) -> str:
  199. if not threshold:
  200. return "unknown"
  201. min_score = threshold.get("min_score")
  202. max_score = threshold.get("max_score")
  203. if min_score is not None and max_score is not None:
  204. return f"{min_score}<=score<={max_score}"
  205. if min_score is not None:
  206. return f"score>={min_score}"
  207. if max_score is not None:
  208. return f"score<={max_score}"
  209. return "unknown"
  210. def _scorecard(score: Any, config: dict[str, Any], missing: bool = False) -> dict[str, Any]:
  211. return {
  212. "total_score": score,
  213. "dimensions": [
  214. {
  215. "key": dimension.get("key"),
  216. "max_score": dimension.get("max_score"),
  217. "score": None,
  218. "evidence_paths": dimension.get("evidence_paths", []),
  219. }
  220. for dimension in config.get("dimensions", [])
  221. if dimension.get("runtime_status") == "active"
  222. ],
  223. "matched_scoring_rules": [],
  224. "score_missing": missing,
  225. }
  226. def _scorecard_total(bundle: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
  227. dimensions = [
  228. dimension for dimension in config.get("dimensions", []) if dimension.get("runtime_status") == "active"
  229. ]
  230. rules = [rule for rule in config.get("scoring_rules", []) if rule.get("enabled", True)]
  231. if dimensions and not rules:
  232. raise ValueError("active scorecard dimensions require scorecard.scoring_rules")
  233. dimension_rows = []
  234. matched_rules = []
  235. total_score = 0
  236. for dimension in dimensions:
  237. score, rule = _score_dimension(bundle, dimension, rules)
  238. total_score += score
  239. if rule:
  240. matched_rules.append(rule["scoring_rule_id"])
  241. dimension_rows.append(
  242. {
  243. "key": dimension.get("key"),
  244. "max_score": dimension.get("max_score"),
  245. "score": score,
  246. "matched_rule_id": rule.get("scoring_rule_id") if rule else None,
  247. "score_missing": rule is None,
  248. "evidence_paths": dimension.get("evidence_paths", []),
  249. }
  250. )
  251. return {
  252. "total_score": total_score if dimensions else None,
  253. "dimensions": dimension_rows,
  254. "matched_scoring_rules": matched_rules,
  255. "score_missing": False,
  256. }
  257. def _scorecard_has_any_evidence(scorecard: dict[str, Any]) -> bool:
  258. return any(not row.get("score_missing") for row in scorecard.get("dimensions", []))
  259. def _scorecard_all_dimensions_missing(scorecard: dict[str, Any]) -> bool:
  260. return not _scorecard_has_any_evidence(scorecard)
  261. def _score_dimension(
  262. bundle: dict[str, Any], dimension: dict[str, Any], rules: list[dict[str, Any]]
  263. ) -> tuple[int | float, dict[str, Any] | None]:
  264. dimension_rules = sorted(
  265. [rule for rule in rules if rule.get("dimension_key") == dimension.get("key")],
  266. key=lambda rule: (rule.get("priority", 999), rule.get("scoring_rule_id", "")),
  267. )
  268. if not dimension_rules:
  269. raise ValueError(f"active scorecard dimension lacks scoring rules: {dimension.get('key')}")
  270. for rule in dimension_rules:
  271. condition = {
  272. "field": rule.get("field_path", ""),
  273. "op": rule.get("operator"),
  274. "value": rule.get("expected_value"),
  275. }
  276. if _condition_matches(bundle, condition):
  277. return rule.get("score_value", 0), rule
  278. return 0, None
  279. def _missing_score_decision(
  280. run_id: str,
  281. policy_run_id: str,
  282. index: int,
  283. bundle: dict[str, Any],
  284. policy_bundle: dict[str, Any],
  285. matched_gates: list[dict[str, Any]],
  286. scorecard: dict[str, Any],
  287. ) -> dict[str, Any]:
  288. missing_policy = policy_bundle["rule_pack"].get("scorecard", {}).get("score_missing_policy", {})
  289. decision_action = missing_policy.get("decision_action", "REJECT_CONTENT")
  290. decision_reason_code = missing_policy.get("decision_reason_code", "missing_score")
  291. effect = _effect_status_for_decision(
  292. policy_bundle, decision_action, SCORE_REASON_CATEGORY, is_hard_gate=False
  293. )
  294. return _build_decision(
  295. run_id=run_id,
  296. policy_run_id=policy_run_id,
  297. index=index,
  298. bundle=bundle,
  299. policy_bundle=policy_bundle,
  300. decision_action=decision_action,
  301. decision_reason_code=decision_reason_code,
  302. search_query_effect_status=effect["content_effect_status"],
  303. triggered_blocking_rules=[],
  304. scorecard={**scorecard, "score_missing": True},
  305. score=None,
  306. replay_marker={
  307. "matched_gates": [_gate_replay(gate) for gate in matched_gates],
  308. "score_missing_policy": missing_policy,
  309. "effect_mapping_id": effect.get("mapping_id"),
  310. "reason_category": SCORE_REASON_CATEGORY,
  311. },
  312. )
  313. def _effect_status_for_decision(
  314. policy_bundle: dict[str, Any],
  315. decision_action: str,
  316. reason_category: str,
  317. is_hard_gate: bool,
  318. ) -> dict[str, Any]:
  319. candidates = [
  320. mapping
  321. for mapping in policy_bundle.get("effect_status_mapping", [])
  322. if mapping.get("enabled", True)
  323. and mapping.get("target_level") == "content"
  324. and mapping.get("decision_action") == decision_action
  325. and mapping.get("is_hard_gate") is is_hard_gate
  326. ]
  327. exact = [mapping for mapping in candidates if mapping.get("reason_category") == reason_category]
  328. matches = exact or candidates
  329. if not matches:
  330. raise ValueError(f"effect mapping not found for {decision_action}/{reason_category}")
  331. return sorted(matches, key=lambda mapping: (mapping.get("priority", 999), mapping.get("mapping_id", "")))[0]
  332. def _reason_category_for_threshold(decision_action: str, decision_reason_code: str) -> str:
  333. if decision_action == "ADD_TO_CONTENT_POOL":
  334. return SUCCESS_REASON_CATEGORY
  335. if decision_action == "KEEP_CONTENT_FOR_REVIEW":
  336. return REVIEW_REASON_CATEGORY
  337. if decision_reason_code == "missing_score":
  338. return SCORE_REASON_CATEGORY
  339. return SCORE_REASON_CATEGORY
  340. def _gate_replay(gate: dict[str, Any]) -> dict[str, Any]:
  341. return {
  342. "gate_id": gate.get("gate_id"),
  343. "decision_reason_code": gate.get("decision_reason_code"),
  344. "priority": gate.get("priority"),
  345. "severity": gate.get("severity"),
  346. "stop_scoring": gate.get("stop_scoring"),
  347. }
  348. def _evidence_refs(rule_pack: dict[str, Any]) -> list[str]:
  349. refs = ["source_evidence"]
  350. for field in rule_pack.get("input_contract", {}).get("required_fields", []):
  351. if field not in refs:
  352. refs.append(field)
  353. return refs