from __future__ import annotations import json from typing import Any, Callable from content_agent.errors import ContentAgentError, ErrorCode from content_agent.integrations.database_runtime import ContentSupplyDbConfig ConnectionFactory = Callable[[], Any] class DemandSourceService: def __init__( self, config: ContentSupplyDbConfig, connection_factory: ConnectionFactory | None = None, ) -> None: self.config = config self._connection_factory = connection_factory or config.connect def get_by_id(self, demand_content_id: int) -> dict[str, Any]: row = self._fetch_one( """ SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data FROM demand_content WHERE id = %s LIMIT 1 """, (demand_content_id,), ) if not row: raise ContentAgentError( ErrorCode.INVALID_SOURCE, "demand_content not found", {"demand_content_id": demand_content_id}, status_code=400, ) return self.to_source_payload(row) def get_by_run_label(self, run_label: str) -> dict[str, Any]: row = self._fetch_one( """ SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data FROM demand_content WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) = %s ORDER BY id ASC LIMIT 1 """, (run_label,), ) if not row: raise ContentAgentError( ErrorCode.INVALID_SOURCE, "demand_content run_label not found", {"run_label": run_label}, status_code=400, ) return self.to_source_payload(row) def get_default_pg_pattern_source(self) -> dict[str, Any]: row = self._fetch_one( """ SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data FROM demand_content WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.evidence_pack.pattern_source_system')) = 'pg_pattern_v2' AND JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.evidence_pack.validation_status')) = 'passed' ORDER BY id ASC LIMIT 1 """, (), ) if not row: raise ContentAgentError( ErrorCode.INVALID_SOURCE, "default pg_pattern_v2 demand_content not found", {"selector": "default_pg_pattern_v2_passed"}, status_code=400, ) return self.to_source_payload(row) def to_source_payload(self, row: dict[str, Any]) -> dict[str, Any]: ext_data = _decode_ext_data(row.get("ext_data")) if not isinstance(ext_data, dict) or not ext_data.get("evidence_pack"): raise ContentAgentError( ErrorCode.INVALID_SOURCE, "demand_content missing ext_data.evidence_pack", {"demand_content_id": row.get("id")}, status_code=400, ) return { "id": row.get("id"), "demand_content_id": str(row.get("id") or ""), "merge_leve2": row.get("merge_leve2"), "name": row.get("name"), "reason": row.get("reason"), "suggestion": row.get("suggestion"), "score": row.get("score"), "dt": row.get("dt"), "ext_data": ext_data, "raw_demand_content": { "id": row.get("id"), "merge_leve2": row.get("merge_leve2"), "name": row.get("name"), "reason": row.get("reason"), "suggestion": row.get("suggestion"), "score": row.get("score"), "dt": row.get("dt"), "ext_data": ext_data, }, } def _fetch_one(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None: with self._connection_factory() as conn: with conn.cursor() as cur: cur.execute(sql, params) return cur.fetchone() def _decode_ext_data(value: Any) -> dict[str, Any] | None: if isinstance(value, dict): return value if isinstance(value, str) and value.strip(): decoded = json.loads(value) if isinstance(decoded, dict): return decoded return None