| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- from __future__ import annotations
- import json
- from typing import Any, Callable
- from content_agent.errors import ContentAgentError, ErrorCode
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig
- ConnectionFactory = Callable[[], Any]
- class DemandSourceService:
- def __init__(
- self,
- config: ContentSupplyDbConfig,
- connection_factory: ConnectionFactory | None = None,
- ) -> None:
- self.config = config
- self._connection_factory = connection_factory or config.connect
- def get_by_id(self, demand_content_id: int) -> dict[str, Any]:
- row = self._fetch_one(
- """
- SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data
- FROM demand_content
- WHERE id = %s
- LIMIT 1
- """,
- (demand_content_id,),
- )
- if not row:
- raise ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "demand_content not found",
- {"demand_content_id": demand_content_id},
- status_code=400,
- )
- return self.to_source_payload(row)
- def get_by_run_label(self, run_label: str) -> dict[str, Any]:
- row = self._fetch_one(
- """
- SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data
- FROM demand_content
- WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.run_label')) = %s
- ORDER BY id ASC
- LIMIT 1
- """,
- (run_label,),
- )
- if not row:
- raise ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "demand_content run_label not found",
- {"run_label": run_label},
- status_code=400,
- )
- return self.to_source_payload(row)
- def get_default_pg_pattern_source(self) -> dict[str, Any]:
- row = self._fetch_one(
- """
- SELECT id, merge_leve2, name, reason, suggestion, score, dt, ext_data
- FROM demand_content
- WHERE JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.evidence_pack.pattern_source_system')) = 'pg_pattern_v2'
- AND JSON_UNQUOTE(JSON_EXTRACT(ext_data, '$.evidence_pack.validation_status')) = 'passed'
- ORDER BY id ASC
- LIMIT 1
- """,
- (),
- )
- if not row:
- raise ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "default pg_pattern_v2 demand_content not found",
- {"selector": "default_pg_pattern_v2_passed"},
- status_code=400,
- )
- return self.to_source_payload(row)
- def to_source_payload(self, row: dict[str, Any]) -> dict[str, Any]:
- ext_data = _decode_ext_data(row.get("ext_data"))
- if not isinstance(ext_data, dict) or not ext_data.get("evidence_pack"):
- raise ContentAgentError(
- ErrorCode.INVALID_SOURCE,
- "demand_content missing ext_data.evidence_pack",
- {"demand_content_id": row.get("id")},
- status_code=400,
- )
- return {
- "id": row.get("id"),
- "demand_content_id": str(row.get("id") or ""),
- "merge_leve2": row.get("merge_leve2"),
- "name": row.get("name"),
- "reason": row.get("reason"),
- "suggestion": row.get("suggestion"),
- "score": row.get("score"),
- "dt": row.get("dt"),
- "ext_data": ext_data,
- "raw_demand_content": {
- "id": row.get("id"),
- "merge_leve2": row.get("merge_leve2"),
- "name": row.get("name"),
- "reason": row.get("reason"),
- "suggestion": row.get("suggestion"),
- "score": row.get("score"),
- "dt": row.get("dt"),
- "ext_data": ext_data,
- },
- }
- def _fetch_one(self, sql: str, params: tuple[Any, ...]) -> dict[str, Any] | None:
- with self._connection_factory() as conn:
- with conn.cursor() as cur:
- cur.execute(sql, params)
- return cur.fetchone()
- def _decode_ext_data(value: Any) -> dict[str, Any] | None:
- if isinstance(value, dict):
- return value
- if isinstance(value, str) and value.strip():
- decoded = json.loads(value)
- if isinstance(decoded, dict):
- return decoded
- return None
|