postprocess_service.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. )
  21. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  22. from app.hot_content.wxindex_trend import calc_wxindex_trend
  23. class WxindexSelectionSkipped(Exception):
  24. """微信指数选词无效时跳过后续查询。"""
  25. def _extract_json_object(text: str) -> dict[str, Any]:
  26. raw = text.strip()
  27. if raw.startswith("```"):
  28. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  29. raw = re.sub(r"\s*```$", "", raw)
  30. try:
  31. parsed = json.loads(raw)
  32. if isinstance(parsed, dict):
  33. return parsed
  34. except json.JSONDecodeError:
  35. pass
  36. match = re.search(r"\{[\s\S]*\}", raw)
  37. if not match:
  38. raise HotContentFlowError("llm output is not json object")
  39. try:
  40. parsed = json.loads(match.group(0))
  41. except json.JSONDecodeError as exc:
  42. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  43. if not isinstance(parsed, dict):
  44. raise HotContentFlowError("llm output is not json object")
  45. return parsed
  46. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  47. today = datetime.now(SHANGHAI_TZ).date()
  48. end_date = today - timedelta(days=1)
  49. start_date = end_date - timedelta(days=max(lookback_days, 1))
  50. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  51. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  52. rows = ((wx_resp.get("data") or {}).get("data") or [])
  53. if not isinstance(rows, list):
  54. return []
  55. series: list[dict[str, Any]] = []
  56. for row in rows:
  57. if not isinstance(row, dict):
  58. continue
  59. ymd = str(row.get("ymd") or "").strip()
  60. total_score = ((row.get("channel_score") or {}).get("total_score"))
  61. try:
  62. score_num = float(total_score) if total_score is not None else None
  63. except (TypeError, ValueError):
  64. score_num = None
  65. if ymd and score_num is not None:
  66. series.append({"ymd": ymd, "total_score": score_num})
  67. series.sort(key=lambda x: x["ymd"])
  68. return series
  69. def _normalize_demand_key(value: str) -> str:
  70. return "".join(value.split())
  71. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  72. lookup: dict[str, str] = {}
  73. for item in demand_name_set:
  74. demand_name = str(item).strip()
  75. if not demand_name:
  76. continue
  77. lookup.setdefault(demand_name, demand_name)
  78. compact_key = _normalize_demand_key(demand_name)
  79. if compact_key:
  80. lookup.setdefault(compact_key, demand_name)
  81. return lookup
  82. def _resolve_demand_name(
  83. demand_name: str,
  84. demand_lookup: dict[str, str],
  85. ) -> str | None:
  86. value = demand_name.strip()
  87. if not value:
  88. return None
  89. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  90. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  91. demand_names: list[str] = []
  92. seen: set[str] = set()
  93. for row in matched_word_rows:
  94. if not isinstance(row, dict):
  95. continue
  96. match_rows = row.get("匹配需求列表") or []
  97. if not isinstance(match_rows, list):
  98. continue
  99. for match in match_rows:
  100. if not isinstance(match, dict):
  101. continue
  102. demand_name = str(match.get("demand_name") or "").strip()
  103. if demand_name and demand_name not in seen:
  104. seen.add(demand_name)
  105. demand_names.append(demand_name)
  106. return demand_names
  107. class ContributionPostprocessService:
  108. def __init__(
  109. self,
  110. config: FlowConfig,
  111. repository: HotContentRepository,
  112. api_client: JsonApiClient,
  113. ):
  114. self.config = config
  115. self.repository = repository
  116. self.api_client = api_client
  117. def run(self) -> dict[str, Any]:
  118. records = self.repository.list_postprocess_candidates(
  119. limit=max(self.config.postprocess_batch_size, 1)
  120. )
  121. if not records:
  122. return self._finalize_run_result(
  123. {
  124. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  125. "status": "success",
  126. "candidate_count": 0,
  127. "matched_count": 0,
  128. "wxindex_count": 0,
  129. "skipped_count": 0,
  130. "failed_count": 0,
  131. }
  132. )
  133. cache = DemandCacheService(
  134. self.config,
  135. self.repository,
  136. ).get_or_create_current_hour_cache()
  137. demand_cache_run_id = int(cache["id"])
  138. demand_name_set = cache["demand_name_set"]
  139. if not demand_name_set:
  140. return self._finalize_run_result(
  141. {
  142. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  143. "status": "skipped",
  144. "reason": "empty_demand_cache",
  145. "demand_cache_run_id": demand_cache_run_id,
  146. "cache_source": cache.get("source"),
  147. "processed_count": 0,
  148. }
  149. )
  150. matched_count = 0
  151. wxindex_count = 0
  152. exported_count = 0
  153. skipped_count = 0
  154. failed_count = 0
  155. for record in records:
  156. record_id = int(record["id"])
  157. try:
  158. match_result = record.get("contribution_demand_match_json")
  159. if not isinstance(match_result, dict):
  160. match_result = self.match_record(
  161. record=record,
  162. demand_name_set=demand_name_set,
  163. demand_cache_run_id=demand_cache_run_id,
  164. )
  165. self.repository.save_contribution_demand_match(
  166. record_id=record_id,
  167. demand_cache_run_id=demand_cache_run_id,
  168. match_json=match_result,
  169. )
  170. matched_count += 1
  171. sanitized_match_result = self.sanitize_match_result(
  172. match_result,
  173. demand_name_set=demand_name_set,
  174. demand_cache_run_id=demand_cache_run_id,
  175. )
  176. if sanitized_match_result != match_result:
  177. match_result = sanitized_match_result
  178. self.repository.save_contribution_demand_match(
  179. record_id=record_id,
  180. demand_cache_run_id=demand_cache_run_id,
  181. match_json=match_result,
  182. )
  183. trend_result = self.build_wxindex_trend(record, match_result)
  184. if trend_result is None:
  185. self.repository.update_postprocess_status(
  186. record_id=record_id,
  187. status=PostprocessStatus.SKIPPED,
  188. error_message="no matched demand words",
  189. )
  190. exported_count += self.export_demand_terms_if_needed(
  191. record=record,
  192. match_result=match_result,
  193. trend_result=None,
  194. )
  195. skipped_count += 1
  196. continue
  197. self.repository.save_wxindex_trend(
  198. record_id=record_id,
  199. trend_json=trend_result,
  200. )
  201. exported_count += self.export_demand_terms_if_needed(
  202. record=record,
  203. match_result=match_result,
  204. trend_result=trend_result,
  205. )
  206. wxindex_count += 1
  207. except WxindexSelectionSkipped as exc:
  208. self.repository.update_postprocess_status(
  209. record_id=record_id,
  210. status=PostprocessStatus.SKIPPED,
  211. error_message=str(exc),
  212. )
  213. if isinstance(match_result, dict):
  214. exported_count += self.export_demand_terms_if_needed(
  215. record=record,
  216. match_result=match_result,
  217. trend_result=None,
  218. )
  219. skipped_count += 1
  220. except Exception as exc:
  221. self.repository.update_postprocess_status(
  222. record_id=record_id,
  223. status=PostprocessStatus.FAILED,
  224. error_message=str(exc),
  225. )
  226. failed_count += 1
  227. return self._finalize_run_result(
  228. {
  229. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  230. "status": "success",
  231. "demand_cache_run_id": demand_cache_run_id,
  232. "cache_source": cache.get("source"),
  233. "cache_hour": str(cache.get("cache_hour") or ""),
  234. "demand_name_count": len(demand_name_set),
  235. "candidate_count": len(records),
  236. "matched_count": matched_count,
  237. "wxindex_count": wxindex_count,
  238. "exported_count": exported_count,
  239. "skipped_count": skipped_count,
  240. "failed_count": failed_count,
  241. }
  242. )
  243. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  244. try:
  245. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  246. except Exception as exc:
  247. result["hive_sync_error"] = str(exc)
  248. return result
  249. def export_demand_terms_if_needed(
  250. self,
  251. *,
  252. record: dict[str, Any],
  253. match_result: dict[str, Any],
  254. trend_result: dict[str, Any] | None,
  255. ) -> int:
  256. export_rows = attach_wxindex_metadata(
  257. build_demand_export_rows(
  258. match_result,
  259. contribution_points=(
  260. record.get("contribution_points_json")
  261. if isinstance(record.get("contribution_points_json"), dict)
  262. else None
  263. ),
  264. trend_json=trend_result if isinstance(trend_result, dict) else None,
  265. ),
  266. trend_result if isinstance(trend_result, dict) else None,
  267. wxindex_threshold=self.config.wxindex_score_threshold,
  268. )
  269. self.repository.replace_demand_export_rows(
  270. record_id=int(record["id"]),
  271. source=str(record.get("source") or ""),
  272. hot_title=str(record.get("title") or ""),
  273. article_title=str(record.get("article_title") or ""),
  274. rows=export_rows,
  275. )
  276. return len(export_rows)
  277. def match_record(
  278. self,
  279. *,
  280. record: dict[str, Any],
  281. demand_name_set: list[str],
  282. demand_cache_run_id: int,
  283. ) -> dict[str, Any]:
  284. contribution_points = record.get("contribution_points_json")
  285. if not isinstance(contribution_points, dict):
  286. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  287. channel_content_id = str(
  288. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  289. ).strip()
  290. words_rows = contribution_points.get("高贡献词列表") or []
  291. if not isinstance(words_rows, list):
  292. words_rows = []
  293. word_list = [
  294. str(item.get("词") or "").strip()
  295. for item in words_rows
  296. if isinstance(item, dict) and str(item.get("词") or "").strip()
  297. ]
  298. llm_result = self.llm_match_single_article(
  299. channel_content_id=channel_content_id or str(record["unique_key"]),
  300. words=word_list,
  301. demand_name_set=demand_name_set,
  302. )
  303. demand_lookup = _build_demand_lookup(demand_name_set)
  304. matched_map: dict[str, list[dict[str, str]]] = {}
  305. for item in llm_result.get("matched") or []:
  306. if not isinstance(item, dict):
  307. continue
  308. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  309. demand_name = str(item.get("demand_name") or "").strip()
  310. reason = str(item.get("reason") or "").strip()
  311. if not word or word not in word_list:
  312. continue
  313. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  314. if canonical_demand_name is None:
  315. continue
  316. matched_map.setdefault(word, []).append(
  317. {
  318. "demand_name": canonical_demand_name,
  319. "reason": reason,
  320. }
  321. )
  322. output_words: list[dict[str, Any]] = []
  323. matched_word_rows: list[dict[str, Any]] = []
  324. for row in words_rows:
  325. if not isinstance(row, dict):
  326. continue
  327. word = str(row.get("词") or "").strip()
  328. item = {"词": word, "贡献度": row.get("贡献度")}
  329. if word in matched_map:
  330. item["匹配需求列表"] = matched_map[word]
  331. matched_word_rows.append(item)
  332. output_words.append(item)
  333. matched_points = self.filter_matched_points(
  334. contribution_points.get("点列表"),
  335. set(matched_map.keys()),
  336. )
  337. return {
  338. "channelContentId": channel_content_id,
  339. "demand_cache_run_id": demand_cache_run_id,
  340. "高贡献词列表": output_words,
  341. "匹配到需求的词列表": matched_word_rows,
  342. "点列表": matched_points,
  343. }
  344. def sanitize_match_result(
  345. self,
  346. match_result: dict[str, Any],
  347. *,
  348. demand_name_set: list[str],
  349. demand_cache_run_id: int,
  350. ) -> dict[str, Any]:
  351. demand_lookup = _build_demand_lookup(demand_name_set)
  352. words_rows = match_result.get("高贡献词列表") or []
  353. if not isinstance(words_rows, list):
  354. words_rows = []
  355. output_words: list[dict[str, Any]] = []
  356. matched_word_rows: list[dict[str, Any]] = []
  357. valid_words: set[str] = set()
  358. for row in words_rows:
  359. if not isinstance(row, dict):
  360. continue
  361. word = str(row.get("词") or "").strip()
  362. item = {"词": word, "贡献度": row.get("贡献度")}
  363. match_rows = row.get("匹配需求列表") or []
  364. if not isinstance(match_rows, list):
  365. match_rows = []
  366. valid_match_rows: list[dict[str, str]] = []
  367. for match in match_rows:
  368. if not isinstance(match, dict):
  369. continue
  370. demand_name = str(match.get("demand_name") or "").strip()
  371. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  372. if canonical_demand_name is None:
  373. continue
  374. valid_match_rows.append(
  375. {
  376. "demand_name": canonical_demand_name,
  377. "reason": str(match.get("reason") or "").strip(),
  378. }
  379. )
  380. if word and valid_match_rows:
  381. item["匹配需求列表"] = valid_match_rows
  382. matched_word_rows.append(item)
  383. valid_words.add(word)
  384. output_words.append(item)
  385. return {
  386. "channelContentId": str(match_result.get("channelContentId") or ""),
  387. "demand_cache_run_id": demand_cache_run_id,
  388. "高贡献词列表": output_words,
  389. "匹配到需求的词列表": matched_word_rows,
  390. "点列表": self.filter_matched_points(
  391. match_result.get("点列表"),
  392. valid_words,
  393. ),
  394. }
  395. def llm_match_single_article(
  396. self,
  397. *,
  398. channel_content_id: str,
  399. words: list[str],
  400. demand_name_set: list[str],
  401. ) -> dict[str, Any]:
  402. if not words:
  403. return {"source": channel_content_id, "matched": []}
  404. system_prompt = """
  405. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  406. # 任务
  407. 我会提供两组数据:
  408. 1. 热点词列表:一组待匹配的热点词语
  409. 2. 需求词库:一组已有的需求词语
  410. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  411. # 匹配标准
  412. 满足以下任意一条,则视为匹配成功:
  413. - 热点词与需求词含义相同或高度相近(如同义词、近义词)
  414. - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
  415. - 热点词与需求词在用户意图上高度一致
  416. 以下情况,不得视为匹配:
  417. - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
  418. - 热点词与需求词只有表面字符重叠,语义方向不同
  419. - 热点词是需求词的上位概念(范围过宽,含义不够精确)
  420. - 两者只是同属某个大类,但具体含义差异明显
  421. # 多词组成的需求词处理规则
  422. 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
  423. # 输出规则
  424. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  425. # 约束
  426. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  427. """
  428. user_payload = {
  429. "source": channel_content_id,
  430. "words": words,
  431. "demand_name_set": demand_name_set,
  432. "output_schema": {
  433. "source": "string",
  434. "matched": [
  435. {
  436. "title": "string, must be selected from words",
  437. "demand_name": "string, must be selected from demand_name_set",
  438. "reason": "string",
  439. }
  440. ],
  441. },
  442. }
  443. last_error: Exception | None = None
  444. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  445. try:
  446. resp = create_chat_completion(
  447. [
  448. {"role": "system", "content": system_prompt},
  449. {
  450. "role": "user",
  451. "content": json.dumps(user_payload, ensure_ascii=False),
  452. },
  453. ],
  454. model=self.config.contribution_match_llm_model or None,
  455. temperature=0,
  456. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  457. )
  458. parsed = _extract_json_object(str(resp.get("content") or ""))
  459. parsed.setdefault("source", channel_content_id)
  460. parsed.setdefault("matched", [])
  461. return parsed
  462. except (OpenRouterCallError, HotContentFlowError) as exc:
  463. last_error = exc
  464. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  465. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  466. raise HotContentFlowError(
  467. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  468. ) from last_error
  469. def build_wxindex_trend(
  470. self,
  471. record: dict[str, Any],
  472. match_result: dict[str, Any],
  473. ) -> dict[str, Any] | None:
  474. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  475. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  476. return None
  477. contribution_words = [
  478. str(row.get("词") or "").strip()
  479. for row in matched_word_rows
  480. if isinstance(row, dict) and str(row.get("词") or "").strip()
  481. ]
  482. if not contribution_words:
  483. return None
  484. channel_content_id = str(
  485. match_result.get("channelContentId") or record.get("unique_key") or ""
  486. )
  487. article_title, body_text = self.extract_article_text(record)
  488. matched_demands = _collect_matched_demand_names(matched_word_rows)
  489. pick = self.llm_extract_wxindex_words(
  490. channel_content_id=channel_content_id,
  491. article_title=article_title,
  492. body_text=body_text,
  493. contribution_words=contribution_words,
  494. matched_demands=matched_demands,
  495. )
  496. selected_words = pick["selected_words"]
  497. start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
  498. threshold = float(self.config.wxindex_score_threshold)
  499. wxindex_searches: list[dict[str, Any]] = []
  500. for keyword in selected_words:
  501. wx_payload = {
  502. "keyword": keyword,
  503. "start_ymd": start_ymd,
  504. "end_ymd": end_ymd,
  505. }
  506. wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
  507. series = _parse_total_scores(wx_resp)
  508. latest_score = series[-1]["total_score"] if series else None
  509. wxindex_searches.append(
  510. {
  511. "keyword": keyword,
  512. "start_ymd": start_ymd,
  513. "end_ymd": end_ymd,
  514. "total_score_7d": series,
  515. "latest_total_score": latest_score,
  516. "threshold": threshold,
  517. "latest_gt_threshold": (
  518. False
  519. if latest_score is None
  520. else latest_score >= threshold
  521. ),
  522. "trend": calc_wxindex_trend(series),
  523. }
  524. )
  525. searchable = [
  526. item
  527. for item in wxindex_searches
  528. if item.get("latest_total_score") is not None
  529. ]
  530. if not searchable:
  531. raise WxindexSelectionSkipped(
  532. f"no wxindex score for any keyword in {channel_content_id}: "
  533. f"{selected_words}"
  534. )
  535. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  536. selected_word = str(best["keyword"])
  537. latest_score = best["latest_total_score"]
  538. series = best["total_score_7d"]
  539. return {
  540. "channelContentId": channel_content_id,
  541. "article_title": article_title,
  542. "llm_selected_words": selected_words,
  543. "llm_selected_word": selected_word,
  544. "llm_reason": pick["reason"],
  545. "wxindex_searches": wxindex_searches,
  546. "wxindex": {
  547. "keyword": selected_word,
  548. "keywords": selected_words,
  549. "start_ymd": start_ymd,
  550. "end_ymd": end_ymd,
  551. "total_score_7d": series,
  552. "latest_total_score": latest_score,
  553. "threshold": threshold,
  554. "latest_gt_threshold": latest_score >= threshold,
  555. "trend": best["trend"],
  556. },
  557. }
  558. def llm_extract_wxindex_words(
  559. self,
  560. *,
  561. channel_content_id: str,
  562. article_title: str,
  563. body_text: str,
  564. contribution_words: list[str],
  565. matched_demands: list[str],
  566. ) -> dict[str, Any]:
  567. system_prompt = """
  568. #角色
  569. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  570. # 任务
  571. 我会提供文章标题、正文,以及两类备选词来源:
  572. 1. 高贡献词:文章贡献度较高的关键词
  573. 2. 已匹配需求:已与需求库匹配上的需求名
  574. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  575. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  576. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  577. # 输出规则
  578. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  579. """
  580. user_payload = {
  581. "source": channel_content_id,
  582. "article_title": article_title,
  583. "article_body_text": body_text,
  584. "contribution_words": contribution_words,
  585. "matched_demands": matched_demands,
  586. "output_schema": {
  587. "source": "string",
  588. "selected_words": [
  589. "string, concise keyword for wxindex search, one or more"
  590. ],
  591. "reason": "string",
  592. },
  593. "constraints": [
  594. "selected_words 为数组,至少 1 个词,可多个",
  595. "每个词简洁(2-4 字),适合微信指数检索",
  596. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  597. "多个词应分别覆盖不同事件或角度,避免语义重复",
  598. "reason 简洁说明,不超过60字",
  599. "仅输出 JSON 对象,不要 markdown 代码块",
  600. ],
  601. }
  602. last_error: Exception | None = None
  603. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  604. try:
  605. resp = create_chat_completion(
  606. [
  607. {"role": "system", "content": system_prompt},
  608. {
  609. "role": "user",
  610. "content": json.dumps(user_payload, ensure_ascii=False),
  611. },
  612. ],
  613. model=self.config.wxindex_llm_model or None,
  614. temperature=0,
  615. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  616. )
  617. parsed = _extract_json_object(str(resp.get("content") or ""))
  618. raw_words = parsed.get("selected_words")
  619. if isinstance(raw_words, str):
  620. raw_words = [raw_words]
  621. if not isinstance(raw_words, list):
  622. legacy_word = str(parsed.get("selected_word") or "").strip()
  623. raw_words = [legacy_word] if legacy_word else []
  624. selected_words: list[str] = []
  625. seen: set[str] = set()
  626. for item in raw_words:
  627. word = str(item or "").strip()
  628. if word and word not in seen:
  629. seen.add(word)
  630. selected_words.append(word)
  631. reason = str(parsed.get("reason") or "").strip()
  632. if not selected_words:
  633. raise WxindexSelectionSkipped(
  634. f"selected_words empty for {channel_content_id}"
  635. )
  636. return {"selected_words": selected_words, "reason": reason}
  637. except WxindexSelectionSkipped:
  638. raise
  639. except (OpenRouterCallError, HotContentFlowError) as exc:
  640. last_error = exc
  641. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  642. continue
  643. raise HotContentFlowError(
  644. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  645. ) from last_error
  646. @staticmethod
  647. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  648. if not matched_words or not isinstance(raw_points, list):
  649. return []
  650. matched_points: list[dict[str, Any]] = []
  651. for point in raw_points:
  652. if not isinstance(point, dict):
  653. continue
  654. point_match_rows = point.get("匹配词列表") or []
  655. if not isinstance(point_match_rows, list):
  656. point_match_rows = []
  657. keep_rows = [
  658. row
  659. for row in point_match_rows
  660. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  661. ]
  662. if keep_rows:
  663. matched_points.append(
  664. {
  665. "来源": str(point.get("来源") or ""),
  666. "点": str(point.get("点") or ""),
  667. "点描述": str(point.get("点描述") or ""),
  668. "匹配词列表": keep_rows,
  669. }
  670. )
  671. return matched_points
  672. @staticmethod
  673. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  674. decode_result = record.get("decode_result_json")
  675. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  676. if not isinstance(target_post, dict):
  677. target_post = {}
  678. article_title = str(
  679. target_post.get("title") or record.get("article_title") or ""
  680. ).strip()
  681. body_text = str(
  682. target_post.get("body_text") or record.get("article_body") or ""
  683. ).strip()
  684. return article_title, body_text
  685. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  686. flow_config = config or load_flow_config()
  687. repository = HotContentRepository(flow_config.mysql)
  688. try:
  689. api_client = JsonApiClient(
  690. timeout_seconds=flow_config.request_timeout_seconds,
  691. verify_ssl=flow_config.https_verify_ssl,
  692. )
  693. service = ContributionPostprocessService(flow_config, repository, api_client)
  694. return service.run()
  695. finally:
  696. repository.close()