| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- """热点内容流程外部 API 客户端。"""
- from __future__ import annotations
- import json
- import socket
- import ssl
- import urllib.error
- import urllib.request
- from typing import Any
- from app.hot_content.exceptions import HotContentFlowError
- def build_url(base_url: str, path: str) -> str:
- return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
- def render_template(value: Any, variables: dict[str, str]) -> Any:
- if isinstance(value, str):
- return value.format(**variables)
- if isinstance(value, list):
- return [render_template(item, variables) for item in value]
- if isinstance(value, dict):
- return {key: render_template(item, variables) for key, item in value.items()}
- return value
- class JsonApiClient:
- def __init__(self, timeout_seconds: int, verify_ssl: bool):
- self.timeout_seconds = timeout_seconds
- self.verify_ssl = verify_ssl
- def post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]:
- body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
- req = urllib.request.Request(
- url,
- data=body_bytes,
- headers={"Content-Type": "application/json"},
- method="POST",
- )
- try:
- with urllib.request.urlopen(
- req,
- timeout=self.timeout_seconds,
- context=self._https_context(),
- ) as resp:
- raw = resp.read().decode("utf-8")
- except urllib.error.HTTPError as exc:
- detail = exc.read().decode("utf-8", errors="replace")
- raise HotContentFlowError(f"api http error: {exc.code} {detail}") from exc
- except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
- raise HotContentFlowError(f"api timeout/url error: {exc}") from exc
- try:
- data = json.loads(raw)
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(f"api invalid json: {exc}") from exc
- code = data.get("code")
- if code is not None and int(code) != 0:
- raise HotContentFlowError(f"api business error: {data}")
- return data
- def _https_context(self) -> ssl.SSLContext | None:
- if self.verify_ssl:
- return None
- context = ssl.create_default_context()
- context.check_hostname = False
- context.verify_mode = ssl.CERT_NONE
- return context
- def extract_rank_items(resp: dict[str, Any], source: str) -> list[dict[str, Any]]:
- data = resp.get("data") or {}
- rows = data.get("data") if isinstance(data, dict) else data
- if not isinstance(rows, list):
- return []
- result: list[dict[str, Any]] = []
- for row in rows:
- if not isinstance(row, dict):
- continue
- rank_list = row.get("rankList")
- if isinstance(rank_list, list):
- row_source = str(row.get("source") or "").strip()
- if row_source and row_source != source:
- continue
- result.extend(item for item in rank_list if isinstance(item, dict))
- continue
- item_source = str(row.get("source") or source).strip()
- if item_source == source:
- result.append(row)
- return result
- def extract_keyword_items(resp: dict[str, Any]) -> list[dict[str, Any]]:
- data = resp.get("data") or {}
- rows = data.get("data") if isinstance(data, dict) else data
- if not isinstance(rows, list):
- return []
- return [item for item in rows if isinstance(item, dict)]
- def pick_first_valid_content(items: list[dict[str, Any]]) -> dict[str, Any] | None:
- for idx, item in enumerate(items):
- content_title = str(item.get("title") or "").strip()
- body_text = str(item.get("content") or "").strip()
- if not body_text:
- body_text = str(item.get("description") or "").strip()
- if not content_title or not body_text:
- continue
- return {
- "selected_index": idx,
- "content_title": content_title,
- "body_text": body_text,
- "url": str(item.get("url") or "").strip(),
- "content_source": str(item.get("source") or "").strip(),
- "raw_json": item,
- }
- return None
- def extract_decode_item_map(resp: dict[str, Any]) -> dict[str, dict[str, Any]]:
- rows = resp.get("data") if isinstance(resp, dict) else []
- if not isinstance(rows, list):
- return {}
- result: dict[str, dict[str, Any]] = {}
- for row in rows:
- if not isinstance(row, dict):
- continue
- channel_content_id = str(row.get("channelContentId") or "").strip()
- if channel_content_id:
- result[channel_content_id] = row
- return result
- def parse_decode_data_content(item: dict[str, Any]) -> dict[str, Any]:
- channel_content_id = str(item.get("channelContentId") or "").strip()
- raw_data_content = str(item.get("dataContent") or "")
- if not raw_data_content.strip():
- return {
- "channelContentId": channel_content_id,
- "status": item.get("status"),
- "errorMessage": item.get("errorMessage"),
- "html": item.get("html"),
- "dataContent": None,
- }
- try:
- parsed = json.loads(raw_data_content)
- except json.JSONDecodeError as exc:
- raise HotContentFlowError(
- f"invalid dataContent json for channelContentId={channel_content_id}: {exc}"
- ) from exc
- if not isinstance(parsed, dict):
- raise HotContentFlowError(
- f"dataContent is not json object for channelContentId={channel_content_id}"
- )
- parsed["html"] = item.get("html")
- return parsed
|