postprocess_service.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. )
  21. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  22. from app.hot_content.wxindex_trend import calc_wxindex_trend
  23. class WxindexSelectionSkipped(Exception):
  24. """微信指数选词无效时跳过后续查询。"""
  25. def _extract_json_object(text: str) -> dict[str, Any]:
  26. raw = text.strip()
  27. if raw.startswith("```"):
  28. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  29. raw = re.sub(r"\s*```$", "", raw)
  30. try:
  31. parsed = json.loads(raw)
  32. if isinstance(parsed, dict):
  33. return parsed
  34. except json.JSONDecodeError:
  35. pass
  36. match = re.search(r"\{[\s\S]*\}", raw)
  37. if not match:
  38. raise HotContentFlowError("llm output is not json object")
  39. try:
  40. parsed = json.loads(match.group(0))
  41. except json.JSONDecodeError as exc:
  42. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  43. if not isinstance(parsed, dict):
  44. raise HotContentFlowError("llm output is not json object")
  45. return parsed
  46. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  47. today = datetime.now(SHANGHAI_TZ).date()
  48. end_date = today - timedelta(days=1)
  49. start_date = end_date - timedelta(days=max(lookback_days, 1))
  50. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  51. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  52. rows = ((wx_resp.get("data") or {}).get("data") or [])
  53. if not isinstance(rows, list):
  54. return []
  55. series: list[dict[str, Any]] = []
  56. for row in rows:
  57. if not isinstance(row, dict):
  58. continue
  59. ymd = str(row.get("ymd") or "").strip()
  60. total_score = ((row.get("channel_score") or {}).get("total_score"))
  61. try:
  62. score_num = float(total_score) if total_score is not None else None
  63. except (TypeError, ValueError):
  64. score_num = None
  65. if ymd and score_num is not None:
  66. series.append({"ymd": ymd, "total_score": score_num})
  67. series.sort(key=lambda x: x["ymd"])
  68. return series
  69. def _normalize_demand_key(value: str) -> str:
  70. return "".join(value.split())
  71. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  72. lookup: dict[str, str] = {}
  73. for item in demand_name_set:
  74. demand_name = str(item).strip()
  75. if not demand_name:
  76. continue
  77. lookup.setdefault(demand_name, demand_name)
  78. compact_key = _normalize_demand_key(demand_name)
  79. if compact_key:
  80. lookup.setdefault(compact_key, demand_name)
  81. return lookup
  82. def _resolve_demand_name(
  83. demand_name: str,
  84. demand_lookup: dict[str, str],
  85. ) -> str | None:
  86. value = demand_name.strip()
  87. if not value:
  88. return None
  89. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  90. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  91. demand_names: list[str] = []
  92. seen: set[str] = set()
  93. for row in matched_word_rows:
  94. if not isinstance(row, dict):
  95. continue
  96. match_rows = row.get("匹配需求列表") or []
  97. if not isinstance(match_rows, list):
  98. continue
  99. for match in match_rows:
  100. if not isinstance(match, dict):
  101. continue
  102. demand_name = str(match.get("demand_name") or "").strip()
  103. if demand_name and demand_name not in seen:
  104. seen.add(demand_name)
  105. demand_names.append(demand_name)
  106. return demand_names
  107. class ContributionPostprocessService:
  108. def __init__(
  109. self,
  110. config: FlowConfig,
  111. repository: HotContentRepository,
  112. api_client: JsonApiClient,
  113. ):
  114. self.config = config
  115. self.repository = repository
  116. self.api_client = api_client
  117. def run(self) -> dict[str, Any]:
  118. records = self.repository.list_postprocess_candidates(
  119. limit=max(self.config.postprocess_batch_size, 1)
  120. )
  121. if not records:
  122. return self._finalize_run_result(
  123. {
  124. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  125. "status": "success",
  126. "candidate_count": 0,
  127. "matched_count": 0,
  128. "wxindex_count": 0,
  129. "skipped_count": 0,
  130. "failed_count": 0,
  131. }
  132. )
  133. cache = DemandCacheService(
  134. self.config,
  135. self.repository,
  136. ).get_or_create_current_hour_cache()
  137. demand_cache_run_id = int(cache["id"])
  138. demand_name_set = cache["demand_name_set"]
  139. if not demand_name_set:
  140. return self._finalize_run_result(
  141. {
  142. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  143. "status": "skipped",
  144. "reason": "empty_demand_cache",
  145. "demand_cache_run_id": demand_cache_run_id,
  146. "cache_source": cache.get("source"),
  147. "processed_count": 0,
  148. }
  149. )
  150. matched_count = 0
  151. wxindex_count = 0
  152. exported_count = 0
  153. skipped_count = 0
  154. failed_count = 0
  155. for record in records:
  156. record_id = int(record["id"])
  157. try:
  158. match_result = record.get("contribution_demand_match_json")
  159. if not isinstance(match_result, dict):
  160. match_result = self.match_record(
  161. record=record,
  162. demand_name_set=demand_name_set,
  163. demand_cache_run_id=demand_cache_run_id,
  164. )
  165. self.repository.save_contribution_demand_match(
  166. record_id=record_id,
  167. demand_cache_run_id=demand_cache_run_id,
  168. match_json=match_result,
  169. )
  170. matched_count += 1
  171. sanitized_match_result = self.sanitize_match_result(
  172. match_result,
  173. demand_name_set=demand_name_set,
  174. demand_cache_run_id=demand_cache_run_id,
  175. )
  176. if sanitized_match_result != match_result:
  177. match_result = sanitized_match_result
  178. self.repository.save_contribution_demand_match(
  179. record_id=record_id,
  180. demand_cache_run_id=demand_cache_run_id,
  181. match_json=match_result,
  182. )
  183. trend_result = self.build_wxindex_trend(record, match_result)
  184. if trend_result is None:
  185. self.repository.update_postprocess_status(
  186. record_id=record_id,
  187. status=PostprocessStatus.SKIPPED,
  188. error_message="no matched demand words",
  189. )
  190. exported_count += self.export_demand_terms_if_needed(
  191. record=record,
  192. match_result=match_result,
  193. trend_result=None,
  194. )
  195. skipped_count += 1
  196. continue
  197. self.repository.save_wxindex_trend(
  198. record_id=record_id,
  199. trend_json=trend_result,
  200. )
  201. exported_count += self.export_demand_terms_if_needed(
  202. record=record,
  203. match_result=match_result,
  204. trend_result=trend_result,
  205. )
  206. wxindex_count += 1
  207. except WxindexSelectionSkipped as exc:
  208. self.repository.update_postprocess_status(
  209. record_id=record_id,
  210. status=PostprocessStatus.SKIPPED,
  211. error_message=str(exc),
  212. )
  213. if isinstance(match_result, dict):
  214. exported_count += self.export_demand_terms_if_needed(
  215. record=record,
  216. match_result=match_result,
  217. trend_result=None,
  218. )
  219. skipped_count += 1
  220. except Exception as exc:
  221. self.repository.update_postprocess_status(
  222. record_id=record_id,
  223. status=PostprocessStatus.FAILED,
  224. error_message=str(exc),
  225. )
  226. failed_count += 1
  227. return self._finalize_run_result(
  228. {
  229. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  230. "status": "success",
  231. "demand_cache_run_id": demand_cache_run_id,
  232. "cache_source": cache.get("source"),
  233. "cache_hour": str(cache.get("cache_hour") or ""),
  234. "demand_name_count": len(demand_name_set),
  235. "candidate_count": len(records),
  236. "matched_count": matched_count,
  237. "wxindex_count": wxindex_count,
  238. "exported_count": exported_count,
  239. "skipped_count": skipped_count,
  240. "failed_count": failed_count,
  241. }
  242. )
  243. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  244. try:
  245. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  246. except Exception as exc:
  247. result["hive_sync_error"] = str(exc)
  248. return result
  249. def export_demand_terms_if_needed(
  250. self,
  251. *,
  252. record: dict[str, Any],
  253. match_result: dict[str, Any],
  254. trend_result: dict[str, Any] | None,
  255. ) -> int:
  256. export_rows = attach_wxindex_metadata(
  257. build_demand_export_rows(
  258. match_result,
  259. contribution_points=(
  260. record.get("contribution_points_json")
  261. if isinstance(record.get("contribution_points_json"), dict)
  262. else None
  263. ),
  264. trend_json=trend_result if isinstance(trend_result, dict) else None,
  265. ),
  266. trend_result if isinstance(trend_result, dict) else None,
  267. )
  268. self.repository.replace_demand_export_rows(
  269. record_id=int(record["id"]),
  270. source=str(record.get("source") or ""),
  271. hot_title=str(record.get("title") or ""),
  272. article_title=str(record.get("article_title") or ""),
  273. rows=export_rows,
  274. )
  275. return len(export_rows)
  276. def match_record(
  277. self,
  278. *,
  279. record: dict[str, Any],
  280. demand_name_set: list[str],
  281. demand_cache_run_id: int,
  282. ) -> dict[str, Any]:
  283. contribution_points = record.get("contribution_points_json")
  284. if not isinstance(contribution_points, dict):
  285. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  286. channel_content_id = str(
  287. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  288. ).strip()
  289. words_rows = contribution_points.get("高贡献词列表") or []
  290. if not isinstance(words_rows, list):
  291. words_rows = []
  292. word_list = [
  293. str(item.get("词") or "").strip()
  294. for item in words_rows
  295. if isinstance(item, dict) and str(item.get("词") or "").strip()
  296. ]
  297. llm_result = self.llm_match_single_article(
  298. channel_content_id=channel_content_id or str(record["unique_key"]),
  299. words=word_list,
  300. demand_name_set=demand_name_set,
  301. )
  302. demand_lookup = _build_demand_lookup(demand_name_set)
  303. matched_map: dict[str, list[dict[str, str]]] = {}
  304. for item in llm_result.get("matched") or []:
  305. if not isinstance(item, dict):
  306. continue
  307. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  308. demand_name = str(item.get("demand_name") or "").strip()
  309. reason = str(item.get("reason") or "").strip()
  310. if not word or word not in word_list:
  311. continue
  312. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  313. if canonical_demand_name is None:
  314. continue
  315. matched_map.setdefault(word, []).append(
  316. {
  317. "demand_name": canonical_demand_name,
  318. "reason": reason,
  319. }
  320. )
  321. output_words: list[dict[str, Any]] = []
  322. matched_word_rows: list[dict[str, Any]] = []
  323. for row in words_rows:
  324. if not isinstance(row, dict):
  325. continue
  326. word = str(row.get("词") or "").strip()
  327. item = {"词": word, "贡献度": row.get("贡献度")}
  328. if word in matched_map:
  329. item["匹配需求列表"] = matched_map[word]
  330. matched_word_rows.append(item)
  331. output_words.append(item)
  332. matched_points = self.filter_matched_points(
  333. contribution_points.get("点列表"),
  334. set(matched_map.keys()),
  335. )
  336. return {
  337. "channelContentId": channel_content_id,
  338. "demand_cache_run_id": demand_cache_run_id,
  339. "高贡献词列表": output_words,
  340. "匹配到需求的词列表": matched_word_rows,
  341. "点列表": matched_points,
  342. }
  343. def sanitize_match_result(
  344. self,
  345. match_result: dict[str, Any],
  346. *,
  347. demand_name_set: list[str],
  348. demand_cache_run_id: int,
  349. ) -> dict[str, Any]:
  350. demand_lookup = _build_demand_lookup(demand_name_set)
  351. words_rows = match_result.get("高贡献词列表") or []
  352. if not isinstance(words_rows, list):
  353. words_rows = []
  354. output_words: list[dict[str, Any]] = []
  355. matched_word_rows: list[dict[str, Any]] = []
  356. valid_words: set[str] = set()
  357. for row in words_rows:
  358. if not isinstance(row, dict):
  359. continue
  360. word = str(row.get("词") or "").strip()
  361. item = {"词": word, "贡献度": row.get("贡献度")}
  362. match_rows = row.get("匹配需求列表") or []
  363. if not isinstance(match_rows, list):
  364. match_rows = []
  365. valid_match_rows: list[dict[str, str]] = []
  366. for match in match_rows:
  367. if not isinstance(match, dict):
  368. continue
  369. demand_name = str(match.get("demand_name") or "").strip()
  370. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  371. if canonical_demand_name is None:
  372. continue
  373. valid_match_rows.append(
  374. {
  375. "demand_name": canonical_demand_name,
  376. "reason": str(match.get("reason") or "").strip(),
  377. }
  378. )
  379. if word and valid_match_rows:
  380. item["匹配需求列表"] = valid_match_rows
  381. matched_word_rows.append(item)
  382. valid_words.add(word)
  383. output_words.append(item)
  384. return {
  385. "channelContentId": str(match_result.get("channelContentId") or ""),
  386. "demand_cache_run_id": demand_cache_run_id,
  387. "高贡献词列表": output_words,
  388. "匹配到需求的词列表": matched_word_rows,
  389. "点列表": self.filter_matched_points(
  390. match_result.get("点列表"),
  391. valid_words,
  392. ),
  393. }
  394. def llm_match_single_article(
  395. self,
  396. *,
  397. channel_content_id: str,
  398. words: list[str],
  399. demand_name_set: list[str],
  400. ) -> dict[str, Any]:
  401. if not words:
  402. return {"source": channel_content_id, "matched": []}
  403. system_prompt = """
  404. #角色
  405. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  406. # 任务
  407. 我会提供两组数据:
  408. 1. 热点词列表:一组待匹配的热点词语
  409. 2. 需求词库:一组已有的需求词语
  410. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  411. 热点词要等于需求词,或者属于需求词,或者表达了相同含义。
  412. # 输出规则
  413. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  414. # 约束
  415. 1. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  416. """
  417. user_payload = {
  418. "source": channel_content_id,
  419. "words": words,
  420. "demand_name_set": demand_name_set,
  421. "output_schema": {
  422. "source": "string",
  423. "matched": [
  424. {
  425. "title": "string, must be selected from words",
  426. "demand_name": "string, must be selected from demand_name_set",
  427. "reason": "string",
  428. }
  429. ],
  430. },
  431. }
  432. last_error: Exception | None = None
  433. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  434. try:
  435. resp = create_chat_completion(
  436. [
  437. {"role": "system", "content": system_prompt},
  438. {
  439. "role": "user",
  440. "content": json.dumps(user_payload, ensure_ascii=False),
  441. },
  442. ],
  443. model=self.config.contribution_match_llm_model or None,
  444. temperature=0,
  445. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  446. )
  447. parsed = _extract_json_object(str(resp.get("content") or ""))
  448. parsed.setdefault("source", channel_content_id)
  449. parsed.setdefault("matched", [])
  450. return parsed
  451. except (OpenRouterCallError, HotContentFlowError) as exc:
  452. last_error = exc
  453. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  454. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  455. raise HotContentFlowError(
  456. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  457. ) from last_error
  458. def build_wxindex_trend(
  459. self,
  460. record: dict[str, Any],
  461. match_result: dict[str, Any],
  462. ) -> dict[str, Any] | None:
  463. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  464. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  465. return None
  466. contribution_words = [
  467. str(row.get("词") or "").strip()
  468. for row in matched_word_rows
  469. if isinstance(row, dict) and str(row.get("词") or "").strip()
  470. ]
  471. if not contribution_words:
  472. return None
  473. channel_content_id = str(
  474. match_result.get("channelContentId") or record.get("unique_key") or ""
  475. )
  476. article_title, body_text = self.extract_article_text(record)
  477. matched_demands = _collect_matched_demand_names(matched_word_rows)
  478. pick = self.llm_extract_wxindex_words(
  479. channel_content_id=channel_content_id,
  480. article_title=article_title,
  481. body_text=body_text,
  482. contribution_words=contribution_words,
  483. matched_demands=matched_demands,
  484. )
  485. selected_words = pick["selected_words"]
  486. start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
  487. threshold = float(self.config.wxindex_score_threshold)
  488. wxindex_searches: list[dict[str, Any]] = []
  489. for keyword in selected_words:
  490. wx_payload = {
  491. "keyword": keyword,
  492. "start_ymd": start_ymd,
  493. "end_ymd": end_ymd,
  494. }
  495. wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
  496. series = _parse_total_scores(wx_resp)
  497. latest_score = series[-1]["total_score"] if series else None
  498. wxindex_searches.append(
  499. {
  500. "keyword": keyword,
  501. "start_ymd": start_ymd,
  502. "end_ymd": end_ymd,
  503. "total_score_7d": series,
  504. "latest_total_score": latest_score,
  505. "threshold": threshold,
  506. "latest_gt_threshold": (
  507. False
  508. if latest_score is None
  509. else latest_score >= threshold
  510. ),
  511. "trend": calc_wxindex_trend(series),
  512. }
  513. )
  514. searchable = [
  515. item
  516. for item in wxindex_searches
  517. if item.get("latest_total_score") is not None
  518. ]
  519. if not searchable:
  520. raise WxindexSelectionSkipped(
  521. f"no wxindex score for any keyword in {channel_content_id}: "
  522. f"{selected_words}"
  523. )
  524. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  525. selected_word = str(best["keyword"])
  526. latest_score = best["latest_total_score"]
  527. series = best["total_score_7d"]
  528. return {
  529. "channelContentId": channel_content_id,
  530. "article_title": article_title,
  531. "llm_selected_words": selected_words,
  532. "llm_selected_word": selected_word,
  533. "llm_reason": pick["reason"],
  534. "wxindex_searches": wxindex_searches,
  535. "wxindex": {
  536. "keyword": selected_word,
  537. "keywords": selected_words,
  538. "start_ymd": start_ymd,
  539. "end_ymd": end_ymd,
  540. "total_score_7d": series,
  541. "latest_total_score": latest_score,
  542. "threshold": threshold,
  543. "latest_gt_threshold": latest_score >= threshold,
  544. "trend": best["trend"],
  545. },
  546. }
  547. def llm_extract_wxindex_words(
  548. self,
  549. *,
  550. channel_content_id: str,
  551. article_title: str,
  552. body_text: str,
  553. contribution_words: list[str],
  554. matched_demands: list[str],
  555. ) -> dict[str, Any]:
  556. system_prompt = """
  557. #角色
  558. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  559. # 任务
  560. 我会提供文章标题、正文,以及两类备选词来源:
  561. 1. 高贡献词:文章贡献度较高的关键词
  562. 2. 已匹配需求:已与需求库匹配上的需求名
  563. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  564. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  565. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  566. # 输出规则
  567. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  568. """
  569. user_payload = {
  570. "source": channel_content_id,
  571. "article_title": article_title,
  572. "article_body_text": body_text,
  573. "contribution_words": contribution_words,
  574. "matched_demands": matched_demands,
  575. "output_schema": {
  576. "source": "string",
  577. "selected_words": [
  578. "string, concise keyword for wxindex search, one or more"
  579. ],
  580. "reason": "string",
  581. },
  582. "constraints": [
  583. "selected_words 为数组,至少 1 个词,可多个",
  584. "每个词简洁(2-4 字),适合微信指数检索",
  585. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  586. "多个词应分别覆盖不同事件或角度,避免语义重复",
  587. "reason 简洁说明,不超过60字",
  588. "仅输出 JSON 对象,不要 markdown 代码块",
  589. ],
  590. }
  591. last_error: Exception | None = None
  592. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  593. try:
  594. resp = create_chat_completion(
  595. [
  596. {"role": "system", "content": system_prompt},
  597. {
  598. "role": "user",
  599. "content": json.dumps(user_payload, ensure_ascii=False),
  600. },
  601. ],
  602. model=self.config.wxindex_llm_model or None,
  603. temperature=0,
  604. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  605. )
  606. parsed = _extract_json_object(str(resp.get("content") or ""))
  607. raw_words = parsed.get("selected_words")
  608. if isinstance(raw_words, str):
  609. raw_words = [raw_words]
  610. if not isinstance(raw_words, list):
  611. legacy_word = str(parsed.get("selected_word") or "").strip()
  612. raw_words = [legacy_word] if legacy_word else []
  613. selected_words: list[str] = []
  614. seen: set[str] = set()
  615. for item in raw_words:
  616. word = str(item or "").strip()
  617. if word and word not in seen:
  618. seen.add(word)
  619. selected_words.append(word)
  620. reason = str(parsed.get("reason") or "").strip()
  621. if not selected_words:
  622. raise WxindexSelectionSkipped(
  623. f"selected_words empty for {channel_content_id}"
  624. )
  625. return {"selected_words": selected_words, "reason": reason}
  626. except WxindexSelectionSkipped:
  627. raise
  628. except (OpenRouterCallError, HotContentFlowError) as exc:
  629. last_error = exc
  630. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  631. continue
  632. raise HotContentFlowError(
  633. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  634. ) from last_error
  635. @staticmethod
  636. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  637. if not matched_words or not isinstance(raw_points, list):
  638. return []
  639. matched_points: list[dict[str, Any]] = []
  640. for point in raw_points:
  641. if not isinstance(point, dict):
  642. continue
  643. point_match_rows = point.get("匹配词列表") or []
  644. if not isinstance(point_match_rows, list):
  645. point_match_rows = []
  646. keep_rows = [
  647. row
  648. for row in point_match_rows
  649. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  650. ]
  651. if keep_rows:
  652. matched_points.append(
  653. {
  654. "来源": str(point.get("来源") or ""),
  655. "点": str(point.get("点") or ""),
  656. "点描述": str(point.get("点描述") or ""),
  657. "匹配词列表": keep_rows,
  658. }
  659. )
  660. return matched_points
  661. @staticmethod
  662. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  663. decode_result = record.get("decode_result_json")
  664. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  665. if not isinstance(target_post, dict):
  666. target_post = {}
  667. article_title = str(
  668. target_post.get("title") or record.get("article_title") or ""
  669. ).strip()
  670. body_text = str(
  671. target_post.get("body_text") or record.get("article_body") or ""
  672. ).strip()
  673. return article_title, body_text
  674. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  675. flow_config = config or load_flow_config()
  676. repository = HotContentRepository(flow_config.mysql)
  677. try:
  678. api_client = JsonApiClient(
  679. timeout_seconds=flow_config.request_timeout_seconds,
  680. verify_ssl=flow_config.https_verify_ssl,
  681. )
  682. service = ContributionPostprocessService(flow_config, repository, api_client)
  683. return service.run()
  684. finally:
  685. repository.close()