client.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. """热点内容流程外部 API 客户端。"""
  2. from __future__ import annotations
  3. import json
  4. import socket
  5. import ssl
  6. import urllib.error
  7. import urllib.request
  8. from typing import Any
  9. from app.hot_content.exceptions import HotContentFlowError
  10. def build_url(base_url: str, path: str) -> str:
  11. return f"{base_url.rstrip('/')}/{path.lstrip('/')}"
  12. def render_template(value: Any, variables: dict[str, str]) -> Any:
  13. if isinstance(value, str):
  14. return value.format(**variables)
  15. if isinstance(value, list):
  16. return [render_template(item, variables) for item in value]
  17. if isinstance(value, dict):
  18. return {key: render_template(item, variables) for key, item in value.items()}
  19. return value
  20. class JsonApiClient:
  21. def __init__(self, timeout_seconds: int, verify_ssl: bool):
  22. self.timeout_seconds = timeout_seconds
  23. self.verify_ssl = verify_ssl
  24. def post_json(self, url: str, payload: dict[str, Any]) -> dict[str, Any]:
  25. body_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
  26. req = urllib.request.Request(
  27. url,
  28. data=body_bytes,
  29. headers={"Content-Type": "application/json"},
  30. method="POST",
  31. )
  32. try:
  33. with urllib.request.urlopen(
  34. req,
  35. timeout=self.timeout_seconds,
  36. context=self._https_context(),
  37. ) as resp:
  38. raw = resp.read().decode("utf-8")
  39. except urllib.error.HTTPError as exc:
  40. detail = exc.read().decode("utf-8", errors="replace")
  41. raise HotContentFlowError(f"api http error: {exc.code} {detail}") from exc
  42. except (urllib.error.URLError, TimeoutError, socket.timeout) as exc:
  43. raise HotContentFlowError(f"api timeout/url error: {exc}") from exc
  44. try:
  45. data = json.loads(raw)
  46. except json.JSONDecodeError as exc:
  47. raise HotContentFlowError(f"api invalid json: {exc}") from exc
  48. code = data.get("code")
  49. if code is not None and int(code) != 0:
  50. raise HotContentFlowError(f"api business error: {data}")
  51. return data
  52. def _https_context(self) -> ssl.SSLContext | None:
  53. if self.verify_ssl:
  54. return None
  55. context = ssl.create_default_context()
  56. context.check_hostname = False
  57. context.verify_mode = ssl.CERT_NONE
  58. return context
  59. def _extract_douyin_word_list_items(inner: dict[str, Any]) -> list[dict[str, Any]]:
  60. word_list = inner.get("word_list")
  61. if not isinstance(word_list, list):
  62. return []
  63. result: list[dict[str, Any]] = []
  64. for item in word_list:
  65. if not isinstance(item, dict):
  66. continue
  67. word = str(item.get("word") or "").strip()
  68. if not word:
  69. continue
  70. normalized = dict(item)
  71. normalized["title"] = word
  72. position = item.get("position")
  73. if position is not None:
  74. normalized["rank"] = position
  75. result.append(normalized)
  76. return result
  77. def extract_rank_items(resp: dict[str, Any], source: str) -> list[dict[str, Any]]:
  78. data = resp.get("data") or {}
  79. inner = data.get("data") if isinstance(data, dict) else None
  80. if isinstance(inner, dict):
  81. douyin_items = _extract_douyin_word_list_items(inner)
  82. if douyin_items:
  83. return douyin_items
  84. rows = inner if isinstance(inner, list) else data
  85. if not isinstance(rows, list):
  86. return []
  87. result: list[dict[str, Any]] = []
  88. for row in rows:
  89. if not isinstance(row, dict):
  90. continue
  91. rank_list = row.get("rankList")
  92. if isinstance(rank_list, list):
  93. row_source = str(row.get("source") or "").strip()
  94. if row_source and row_source != source:
  95. continue
  96. result.extend(item for item in rank_list if isinstance(item, dict))
  97. continue
  98. item_source = str(row.get("source") or source).strip()
  99. if item_source == source:
  100. result.append(row)
  101. return result
  102. def extract_keyword_items(resp: dict[str, Any]) -> list[dict[str, Any]]:
  103. data = resp.get("data") or {}
  104. rows = data.get("data") if isinstance(data, dict) else data
  105. if not isinstance(rows, list):
  106. return []
  107. return [item for item in rows if isinstance(item, dict)]
  108. def pick_first_valid_content(items: list[dict[str, Any]]) -> dict[str, Any] | None:
  109. for idx, item in enumerate(items):
  110. content_title = str(item.get("title") or "").strip()
  111. body_text = str(item.get("content") or "").strip()
  112. if not body_text:
  113. body_text = str(item.get("description") or "").strip()
  114. if not content_title or not body_text:
  115. continue
  116. return {
  117. "selected_index": idx,
  118. "content_title": content_title,
  119. "body_text": body_text,
  120. "url": str(item.get("url") or "").strip(),
  121. "content_source": str(item.get("source") or "").strip(),
  122. "raw_json": item,
  123. }
  124. return None
  125. def extract_decode_item_map(resp: dict[str, Any]) -> dict[str, dict[str, Any]]:
  126. rows = resp.get("data") if isinstance(resp, dict) else []
  127. if not isinstance(rows, list):
  128. return {}
  129. result: dict[str, dict[str, Any]] = {}
  130. for row in rows:
  131. if not isinstance(row, dict):
  132. continue
  133. channel_content_id = str(row.get("channelContentId") or "").strip()
  134. if channel_content_id:
  135. result[channel_content_id] = row
  136. return result
  137. def parse_decode_data_content(item: dict[str, Any]) -> dict[str, Any]:
  138. channel_content_id = str(item.get("channelContentId") or "").strip()
  139. raw_data_content = str(item.get("dataContent") or "")
  140. if not raw_data_content.strip():
  141. return {
  142. "channelContentId": channel_content_id,
  143. "status": item.get("status"),
  144. "errorMessage": item.get("errorMessage"),
  145. "html": item.get("html"),
  146. "dataContent": None,
  147. }
  148. try:
  149. parsed = json.loads(raw_data_content)
  150. except json.JSONDecodeError as exc:
  151. raise HotContentFlowError(
  152. f"invalid dataContent json for channelContentId={channel_content_id}: {exc}"
  153. ) from exc
  154. if not isinstance(parsed, dict):
  155. raise HotContentFlowError(
  156. f"dataContent is not json object for channelContentId={channel_content_id}"
  157. )
  158. parsed["html"] = item.get("html")
  159. return parsed