postprocess_service.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. build_export_rows_from_record,
  21. )
  22. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  23. from app.hot_content.demand_quality import run_demand_quality_pipeline
  24. from app.hot_content.wxindex_trend import calc_wxindex_trend
  25. from app.hot_content.wxindex_words import (
  26. ensure_word_full_scores,
  27. slice_scores_lookback,
  28. sync_words_from_trend_json,
  29. )
  30. class WxindexSelectionSkipped(Exception):
  31. """微信指数选词无效时跳过后续查询。"""
  32. def _extract_json_object(text: str) -> dict[str, Any]:
  33. raw = text.strip()
  34. if raw.startswith("```"):
  35. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  36. raw = re.sub(r"\s*```$", "", raw)
  37. try:
  38. parsed = json.loads(raw)
  39. if isinstance(parsed, dict):
  40. return parsed
  41. except json.JSONDecodeError:
  42. pass
  43. match = re.search(r"\{[\s\S]*\}", raw)
  44. if not match:
  45. raise HotContentFlowError("llm output is not json object")
  46. try:
  47. parsed = json.loads(match.group(0))
  48. except json.JSONDecodeError as exc:
  49. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  50. if not isinstance(parsed, dict):
  51. raise HotContentFlowError("llm output is not json object")
  52. return parsed
  53. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  54. today = datetime.now(SHANGHAI_TZ).date()
  55. end_date = today - timedelta(days=1)
  56. start_date = end_date - timedelta(days=max(lookback_days, 1))
  57. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  58. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  59. rows = ((wx_resp.get("data") or {}).get("data") or [])
  60. if not isinstance(rows, list):
  61. return []
  62. series: list[dict[str, Any]] = []
  63. for row in rows:
  64. if not isinstance(row, dict):
  65. continue
  66. ymd = str(row.get("ymd") or "").strip()
  67. total_score = ((row.get("channel_score") or {}).get("total_score"))
  68. try:
  69. score_num = float(total_score) if total_score is not None else None
  70. except (TypeError, ValueError):
  71. score_num = None
  72. if ymd and score_num is not None:
  73. series.append({"ymd": ymd, "total_score": score_num})
  74. series.sort(key=lambda x: x["ymd"])
  75. return series
  76. def _normalize_demand_key(value: str) -> str:
  77. return "".join(value.split())
  78. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  79. lookup: dict[str, str] = {}
  80. for item in demand_name_set:
  81. demand_name = str(item).strip()
  82. if not demand_name:
  83. continue
  84. lookup.setdefault(demand_name, demand_name)
  85. compact_key = _normalize_demand_key(demand_name)
  86. if compact_key:
  87. lookup.setdefault(compact_key, demand_name)
  88. return lookup
  89. def _resolve_demand_name(
  90. demand_name: str,
  91. demand_lookup: dict[str, str],
  92. ) -> str | None:
  93. value = demand_name.strip()
  94. if not value:
  95. return None
  96. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  97. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  98. demand_names: list[str] = []
  99. seen: set[str] = set()
  100. for row in matched_word_rows:
  101. if not isinstance(row, dict):
  102. continue
  103. match_rows = row.get("匹配需求列表") or []
  104. if not isinstance(match_rows, list):
  105. continue
  106. for match in match_rows:
  107. if not isinstance(match, dict):
  108. continue
  109. demand_name = str(match.get("demand_name") or "").strip()
  110. if demand_name and demand_name not in seen:
  111. seen.add(demand_name)
  112. demand_names.append(demand_name)
  113. return demand_names
  114. class ContributionPostprocessService:
  115. def __init__(
  116. self,
  117. config: FlowConfig,
  118. repository: HotContentRepository,
  119. api_client: JsonApiClient,
  120. ):
  121. self.config = config
  122. self.repository = repository
  123. self.api_client = api_client
  124. def run(self) -> dict[str, Any]:
  125. records = self.repository.list_postprocess_candidates(
  126. limit=max(self.config.postprocess_batch_size, 1)
  127. )
  128. if not records:
  129. return self._finalize_run_result(
  130. {
  131. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  132. "status": "success",
  133. "candidate_count": 0,
  134. "matched_count": 0,
  135. "wxindex_count": 0,
  136. "skipped_count": 0,
  137. "failed_count": 0,
  138. }
  139. )
  140. cache = DemandCacheService(
  141. self.config,
  142. self.repository,
  143. ).get_or_create_current_hour_cache()
  144. demand_cache_run_id = int(cache["id"])
  145. demand_name_set = cache["demand_name_set"]
  146. if not demand_name_set:
  147. return self._finalize_run_result(
  148. {
  149. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  150. "status": "skipped",
  151. "reason": "empty_demand_cache",
  152. "demand_cache_run_id": demand_cache_run_id,
  153. "cache_source": cache.get("source"),
  154. "processed_count": 0,
  155. }
  156. )
  157. matched_count = 0
  158. wxindex_count = 0
  159. quality_count = 0
  160. exported_count = 0
  161. skipped_count = 0
  162. failed_count = 0
  163. for record in records:
  164. record_id = int(record["id"])
  165. try:
  166. match_result = record.get("contribution_demand_match_json")
  167. if not isinstance(match_result, dict):
  168. match_result = self.match_record(
  169. record=record,
  170. demand_name_set=demand_name_set,
  171. demand_cache_run_id=demand_cache_run_id,
  172. )
  173. self.repository.save_contribution_demand_match(
  174. record_id=record_id,
  175. demand_cache_run_id=demand_cache_run_id,
  176. match_json=match_result,
  177. )
  178. matched_count += 1
  179. sanitized_match_result = self.sanitize_match_result(
  180. match_result,
  181. demand_name_set=demand_name_set,
  182. demand_cache_run_id=demand_cache_run_id,
  183. )
  184. if sanitized_match_result != match_result:
  185. match_result = sanitized_match_result
  186. self.repository.save_contribution_demand_match(
  187. record_id=record_id,
  188. demand_cache_run_id=demand_cache_run_id,
  189. match_json=match_result,
  190. )
  191. trend_result = self.build_wxindex_trend(record, match_result)
  192. if trend_result is None:
  193. self.repository.update_postprocess_status(
  194. record_id=record_id,
  195. status=PostprocessStatus.SKIPPED,
  196. error_message="no matched demand words",
  197. )
  198. self._save_empty_demand_quality(record_id=record_id)
  199. exported_count += self.export_demand_terms_if_needed(
  200. record=record,
  201. match_result=match_result,
  202. trend_result=None,
  203. )
  204. skipped_count += 1
  205. continue
  206. self.repository.save_wxindex_trend(
  207. record_id=record_id,
  208. trend_json=trend_result,
  209. )
  210. self.sync_wxindex_words(
  211. record_id=record_id,
  212. trend_result=trend_result,
  213. event_created_at=record.get("created_at"),
  214. )
  215. event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
  216. record=record,
  217. match_result=match_result,
  218. trend_result=trend_result,
  219. )
  220. self.repository.save_demand_quality(
  221. record_id=record_id,
  222. event_sense_json=event_sense_json,
  223. senior_fit_json=senior_fit_json,
  224. )
  225. exported_count += self.export_demand_terms_if_needed(
  226. record=record,
  227. match_result=match_result,
  228. trend_result=trend_result,
  229. event_sense_json=event_sense_json,
  230. senior_fit_json=senior_fit_json,
  231. )
  232. wxindex_count += 1
  233. quality_count += 1
  234. except WxindexSelectionSkipped as exc:
  235. self.repository.update_postprocess_status(
  236. record_id=record_id,
  237. status=PostprocessStatus.SKIPPED,
  238. error_message=str(exc),
  239. )
  240. if isinstance(match_result, dict):
  241. self._save_empty_demand_quality(record_id=record_id)
  242. exported_count += self.export_demand_terms_if_needed(
  243. record=record,
  244. match_result=match_result,
  245. trend_result=None,
  246. )
  247. skipped_count += 1
  248. except Exception as exc:
  249. self.repository.update_postprocess_status(
  250. record_id=record_id,
  251. status=PostprocessStatus.FAILED,
  252. error_message=str(exc),
  253. )
  254. failed_count += 1
  255. return self._finalize_run_result(
  256. {
  257. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  258. "status": "success",
  259. "demand_cache_run_id": demand_cache_run_id,
  260. "cache_source": cache.get("source"),
  261. "cache_hour": str(cache.get("cache_hour") or ""),
  262. "demand_name_count": len(demand_name_set),
  263. "candidate_count": len(records),
  264. "matched_count": matched_count,
  265. "wxindex_count": wxindex_count,
  266. "quality_count": quality_count,
  267. "exported_count": exported_count,
  268. "skipped_count": skipped_count,
  269. "failed_count": failed_count,
  270. }
  271. )
  272. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  273. try:
  274. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  275. except Exception as exc:
  276. result["hive_sync_error"] = str(exc)
  277. return result
  278. def sync_wxindex_words(
  279. self,
  280. *,
  281. record_id: int,
  282. trend_result: dict[str, Any],
  283. event_created_at: datetime | None = None,
  284. verbose: bool = False,
  285. ) -> dict[str, int]:
  286. return sync_words_from_trend_json(
  287. self.repository,
  288. self.api_client,
  289. self.config.wxindex_api_url,
  290. trend_json=trend_result,
  291. record_id=record_id,
  292. event_created_at=event_created_at,
  293. verbose=verbose,
  294. update_meta_if_exists=True,
  295. )
  296. def _save_empty_demand_quality(self, *, record_id: int) -> None:
  297. self.repository.save_demand_quality(
  298. record_id=record_id,
  299. event_sense_json={},
  300. senior_fit_json={},
  301. update_status=False,
  302. )
  303. def run_demand_quality_judgment(
  304. self,
  305. *,
  306. record: dict[str, Any],
  307. match_result: dict[str, Any],
  308. trend_result: dict[str, Any],
  309. ) -> tuple[dict[str, Any], dict[str, Any]]:
  310. channel_content_id = str(
  311. match_result.get("channelContentId") or record.get("unique_key") or ""
  312. ).strip()
  313. base_export_rows = attach_wxindex_metadata(
  314. build_demand_export_rows(
  315. match_result,
  316. contribution_points=(
  317. record.get("contribution_points_json")
  318. if isinstance(record.get("contribution_points_json"), dict)
  319. else None
  320. ),
  321. trend_json=trend_result,
  322. ),
  323. trend_result,
  324. wxindex_threshold=self.config.wxindex_score_threshold,
  325. )
  326. return run_demand_quality_pipeline(
  327. channel_content_id=channel_content_id,
  328. export_rows=base_export_rows,
  329. wxindex_threshold=self.config.wxindex_score_threshold,
  330. event_threshold=self.config.demand_event_sense_threshold,
  331. senior_threshold=self.config.demand_senior_fit_threshold,
  332. model=self.config.demand_quality_llm_model,
  333. max_attempts=self.config.demand_quality_llm_max_attempts,
  334. retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
  335. max_tokens=self.config.demand_quality_llm_max_tokens,
  336. )
  337. def export_demand_terms_if_needed(
  338. self,
  339. *,
  340. record: dict[str, Any],
  341. match_result: dict[str, Any],
  342. trend_result: dict[str, Any] | None,
  343. event_sense_json: dict[str, Any] | None = None,
  344. senior_fit_json: dict[str, Any] | None = None,
  345. ) -> int:
  346. normalized_record = {
  347. "contribution_demand_match_json": match_result,
  348. "contribution_points_json": (
  349. record.get("contribution_points_json")
  350. if isinstance(record.get("contribution_points_json"), dict)
  351. else None
  352. ),
  353. "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
  354. "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
  355. "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
  356. }
  357. export_rows = build_export_rows_from_record(
  358. normalized_record,
  359. wxindex_threshold=self.config.wxindex_score_threshold,
  360. event_sense_json=normalized_record["demand_event_sense_json"],
  361. senior_fit_json=normalized_record["demand_senior_fit_json"],
  362. event_threshold=self.config.demand_event_sense_threshold,
  363. senior_threshold=self.config.demand_senior_fit_threshold,
  364. )
  365. self.repository.replace_demand_export_rows(
  366. record_id=int(record["id"]),
  367. source=str(record.get("source") or ""),
  368. hot_title=str(record.get("title") or ""),
  369. article_title=str(record.get("article_title") or ""),
  370. rows=export_rows,
  371. )
  372. return len(export_rows)
  373. def match_record(
  374. self,
  375. *,
  376. record: dict[str, Any],
  377. demand_name_set: list[str],
  378. demand_cache_run_id: int,
  379. ) -> dict[str, Any]:
  380. contribution_points = record.get("contribution_points_json")
  381. if not isinstance(contribution_points, dict):
  382. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  383. channel_content_id = str(
  384. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  385. ).strip()
  386. words_rows = contribution_points.get("高贡献词列表") or []
  387. if not isinstance(words_rows, list):
  388. words_rows = []
  389. word_list = [
  390. str(item.get("词") or "").strip()
  391. for item in words_rows
  392. if isinstance(item, dict) and str(item.get("词") or "").strip()
  393. ]
  394. llm_result = self.llm_match_single_article(
  395. channel_content_id=channel_content_id or str(record["unique_key"]),
  396. words=word_list,
  397. demand_name_set=demand_name_set,
  398. )
  399. demand_lookup = _build_demand_lookup(demand_name_set)
  400. matched_map: dict[str, list[dict[str, str]]] = {}
  401. for item in llm_result.get("matched") or []:
  402. if not isinstance(item, dict):
  403. continue
  404. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  405. demand_name = str(item.get("demand_name") or "").strip()
  406. reason = str(item.get("reason") or "").strip()
  407. if not word or word not in word_list:
  408. continue
  409. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  410. if canonical_demand_name is None:
  411. continue
  412. matched_map.setdefault(word, []).append(
  413. {
  414. "demand_name": canonical_demand_name,
  415. "reason": reason,
  416. }
  417. )
  418. output_words: list[dict[str, Any]] = []
  419. matched_word_rows: list[dict[str, Any]] = []
  420. for row in words_rows:
  421. if not isinstance(row, dict):
  422. continue
  423. word = str(row.get("词") or "").strip()
  424. item = {"词": word, "贡献度": row.get("贡献度")}
  425. if word in matched_map:
  426. item["匹配需求列表"] = matched_map[word]
  427. matched_word_rows.append(item)
  428. output_words.append(item)
  429. matched_points = self.filter_matched_points(
  430. contribution_points.get("点列表"),
  431. set(matched_map.keys()),
  432. )
  433. return {
  434. "channelContentId": channel_content_id,
  435. "demand_cache_run_id": demand_cache_run_id,
  436. "高贡献词列表": output_words,
  437. "匹配到需求的词列表": matched_word_rows,
  438. "点列表": matched_points,
  439. }
  440. def sanitize_match_result(
  441. self,
  442. match_result: dict[str, Any],
  443. *,
  444. demand_name_set: list[str],
  445. demand_cache_run_id: int,
  446. ) -> dict[str, Any]:
  447. demand_lookup = _build_demand_lookup(demand_name_set)
  448. words_rows = match_result.get("高贡献词列表") or []
  449. if not isinstance(words_rows, list):
  450. words_rows = []
  451. output_words: list[dict[str, Any]] = []
  452. matched_word_rows: list[dict[str, Any]] = []
  453. valid_words: set[str] = set()
  454. for row in words_rows:
  455. if not isinstance(row, dict):
  456. continue
  457. word = str(row.get("词") or "").strip()
  458. item = {"词": word, "贡献度": row.get("贡献度")}
  459. match_rows = row.get("匹配需求列表") or []
  460. if not isinstance(match_rows, list):
  461. match_rows = []
  462. valid_match_rows: list[dict[str, str]] = []
  463. for match in match_rows:
  464. if not isinstance(match, dict):
  465. continue
  466. demand_name = str(match.get("demand_name") or "").strip()
  467. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  468. if canonical_demand_name is None:
  469. continue
  470. valid_match_rows.append(
  471. {
  472. "demand_name": canonical_demand_name,
  473. "reason": str(match.get("reason") or "").strip(),
  474. }
  475. )
  476. if word and valid_match_rows:
  477. item["匹配需求列表"] = valid_match_rows
  478. matched_word_rows.append(item)
  479. valid_words.add(word)
  480. output_words.append(item)
  481. return {
  482. "channelContentId": str(match_result.get("channelContentId") or ""),
  483. "demand_cache_run_id": demand_cache_run_id,
  484. "高贡献词列表": output_words,
  485. "匹配到需求的词列表": matched_word_rows,
  486. "点列表": self.filter_matched_points(
  487. match_result.get("点列表"),
  488. valid_words,
  489. ),
  490. }
  491. def llm_match_single_article(
  492. self,
  493. *,
  494. channel_content_id: str,
  495. words: list[str],
  496. demand_name_set: list[str],
  497. ) -> dict[str, Any]:
  498. if not words:
  499. return {"source": channel_content_id, "matched": []}
  500. system_prompt = """
  501. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  502. # 任务
  503. 我会提供两组数据:
  504. 1. 热点词列表:一组待匹配的热点词语
  505. 2. 需求词库:一组已有的需求词语
  506. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  507. # 匹配标准
  508. 满足以下任意一条,则视为匹配成功:
  509. - 热点词与需求词含义相同或高度相近(如同义词、近义词)
  510. - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
  511. - 热点词与需求词在用户意图上高度一致
  512. 以下情况,不得视为匹配:
  513. - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
  514. - 热点词与需求词只有表面字符重叠,语义方向不同
  515. - 热点词是需求词的上位概念(范围过宽,含义不够精确)
  516. - 两者只是同属某个大类,但具体含义差异明显
  517. # 多词组成的需求词处理规则
  518. 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
  519. # 输出规则
  520. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  521. # 约束
  522. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  523. """
  524. user_payload = {
  525. "source": channel_content_id,
  526. "words": words,
  527. "demand_name_set": demand_name_set,
  528. "output_schema": {
  529. "source": "string",
  530. "matched": [
  531. {
  532. "title": "string, must be selected from words",
  533. "demand_name": "string, must be selected from demand_name_set",
  534. "reason": "string",
  535. }
  536. ],
  537. },
  538. }
  539. last_error: Exception | None = None
  540. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  541. try:
  542. resp = create_chat_completion(
  543. [
  544. {"role": "system", "content": system_prompt},
  545. {
  546. "role": "user",
  547. "content": json.dumps(user_payload, ensure_ascii=False),
  548. },
  549. ],
  550. model=self.config.contribution_match_llm_model or None,
  551. temperature=0,
  552. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  553. )
  554. parsed = _extract_json_object(str(resp.get("content") or ""))
  555. parsed.setdefault("source", channel_content_id)
  556. parsed.setdefault("matched", [])
  557. return parsed
  558. except (OpenRouterCallError, HotContentFlowError) as exc:
  559. last_error = exc
  560. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  561. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  562. raise HotContentFlowError(
  563. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  564. ) from last_error
  565. def build_wxindex_trend(
  566. self,
  567. record: dict[str, Any],
  568. match_result: dict[str, Any],
  569. ) -> dict[str, Any] | None:
  570. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  571. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  572. return None
  573. contribution_words = [
  574. str(row.get("词") or "").strip()
  575. for row in matched_word_rows
  576. if isinstance(row, dict) and str(row.get("词") or "").strip()
  577. ]
  578. if not contribution_words:
  579. return None
  580. channel_content_id = str(
  581. match_result.get("channelContentId") or record.get("unique_key") or ""
  582. )
  583. article_title, body_text = self.extract_article_text(record)
  584. matched_demands = _collect_matched_demand_names(matched_word_rows)
  585. pick = self.llm_extract_wxindex_words(
  586. channel_content_id=channel_content_id,
  587. article_title=article_title,
  588. body_text=body_text,
  589. contribution_words=contribution_words,
  590. matched_demands=matched_demands,
  591. )
  592. selected_words = pick["selected_words"]
  593. threshold = float(self.config.wxindex_score_threshold)
  594. wxindex_searches: list[dict[str, Any]] = []
  595. event_created_at = record.get("created_at")
  596. for keyword in selected_words:
  597. full_scores, _action = ensure_word_full_scores(
  598. self.repository,
  599. self.api_client,
  600. self.config.wxindex_api_url,
  601. keyword=keyword,
  602. event_created_at=event_created_at,
  603. update_meta_if_exists=True,
  604. )
  605. series, start_ymd, end_ymd = slice_scores_lookback(
  606. full_scores,
  607. self.config.wxindex_lookback_days,
  608. )
  609. latest_score = series[-1]["total_score"] if series else None
  610. wxindex_searches.append(
  611. {
  612. "keyword": keyword,
  613. "start_ymd": start_ymd,
  614. "end_ymd": end_ymd,
  615. "total_score_7d": series,
  616. "latest_total_score": latest_score,
  617. "threshold": threshold,
  618. "latest_gt_threshold": (
  619. False
  620. if latest_score is None
  621. else latest_score >= threshold
  622. ),
  623. "trend": calc_wxindex_trend(series),
  624. }
  625. )
  626. searchable = [
  627. item
  628. for item in wxindex_searches
  629. if item.get("latest_total_score") is not None
  630. ]
  631. if not searchable:
  632. raise WxindexSelectionSkipped(
  633. f"no wxindex score for any keyword in {channel_content_id}: "
  634. f"{selected_words}"
  635. )
  636. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  637. selected_word = str(best["keyword"])
  638. latest_score = best["latest_total_score"]
  639. series = best["total_score_7d"]
  640. return {
  641. "channelContentId": channel_content_id,
  642. "article_title": article_title,
  643. "llm_selected_words": selected_words,
  644. "llm_selected_word": selected_word,
  645. "llm_reason": pick["reason"],
  646. "wxindex_searches": wxindex_searches,
  647. "wxindex": {
  648. "keyword": selected_word,
  649. "keywords": selected_words,
  650. "start_ymd": best["start_ymd"],
  651. "end_ymd": best["end_ymd"],
  652. "total_score_7d": series,
  653. "latest_total_score": latest_score,
  654. "threshold": threshold,
  655. "latest_gt_threshold": latest_score >= threshold,
  656. "trend": best["trend"],
  657. },
  658. }
  659. def llm_extract_wxindex_words(
  660. self,
  661. *,
  662. channel_content_id: str,
  663. article_title: str,
  664. body_text: str,
  665. contribution_words: list[str],
  666. matched_demands: list[str],
  667. ) -> dict[str, Any]:
  668. system_prompt = """
  669. #角色
  670. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  671. # 任务
  672. 我会提供文章标题、正文,以及两类备选词来源:
  673. 1. 高贡献词:文章贡献度较高的关键词
  674. 2. 已匹配需求:已与需求库匹配上的需求名
  675. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  676. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  677. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  678. # 输出规则
  679. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  680. """
  681. user_payload = {
  682. "source": channel_content_id,
  683. "article_title": article_title,
  684. "article_body_text": body_text,
  685. "contribution_words": contribution_words,
  686. "matched_demands": matched_demands,
  687. "output_schema": {
  688. "source": "string",
  689. "selected_words": [
  690. "string, concise keyword for wxindex search, one or more"
  691. ],
  692. "reason": "string",
  693. },
  694. "constraints": [
  695. "selected_words 为数组,至少 1 个词,可多个",
  696. "每个词简洁(2-4 字),适合微信指数检索",
  697. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  698. "多个词应分别覆盖不同事件或角度,避免语义重复",
  699. "reason 简洁说明,不超过60字",
  700. "仅输出 JSON 对象,不要 markdown 代码块",
  701. ],
  702. }
  703. last_error: Exception | None = None
  704. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  705. try:
  706. resp = create_chat_completion(
  707. [
  708. {"role": "system", "content": system_prompt},
  709. {
  710. "role": "user",
  711. "content": json.dumps(user_payload, ensure_ascii=False),
  712. },
  713. ],
  714. model=self.config.wxindex_llm_model or None,
  715. temperature=0,
  716. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  717. )
  718. parsed = _extract_json_object(str(resp.get("content") or ""))
  719. raw_words = parsed.get("selected_words")
  720. if isinstance(raw_words, str):
  721. raw_words = [raw_words]
  722. if not isinstance(raw_words, list):
  723. legacy_word = str(parsed.get("selected_word") or "").strip()
  724. raw_words = [legacy_word] if legacy_word else []
  725. selected_words: list[str] = []
  726. seen: set[str] = set()
  727. for item in raw_words:
  728. word = str(item or "").strip()
  729. if word and word not in seen:
  730. seen.add(word)
  731. selected_words.append(word)
  732. reason = str(parsed.get("reason") or "").strip()
  733. if not selected_words:
  734. raise WxindexSelectionSkipped(
  735. f"selected_words empty for {channel_content_id}"
  736. )
  737. return {"selected_words": selected_words, "reason": reason}
  738. except WxindexSelectionSkipped:
  739. raise
  740. except (OpenRouterCallError, HotContentFlowError) as exc:
  741. last_error = exc
  742. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  743. continue
  744. raise HotContentFlowError(
  745. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  746. ) from last_error
  747. @staticmethod
  748. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  749. if not matched_words or not isinstance(raw_points, list):
  750. return []
  751. matched_points: list[dict[str, Any]] = []
  752. for point in raw_points:
  753. if not isinstance(point, dict):
  754. continue
  755. point_match_rows = point.get("匹配词列表") or []
  756. if not isinstance(point_match_rows, list):
  757. point_match_rows = []
  758. keep_rows = [
  759. row
  760. for row in point_match_rows
  761. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  762. ]
  763. if keep_rows:
  764. matched_points.append(
  765. {
  766. "来源": str(point.get("来源") or ""),
  767. "点": str(point.get("点") or ""),
  768. "点描述": str(point.get("点描述") or ""),
  769. "匹配词列表": keep_rows,
  770. }
  771. )
  772. return matched_points
  773. @staticmethod
  774. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  775. decode_result = record.get("decode_result_json")
  776. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  777. if not isinstance(target_post, dict):
  778. target_post = {}
  779. article_title = str(
  780. target_post.get("title") or record.get("article_title") or ""
  781. ).strip()
  782. body_text = str(
  783. target_post.get("body_text") or record.get("article_body") or ""
  784. ).strip()
  785. return article_title, body_text
  786. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  787. flow_config = config or load_flow_config()
  788. repository = HotContentRepository(flow_config.mysql)
  789. try:
  790. api_client = JsonApiClient(
  791. timeout_seconds=flow_config.request_timeout_seconds,
  792. verify_ssl=flow_config.https_verify_ssl,
  793. )
  794. service = ContributionPostprocessService(flow_config, repository, api_client)
  795. return service.run()
  796. finally:
  797. repository.close()