client.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """热点内容流程外部 API 客户端。"""
  2. from __future__ import annotations
  3. import json
  4. import socket
  5. import ssl
  6. import urllib.error
  7. import urllib.request
  8. from typing import Any
  9. from app.hot_content.exceptions import HotContentFlowError
  10. def build_url(base_url: str, path: str) -> str:
  11. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  12. def render_template(value: Any, variables: dict[str, str]) -> Any:
  13. if isinstance(value, str):
  14. return value.format(**variables)
  15. if isinstance(value, list):
  16. return [render_template(item, variables) for item in value]
  17. if isinstance(value, dict):
  18. return {key: render_template(item, variables) for key, item in value.items()}
  19. return value
  20. class JsonApiClient:
  21. def __init__(self, timeout_seconds: int, verify_ssl: bool):
  22. self.timeout_seconds = timeout_seconds
  23. self.verify_ssl = verify_ssl
  24. def post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]:
  25. body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
  26. req = urllib.request.Request(
  27. url,
  28. data=body_bytes,
  29. headers={"Content-Type": "application/json"},
  30. method="POST",
  31. )
  32. try:
  33. with urllib.request.urlopen(
  34. req,
  35. timeout=self.timeout_seconds,
  36. context=self._https_context(),
  37. ) as resp:
  38. raw = resp.read().decode("utf-8")
  39. except urllib.error.HTTPError as exc:
  40. detail = exc.read().decode("utf-8", errors="replace")
  41. raise HotContentFlowError(f"api http error: {exc.code} {detail}") from exc
  42. except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
  43. raise HotContentFlowError(f"api timeout/url error: {exc}") from exc
  44. try:
  45. data = json.loads(raw)
  46. except json.JSONDecodeError as exc:
  47. raise HotContentFlowError(f"api invalid json: {exc}") from exc
  48. code = data.get("code")
  49. if code is not None and int(code) != 0:
  50. raise HotContentFlowError(f"api business error: {data}")
  51. return data
  52. def _https_context(self) -> ssl.SSLContext | None:
  53. if self.verify_ssl:
  54. return None
  55. context = ssl.create_default_context()
  56. context.check_hostname = False
  57. context.verify_mode = ssl.CERT_NONE
  58. return context
  59. def extract_rank_items(resp: dict[str, Any], source: str) -> list[dict[str, Any]]:
  60. data = resp.get("data") or {}
  61. rows = data.get("data") if isinstance(data, dict) else data
  62. if not isinstance(rows, list):
  63. return []
  64. result: list[dict[str, Any]] = []
  65. for row in rows:
  66. if not isinstance(row, dict):
  67. continue
  68. rank_list = row.get("rankList")
  69. if isinstance(rank_list, list):
  70. row_source = str(row.get("source") or "").strip()
  71. if row_source and row_source != source:
  72. continue
  73. result.extend(item for item in rank_list if isinstance(item, dict))
  74. continue
  75. item_source = str(row.get("source") or source).strip()
  76. if item_source == source:
  77. result.append(row)
  78. return result
  79. def extract_keyword_items(resp: dict[str, Any]) -> list[dict[str, Any]]:
  80. data = resp.get("data") or {}
  81. rows = data.get("data") if isinstance(data, dict) else data
  82. if not isinstance(rows, list):
  83. return []
  84. return [item for item in rows if isinstance(item, dict)]
  85. def pick_first_valid_content(items: list[dict[str, Any]]) -> dict[str, Any] | None:
  86. for idx, item in enumerate(items):
  87. content_title = str(item.get("title") or "").strip()
  88. body_text = str(item.get("content") or "").strip()
  89. if not body_text:
  90. body_text = str(item.get("description") or "").strip()
  91. if not content_title or not body_text:
  92. continue
  93. return {
  94. "selected_index": idx,
  95. "content_title": content_title,
  96. "body_text": body_text,
  97. "url": str(item.get("url") or "").strip(),
  98. "content_source": str(item.get("source") or "").strip(),
  99. "raw_json": item,
  100. }
  101. return None
  102. def extract_decode_item_map(resp: dict[str, Any]) -> dict[str, dict[str, Any]]:
  103. rows = resp.get("data") if isinstance(resp, dict) else []
  104. if not isinstance(rows, list):
  105. return {}
  106. result: dict[str, dict[str, Any]] = {}
  107. for row in rows:
  108. if not isinstance(row, dict):
  109. continue
  110. channel_content_id = str(row.get("channelContentId") or "").strip()
  111. if channel_content_id:
  112. result[channel_content_id] = row
  113. return result
  114. def parse_decode_data_content(item: dict[str, Any]) -> dict[str, Any]:
  115. channel_content_id = str(item.get("channelContentId") or "").strip()
  116. raw_data_content = str(item.get("dataContent") or "")
  117. if not raw_data_content.strip():
  118. return {
  119. "channelContentId": channel_content_id,
  120. "status": item.get("status"),
  121. "errorMessage": item.get("errorMessage"),
  122. "html": item.get("html"),
  123. "dataContent": None,
  124. }
  125. try:
  126. parsed = json.loads(raw_data_content)
  127. except json.JSONDecodeError as exc:
  128. raise HotContentFlowError(
  129. f"invalid dataContent json for channelContentId={channel_content_id}: {exc}"
  130. ) from exc
  131. if not isinstance(parsed, dict):
  132. raise HotContentFlowError(
  133. f"dataContent is not json object for channelContentId={channel_content_id}"
  134. )
  135. parsed["html"] = item.get("html")
  136. return parsed