postprocess_service.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. build_export_rows_from_record,
  21. )
  22. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  23. from app.hot_content.demand_quality import run_demand_quality_pipeline
  24. from app.hot_content.wxindex_trend import calc_wxindex_trend
  25. from app.hot_content.wxindex_words import (
  26. ensure_word_full_scores,
  27. slice_scores_lookback,
  28. sync_words_from_trend_json,
  29. )
  30. class WxindexSelectionSkipped(Exception):
  31. """微信指数选词无效时跳过后续查询。"""
  32. def _extract_json_object(text: str) -> dict[str, Any]:
  33. raw = text.strip()
  34. if raw.startswith("```"):
  35. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  36. raw = re.sub(r"\s*```$", "", raw)
  37. try:
  38. parsed = json.loads(raw)
  39. if isinstance(parsed, dict):
  40. return parsed
  41. except json.JSONDecodeError:
  42. pass
  43. match = re.search(r"\{[\s\S]*\}", raw)
  44. if not match:
  45. raise HotContentFlowError("llm output is not json object")
  46. try:
  47. parsed = json.loads(match.group(0))
  48. except json.JSONDecodeError as exc:
  49. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  50. if not isinstance(parsed, dict):
  51. raise HotContentFlowError("llm output is not json object")
  52. return parsed
  53. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  54. today = datetime.now(SHANGHAI_TZ).date()
  55. end_date = today - timedelta(days=1)
  56. start_date = end_date - timedelta(days=max(lookback_days, 1))
  57. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  58. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  59. rows = ((wx_resp.get("data") or {}).get("data") or [])
  60. if not isinstance(rows, list):
  61. return []
  62. series: list[dict[str, Any]] = []
  63. for row in rows:
  64. if not isinstance(row, dict):
  65. continue
  66. ymd = str(row.get("ymd") or "").strip()
  67. total_score = ((row.get("channel_score") or {}).get("total_score"))
  68. try:
  69. score_num = float(total_score) if total_score is not None else None
  70. except (TypeError, ValueError):
  71. score_num = None
  72. if ymd and score_num is not None:
  73. series.append({"ymd": ymd, "total_score": score_num})
  74. series.sort(key=lambda x: x["ymd"])
  75. return series
  76. def _normalize_demand_key(value: str) -> str:
  77. return "".join(value.split())
  78. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  79. lookup: dict[str, str] = {}
  80. for item in demand_name_set:
  81. demand_name = str(item).strip()
  82. if not demand_name:
  83. continue
  84. lookup.setdefault(demand_name, demand_name)
  85. compact_key = _normalize_demand_key(demand_name)
  86. if compact_key:
  87. lookup.setdefault(compact_key, demand_name)
  88. return lookup
  89. def _resolve_demand_name(
  90. demand_name: str,
  91. demand_lookup: dict[str, str],
  92. ) -> str | None:
  93. value = demand_name.strip()
  94. if not value:
  95. return None
  96. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  97. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  98. demand_names: list[str] = []
  99. seen: set[str] = set()
  100. for row in matched_word_rows:
  101. if not isinstance(row, dict):
  102. continue
  103. match_rows = row.get("匹配需求列表") or []
  104. if not isinstance(match_rows, list):
  105. continue
  106. for match in match_rows:
  107. if not isinstance(match, dict):
  108. continue
  109. demand_name = str(match.get("demand_name") or "").strip()
  110. if demand_name and demand_name not in seen:
  111. seen.add(demand_name)
  112. demand_names.append(demand_name)
  113. return demand_names
  114. class ContributionPostprocessService:
  115. def __init__(
  116. self,
  117. config: FlowConfig,
  118. repository: HotContentRepository,
  119. api_client: JsonApiClient,
  120. ):
  121. self.config = config
  122. self.repository = repository
  123. self.api_client = api_client
  124. def run(self) -> dict[str, Any]:
  125. records = self.repository.list_postprocess_candidates(
  126. limit=max(self.config.postprocess_batch_size, 1)
  127. )
  128. if not records:
  129. return self._finalize_run_result(
  130. {
  131. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  132. "status": "success",
  133. "candidate_count": 0,
  134. "matched_count": 0,
  135. "wxindex_count": 0,
  136. "skipped_count": 0,
  137. "failed_count": 0,
  138. }
  139. )
  140. cache = DemandCacheService(
  141. self.config,
  142. self.repository,
  143. ).get_or_create_current_hour_cache()
  144. demand_cache_run_id = int(cache["id"])
  145. demand_name_set = cache["demand_name_set"]
  146. if not demand_name_set:
  147. return self._finalize_run_result(
  148. {
  149. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  150. "status": "skipped",
  151. "reason": "empty_demand_cache",
  152. "demand_cache_run_id": demand_cache_run_id,
  153. "cache_source": cache.get("source"),
  154. "processed_count": 0,
  155. }
  156. )
  157. matched_count = 0
  158. wxindex_count = 0
  159. quality_count = 0
  160. exported_count = 0
  161. skipped_count = 0
  162. failed_count = 0
  163. for record in records:
  164. record_id = int(record["id"])
  165. try:
  166. match_result = record.get("contribution_demand_match_json")
  167. if not isinstance(match_result, dict):
  168. match_result = self.match_record(
  169. record=record,
  170. demand_name_set=demand_name_set,
  171. demand_cache_run_id=demand_cache_run_id,
  172. )
  173. self.repository.save_contribution_demand_match(
  174. record_id=record_id,
  175. demand_cache_run_id=demand_cache_run_id,
  176. match_json=match_result,
  177. )
  178. matched_count += 1
  179. sanitized_match_result = self.sanitize_match_result(
  180. match_result,
  181. demand_name_set=demand_name_set,
  182. demand_cache_run_id=demand_cache_run_id,
  183. )
  184. if sanitized_match_result != match_result:
  185. match_result = sanitized_match_result
  186. self.repository.save_contribution_demand_match(
  187. record_id=record_id,
  188. demand_cache_run_id=demand_cache_run_id,
  189. match_json=match_result,
  190. )
  191. trend_result = self.build_wxindex_trend(record, match_result)
  192. if trend_result is None:
  193. self.repository.update_postprocess_status(
  194. record_id=record_id,
  195. status=PostprocessStatus.SKIPPED,
  196. error_message="no matched demand words",
  197. )
  198. self._save_empty_demand_quality(record_id=record_id)
  199. exported_count += self.export_demand_terms_if_needed(
  200. record=record,
  201. match_result=match_result,
  202. trend_result=None,
  203. )
  204. skipped_count += 1
  205. continue
  206. self.repository.save_wxindex_trend(
  207. record_id=record_id,
  208. trend_json=trend_result,
  209. )
  210. self.sync_wxindex_words(
  211. record_id=record_id,
  212. trend_result=trend_result,
  213. )
  214. event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
  215. record=record,
  216. match_result=match_result,
  217. trend_result=trend_result,
  218. )
  219. self.repository.save_demand_quality(
  220. record_id=record_id,
  221. event_sense_json=event_sense_json,
  222. senior_fit_json=senior_fit_json,
  223. )
  224. exported_count += self.export_demand_terms_if_needed(
  225. record=record,
  226. match_result=match_result,
  227. trend_result=trend_result,
  228. event_sense_json=event_sense_json,
  229. senior_fit_json=senior_fit_json,
  230. )
  231. wxindex_count += 1
  232. quality_count += 1
  233. except WxindexSelectionSkipped as exc:
  234. self.repository.update_postprocess_status(
  235. record_id=record_id,
  236. status=PostprocessStatus.SKIPPED,
  237. error_message=str(exc),
  238. )
  239. if isinstance(match_result, dict):
  240. self._save_empty_demand_quality(record_id=record_id)
  241. exported_count += self.export_demand_terms_if_needed(
  242. record=record,
  243. match_result=match_result,
  244. trend_result=None,
  245. )
  246. skipped_count += 1
  247. except Exception as exc:
  248. self.repository.update_postprocess_status(
  249. record_id=record_id,
  250. status=PostprocessStatus.FAILED,
  251. error_message=str(exc),
  252. )
  253. failed_count += 1
  254. return self._finalize_run_result(
  255. {
  256. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  257. "status": "success",
  258. "demand_cache_run_id": demand_cache_run_id,
  259. "cache_source": cache.get("source"),
  260. "cache_hour": str(cache.get("cache_hour") or ""),
  261. "demand_name_count": len(demand_name_set),
  262. "candidate_count": len(records),
  263. "matched_count": matched_count,
  264. "wxindex_count": wxindex_count,
  265. "quality_count": quality_count,
  266. "exported_count": exported_count,
  267. "skipped_count": skipped_count,
  268. "failed_count": failed_count,
  269. }
  270. )
  271. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  272. try:
  273. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  274. except Exception as exc:
  275. result["hive_sync_error"] = str(exc)
  276. return result
  277. def sync_wxindex_words(
  278. self,
  279. *,
  280. record_id: int,
  281. trend_result: dict[str, Any],
  282. verbose: bool = False,
  283. ) -> dict[str, int]:
  284. return sync_words_from_trend_json(
  285. self.repository,
  286. self.api_client,
  287. self.config.wxindex_api_url,
  288. trend_json=trend_result,
  289. record_id=record_id,
  290. verbose=verbose,
  291. )
  292. def _save_empty_demand_quality(self, *, record_id: int) -> None:
  293. self.repository.save_demand_quality(
  294. record_id=record_id,
  295. event_sense_json={},
  296. senior_fit_json={},
  297. update_status=False,
  298. )
  299. def run_demand_quality_judgment(
  300. self,
  301. *,
  302. record: dict[str, Any],
  303. match_result: dict[str, Any],
  304. trend_result: dict[str, Any],
  305. ) -> tuple[dict[str, Any], dict[str, Any]]:
  306. channel_content_id = str(
  307. match_result.get("channelContentId") or record.get("unique_key") or ""
  308. ).strip()
  309. base_export_rows = attach_wxindex_metadata(
  310. build_demand_export_rows(
  311. match_result,
  312. contribution_points=(
  313. record.get("contribution_points_json")
  314. if isinstance(record.get("contribution_points_json"), dict)
  315. else None
  316. ),
  317. trend_json=trend_result,
  318. ),
  319. trend_result,
  320. wxindex_threshold=self.config.wxindex_score_threshold,
  321. )
  322. return run_demand_quality_pipeline(
  323. channel_content_id=channel_content_id,
  324. export_rows=base_export_rows,
  325. wxindex_threshold=self.config.wxindex_score_threshold,
  326. event_threshold=self.config.demand_event_sense_threshold,
  327. senior_threshold=self.config.demand_senior_fit_threshold,
  328. model=self.config.demand_quality_llm_model,
  329. max_attempts=self.config.demand_quality_llm_max_attempts,
  330. retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
  331. max_tokens=self.config.demand_quality_llm_max_tokens,
  332. )
  333. def export_demand_terms_if_needed(
  334. self,
  335. *,
  336. record: dict[str, Any],
  337. match_result: dict[str, Any],
  338. trend_result: dict[str, Any] | None,
  339. event_sense_json: dict[str, Any] | None = None,
  340. senior_fit_json: dict[str, Any] | None = None,
  341. ) -> int:
  342. normalized_record = {
  343. "contribution_demand_match_json": match_result,
  344. "contribution_points_json": (
  345. record.get("contribution_points_json")
  346. if isinstance(record.get("contribution_points_json"), dict)
  347. else None
  348. ),
  349. "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
  350. "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
  351. "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
  352. }
  353. export_rows = build_export_rows_from_record(
  354. normalized_record,
  355. wxindex_threshold=self.config.wxindex_score_threshold,
  356. event_sense_json=normalized_record["demand_event_sense_json"],
  357. senior_fit_json=normalized_record["demand_senior_fit_json"],
  358. event_threshold=self.config.demand_event_sense_threshold,
  359. senior_threshold=self.config.demand_senior_fit_threshold,
  360. )
  361. self.repository.replace_demand_export_rows(
  362. record_id=int(record["id"]),
  363. source=str(record.get("source") or ""),
  364. hot_title=str(record.get("title") or ""),
  365. article_title=str(record.get("article_title") or ""),
  366. rows=export_rows,
  367. )
  368. return len(export_rows)
  369. def match_record(
  370. self,
  371. *,
  372. record: dict[str, Any],
  373. demand_name_set: list[str],
  374. demand_cache_run_id: int,
  375. ) -> dict[str, Any]:
  376. contribution_points = record.get("contribution_points_json")
  377. if not isinstance(contribution_points, dict):
  378. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  379. channel_content_id = str(
  380. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  381. ).strip()
  382. words_rows = contribution_points.get("高贡献词列表") or []
  383. if not isinstance(words_rows, list):
  384. words_rows = []
  385. word_list = [
  386. str(item.get("词") or "").strip()
  387. for item in words_rows
  388. if isinstance(item, dict) and str(item.get("词") or "").strip()
  389. ]
  390. llm_result = self.llm_match_single_article(
  391. channel_content_id=channel_content_id or str(record["unique_key"]),
  392. words=word_list,
  393. demand_name_set=demand_name_set,
  394. )
  395. demand_lookup = _build_demand_lookup(demand_name_set)
  396. matched_map: dict[str, list[dict[str, str]]] = {}
  397. for item in llm_result.get("matched") or []:
  398. if not isinstance(item, dict):
  399. continue
  400. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  401. demand_name = str(item.get("demand_name") or "").strip()
  402. reason = str(item.get("reason") or "").strip()
  403. if not word or word not in word_list:
  404. continue
  405. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  406. if canonical_demand_name is None:
  407. continue
  408. matched_map.setdefault(word, []).append(
  409. {
  410. "demand_name": canonical_demand_name,
  411. "reason": reason,
  412. }
  413. )
  414. output_words: list[dict[str, Any]] = []
  415. matched_word_rows: list[dict[str, Any]] = []
  416. for row in words_rows:
  417. if not isinstance(row, dict):
  418. continue
  419. word = str(row.get("词") or "").strip()
  420. item = {"词": word, "贡献度": row.get("贡献度")}
  421. if word in matched_map:
  422. item["匹配需求列表"] = matched_map[word]
  423. matched_word_rows.append(item)
  424. output_words.append(item)
  425. matched_points = self.filter_matched_points(
  426. contribution_points.get("点列表"),
  427. set(matched_map.keys()),
  428. )
  429. return {
  430. "channelContentId": channel_content_id,
  431. "demand_cache_run_id": demand_cache_run_id,
  432. "高贡献词列表": output_words,
  433. "匹配到需求的词列表": matched_word_rows,
  434. "点列表": matched_points,
  435. }
  436. def sanitize_match_result(
  437. self,
  438. match_result: dict[str, Any],
  439. *,
  440. demand_name_set: list[str],
  441. demand_cache_run_id: int,
  442. ) -> dict[str, Any]:
  443. demand_lookup = _build_demand_lookup(demand_name_set)
  444. words_rows = match_result.get("高贡献词列表") or []
  445. if not isinstance(words_rows, list):
  446. words_rows = []
  447. output_words: list[dict[str, Any]] = []
  448. matched_word_rows: list[dict[str, Any]] = []
  449. valid_words: set[str] = set()
  450. for row in words_rows:
  451. if not isinstance(row, dict):
  452. continue
  453. word = str(row.get("词") or "").strip()
  454. item = {"词": word, "贡献度": row.get("贡献度")}
  455. match_rows = row.get("匹配需求列表") or []
  456. if not isinstance(match_rows, list):
  457. match_rows = []
  458. valid_match_rows: list[dict[str, str]] = []
  459. for match in match_rows:
  460. if not isinstance(match, dict):
  461. continue
  462. demand_name = str(match.get("demand_name") or "").strip()
  463. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  464. if canonical_demand_name is None:
  465. continue
  466. valid_match_rows.append(
  467. {
  468. "demand_name": canonical_demand_name,
  469. "reason": str(match.get("reason") or "").strip(),
  470. }
  471. )
  472. if word and valid_match_rows:
  473. item["匹配需求列表"] = valid_match_rows
  474. matched_word_rows.append(item)
  475. valid_words.add(word)
  476. output_words.append(item)
  477. return {
  478. "channelContentId": str(match_result.get("channelContentId") or ""),
  479. "demand_cache_run_id": demand_cache_run_id,
  480. "高贡献词列表": output_words,
  481. "匹配到需求的词列表": matched_word_rows,
  482. "点列表": self.filter_matched_points(
  483. match_result.get("点列表"),
  484. valid_words,
  485. ),
  486. }
  487. def llm_match_single_article(
  488. self,
  489. *,
  490. channel_content_id: str,
  491. words: list[str],
  492. demand_name_set: list[str],
  493. ) -> dict[str, Any]:
  494. if not words:
  495. return {"source": channel_content_id, "matched": []}
  496. system_prompt = """
  497. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  498. # 任务
  499. 我会提供两组数据:
  500. 1. 热点词列表:一组待匹配的热点词语
  501. 2. 需求词库:一组已有的需求词语
  502. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  503. # 匹配标准
  504. 满足以下任意一条,则视为匹配成功:
  505. - 热点词与需求词含义相同或高度相近(如同义词、近义词)
  506. - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
  507. - 热点词与需求词在用户意图上高度一致
  508. 以下情况,不得视为匹配:
  509. - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
  510. - 热点词与需求词只有表面字符重叠,语义方向不同
  511. - 热点词是需求词的上位概念(范围过宽,含义不够精确)
  512. - 两者只是同属某个大类,但具体含义差异明显
  513. # 多词组成的需求词处理规则
  514. 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
  515. # 输出规则
  516. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  517. # 约束
  518. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  519. """
  520. user_payload = {
  521. "source": channel_content_id,
  522. "words": words,
  523. "demand_name_set": demand_name_set,
  524. "output_schema": {
  525. "source": "string",
  526. "matched": [
  527. {
  528. "title": "string, must be selected from words",
  529. "demand_name": "string, must be selected from demand_name_set",
  530. "reason": "string",
  531. }
  532. ],
  533. },
  534. }
  535. last_error: Exception | None = None
  536. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  537. try:
  538. resp = create_chat_completion(
  539. [
  540. {"role": "system", "content": system_prompt},
  541. {
  542. "role": "user",
  543. "content": json.dumps(user_payload, ensure_ascii=False),
  544. },
  545. ],
  546. model=self.config.contribution_match_llm_model or None,
  547. temperature=0,
  548. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  549. )
  550. parsed = _extract_json_object(str(resp.get("content") or ""))
  551. parsed.setdefault("source", channel_content_id)
  552. parsed.setdefault("matched", [])
  553. return parsed
  554. except (OpenRouterCallError, HotContentFlowError) as exc:
  555. last_error = exc
  556. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  557. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  558. raise HotContentFlowError(
  559. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  560. ) from last_error
  561. def build_wxindex_trend(
  562. self,
  563. record: dict[str, Any],
  564. match_result: dict[str, Any],
  565. ) -> dict[str, Any] | None:
  566. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  567. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  568. return None
  569. contribution_words = [
  570. str(row.get("词") or "").strip()
  571. for row in matched_word_rows
  572. if isinstance(row, dict) and str(row.get("词") or "").strip()
  573. ]
  574. if not contribution_words:
  575. return None
  576. channel_content_id = str(
  577. match_result.get("channelContentId") or record.get("unique_key") or ""
  578. )
  579. article_title, body_text = self.extract_article_text(record)
  580. matched_demands = _collect_matched_demand_names(matched_word_rows)
  581. pick = self.llm_extract_wxindex_words(
  582. channel_content_id=channel_content_id,
  583. article_title=article_title,
  584. body_text=body_text,
  585. contribution_words=contribution_words,
  586. matched_demands=matched_demands,
  587. )
  588. selected_words = pick["selected_words"]
  589. threshold = float(self.config.wxindex_score_threshold)
  590. wxindex_searches: list[dict[str, Any]] = []
  591. for keyword in selected_words:
  592. full_scores, _action = ensure_word_full_scores(
  593. self.repository,
  594. self.api_client,
  595. self.config.wxindex_api_url,
  596. keyword=keyword,
  597. )
  598. series, start_ymd, end_ymd = slice_scores_lookback(
  599. full_scores,
  600. self.config.wxindex_lookback_days,
  601. )
  602. latest_score = series[-1]["total_score"] if series else None
  603. wxindex_searches.append(
  604. {
  605. "keyword": keyword,
  606. "start_ymd": start_ymd,
  607. "end_ymd": end_ymd,
  608. "total_score_7d": series,
  609. "latest_total_score": latest_score,
  610. "threshold": threshold,
  611. "latest_gt_threshold": (
  612. False
  613. if latest_score is None
  614. else latest_score >= threshold
  615. ),
  616. "trend": calc_wxindex_trend(series),
  617. }
  618. )
  619. searchable = [
  620. item
  621. for item in wxindex_searches
  622. if item.get("latest_total_score") is not None
  623. ]
  624. if not searchable:
  625. raise WxindexSelectionSkipped(
  626. f"no wxindex score for any keyword in {channel_content_id}: "
  627. f"{selected_words}"
  628. )
  629. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  630. selected_word = str(best["keyword"])
  631. latest_score = best["latest_total_score"]
  632. series = best["total_score_7d"]
  633. return {
  634. "channelContentId": channel_content_id,
  635. "article_title": article_title,
  636. "llm_selected_words": selected_words,
  637. "llm_selected_word": selected_word,
  638. "llm_reason": pick["reason"],
  639. "wxindex_searches": wxindex_searches,
  640. "wxindex": {
  641. "keyword": selected_word,
  642. "keywords": selected_words,
  643. "start_ymd": best["start_ymd"],
  644. "end_ymd": best["end_ymd"],
  645. "total_score_7d": series,
  646. "latest_total_score": latest_score,
  647. "threshold": threshold,
  648. "latest_gt_threshold": latest_score >= threshold,
  649. "trend": best["trend"],
  650. },
  651. }
  652. def llm_extract_wxindex_words(
  653. self,
  654. *,
  655. channel_content_id: str,
  656. article_title: str,
  657. body_text: str,
  658. contribution_words: list[str],
  659. matched_demands: list[str],
  660. ) -> dict[str, Any]:
  661. system_prompt = """
  662. #角色
  663. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  664. # 任务
  665. 我会提供文章标题、正文,以及两类备选词来源:
  666. 1. 高贡献词:文章贡献度较高的关键词
  667. 2. 已匹配需求:已与需求库匹配上的需求名
  668. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  669. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  670. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  671. # 输出规则
  672. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  673. """
  674. user_payload = {
  675. "source": channel_content_id,
  676. "article_title": article_title,
  677. "article_body_text": body_text,
  678. "contribution_words": contribution_words,
  679. "matched_demands": matched_demands,
  680. "output_schema": {
  681. "source": "string",
  682. "selected_words": [
  683. "string, concise keyword for wxindex search, one or more"
  684. ],
  685. "reason": "string",
  686. },
  687. "constraints": [
  688. "selected_words 为数组,至少 1 个词,可多个",
  689. "每个词简洁(2-4 字),适合微信指数检索",
  690. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  691. "多个词应分别覆盖不同事件或角度,避免语义重复",
  692. "reason 简洁说明,不超过60字",
  693. "仅输出 JSON 对象,不要 markdown 代码块",
  694. ],
  695. }
  696. last_error: Exception | None = None
  697. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  698. try:
  699. resp = create_chat_completion(
  700. [
  701. {"role": "system", "content": system_prompt},
  702. {
  703. "role": "user",
  704. "content": json.dumps(user_payload, ensure_ascii=False),
  705. },
  706. ],
  707. model=self.config.wxindex_llm_model or None,
  708. temperature=0,
  709. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  710. )
  711. parsed = _extract_json_object(str(resp.get("content") or ""))
  712. raw_words = parsed.get("selected_words")
  713. if isinstance(raw_words, str):
  714. raw_words = [raw_words]
  715. if not isinstance(raw_words, list):
  716. legacy_word = str(parsed.get("selected_word") or "").strip()
  717. raw_words = [legacy_word] if legacy_word else []
  718. selected_words: list[str] = []
  719. seen: set[str] = set()
  720. for item in raw_words:
  721. word = str(item or "").strip()
  722. if word and word not in seen:
  723. seen.add(word)
  724. selected_words.append(word)
  725. reason = str(parsed.get("reason") or "").strip()
  726. if not selected_words:
  727. raise WxindexSelectionSkipped(
  728. f"selected_words empty for {channel_content_id}"
  729. )
  730. return {"selected_words": selected_words, "reason": reason}
  731. except WxindexSelectionSkipped:
  732. raise
  733. except (OpenRouterCallError, HotContentFlowError) as exc:
  734. last_error = exc
  735. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  736. continue
  737. raise HotContentFlowError(
  738. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  739. ) from last_error
  740. @staticmethod
  741. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  742. if not matched_words or not isinstance(raw_points, list):
  743. return []
  744. matched_points: list[dict[str, Any]] = []
  745. for point in raw_points:
  746. if not isinstance(point, dict):
  747. continue
  748. point_match_rows = point.get("匹配词列表") or []
  749. if not isinstance(point_match_rows, list):
  750. point_match_rows = []
  751. keep_rows = [
  752. row
  753. for row in point_match_rows
  754. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  755. ]
  756. if keep_rows:
  757. matched_points.append(
  758. {
  759. "来源": str(point.get("来源") or ""),
  760. "点": str(point.get("点") or ""),
  761. "点描述": str(point.get("点描述") or ""),
  762. "匹配词列表": keep_rows,
  763. }
  764. )
  765. return matched_points
  766. @staticmethod
  767. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  768. decode_result = record.get("decode_result_json")
  769. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  770. if not isinstance(target_post, dict):
  771. target_post = {}
  772. article_title = str(
  773. target_post.get("title") or record.get("article_title") or ""
  774. ).strip()
  775. body_text = str(
  776. target_post.get("body_text") or record.get("article_body") or ""
  777. ).strip()
  778. return article_title, body_text
  779. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  780. flow_config = config or load_flow_config()
  781. repository = HotContentRepository(flow_config.mysql)
  782. try:
  783. api_client = JsonApiClient(
  784. timeout_seconds=flow_config.request_timeout_seconds,
  785. verify_ssl=flow_config.https_verify_ssl,
  786. )
  787. service = ContributionPostprocessService(flow_config, repository, api_client)
  788. return service.run()
  789. finally:
  790. repository.close()