postprocess_service.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. )
  21. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  22. from app.hot_content.wxindex_trend import calc_wxindex_trend
  23. class WxindexSelectionSkipped(Exception):
  24. """微信指数选词无效时跳过后续查询。"""
  25. def _extract_json_object(text: str) -> dict[str, Any]:
  26. raw = text.strip()
  27. if raw.startswith("```"):
  28. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  29. raw = re.sub(r"\s*```$", "", raw)
  30. try:
  31. parsed = json.loads(raw)
  32. if isinstance(parsed, dict):
  33. return parsed
  34. except json.JSONDecodeError:
  35. pass
  36. match = re.search(r"\{[\s\S]*\}", raw)
  37. if not match:
  38. raise HotContentFlowError("llm output is not json object")
  39. try:
  40. parsed = json.loads(match.group(0))
  41. except json.JSONDecodeError as exc:
  42. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  43. if not isinstance(parsed, dict):
  44. raise HotContentFlowError("llm output is not json object")
  45. return parsed
  46. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  47. today = datetime.now(SHANGHAI_TZ).date()
  48. end_date = today - timedelta(days=1)
  49. start_date = end_date - timedelta(days=max(lookback_days, 1))
  50. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  51. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  52. rows = ((wx_resp.get("data") or {}).get("data") or [])
  53. if not isinstance(rows, list):
  54. return []
  55. series: list[dict[str, Any]] = []
  56. for row in rows:
  57. if not isinstance(row, dict):
  58. continue
  59. ymd = str(row.get("ymd") or "").strip()
  60. total_score = ((row.get("channel_score") or {}).get("total_score"))
  61. try:
  62. score_num = float(total_score) if total_score is not None else None
  63. except (TypeError, ValueError):
  64. score_num = None
  65. if ymd and score_num is not None:
  66. series.append({"ymd": ymd, "total_score": score_num})
  67. series.sort(key=lambda x: x["ymd"])
  68. return series
  69. def _normalize_demand_key(value: str) -> str:
  70. return "".join(value.split())
  71. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  72. lookup: dict[str, str] = {}
  73. for item in demand_name_set:
  74. demand_name = str(item).strip()
  75. if not demand_name:
  76. continue
  77. lookup.setdefault(demand_name, demand_name)
  78. compact_key = _normalize_demand_key(demand_name)
  79. if compact_key:
  80. lookup.setdefault(compact_key, demand_name)
  81. return lookup
  82. def _resolve_demand_name(
  83. demand_name: str,
  84. demand_lookup: dict[str, str],
  85. ) -> str | None:
  86. value = demand_name.strip()
  87. if not value:
  88. return None
  89. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  90. class ContributionPostprocessService:
  91. def __init__(
  92. self,
  93. config: FlowConfig,
  94. repository: HotContentRepository,
  95. api_client: JsonApiClient,
  96. ):
  97. self.config = config
  98. self.repository = repository
  99. self.api_client = api_client
  100. def run(self) -> dict[str, Any]:
  101. records = self.repository.list_postprocess_candidates(
  102. limit=max(self.config.postprocess_batch_size, 1)
  103. )
  104. if not records:
  105. return self._finalize_run_result(
  106. {
  107. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  108. "status": "success",
  109. "candidate_count": 0,
  110. "matched_count": 0,
  111. "wxindex_count": 0,
  112. "skipped_count": 0,
  113. "failed_count": 0,
  114. }
  115. )
  116. cache = DemandCacheService(
  117. self.config,
  118. self.repository,
  119. ).get_or_create_current_hour_cache()
  120. demand_cache_run_id = int(cache["id"])
  121. demand_name_set = cache["demand_name_set"]
  122. if not demand_name_set:
  123. return self._finalize_run_result(
  124. {
  125. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  126. "status": "skipped",
  127. "reason": "empty_demand_cache",
  128. "demand_cache_run_id": demand_cache_run_id,
  129. "cache_source": cache.get("source"),
  130. "processed_count": 0,
  131. }
  132. )
  133. matched_count = 0
  134. wxindex_count = 0
  135. exported_count = 0
  136. skipped_count = 0
  137. failed_count = 0
  138. for record in records:
  139. record_id = int(record["id"])
  140. try:
  141. match_result = record.get("contribution_demand_match_json")
  142. if not isinstance(match_result, dict):
  143. match_result = self.match_record(
  144. record=record,
  145. demand_name_set=demand_name_set,
  146. demand_cache_run_id=demand_cache_run_id,
  147. )
  148. self.repository.save_contribution_demand_match(
  149. record_id=record_id,
  150. demand_cache_run_id=demand_cache_run_id,
  151. match_json=match_result,
  152. )
  153. matched_count += 1
  154. sanitized_match_result = self.sanitize_match_result(
  155. match_result,
  156. demand_name_set=demand_name_set,
  157. demand_cache_run_id=demand_cache_run_id,
  158. )
  159. if sanitized_match_result != match_result:
  160. match_result = sanitized_match_result
  161. self.repository.save_contribution_demand_match(
  162. record_id=record_id,
  163. demand_cache_run_id=demand_cache_run_id,
  164. match_json=match_result,
  165. )
  166. trend_result = self.build_wxindex_trend(record, match_result)
  167. if trend_result is None:
  168. self.repository.update_postprocess_status(
  169. record_id=record_id,
  170. status=PostprocessStatus.SKIPPED,
  171. error_message="no matched demand words",
  172. )
  173. exported_count += self.export_demand_terms_if_needed(
  174. record=record,
  175. match_result=match_result,
  176. trend_result=None,
  177. )
  178. skipped_count += 1
  179. continue
  180. self.repository.save_wxindex_trend(
  181. record_id=record_id,
  182. trend_json=trend_result,
  183. )
  184. exported_count += self.export_demand_terms_if_needed(
  185. record=record,
  186. match_result=match_result,
  187. trend_result=trend_result,
  188. )
  189. wxindex_count += 1
  190. except WxindexSelectionSkipped as exc:
  191. self.repository.update_postprocess_status(
  192. record_id=record_id,
  193. status=PostprocessStatus.SKIPPED,
  194. error_message=str(exc),
  195. )
  196. if isinstance(match_result, dict):
  197. exported_count += self.export_demand_terms_if_needed(
  198. record=record,
  199. match_result=match_result,
  200. trend_result=None,
  201. )
  202. skipped_count += 1
  203. except Exception as exc:
  204. self.repository.update_postprocess_status(
  205. record_id=record_id,
  206. status=PostprocessStatus.FAILED,
  207. error_message=str(exc),
  208. )
  209. failed_count += 1
  210. return self._finalize_run_result(
  211. {
  212. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  213. "status": "success",
  214. "demand_cache_run_id": demand_cache_run_id,
  215. "cache_source": cache.get("source"),
  216. "cache_hour": str(cache.get("cache_hour") or ""),
  217. "demand_name_count": len(demand_name_set),
  218. "candidate_count": len(records),
  219. "matched_count": matched_count,
  220. "wxindex_count": wxindex_count,
  221. "exported_count": exported_count,
  222. "skipped_count": skipped_count,
  223. "failed_count": failed_count,
  224. }
  225. )
  226. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  227. try:
  228. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  229. except Exception as exc:
  230. result["hive_sync_error"] = str(exc)
  231. return result
  232. def export_demand_terms_if_needed(
  233. self,
  234. *,
  235. record: dict[str, Any],
  236. match_result: dict[str, Any],
  237. trend_result: dict[str, Any] | None,
  238. ) -> int:
  239. export_rows = attach_wxindex_metadata(
  240. build_demand_export_rows(
  241. match_result,
  242. contribution_points=(
  243. record.get("contribution_points_json")
  244. if isinstance(record.get("contribution_points_json"), dict)
  245. else None
  246. ),
  247. trend_json=trend_result if isinstance(trend_result, dict) else None,
  248. ),
  249. trend_result if isinstance(trend_result, dict) else None,
  250. )
  251. self.repository.replace_demand_export_rows(
  252. record_id=int(record["id"]),
  253. source=str(record.get("source") or ""),
  254. hot_title=str(record.get("title") or ""),
  255. article_title=str(record.get("article_title") or ""),
  256. rows=export_rows,
  257. )
  258. return len(export_rows)
  259. def match_record(
  260. self,
  261. *,
  262. record: dict[str, Any],
  263. demand_name_set: list[str],
  264. demand_cache_run_id: int,
  265. ) -> dict[str, Any]:
  266. contribution_points = record.get("contribution_points_json")
  267. if not isinstance(contribution_points, dict):
  268. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  269. channel_content_id = str(
  270. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  271. ).strip()
  272. words_rows = contribution_points.get("高贡献词列表") or []
  273. if not isinstance(words_rows, list):
  274. words_rows = []
  275. word_list = [
  276. str(item.get("词") or "").strip()
  277. for item in words_rows
  278. if isinstance(item, dict) and str(item.get("词") or "").strip()
  279. ]
  280. llm_result = self.llm_match_single_article(
  281. channel_content_id=channel_content_id or str(record["unique_key"]),
  282. words=word_list,
  283. demand_name_set=demand_name_set,
  284. )
  285. demand_lookup = _build_demand_lookup(demand_name_set)
  286. matched_map: dict[str, list[dict[str, str]]] = {}
  287. for item in llm_result.get("matched") or []:
  288. if not isinstance(item, dict):
  289. continue
  290. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  291. demand_name = str(item.get("demand_name") or "").strip()
  292. reason = str(item.get("reason") or "").strip()
  293. if not word or word not in word_list:
  294. continue
  295. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  296. if canonical_demand_name is None:
  297. continue
  298. matched_map.setdefault(word, []).append(
  299. {
  300. "demand_name": canonical_demand_name,
  301. "reason": reason,
  302. }
  303. )
  304. output_words: list[dict[str, Any]] = []
  305. matched_word_rows: list[dict[str, Any]] = []
  306. for row in words_rows:
  307. if not isinstance(row, dict):
  308. continue
  309. word = str(row.get("词") or "").strip()
  310. item = {"词": word, "贡献度": row.get("贡献度")}
  311. if word in matched_map:
  312. item["匹配需求列表"] = matched_map[word]
  313. matched_word_rows.append(item)
  314. output_words.append(item)
  315. matched_points = self.filter_matched_points(
  316. contribution_points.get("点列表"),
  317. set(matched_map.keys()),
  318. )
  319. return {
  320. "channelContentId": channel_content_id,
  321. "demand_cache_run_id": demand_cache_run_id,
  322. "高贡献词列表": output_words,
  323. "匹配到需求的词列表": matched_word_rows,
  324. "点列表": matched_points,
  325. }
  326. def sanitize_match_result(
  327. self,
  328. match_result: dict[str, Any],
  329. *,
  330. demand_name_set: list[str],
  331. demand_cache_run_id: int,
  332. ) -> dict[str, Any]:
  333. demand_lookup = _build_demand_lookup(demand_name_set)
  334. words_rows = match_result.get("高贡献词列表") or []
  335. if not isinstance(words_rows, list):
  336. words_rows = []
  337. output_words: list[dict[str, Any]] = []
  338. matched_word_rows: list[dict[str, Any]] = []
  339. valid_words: set[str] = set()
  340. for row in words_rows:
  341. if not isinstance(row, dict):
  342. continue
  343. word = str(row.get("词") or "").strip()
  344. item = {"词": word, "贡献度": row.get("贡献度")}
  345. match_rows = row.get("匹配需求列表") or []
  346. if not isinstance(match_rows, list):
  347. match_rows = []
  348. valid_match_rows: list[dict[str, str]] = []
  349. for match in match_rows:
  350. if not isinstance(match, dict):
  351. continue
  352. demand_name = str(match.get("demand_name") or "").strip()
  353. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  354. if canonical_demand_name is None:
  355. continue
  356. valid_match_rows.append(
  357. {
  358. "demand_name": canonical_demand_name,
  359. "reason": str(match.get("reason") or "").strip(),
  360. }
  361. )
  362. if word and valid_match_rows:
  363. item["匹配需求列表"] = valid_match_rows
  364. matched_word_rows.append(item)
  365. valid_words.add(word)
  366. output_words.append(item)
  367. return {
  368. "channelContentId": str(match_result.get("channelContentId") or ""),
  369. "demand_cache_run_id": demand_cache_run_id,
  370. "高贡献词列表": output_words,
  371. "匹配到需求的词列表": matched_word_rows,
  372. "点列表": self.filter_matched_points(
  373. match_result.get("点列表"),
  374. valid_words,
  375. ),
  376. }
  377. def llm_match_single_article(
  378. self,
  379. *,
  380. channel_content_id: str,
  381. words: list[str],
  382. demand_name_set: list[str],
  383. ) -> dict[str, Any]:
  384. if not words:
  385. return {"source": channel_content_id, "matched": []}
  386. system_prompt = """
  387. #角色
  388. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  389. # 任务
  390. 我会提供两组数据:
  391. 1. 热点词列表:一组待匹配的热点词语
  392. 2. 需求词库:一组已有的需求词语
  393. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  394. 热点词要等于需求词,或者属于需求词,或者表达了相同含义。
  395. # 输出规则
  396. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  397. # 约束
  398. 1. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  399. """
  400. user_payload = {
  401. "source": channel_content_id,
  402. "words": words,
  403. "demand_name_set": demand_name_set,
  404. "output_schema": {
  405. "source": "string",
  406. "matched": [
  407. {
  408. "title": "string, must be selected from words",
  409. "demand_name": "string, must be selected from demand_name_set",
  410. "reason": "string",
  411. }
  412. ],
  413. },
  414. }
  415. last_error: Exception | None = None
  416. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  417. try:
  418. resp = create_chat_completion(
  419. [
  420. {"role": "system", "content": system_prompt},
  421. {
  422. "role": "user",
  423. "content": json.dumps(user_payload, ensure_ascii=False),
  424. },
  425. ],
  426. model=self.config.contribution_match_llm_model or None,
  427. temperature=0,
  428. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  429. )
  430. parsed = _extract_json_object(str(resp.get("content") or ""))
  431. parsed.setdefault("source", channel_content_id)
  432. parsed.setdefault("matched", [])
  433. return parsed
  434. except (OpenRouterCallError, HotContentFlowError) as exc:
  435. last_error = exc
  436. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  437. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  438. raise HotContentFlowError(
  439. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  440. ) from last_error
  441. def build_wxindex_trend(
  442. self,
  443. record: dict[str, Any],
  444. match_result: dict[str, Any],
  445. ) -> dict[str, Any] | None:
  446. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  447. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  448. return None
  449. candidate_words = [
  450. str(row.get("词") or "").strip()
  451. for row in matched_word_rows
  452. if isinstance(row, dict) and str(row.get("词") or "").strip()
  453. ]
  454. if not candidate_words:
  455. return None
  456. channel_content_id = str(
  457. match_result.get("channelContentId") or record.get("unique_key") or ""
  458. )
  459. article_title, body_text = self.extract_article_text(record)
  460. if len(candidate_words) == 1:
  461. pick = {
  462. "selected_word": candidate_words[0],
  463. "reason": "only one candidate word",
  464. }
  465. else:
  466. pick = self.llm_pick_best_word(
  467. channel_content_id=channel_content_id,
  468. article_title=article_title,
  469. body_text=body_text,
  470. candidate_words=candidate_words,
  471. )
  472. selected_word = pick["selected_word"]
  473. start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
  474. wx_payload = {
  475. "keyword": selected_word,
  476. "start_ymd": start_ymd,
  477. "end_ymd": end_ymd,
  478. }
  479. wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
  480. series = _parse_total_scores(wx_resp)
  481. latest_score = series[-1]["total_score"] if series else None
  482. threshold = float(self.config.wxindex_score_threshold)
  483. return {
  484. "channelContentId": channel_content_id,
  485. "article_title": article_title,
  486. "llm_selected_word": selected_word,
  487. "llm_reason": pick["reason"],
  488. "wxindex": {
  489. "keyword": selected_word,
  490. "start_ymd": start_ymd,
  491. "end_ymd": end_ymd,
  492. "total_score_7d": series,
  493. "latest_total_score": latest_score,
  494. "threshold": threshold,
  495. "latest_gt_threshold": (
  496. False
  497. if latest_score is None
  498. else latest_score >= threshold
  499. ),
  500. "trend": calc_wxindex_trend(series),
  501. },
  502. }
  503. def llm_pick_best_word(
  504. self,
  505. *,
  506. channel_content_id: str,
  507. article_title: str,
  508. body_text: str,
  509. candidate_words: list[str],
  510. ) -> dict[str, str]:
  511. system_prompt = """
  512. #角色
  513. 你是一个专业的语义分析专家,擅长精准概括整篇文章。
  514. # 任务
  515. 我会提供一篇文章的标题、正文和候选词列表,请你选择一个最能代表文章内容的词。
  516. # 输出规则
  517. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  518. """
  519. user_payload = {
  520. "source": channel_content_id,
  521. "article_title": article_title,
  522. "article_body_text": body_text,
  523. "candidate_words": candidate_words,
  524. "output_schema": {
  525. "source": "string",
  526. "selected_word": "string, must be selected from candidate_words",
  527. "reason": "string",
  528. },
  529. "constraints": [
  530. "selected_word 必须来自 candidate_words",
  531. "reason 简洁说明,不超过40字",
  532. "仅输出 JSON 对象,不要 markdown 代码块",
  533. ],
  534. }
  535. last_error: Exception | None = None
  536. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  537. try:
  538. resp = create_chat_completion(
  539. [
  540. {"role": "system", "content": system_prompt},
  541. {
  542. "role": "user",
  543. "content": json.dumps(user_payload, ensure_ascii=False),
  544. },
  545. ],
  546. model=self.config.wxindex_llm_model or None,
  547. temperature=0,
  548. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  549. )
  550. parsed = _extract_json_object(str(resp.get("content") or ""))
  551. selected_word = str(parsed.get("selected_word") or "").strip()
  552. reason = str(parsed.get("reason") or "").strip()
  553. if selected_word not in candidate_words:
  554. raise WxindexSelectionSkipped(
  555. f"selected_word not in candidates for {channel_content_id}: "
  556. f"{selected_word}"
  557. )
  558. return {"selected_word": selected_word, "reason": reason}
  559. except (OpenRouterCallError, HotContentFlowError) as exc:
  560. last_error = exc
  561. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  562. continue
  563. raise HotContentFlowError(
  564. f"llm pick word failed for {channel_content_id}: {last_error}"
  565. ) from last_error
  566. @staticmethod
  567. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  568. if not matched_words or not isinstance(raw_points, list):
  569. return []
  570. matched_points: list[dict[str, Any]] = []
  571. for point in raw_points:
  572. if not isinstance(point, dict):
  573. continue
  574. point_match_rows = point.get("匹配词列表") or []
  575. if not isinstance(point_match_rows, list):
  576. point_match_rows = []
  577. keep_rows = [
  578. row
  579. for row in point_match_rows
  580. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  581. ]
  582. if keep_rows:
  583. matched_points.append(
  584. {
  585. "来源": str(point.get("来源") or ""),
  586. "点": str(point.get("点") or ""),
  587. "点描述": str(point.get("点描述") or ""),
  588. "匹配词列表": keep_rows,
  589. }
  590. )
  591. return matched_points
  592. @staticmethod
  593. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  594. decode_result = record.get("decode_result_json")
  595. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  596. if not isinstance(target_post, dict):
  597. target_post = {}
  598. article_title = str(
  599. target_post.get("title") or record.get("article_title") or ""
  600. ).strip()
  601. body_text = str(
  602. target_post.get("body_text") or record.get("article_body") or ""
  603. ).strip()
  604. return article_title, body_text
  605. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  606. flow_config = config or load_flow_config()
  607. repository = HotContentRepository(flow_config.mysql)
  608. try:
  609. api_client = JsonApiClient(
  610. timeout_seconds=flow_config.request_timeout_seconds,
  611. verify_ssl=flow_config.https_verify_ssl,
  612. )
  613. service = ContributionPostprocessService(flow_config, repository, api_client)
  614. return service.run()
  615. finally:
  616. repository.close()