service.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. """热点内容流程服务。"""
  2. from __future__ import annotations
  3. from datetime import datetime
  4. import json
  5. import time
  6. from typing import Any
  7. from app.hot_content.category_filter import llm_filter_record_content
  8. from app.hot_content.client import (
  9. JsonApiClient,
  10. build_url,
  11. extract_decode_item_map,
  12. extract_keyword_items,
  13. extract_rank_items,
  14. pick_first_valid_content,
  15. render_template,
  16. )
  17. from app.hot_content.config import load_flow_config
  18. from app.hot_content.exceptions import HotContentFlowError
  19. from app.hot_content.repository import HotContentRepository, unique_title_key
  20. from app.hot_content.status import ExecutionStatus, decode_api_status_to_code
  21. from app.hot_content.timezone import SHANGHAI_TZ
  22. from app.hot_content.types import FlowConfig
  23. HOT_RANK_PAYLOAD = {"sort_type": "最热", "category": "news", "cursor": 0}
  24. KEYWORD_PAYLOAD_TEMPLATE = {"keyword": "{title}", "cursor": 0}
  25. class HotContentFlowService:
  26. def __init__(
  27. self,
  28. config: FlowConfig,
  29. repository: HotContentRepository,
  30. api_client: JsonApiClient,
  31. ):
  32. self.config = config
  33. self.repository = repository
  34. self.api_client = api_client
  35. def run(self) -> dict[str, Any]:
  36. hot_titles = self.fetch_and_save_hot_titles()
  37. selected_contents = self.search_and_save_contents(hot_titles)
  38. selected_contents = self.filter_contents_by_category(selected_contents)
  39. decode_resp = self.submit_decode_tasks(selected_contents)
  40. return self.build_summary(hot_titles, selected_contents, decode_resp)
  41. def fetch_and_save_hot_titles(self) -> list[dict[str, Any]]:
  42. saved_titles: list[dict[str, Any]] = []
  43. seen_keys: set[str] = set()
  44. response_cache: dict[str, dict[str, Any]] = {}
  45. for source_config in self.config.sources:
  46. hot_rank_base_url = (
  47. source_config.hot_rank_base_url or self.config.crawapi_base_url
  48. )
  49. hot_rank_path = source_config.hot_rank_path or self.config.hot_rank_path
  50. hot_rank_payload = (
  51. source_config.hot_rank_payload
  52. if source_config.hot_rank_payload is not None
  53. else HOT_RANK_PAYLOAD
  54. )
  55. cache_key = json.dumps(
  56. {
  57. "base_url": hot_rank_base_url,
  58. "path": hot_rank_path,
  59. "payload": hot_rank_payload,
  60. },
  61. ensure_ascii=False,
  62. sort_keys=True,
  63. )
  64. if cache_key not in response_cache:
  65. hot_url = build_url(hot_rank_base_url, hot_rank_path)
  66. response_cache[cache_key] = self.api_client.post_json(
  67. hot_url,
  68. hot_rank_payload,
  69. )
  70. resp = response_cache[cache_key]
  71. rank_items = extract_rank_items(resp, source_config.source)
  72. for rank_item in rank_items:
  73. title = str(rank_item.get("title") or rank_item.get("name") or "").strip()
  74. if not title:
  75. continue
  76. unique_key = unique_title_key(source_config.source, title)
  77. if unique_key in seen_keys:
  78. continue
  79. seen_keys.add(unique_key)
  80. record = self.repository.upsert_record(
  81. source=source_config.source,
  82. title=title,
  83. rank=_to_int_or_none(rank_item.get("rank")),
  84. )
  85. saved_titles.append(
  86. {
  87. "id": record["id"],
  88. "unique_key": record["unique_key"],
  89. "execution_status": record["execution_status"],
  90. "article_title": record["article_title"],
  91. "article_body": record["article_body"],
  92. "article_url": record["article_url"],
  93. "has_decode_request": record["has_decode_request"],
  94. "has_contribution_points": record["has_contribution_points"],
  95. "source": source_config.source,
  96. "title": title,
  97. "rank": _to_int_or_none(rank_item.get("rank")),
  98. "source_config": source_config,
  99. }
  100. )
  101. return saved_titles
  102. def search_and_save_contents(self, hot_titles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  103. selected_contents: list[dict[str, Any]] = []
  104. for hot in hot_titles:
  105. execution_status = int(hot.get("execution_status") or 0)
  106. if execution_status >= ExecutionStatus.DECODE_SUBMITTED or hot.get(
  107. "has_contribution_points"
  108. ):
  109. continue
  110. if execution_status == ExecutionStatus.CATEGORY_FILTER_REJECTED:
  111. continue
  112. existing_body = str(hot.get("article_body") or "").strip()
  113. existing_title = str(hot.get("article_title") or "").strip()
  114. if (
  115. execution_status in {
  116. ExecutionStatus.CONTENT_OK,
  117. ExecutionStatus.CATEGORY_FILTER_PASSED,
  118. }
  119. and existing_title
  120. and existing_body
  121. ):
  122. selected_contents.append(
  123. {
  124. "record_id": hot["id"],
  125. "unique_key": hot["unique_key"],
  126. "hot_title": hot["title"],
  127. "status": "ok",
  128. "content_title": existing_title,
  129. "body_text": existing_body,
  130. "url": str(hot.get("article_url") or ""),
  131. "category_filter_passed": (
  132. execution_status == ExecutionStatus.CATEGORY_FILTER_PASSED
  133. ),
  134. }
  135. )
  136. continue
  137. keyword_url = build_url(self.config.crawapi_base_url, self.config.keyword_search_path)
  138. payload = render_template(
  139. KEYWORD_PAYLOAD_TEMPLATE,
  140. {
  141. "title": hot["title"],
  142. "source": hot["source"],
  143. "record_id": str(hot["id"]),
  144. "unique_key": str(hot["unique_key"]),
  145. "hot_title_id": str(hot["id"]),
  146. },
  147. )
  148. for attempt in range(1, 4): # max 3 retries
  149. try:
  150. resp = self.api_client.post_json(keyword_url, payload)
  151. selected = pick_first_valid_content(extract_keyword_items(resp))
  152. if selected is None:
  153. self.repository.mark_no_valid_content(
  154. record_id=hot["id"],
  155. reason="no valid content",
  156. )
  157. selected_contents.append(
  158. {
  159. "record_id": hot["id"],
  160. "unique_key": hot["unique_key"],
  161. "hot_title": hot["title"],
  162. "status": "no_valid_content",
  163. }
  164. )
  165. break
  166. self.repository.update_article(
  167. record_id=hot["id"],
  168. article_title=selected["content_title"],
  169. article_body=selected["body_text"],
  170. url=selected["url"],
  171. )
  172. selected_contents.append(
  173. {
  174. "record_id": hot["id"],
  175. "unique_key": hot["unique_key"],
  176. "hot_title": hot["title"],
  177. "status": "ok",
  178. **selected,
  179. }
  180. )
  181. break
  182. except HotContentFlowError as exc:
  183. if attempt < 3:
  184. # 简单退避,避免对不稳定接口造成瞬时压力
  185. time.sleep(attempt)
  186. continue
  187. self.repository.update_status(
  188. record_id=hot["id"],
  189. status=ExecutionStatus.CONTENT_REQUEST_FAILED,
  190. error_message=str(exc),
  191. )
  192. selected_contents.append(
  193. {
  194. "record_id": hot["id"],
  195. "unique_key": hot["unique_key"],
  196. "hot_title": hot["title"],
  197. "status": "content_request_failed",
  198. "error": str(exc),
  199. }
  200. )
  201. return selected_contents
  202. def filter_contents_by_category(
  203. self,
  204. selected_contents: list[dict[str, Any]],
  205. ) -> list[dict[str, Any]]:
  206. filtered: list[dict[str, Any]] = []
  207. category_list = list(self.config.category_filter_categories)
  208. total = sum(1 for item in selected_contents if item.get("status") == "ok")
  209. processed = 0
  210. for item in selected_contents:
  211. if item.get("status") != "ok":
  212. filtered.append(item)
  213. continue
  214. if item.get("category_filter_passed"):
  215. filtered.append(item)
  216. continue
  217. record_id = int(item["record_id"])
  218. record = self.repository.get_record_for_category_filter(record_id)
  219. if not record:
  220. self.repository.mark_no_valid_content(
  221. record_id=record_id,
  222. reason="record not found",
  223. )
  224. filtered.append(
  225. {
  226. "record_id": record_id,
  227. "unique_key": item.get("unique_key"),
  228. "status": "no_valid_content",
  229. "reason": "record not found",
  230. }
  231. )
  232. continue
  233. hot_title = str(record.get("title") or "").strip()
  234. content_title = str(record.get("article_title") or "").strip()
  235. body_text = str(record.get("article_body") or "").strip()
  236. if not hot_title or not content_title or not body_text:
  237. self.repository.mark_no_valid_content(
  238. record_id=record_id,
  239. reason="missing title or body",
  240. )
  241. filtered.append(
  242. {
  243. "record_id": record_id,
  244. "unique_key": item.get("unique_key"),
  245. "status": "no_valid_content",
  246. "reason": "missing title or body",
  247. }
  248. )
  249. continue
  250. processed += 1
  251. result = llm_filter_record_content(
  252. record_id=record_id,
  253. hot_title=hot_title,
  254. content_title=content_title,
  255. body_text=body_text,
  256. category_list=category_list,
  257. model=self.config.category_filter_llm_model,
  258. max_attempts=self.config.category_filter_llm_max_attempts,
  259. retry_sleep_seconds=self.config.category_filter_llm_retry_sleep_seconds,
  260. max_tokens=self.config.category_filter_llm_max_tokens,
  261. body_max_chars=self.config.category_filter_body_max_chars,
  262. )
  263. passed = bool(result.get("passed"))
  264. self.repository.update_category_filter_result(
  265. record_id=record_id,
  266. passed=passed,
  267. result_json=result,
  268. )
  269. if passed:
  270. filtered.append(
  271. {
  272. **item,
  273. "category_filter_passed": True,
  274. "matched_category": result.get("matched_category"),
  275. }
  276. )
  277. else:
  278. filtered.append(
  279. {
  280. "record_id": record_id,
  281. "unique_key": item.get("unique_key"),
  282. "status": "category_rejected",
  283. }
  284. )
  285. if (
  286. processed < total
  287. and self.config.category_filter_item_sleep_seconds > 0
  288. ):
  289. time.sleep(self.config.category_filter_item_sleep_seconds)
  290. return filtered
  291. def submit_decode_tasks(self, selected_contents: list[dict[str, Any]]) -> dict[str, Any]:
  292. decode_items: list[dict[str, Any]] = []
  293. request_count = 0
  294. failed_request_count = 0
  295. for item in selected_contents:
  296. if item.get("status") != "ok":
  297. continue
  298. record_id = int(item["record_id"])
  299. channel_content_id = str(item["unique_key"])
  300. title = str(item.get("content_title") or "").strip()
  301. body_text = str(item.get("body_text") or "").strip()
  302. if not title or not body_text:
  303. continue
  304. post = {
  305. "channelContentId": channel_content_id,
  306. "title": title,
  307. "bodyText": body_text,
  308. "contentModal": 3,
  309. }
  310. decode_payload = {
  311. "params": {
  312. "configId": self.config.decode_config_id,
  313. "skipCompleted": False,
  314. "posts": [post],
  315. }
  316. }
  317. request_count += 1
  318. try:
  319. decode_resp = self.api_client.post_json(self.config.decode_api_url, decode_payload)
  320. except HotContentFlowError as exc:
  321. decode_resp = {"code": -1, "data": [], "msg": str(exc)}
  322. failed_request_count += 1
  323. decode_item_map = extract_decode_item_map(decode_resp)
  324. decode_item = decode_item_map.get(channel_content_id, {})
  325. status = decode_api_status_to_code(
  326. str(decode_item.get("status") or ""),
  327. has_success_response=int(decode_resp.get("code") or 0) == 0,
  328. )
  329. error_message = None
  330. if int(decode_resp.get("code") or 0) != 0:
  331. error_message = str(decode_resp.get("msg") or decode_resp)
  332. self.repository.update_decode_result(
  333. record_id=record_id,
  334. status=status,
  335. request_json=decode_payload,
  336. response_json=decode_item or decode_resp,
  337. error_message=error_message,
  338. )
  339. if decode_item:
  340. decode_items.append(decode_item)
  341. if request_count == 0:
  342. return {"code": 0, "data": [], "msg": "no_valid_posts", "request_count": 0}
  343. return {
  344. "code": 0 if failed_request_count == 0 else -1,
  345. "data": decode_items,
  346. "request_count": request_count,
  347. "failed_request_count": failed_request_count,
  348. }
  349. def build_summary(
  350. self,
  351. hot_titles: list[dict[str, Any]],
  352. selected_contents: list[dict[str, Any]],
  353. decode_resp: dict[str, Any],
  354. ) -> dict[str, Any]:
  355. decode_items = decode_resp.get("data") if isinstance(decode_resp, dict) else []
  356. decode_items = decode_items if isinstance(decode_items, list) else []
  357. return {
  358. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  359. "source_count": len(self.config.sources),
  360. "hot_title_count": len(hot_titles),
  361. "selected_ok_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  362. "selected_failed_count": sum(
  363. 1 for item in selected_contents if item.get("status") != "ok"
  364. ),
  365. "category_rejected_count": sum(
  366. 1 for item in selected_contents if item.get("status") == "category_rejected"
  367. ),
  368. "no_valid_content_count": sum(
  369. 1 for item in selected_contents if item.get("status") == "no_valid_content"
  370. ),
  371. "decode_post_count": sum(1 for item in selected_contents if item.get("status") == "ok"),
  372. "decode_api_code": decode_resp.get("code") if isinstance(decode_resp, dict) else None,
  373. "decode_success_count": sum(
  374. 1 for item in decode_items if str(item.get("status") or "") == "SUCCESS"
  375. ),
  376. "decode_pending_count": sum(
  377. 1 for item in decode_items if str(item.get("status") or "") == "PENDING"
  378. ),
  379. "decode_failed_count": sum(
  380. 1 for item in decode_items if str(item.get("status") or "") == "FAILED"
  381. ),
  382. }
  383. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  384. flow_config = config or load_flow_config()
  385. repository = HotContentRepository(flow_config.mysql)
  386. try:
  387. api_client = JsonApiClient(
  388. timeout_seconds=flow_config.request_timeout_seconds,
  389. verify_ssl=flow_config.https_verify_ssl,
  390. )
  391. service = HotContentFlowService(flow_config, repository, api_client)
  392. return service.run()
  393. finally:
  394. repository.close()
  395. def _to_int_or_none(value: Any) -> int | None:
  396. try:
  397. return int(value)
  398. except (TypeError, ValueError):
  399. return None