service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. """热点内容流程服务。"""
  2. from __future__ import annotations
  3. from datetime import datetime
  4. import time
  5. from typing import Any
  6. from app.hot_content.client import (
  7. JsonApiClient,
  8. build_url,
  9. extract_decode_item_map,
  10. extract_keyword_items,
  11. extract_rank_items,
  12. pick_first_valid_content,
  13. render_template,
  14. )
  15. from app.hot_content.config import load_flow_config
  16. from app.hot_content.exceptions import HotContentFlowError
  17. from app.hot_content.repository import HotContentRepository, unique_title_key
  18. from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
  19. from app.hot_content.timezone import SHANGHAI_TZ
  20. from app.hot_content.types import FlowConfig
  21. HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0}
  22. KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0}
  23. class HotContentFlowService:
  24. def __init__(
  25. self,
  26. config: FlowConfig,
  27. repository: HotContentRepository,
  28. api_client: JsonApiClient,
  29. ):
  30. self.config = config
  31. self.repository = repository
  32. self.api_client = api_client
  33. def run(self) -> dict[str, Any]:
  34. hot_titles = self.fetch_and_save_hot_titles()
  35. selected_contents = self.search_and_save_contents(hot_titles)
  36. decode_resp = self.submit_decode_tasks(selected_contents)
  37. return self.build_summary(hot_titles, selected_contents, decode_resp)
  38. def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]:
  39. hot_url = build_url(self.config.crawapi_base_url, self.config.hot_rank_path)
  40. saved_titles: list[dict[str, Any]] = []
  41. seen_keys: set[str] = set()
  42. resp = self.api_client.post_json(hot_url, HOT_RANK_PAYLOAD)
  43. for source_config in self.config.sources:
  44. rank_items = extract_rank_items(resp, source_config.source)
  45. for rank_item in rank_items[: source_config.count]:
  46. title = str(rank_item.get("title") or "").strip()
  47. if not title:
  48. continue
  49. unique_key = unique_title_key(source_config.source, title)
  50. if unique_key in seen_keys:
  51. continue
  52. seen_keys.add(unique_key)
  53. record = self.repository.upsert_record(
  54. source=source_config.source,
  55. title=title,
  56. rank=_to_int_or_none(rank_item.get("rank")),
  57. )
  58. saved_titles.append(
  59. {
  60. "id": record["id"],
  61. "unique_key": record["unique_key"],
  62. "execution_status": record["execution_status"],
  63. "article_title": record["article_title"],
  64. "article_body": record["article_body"],
  65. "article_url": record["article_url"],
  66. "has_decode_request": record["has_decode_request"],
  67. "has_contribution_points": record["has_contribution_points"],
  68. "source": source_config.source,
  69. "title": title,
  70. "rank": _to_int_or_none(rank_item.get("rank")),
  71. "source_config": source_config,
  72. }
  73. )
  74. return saved_titles
  75. def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  76. selected_contents: list[dict[str, Any]] = []
  77. for hot in hot_titles:
  78. execution_status = int(hot.get("execution_status") or 0)
  79. if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get(
  80. "has_contribution_points"
  81. ):
  82. continue
  83. existing_body = str(hot.get("article_body") or "").strip()
  84. existing_title = str(hot.get("article_title") or "").strip()
  85. if execution_status == ExecutionStatus.CONTENT_OK and existing_title and existing_body:
  86. selected_contents.append(
  87. {
  88. "record_id": hot["id"],
  89. "unique_key": hot["unique_key"],
  90. "hot_title": hot["title"],
  91. "status": "ok",
  92. "content_title": existing_title,
  93. "body_text": existing_body,
  94. "url": str(hot.get("article_url") or ""),
  95. }
  96. )
  97. continue
  98. keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path)
  99. payload = render_template(
  100. KEYWORD_PAYLOAD_TEMPLATE,
  101. {
  102. "title": hot["title"],
  103. "source": hot["source"],
  104. "record_id": str(hot["id"]),
  105. "unique_key": str(hot["unique_key"]),
  106. "hot_title_id": str(hot["id"]),
  107. },
  108. )
  109. for attempt in range(1, 4): # max 3 retries
  110. try:
  111. resp = self.api_client.post_json(keyword_url, payload)
  112. selected = pick_first_valid_content(extract_keyword_items(resp))
  113. if selected is None:
  114. self.repository.update_status(
  115. record_id=hot["id"],
  116. status=ExecutionStatus.NO_VALID_CONTENT,
  117. )
  118. selected_contents.append(
  119. {
  120. "record_id": hot["id"],
  121. "unique_key": hot["unique_key"],
  122. "hot_title": hot["title"],
  123. "status": "no_valid_content",
  124. }
  125. )
  126. break
  127. self.repository.update_article(
  128. record_id=hot["id"],
  129. article_title=selected["content_title"],
  130. article_body=selected["body_text"],
  131. url=selected["url"],
  132. )
  133. selected_contents.append(
  134. {
  135. "record_id": hot["id"],
  136. "unique_key": hot["unique_key"],
  137. "hot_title": hot["title"],
  138. "status": "ok",
  139. **selected,
  140. }
  141. )
  142. break
  143. except HotContentFlowError as exc:
  144. if attempt < 3:
  145. # 简单退避,避免对不稳定接口造成瞬时压力
  146. time.sleep(attempt)
  147. continue
  148. self.repository.update_status(
  149. record_id=hot["id"],
  150. status=ExecutionStatus.CONTENT_REQUEST_FAILED,
  151. error_message=str(exc),
  152. )
  153. selected_contents.append(
  154. {
  155. "record_id": hot["id"],
  156. "unique_key": hot["unique_key"],
  157. "hot_title": hot["title"],
  158. "status": "content_request_failed",
  159. "error": str(exc),
  160. }
  161. )
  162. return selected_contents
  163. def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
  164. decode_items: list[dict[str, Any]] = []
  165. request_count = 0
  166. failed_request_count = 0
  167. for item in selected_contents:
  168. if item.get("status") != "ok":
  169. continue
  170. record_id = int(item["record_id"])
  171. channel_content_id = str(item["unique_key"])
  172. title = str(item.get("content_title") or "").strip()
  173. body_text = str(item.get("body_text") or "").strip()
  174. if not title or not body_text:
  175. continue
  176. post = {
  177. "channelContentId": channel_content_id,
  178. "title": title,
  179. "bodyText": body_text,
  180. "contentModal": 3,
  181. }
  182. decode_payload = {
  183. "params": {
  184. "configId": self.config.decode_config_id,
  185. "skipCompleted": False,
  186. "posts": [post],
  187. }
  188. }
  189. request_count += 1
  190. try:
  191. decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload)
  192. except HotContentFlowError as exc:
  193. decode_resp = {"code": -1, "data": [], "msg": str(exc)}
  194. failed_request_count += 1
  195. decode_item_map = extract_decode_item_map(decode_resp)
  196. decode_item = decode_item_map.get(channel_content_id, {})
  197. status = decode_api_status_to_code(
  198. str(decode_item.get("status") or ""),
  199. has_success_response=int(decode_resp.get("code") or 0) == 0,
  200. )
  201. error_message = None
  202. if int(decode_resp.get("code") or 0) != 0:
  203. error_message = str(decode_resp.get("msg") or decode_resp)
  204. self.repository.update_decode_result(
  205. record_id=record_id,
  206. status=status,
  207. request_json=decode_payload,
  208. response_json=decode_item or decode_resp,
  209. error_message=error_message,
  210. )
  211. if decode_item:
  212. decode_items.append(decode_item)
  213. if request_count == 0:
  214. return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0}
  215. return {
  216. "code": 0 if failed_request_count == 0 else -1,
  217. "data": decode_items,
  218. "request_count": request_count,
  219. "failed_request_count": failed_request_count,
  220. }
  221. def build_summary(
  222. self,
  223. hot_titles: list[dict[str, Any]],
  224. selected_contents: list[dict[str, Any]],
  225. decode_resp: dict[str, Any],
  226. ) -> dict[str, Any]:
  227. decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else []
  228. decode_items = decode_items if isinstance(decode_items, list) else []
  229. return {
  230. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  231. "source_count": len(self.config.sources),
  232. "hot_title_count": len(hot_titles),
  233. "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  234. "selected_failed_count": sum(
  235. 1 for item in selected_contents if item.get("status") != "ok"
  236. ),
  237. "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  238. "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
  239. "decode_success_count": sum(
  240. 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS"
  241. ),
  242. "decode_pending_count": sum(
  243. 1 for item in decode_items if str(item.get("status") or "") == "PENDING"
  244. ),
  245. "decode_failed_count": sum(
  246. 1 for item in decode_items if str(item.get("status") or "") == "FAILED"
  247. ),
  248. }
  249. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  250. flow_config = config or load_flow_config()
  251. repository = HotContentRepository(flow_config.mysql)
  252. try:
  253. api_client = JsonApiClient(
  254. timeout_seconds=flow_config.request_timeout_seconds,
  255. verify_ssl=flow_config.https_verify_ssl,
  256. )
  257. service = HotContentFlowService(flow_config, repository, api_client)
  258. return service.run()
  259. finally:
  260. repository.close()
  261. def _to_int_or_none(value: Any) -> int | None:
  262. try:
  263. return int(value)
  264. except (TypeError, ValueError):
  265. return None