| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- from __future__ import annotations
- from typing import Any
- from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
- from content_agent.record_payload import with_raw_payload
- HARD_GATE_REASON_CATEGORY = "hard_gate"
- SCORE_REASON_CATEGORY = "score_or_data_failed"
- REVIEW_REASON_CATEGORY = "review_needed"
- SUCCESS_REASON_CATEGORY = "score_pass"
- def decide(
- run_id: str,
- policy_run_id: str,
- index: int,
- bundle: dict[str, Any],
- policy_bundle: dict[str, Any],
- ) -> dict[str, Any]:
- rule_pack = policy_bundle["rule_pack"]
- matched_gates = _evaluate_hard_gates(bundle, rule_pack.get("hard_gates", []))
- blocking_gates = [
- gate for gate in matched_gates if gate.get("severity") == "fatal" or gate.get("stop_scoring")
- ]
- triggered_blocking_rules = [gate["gate_id"] for gate in blocking_gates]
- if blocking_gates:
- primary_gate = _primary_hard_gate(blocking_gates)
- effect = _effect_status_for_decision(
- policy_bundle,
- primary_gate["decision_action"],
- HARD_GATE_REASON_CATEGORY,
- is_hard_gate=True,
- )
- return _build_decision(
- run_id=run_id,
- policy_run_id=policy_run_id,
- index=index,
- bundle=bundle,
- policy_bundle=policy_bundle,
- decision_action=primary_gate["decision_action"],
- decision_reason_code=primary_gate["decision_reason_code"],
- search_query_effect_status=effect["content_effect_status"],
- triggered_blocking_rules=triggered_blocking_rules,
- scorecard=_scorecard(None, rule_pack.get("scorecard", {}), missing=True),
- score=None,
- replay_marker={
- "matched_gates": [_gate_replay(gate) for gate in matched_gates],
- "primary_gate_id": primary_gate["gate_id"],
- "primary_reason_code": primary_gate["decision_reason_code"],
- "primary_gate_priority": primary_gate.get("priority"),
- "effect_mapping_id": effect.get("mapping_id"),
- "reason_category": HARD_GATE_REASON_CATEGORY,
- },
- )
- scorecard = _scorecard_total(bundle, rule_pack.get("scorecard", {}))
- score = scorecard.get("total_score")
- if score is None or _scorecard_all_dimensions_missing(scorecard):
- return _missing_score_decision(
- run_id,
- policy_run_id,
- index,
- bundle,
- policy_bundle,
- matched_gates,
- scorecard,
- )
- threshold = _match_threshold(score, rule_pack.get("thresholds", []))
- if threshold is None:
- raise ValueError(f"no threshold matched score: {score}")
- decision_action = threshold["decision_action"]
- decision_reason_code = threshold["decision_reason_code"]
- reason_category = _reason_category_for_threshold(decision_action, decision_reason_code)
- effect = _effect_status_for_decision(
- policy_bundle, decision_action, reason_category, is_hard_gate=False
- )
- return _build_decision(
- run_id=run_id,
- policy_run_id=policy_run_id,
- index=index,
- bundle=bundle,
- policy_bundle=policy_bundle,
- decision_action=decision_action,
- decision_reason_code=decision_reason_code,
- search_query_effect_status=effect["content_effect_status"],
- triggered_blocking_rules=triggered_blocking_rules,
- scorecard=scorecard,
- score=score,
- replay_marker={
- "matched_gates": [_gate_replay(gate) for gate in matched_gates],
- "matched_threshold": _threshold_label(threshold),
- "matched_threshold_priority": threshold.get("priority"),
- "effect_mapping_id": effect.get("mapping_id"),
- "reason_category": reason_category,
- "matched_scoring_rules": scorecard.get("matched_scoring_rules", []),
- },
- )
- def _build_decision(
- run_id: str,
- policy_run_id: str,
- index: int,
- bundle: dict[str, Any],
- policy_bundle: dict[str, Any],
- decision_action: str,
- decision_reason_code: str,
- search_query_effect_status: str,
- triggered_blocking_rules: list[str],
- scorecard: dict[str, Any],
- score: Any,
- replay_marker: dict[str, Any],
- ) -> dict[str, Any]:
- content = bundle["content"]
- decision = {
- "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "run_id": run_id,
- "policy_run_id": policy_run_id,
- "decision_id": f"d_{index:03d}",
- "policy_bundle_id": policy_bundle["policy_bundle_id"],
- "rule_pack_id": policy_bundle["rule_pack_id"],
- "rule_pack_version": policy_bundle["rule_pack_version"],
- "strategy_id": policy_bundle["strategy_id"],
- "strategy_version": policy_bundle["strategy_version"],
- "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
- "dispatch_id": policy_bundle.get("dispatch_id"),
- "decision_target_type": content["decision_target_type"],
- "decision_target_id": content["decision_target_id"],
- "triggered_blocking_rules": triggered_blocking_rules,
- "scorecard": scorecard,
- "score": score,
- "decision_action": decision_action,
- "decision_reason_code": decision_reason_code,
- "search_query_effect_status": search_query_effect_status,
- "source_evidence": bundle["source_evidence"],
- "decision_input_snapshot_id": bundle["run_context"]["decision_input_snapshot_id"],
- "decision_evidence_refs": _evidence_refs(policy_bundle["rule_pack"]),
- "decision_replay_data": {
- "policy_run_id": policy_run_id,
- "policy_bundle_id": policy_bundle["policy_bundle_id"],
- "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
- "rule_package_id": policy_bundle.get("rule_package_id"),
- "rule_pack_id": policy_bundle["rule_pack_id"],
- "rule_pack_version": policy_bundle["rule_pack_version"],
- "dispatch_id": policy_bundle.get("dispatch_id"),
- "runtime_stage": policy_bundle.get("runtime_stage"),
- "strategy_version": policy_bundle["strategy_version"],
- "runtime_record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
- "decision_evidence_refs": _evidence_refs(policy_bundle["rule_pack"]),
- "missing_dimensions": [
- row["key"] for row in scorecard.get("dimensions", []) if row.get("score_missing")
- ],
- **replay_marker,
- },
- }
- return with_raw_payload(decision)
- def _evaluate_hard_gates(bundle: dict[str, Any], gates: list[dict[str, Any]]) -> list[dict[str, Any]]:
- return [gate for gate in gates if _condition_matches(bundle, gate.get("when", {}))]
- def _primary_hard_gate(gates: list[dict[str, Any]]) -> dict[str, Any]:
- return sorted(gates, key=lambda gate: (gate.get("priority", 999), gate.get("gate_id", "")))[0]
- def _condition_matches(bundle: dict[str, Any], condition: dict[str, Any]) -> bool:
- value = _get_path(bundle, condition.get("field", ""))
- op = condition.get("op")
- expected = condition.get("value")
- if op == "is_empty":
- return _is_empty(value)
- if op == "in":
- return value in (expected or [])
- if op == "not_in":
- return value not in (expected or [])
- if op == "eq":
- return value == expected
- if op == "gte":
- return value is not None and value >= expected
- if op == "lt":
- return value is not None and value < expected
- if op == "lte":
- return value is not None and value <= expected
- raise ValueError(f"unsupported rule operator: {op}")
- def _match_threshold(score: int | float | None, thresholds: list[dict[str, Any]]) -> dict[str, Any] | None:
- if score is None:
- return None
- for threshold in thresholds:
- min_score = threshold.get("min_score")
- max_score = threshold.get("max_score")
- if min_score is not None and score < min_score:
- continue
- if max_score is not None and score > max_score:
- continue
- return threshold
- return None
- def _get_path(data: dict[str, Any], path: str) -> Any:
- current: Any = data
- for part in path.split("."):
- if not part:
- return None
- if not isinstance(current, dict):
- return None
- current = current.get(part)
- if current is None:
- return None
- return current
- def _is_empty(value: Any) -> bool:
- return value is None or value == "" or value == [] or value == {}
- def _threshold_label(threshold: dict[str, Any] | None) -> str:
- if not threshold:
- return "unknown"
- min_score = threshold.get("min_score")
- max_score = threshold.get("max_score")
- if min_score is not None and max_score is not None:
- return f"{min_score}<=score<={max_score}"
- if min_score is not None:
- return f"score>={min_score}"
- if max_score is not None:
- return f"score<={max_score}"
- return "unknown"
- def _scorecard(score: Any, config: dict[str, Any], missing: bool = False) -> dict[str, Any]:
- return {
- "total_score": score,
- "dimensions": [
- {
- "key": dimension.get("key"),
- "max_score": dimension.get("max_score"),
- "score": None,
- "evidence_paths": dimension.get("evidence_paths", []),
- }
- for dimension in config.get("dimensions", [])
- if dimension.get("runtime_status") == "active"
- ],
- "matched_scoring_rules": [],
- "score_missing": missing,
- }
- def _scorecard_total(bundle: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
- dimensions = [
- dimension for dimension in config.get("dimensions", []) if dimension.get("runtime_status") == "active"
- ]
- rules = [rule for rule in config.get("scoring_rules", []) if rule.get("enabled", True)]
- if dimensions and not rules:
- raise ValueError("active scorecard dimensions require scorecard.scoring_rules")
- dimension_rows = []
- matched_rules = []
- total_score = 0
- for dimension in dimensions:
- score, rule = _score_dimension(bundle, dimension, rules)
- total_score += score
- if rule:
- matched_rules.append(rule["scoring_rule_id"])
- dimension_rows.append(
- {
- "key": dimension.get("key"),
- "max_score": dimension.get("max_score"),
- "score": score,
- "matched_rule_id": rule.get("scoring_rule_id") if rule else None,
- "score_missing": rule is None,
- "evidence_paths": dimension.get("evidence_paths", []),
- }
- )
- return {
- "total_score": total_score if dimensions else None,
- "dimensions": dimension_rows,
- "matched_scoring_rules": matched_rules,
- "score_missing": False,
- }
- def _scorecard_has_any_evidence(scorecard: dict[str, Any]) -> bool:
- return any(not row.get("score_missing") for row in scorecard.get("dimensions", []))
- def _scorecard_all_dimensions_missing(scorecard: dict[str, Any]) -> bool:
- return not _scorecard_has_any_evidence(scorecard)
- def _score_dimension(
- bundle: dict[str, Any], dimension: dict[str, Any], rules: list[dict[str, Any]]
- ) -> tuple[int | float, dict[str, Any] | None]:
- dimension_rules = sorted(
- [rule for rule in rules if rule.get("dimension_key") == dimension.get("key")],
- key=lambda rule: (rule.get("priority", 999), rule.get("scoring_rule_id", "")),
- )
- if not dimension_rules:
- raise ValueError(f"active scorecard dimension lacks scoring rules: {dimension.get('key')}")
- for rule in dimension_rules:
- condition = {
- "field": rule.get("field_path", ""),
- "op": rule.get("operator"),
- "value": rule.get("expected_value"),
- }
- if _condition_matches(bundle, condition):
- return rule.get("score_value", 0), rule
- return 0, None
- def _missing_score_decision(
- run_id: str,
- policy_run_id: str,
- index: int,
- bundle: dict[str, Any],
- policy_bundle: dict[str, Any],
- matched_gates: list[dict[str, Any]],
- scorecard: dict[str, Any],
- ) -> dict[str, Any]:
- missing_policy = policy_bundle["rule_pack"].get("scorecard", {}).get("score_missing_policy", {})
- decision_action = missing_policy.get("decision_action", "REJECT_CONTENT")
- decision_reason_code = missing_policy.get("decision_reason_code", "missing_score")
- effect = _effect_status_for_decision(
- policy_bundle, decision_action, SCORE_REASON_CATEGORY, is_hard_gate=False
- )
- return _build_decision(
- run_id=run_id,
- policy_run_id=policy_run_id,
- index=index,
- bundle=bundle,
- policy_bundle=policy_bundle,
- decision_action=decision_action,
- decision_reason_code=decision_reason_code,
- search_query_effect_status=effect["content_effect_status"],
- triggered_blocking_rules=[],
- scorecard={**scorecard, "score_missing": True},
- score=None,
- replay_marker={
- "matched_gates": [_gate_replay(gate) for gate in matched_gates],
- "score_missing_policy": missing_policy,
- "effect_mapping_id": effect.get("mapping_id"),
- "reason_category": SCORE_REASON_CATEGORY,
- },
- )
- def _effect_status_for_decision(
- policy_bundle: dict[str, Any],
- decision_action: str,
- reason_category: str,
- is_hard_gate: bool,
- ) -> dict[str, Any]:
- candidates = [
- mapping
- for mapping in policy_bundle.get("effect_status_mapping", [])
- if mapping.get("enabled", True)
- and mapping.get("target_level") == "content"
- and mapping.get("decision_action") == decision_action
- and mapping.get("is_hard_gate") is is_hard_gate
- ]
- exact = [mapping for mapping in candidates if mapping.get("reason_category") == reason_category]
- matches = exact or candidates
- if not matches:
- raise ValueError(f"effect mapping not found for {decision_action}/{reason_category}")
- return sorted(matches, key=lambda mapping: (mapping.get("priority", 999), mapping.get("mapping_id", "")))[0]
- def _reason_category_for_threshold(decision_action: str, decision_reason_code: str) -> str:
- if decision_action == "ADD_TO_CONTENT_POOL":
- return SUCCESS_REASON_CATEGORY
- if decision_action == "KEEP_CONTENT_FOR_REVIEW":
- return REVIEW_REASON_CATEGORY
- if decision_reason_code == "missing_score":
- return SCORE_REASON_CATEGORY
- return SCORE_REASON_CATEGORY
- def _gate_replay(gate: dict[str, Any]) -> dict[str, Any]:
- return {
- "gate_id": gate.get("gate_id"),
- "decision_reason_code": gate.get("decision_reason_code"),
- "priority": gate.get("priority"),
- "severity": gate.get("severity"),
- "stop_scoring": gate.get("stop_scoring"),
- }
- def _evidence_refs(rule_pack: dict[str, Any]) -> list[str]:
- refs = ["source_evidence"]
- for field in rule_pack.get("input_contract", {}).get("required_fields", []):
- if field not in refs:
- refs.append(field)
- return refs
|