"""热点内容流程外部 API 客户端。""" from __future__ import annotations import json import socket import ssl import urllib.error import urllib.request from typing import Any from app.hot_content.exceptions import HotContentFlowError def build_url(base_url: str, path: str) -> str: return f"{base_url.rstrip('/')}/{path.lstrip('/')}" def render_template(value: Any, variables: dict[str, str]) -> Any: if isinstance(value, str): return value.format(**variables) if isinstance(value, list): return [render_template(item, variables) for item in value] if isinstance(value, dict): return {key: render_template(item, variables) for key, item in value.items()} return value class JsonApiClient: def __init__(self, timeout_seconds: int, verify_ssl: bool): self.timeout_seconds = timeout_seconds self.verify_ssl = verify_ssl def post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]: body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( url, data=body_bytes, headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen( req, timeout=self.timeout_seconds, context=self._https_context(), ) as resp: raw = resp.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise HotContentFlowError(f"api http error: {exc.code} {detail}") from exc except (urllib.error.URLError, TimeoutError, socket.timeout) as exc: raise HotContentFlowError(f"api timeout/url error: {exc}") from exc try: data = json.loads(raw) except json.JSONDecodeError as exc: raise HotContentFlowError(f"api invalid json: {exc}") from exc code = data.get("code") if code is not None and int(code) != 0: raise HotContentFlowError(f"api business error: {data}") return data def _https_context(self) -> ssl.SSLContext | None: if self.verify_ssl: return None context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE return context def extract_rank_items(resp: dict[str, Any], source: str) -> list[dict[str, Any]]: data = resp.get("data") or {} rows = data.get("data") if isinstance(data, dict) else data if not isinstance(rows, list): return [] result: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): continue rank_list = row.get("rankList") if isinstance(rank_list, list): row_source = str(row.get("source") or "").strip() if row_source and row_source != source: continue result.extend(item for item in rank_list if isinstance(item, dict)) continue item_source = str(row.get("source") or source).strip() if item_source == source: result.append(row) return result def extract_keyword_items(resp: dict[str, Any]) -> list[dict[str, Any]]: data = resp.get("data") or {} rows = data.get("data") if isinstance(data, dict) else data if not isinstance(rows, list): return [] return [item for item in rows if isinstance(item, dict)] def pick_first_valid_content(items: list[dict[str, Any]]) -> dict[str, Any] | None: for idx, item in enumerate(items): content_title = str(item.get("title") or "").strip() body_text = str(item.get("content") or "").strip() if not body_text: body_text = str(item.get("description") or "").strip() if not content_title or not body_text: continue return { "selected_index": idx, "content_title": content_title, "body_text": body_text, "url": str(item.get("url") or "").strip(), "content_source": str(item.get("source") or "").strip(), "raw_json": item, } return None def extract_decode_item_map(resp: dict[str, Any]) -> dict[str, dict[str, Any]]: rows = resp.get("data") if isinstance(resp, dict) else [] if not isinstance(rows, list): return {} result: dict[str, dict[str, Any]] = {} for row in rows: if not isinstance(row, dict): continue channel_content_id = str(row.get("channelContentId") or "").strip() if channel_content_id: result[channel_content_id] = row return result def parse_decode_data_content(item: dict[str, Any]) -> dict[str, Any]: channel_content_id = str(item.get("channelContentId") or "").strip() raw_data_content = str(item.get("dataContent") or "") if not raw_data_content.strip(): return { "channelContentId": channel_content_id, "status": item.get("status"), "errorMessage": item.get("errorMessage"), "html": item.get("html"), "dataContent": None, } try: parsed = json.loads(raw_data_content) except json.JSONDecodeError as exc: raise HotContentFlowError( f"invalid dataContent json for channelContentId={channel_content_id}: {exc}" ) from exc if not isinstance(parsed, dict): raise HotContentFlowError( f"dataContent is not json object for channelContentId={channel_content_id}" ) parsed["html"] = item.get("html") return parsed