service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. """热点内容流程服务。"""
  2. from __future__ import annotations
  3. from datetime import datetime
  4. import time
  5. from typing import Any
  6. from app.hot_content.category_filter import llm_filter_record_content
  7. from app.hot_content.client import (
  8. JsonApiClient,
  9. build_url,
  10. extract_decode_item_map,
  11. extract_keyword_items,
  12. extract_rank_items,
  13. pick_first_valid_content,
  14. render_template,
  15. )
  16. from app.hot_content.config import load_flow_config
  17. from app.hot_content.exceptions import HotContentFlowError
  18. from app.hot_content.repository import HotContentRepository, unique_title_key
  19. from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
  20. from app.hot_content.timezone import SHANGHAI_TZ
  21. from app.hot_content.types import FlowConfig
  22. HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0}
  23. KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0}
  24. class HotContentFlowService:
  25. def __init__(
  26. self,
  27. config: FlowConfig,
  28. repository: HotContentRepository,
  29. api_client: JsonApiClient,
  30. ):
  31. self.config = config
  32. self.repository = repository
  33. self.api_client = api_client
  34. def run(self) -> dict[str, Any]:
  35. hot_titles = self.fetch_and_save_hot_titles()
  36. selected_contents = self.search_and_save_contents(hot_titles)
  37. selected_contents = self.filter_contents_by_category(selected_contents)
  38. decode_resp = self.submit_decode_tasks(selected_contents)
  39. return self.build_summary(hot_titles, selected_contents, decode_resp)
  40. def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]:
  41. hot_url = build_url(self.config.crawapi_base_url, self.config.hot_rank_path)
  42. saved_titles: list[dict[str, Any]] = []
  43. seen_keys: set[str] = set()
  44. resp = self.api_client.post_json(hot_url, HOT_RANK_PAYLOAD)
  45. for source_config in self.config.sources:
  46. rank_items = extract_rank_items(resp, source_config.source)
  47. for rank_item in rank_items:
  48. title = str(rank_item.get("title") or "").strip()
  49. if not title:
  50. continue
  51. unique_key = unique_title_key(source_config.source, title)
  52. if unique_key in seen_keys:
  53. continue
  54. seen_keys.add(unique_key)
  55. record = self.repository.upsert_record(
  56. source=source_config.source,
  57. title=title,
  58. rank=_to_int_or_none(rank_item.get("rank")),
  59. )
  60. saved_titles.append(
  61. {
  62. "id": record["id"],
  63. "unique_key": record["unique_key"],
  64. "execution_status": record["execution_status"],
  65. "article_title": record["article_title"],
  66. "article_body": record["article_body"],
  67. "article_url": record["article_url"],
  68. "has_decode_request": record["has_decode_request"],
  69. "has_contribution_points": record["has_contribution_points"],
  70. "source": source_config.source,
  71. "title": title,
  72. "rank": _to_int_or_none(rank_item.get("rank")),
  73. "source_config": source_config,
  74. }
  75. )
  76. return saved_titles
  77. def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  78. selected_contents: list[dict[str, Any]] = []
  79. for hot in hot_titles:
  80. execution_status = int(hot.get("execution_status") or 0)
  81. if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get(
  82. "has_contribution_points"
  83. ):
  84. continue
  85. if execution_status == ExecutionStatus.CATEGORY_FILTER_REJECTED:
  86. continue
  87. existing_body = str(hot.get("article_body") or "").strip()
  88. existing_title = str(hot.get("article_title") or "").strip()
  89. if (
  90. execution_status in {
  91. ExecutionStatus.CONTENT_OK,
  92. ExecutionStatus.CATEGORY_FILTER_PASSED,
  93. }
  94. and existing_title
  95. and existing_body
  96. ):
  97. selected_contents.append(
  98. {
  99. "record_id": hot["id"],
  100. "unique_key": hot["unique_key"],
  101. "hot_title": hot["title"],
  102. "status": "ok",
  103. "content_title": existing_title,
  104. "body_text": existing_body,
  105. "url": str(hot.get("article_url") or ""),
  106. "category_filter_passed": (
  107. execution_status == ExecutionStatus.CATEGORY_FILTER_PASSED
  108. ),
  109. }
  110. )
  111. continue
  112. keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path)
  113. payload = render_template(
  114. KEYWORD_PAYLOAD_TEMPLATE,
  115. {
  116. "title": hot["title"],
  117. "source": hot["source"],
  118. "record_id": str(hot["id"]),
  119. "unique_key": str(hot["unique_key"]),
  120. "hot_title_id": str(hot["id"]),
  121. },
  122. )
  123. for attempt in range(1, 4): # max 3 retries
  124. try:
  125. resp = self.api_client.post_json(keyword_url, payload)
  126. selected = pick_first_valid_content(extract_keyword_items(resp))
  127. if selected is None:
  128. self.repository.mark_no_valid_content(
  129. record_id=hot["id"],
  130. reason="no valid content",
  131. )
  132. selected_contents.append(
  133. {
  134. "record_id": hot["id"],
  135. "unique_key": hot["unique_key"],
  136. "hot_title": hot["title"],
  137. "status": "no_valid_content",
  138. }
  139. )
  140. break
  141. self.repository.update_article(
  142. record_id=hot["id"],
  143. article_title=selected["content_title"],
  144. article_body=selected["body_text"],
  145. url=selected["url"],
  146. )
  147. selected_contents.append(
  148. {
  149. "record_id": hot["id"],
  150. "unique_key": hot["unique_key"],
  151. "hot_title": hot["title"],
  152. "status": "ok",
  153. **selected,
  154. }
  155. )
  156. break
  157. except HotContentFlowError as exc:
  158. if attempt < 3:
  159. # 简单退避,避免对不稳定接口造成瞬时压力
  160. time.sleep(attempt)
  161. continue
  162. self.repository.update_status(
  163. record_id=hot["id"],
  164. status=ExecutionStatus.CONTENT_REQUEST_FAILED,
  165. error_message=str(exc),
  166. )
  167. selected_contents.append(
  168. {
  169. "record_id": hot["id"],
  170. "unique_key": hot["unique_key"],
  171. "hot_title": hot["title"],
  172. "status": "content_request_failed",
  173. "error": str(exc),
  174. }
  175. )
  176. return selected_contents
  177. def filter_contents_by_category(
  178. self,
  179. selected_contents: list[dict[str, Any]],
  180. ) -> list[dict[str, Any]]:
  181. filtered: list[dict[str, Any]] = []
  182. category_list = list(self.config.category_filter_categories)
  183. total = sum(1 for item in selected_contents if item.get("status") == "ok")
  184. processed = 0
  185. for item in selected_contents:
  186. if item.get("status") != "ok":
  187. filtered.append(item)
  188. continue
  189. if item.get("category_filter_passed"):
  190. filtered.append(item)
  191. continue
  192. record_id = int(item["record_id"])
  193. record = self.repository.get_record_for_category_filter(record_id)
  194. if not record:
  195. self.repository.mark_no_valid_content(
  196. record_id=record_id,
  197. reason="record not found",
  198. )
  199. filtered.append(
  200. {
  201. "record_id": record_id,
  202. "unique_key": item.get("unique_key"),
  203. "status": "no_valid_content",
  204. "reason": "record not found",
  205. }
  206. )
  207. continue
  208. hot_title = str(record.get("title") or "").strip()
  209. content_title = str(record.get("article_title") or "").strip()
  210. body_text = str(record.get("article_body") or "").strip()
  211. if not hot_title or not content_title or not body_text:
  212. self.repository.mark_no_valid_content(
  213. record_id=record_id,
  214. reason="missing title or body",
  215. )
  216. filtered.append(
  217. {
  218. "record_id": record_id,
  219. "unique_key": item.get("unique_key"),
  220. "status": "no_valid_content",
  221. "reason": "missing title or body",
  222. }
  223. )
  224. continue
  225. processed += 1
  226. result = llm_filter_record_content(
  227. record_id=record_id,
  228. hot_title=hot_title,
  229. content_title=content_title,
  230. body_text=body_text,
  231. category_list=category_list,
  232. model=self.config.category_filter_llm_model,
  233. max_attempts=self.config.category_filter_llm_max_attempts,
  234. retry_sleep_seconds=self.config.category_filter_llm_retry_sleep_seconds,
  235. max_tokens=self.config.category_filter_llm_max_tokens,
  236. body_max_chars=self.config.category_filter_body_max_chars,
  237. )
  238. passed = bool(result.get("passed"))
  239. self.repository.update_category_filter_result(
  240. record_id=record_id,
  241. passed=passed,
  242. result_json=result,
  243. )
  244. if passed:
  245. filtered.append(
  246. {
  247. **item,
  248. "category_filter_passed": True,
  249. "matched_category": result.get("matched_category"),
  250. }
  251. )
  252. else:
  253. filtered.append(
  254. {
  255. "record_id": record_id,
  256. "unique_key": item.get("unique_key"),
  257. "status": "category_rejected",
  258. }
  259. )
  260. if (
  261. processed < total
  262. and self.config.category_filter_item_sleep_seconds > 0
  263. ):
  264. time.sleep(self.config.category_filter_item_sleep_seconds)
  265. return filtered
  266. def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
  267. decode_items: list[dict[str, Any]] = []
  268. request_count = 0
  269. failed_request_count = 0
  270. for item in selected_contents:
  271. if item.get("status") != "ok":
  272. continue
  273. record_id = int(item["record_id"])
  274. channel_content_id = str(item["unique_key"])
  275. title = str(item.get("content_title") or "").strip()
  276. body_text = str(item.get("body_text") or "").strip()
  277. if not title or not body_text:
  278. continue
  279. post = {
  280. "channelContentId": channel_content_id,
  281. "title": title,
  282. "bodyText": body_text,
  283. "contentModal": 3,
  284. }
  285. decode_payload = {
  286. "params": {
  287. "configId": self.config.decode_config_id,
  288. "skipCompleted": False,
  289. "posts": [post],
  290. }
  291. }
  292. request_count += 1
  293. try:
  294. decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload)
  295. except HotContentFlowError as exc:
  296. decode_resp = {"code": -1, "data": [], "msg": str(exc)}
  297. failed_request_count += 1
  298. decode_item_map = extract_decode_item_map(decode_resp)
  299. decode_item = decode_item_map.get(channel_content_id, {})
  300. status = decode_api_status_to_code(
  301. str(decode_item.get("status") or ""),
  302. has_success_response=int(decode_resp.get("code") or 0) == 0,
  303. )
  304. error_message = None
  305. if int(decode_resp.get("code") or 0) != 0:
  306. error_message = str(decode_resp.get("msg") or decode_resp)
  307. self.repository.update_decode_result(
  308. record_id=record_id,
  309. status=status,
  310. request_json=decode_payload,
  311. response_json=decode_item or decode_resp,
  312. error_message=error_message,
  313. )
  314. if decode_item:
  315. decode_items.append(decode_item)
  316. if request_count == 0:
  317. return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0}
  318. return {
  319. "code": 0 if failed_request_count == 0 else -1,
  320. "data": decode_items,
  321. "request_count": request_count,
  322. "failed_request_count": failed_request_count,
  323. }
  324. def build_summary(
  325. self,
  326. hot_titles: list[dict[str, Any]],
  327. selected_contents: list[dict[str, Any]],
  328. decode_resp: dict[str, Any],
  329. ) -> dict[str, Any]:
  330. decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else []
  331. decode_items = decode_items if isinstance(decode_items, list) else []
  332. return {
  333. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  334. "source_count": len(self.config.sources),
  335. "hot_title_count": len(hot_titles),
  336. "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  337. "selected_failed_count": sum(
  338. 1 for item in selected_contents if item.get("status") != "ok"
  339. ),
  340. "category_rejected_count": sum(
  341. 1 for item in selected_contents if item.get("status") == "category_rejected"
  342. ),
  343. "no_valid_content_count": sum(
  344. 1 for item in selected_contents if item.get("status") == "no_valid_content"
  345. ),
  346. "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  347. "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
  348. "decode_success_count": sum(
  349. 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS"
  350. ),
  351. "decode_pending_count": sum(
  352. 1 for item in decode_items if str(item.get("status") or "") == "PENDING"
  353. ),
  354. "decode_failed_count": sum(
  355. 1 for item in decode_items if str(item.get("status") or "") == "FAILED"
  356. ),
  357. }
  358. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  359. flow_config = config or load_flow_config()
  360. repository = HotContentRepository(flow_config.mysql)
  361. try:
  362. api_client = JsonApiClient(
  363. timeout_seconds=flow_config.request_timeout_seconds,
  364. verify_ssl=flow_config.https_verify_ssl,
  365. )
  366. service = HotContentFlowService(flow_config, repository, api_client)
  367. return service.run()
  368. finally:
  369. repository.close()
  370. def _to_int_or_none(value: Any) -> int | None:
  371. try:
  372. return int(value)
  373. except (TypeError, ValueError):
  374. return None