postprocess_service.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. build_export_rows_from_record,
  21. )
  22. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  23. from app.hot_content.demand_quality import run_demand_quality_pipeline
  24. from app.hot_content.wxindex_trend import calc_wxindex_trend
  25. class WxindexSelectionSkipped(Exception):
  26. """微信指数选词无效时跳过后续查询。"""
  27. def _extract_json_object(text: str) -> dict[str, Any]:
  28. raw = text.strip()
  29. if raw.startswith("```"):
  30. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  31. raw = re.sub(r"\s*```$", "", raw)
  32. try:
  33. parsed = json.loads(raw)
  34. if isinstance(parsed, dict):
  35. return parsed
  36. except json.JSONDecodeError:
  37. pass
  38. match = re.search(r"\{[\s\S]*\}", raw)
  39. if not match:
  40. raise HotContentFlowError("llm output is not json object")
  41. try:
  42. parsed = json.loads(match.group(0))
  43. except json.JSONDecodeError as exc:
  44. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  45. if not isinstance(parsed, dict):
  46. raise HotContentFlowError("llm output is not json object")
  47. return parsed
  48. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  49. today = datetime.now(SHANGHAI_TZ).date()
  50. end_date = today - timedelta(days=1)
  51. start_date = end_date - timedelta(days=max(lookback_days, 1))
  52. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  53. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  54. rows = ((wx_resp.get("data") or {}).get("data") or [])
  55. if not isinstance(rows, list):
  56. return []
  57. series: list[dict[str, Any]] = []
  58. for row in rows:
  59. if not isinstance(row, dict):
  60. continue
  61. ymd = str(row.get("ymd") or "").strip()
  62. total_score = ((row.get("channel_score") or {}).get("total_score"))
  63. try:
  64. score_num = float(total_score) if total_score is not None else None
  65. except (TypeError, ValueError):
  66. score_num = None
  67. if ymd and score_num is not None:
  68. series.append({"ymd": ymd, "total_score": score_num})
  69. series.sort(key=lambda x: x["ymd"])
  70. return series
  71. def _normalize_demand_key(value: str) -> str:
  72. return "".join(value.split())
  73. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  74. lookup: dict[str, str] = {}
  75. for item in demand_name_set:
  76. demand_name = str(item).strip()
  77. if not demand_name:
  78. continue
  79. lookup.setdefault(demand_name, demand_name)
  80. compact_key = _normalize_demand_key(demand_name)
  81. if compact_key:
  82. lookup.setdefault(compact_key, demand_name)
  83. return lookup
  84. def _resolve_demand_name(
  85. demand_name: str,
  86. demand_lookup: dict[str, str],
  87. ) -> str | None:
  88. value = demand_name.strip()
  89. if not value:
  90. return None
  91. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  92. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  93. demand_names: list[str] = []
  94. seen: set[str] = set()
  95. for row in matched_word_rows:
  96. if not isinstance(row, dict):
  97. continue
  98. match_rows = row.get("匹配需求列表") or []
  99. if not isinstance(match_rows, list):
  100. continue
  101. for match in match_rows:
  102. if not isinstance(match, dict):
  103. continue
  104. demand_name = str(match.get("demand_name") or "").strip()
  105. if demand_name and demand_name not in seen:
  106. seen.add(demand_name)
  107. demand_names.append(demand_name)
  108. return demand_names
  109. class ContributionPostprocessService:
  110. def __init__(
  111. self,
  112. config: FlowConfig,
  113. repository: HotContentRepository,
  114. api_client: JsonApiClient,
  115. ):
  116. self.config = config
  117. self.repository = repository
  118. self.api_client = api_client
  119. def run(self) -> dict[str, Any]:
  120. records = self.repository.list_postprocess_candidates(
  121. limit=max(self.config.postprocess_batch_size, 1)
  122. )
  123. if not records:
  124. return self._finalize_run_result(
  125. {
  126. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  127. "status": "success",
  128. "candidate_count": 0,
  129. "matched_count": 0,
  130. "wxindex_count": 0,
  131. "skipped_count": 0,
  132. "failed_count": 0,
  133. }
  134. )
  135. cache = DemandCacheService(
  136. self.config,
  137. self.repository,
  138. ).get_or_create_current_hour_cache()
  139. demand_cache_run_id = int(cache["id"])
  140. demand_name_set = cache["demand_name_set"]
  141. if not demand_name_set:
  142. return self._finalize_run_result(
  143. {
  144. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  145. "status": "skipped",
  146. "reason": "empty_demand_cache",
  147. "demand_cache_run_id": demand_cache_run_id,
  148. "cache_source": cache.get("source"),
  149. "processed_count": 0,
  150. }
  151. )
  152. matched_count = 0
  153. wxindex_count = 0
  154. quality_count = 0
  155. exported_count = 0
  156. skipped_count = 0
  157. failed_count = 0
  158. for record in records:
  159. record_id = int(record["id"])
  160. try:
  161. match_result = record.get("contribution_demand_match_json")
  162. if not isinstance(match_result, dict):
  163. match_result = self.match_record(
  164. record=record,
  165. demand_name_set=demand_name_set,
  166. demand_cache_run_id=demand_cache_run_id,
  167. )
  168. self.repository.save_contribution_demand_match(
  169. record_id=record_id,
  170. demand_cache_run_id=demand_cache_run_id,
  171. match_json=match_result,
  172. )
  173. matched_count += 1
  174. sanitized_match_result = self.sanitize_match_result(
  175. match_result,
  176. demand_name_set=demand_name_set,
  177. demand_cache_run_id=demand_cache_run_id,
  178. )
  179. if sanitized_match_result != match_result:
  180. match_result = sanitized_match_result
  181. self.repository.save_contribution_demand_match(
  182. record_id=record_id,
  183. demand_cache_run_id=demand_cache_run_id,
  184. match_json=match_result,
  185. )
  186. trend_result = self.build_wxindex_trend(record, match_result)
  187. if trend_result is None:
  188. self.repository.update_postprocess_status(
  189. record_id=record_id,
  190. status=PostprocessStatus.SKIPPED,
  191. error_message="no matched demand words",
  192. )
  193. self._save_empty_demand_quality(record_id=record_id)
  194. exported_count += self.export_demand_terms_if_needed(
  195. record=record,
  196. match_result=match_result,
  197. trend_result=None,
  198. )
  199. skipped_count += 1
  200. continue
  201. self.repository.save_wxindex_trend(
  202. record_id=record_id,
  203. trend_json=trend_result,
  204. )
  205. event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
  206. record=record,
  207. match_result=match_result,
  208. trend_result=trend_result,
  209. )
  210. self.repository.save_demand_quality(
  211. record_id=record_id,
  212. event_sense_json=event_sense_json,
  213. senior_fit_json=senior_fit_json,
  214. )
  215. exported_count += self.export_demand_terms_if_needed(
  216. record=record,
  217. match_result=match_result,
  218. trend_result=trend_result,
  219. event_sense_json=event_sense_json,
  220. senior_fit_json=senior_fit_json,
  221. )
  222. wxindex_count += 1
  223. quality_count += 1
  224. except WxindexSelectionSkipped as exc:
  225. self.repository.update_postprocess_status(
  226. record_id=record_id,
  227. status=PostprocessStatus.SKIPPED,
  228. error_message=str(exc),
  229. )
  230. if isinstance(match_result, dict):
  231. self._save_empty_demand_quality(record_id=record_id)
  232. exported_count += self.export_demand_terms_if_needed(
  233. record=record,
  234. match_result=match_result,
  235. trend_result=None,
  236. )
  237. skipped_count += 1
  238. except Exception as exc:
  239. self.repository.update_postprocess_status(
  240. record_id=record_id,
  241. status=PostprocessStatus.FAILED,
  242. error_message=str(exc),
  243. )
  244. failed_count += 1
  245. return self._finalize_run_result(
  246. {
  247. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  248. "status": "success",
  249. "demand_cache_run_id": demand_cache_run_id,
  250. "cache_source": cache.get("source"),
  251. "cache_hour": str(cache.get("cache_hour") or ""),
  252. "demand_name_count": len(demand_name_set),
  253. "candidate_count": len(records),
  254. "matched_count": matched_count,
  255. "wxindex_count": wxindex_count,
  256. "quality_count": quality_count,
  257. "exported_count": exported_count,
  258. "skipped_count": skipped_count,
  259. "failed_count": failed_count,
  260. }
  261. )
  262. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  263. try:
  264. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  265. except Exception as exc:
  266. result["hive_sync_error"] = str(exc)
  267. return result
  268. def _save_empty_demand_quality(self, *, record_id: int) -> None:
  269. self.repository.save_demand_quality(
  270. record_id=record_id,
  271. event_sense_json={},
  272. senior_fit_json={},
  273. update_status=False,
  274. )
  275. def run_demand_quality_judgment(
  276. self,
  277. *,
  278. record: dict[str, Any],
  279. match_result: dict[str, Any],
  280. trend_result: dict[str, Any],
  281. ) -> tuple[dict[str, Any], dict[str, Any]]:
  282. channel_content_id = str(
  283. match_result.get("channelContentId") or record.get("unique_key") or ""
  284. ).strip()
  285. base_export_rows = attach_wxindex_metadata(
  286. build_demand_export_rows(
  287. match_result,
  288. contribution_points=(
  289. record.get("contribution_points_json")
  290. if isinstance(record.get("contribution_points_json"), dict)
  291. else None
  292. ),
  293. trend_json=trend_result,
  294. ),
  295. trend_result,
  296. wxindex_threshold=self.config.wxindex_score_threshold,
  297. )
  298. return run_demand_quality_pipeline(
  299. channel_content_id=channel_content_id,
  300. export_rows=base_export_rows,
  301. wxindex_threshold=self.config.wxindex_score_threshold,
  302. event_threshold=self.config.demand_event_sense_threshold,
  303. senior_threshold=self.config.demand_senior_fit_threshold,
  304. model=self.config.demand_quality_llm_model,
  305. max_attempts=self.config.demand_quality_llm_max_attempts,
  306. retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
  307. max_tokens=self.config.demand_quality_llm_max_tokens,
  308. )
  309. def export_demand_terms_if_needed(
  310. self,
  311. *,
  312. record: dict[str, Any],
  313. match_result: dict[str, Any],
  314. trend_result: dict[str, Any] | None,
  315. event_sense_json: dict[str, Any] | None = None,
  316. senior_fit_json: dict[str, Any] | None = None,
  317. ) -> int:
  318. normalized_record = {
  319. "contribution_demand_match_json": match_result,
  320. "contribution_points_json": (
  321. record.get("contribution_points_json")
  322. if isinstance(record.get("contribution_points_json"), dict)
  323. else None
  324. ),
  325. "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
  326. "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
  327. "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
  328. }
  329. export_rows = build_export_rows_from_record(
  330. normalized_record,
  331. wxindex_threshold=self.config.wxindex_score_threshold,
  332. event_sense_json=normalized_record["demand_event_sense_json"],
  333. senior_fit_json=normalized_record["demand_senior_fit_json"],
  334. event_threshold=self.config.demand_event_sense_threshold,
  335. senior_threshold=self.config.demand_senior_fit_threshold,
  336. )
  337. self.repository.replace_demand_export_rows(
  338. record_id=int(record["id"]),
  339. source=str(record.get("source") or ""),
  340. hot_title=str(record.get("title") or ""),
  341. article_title=str(record.get("article_title") or ""),
  342. rows=export_rows,
  343. )
  344. return len(export_rows)
  345. def match_record(
  346. self,
  347. *,
  348. record: dict[str, Any],
  349. demand_name_set: list[str],
  350. demand_cache_run_id: int,
  351. ) -> dict[str, Any]:
  352. contribution_points = record.get("contribution_points_json")
  353. if not isinstance(contribution_points, dict):
  354. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  355. channel_content_id = str(
  356. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  357. ).strip()
  358. words_rows = contribution_points.get("高贡献词列表") or []
  359. if not isinstance(words_rows, list):
  360. words_rows = []
  361. word_list = [
  362. str(item.get("词") or "").strip()
  363. for item in words_rows
  364. if isinstance(item, dict) and str(item.get("词") or "").strip()
  365. ]
  366. llm_result = self.llm_match_single_article(
  367. channel_content_id=channel_content_id or str(record["unique_key"]),
  368. words=word_list,
  369. demand_name_set=demand_name_set,
  370. )
  371. demand_lookup = _build_demand_lookup(demand_name_set)
  372. matched_map: dict[str, list[dict[str, str]]] = {}
  373. for item in llm_result.get("matched") or []:
  374. if not isinstance(item, dict):
  375. continue
  376. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  377. demand_name = str(item.get("demand_name") or "").strip()
  378. reason = str(item.get("reason") or "").strip()
  379. if not word or word not in word_list:
  380. continue
  381. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  382. if canonical_demand_name is None:
  383. continue
  384. matched_map.setdefault(word, []).append(
  385. {
  386. "demand_name": canonical_demand_name,
  387. "reason": reason,
  388. }
  389. )
  390. output_words: list[dict[str, Any]] = []
  391. matched_word_rows: list[dict[str, Any]] = []
  392. for row in words_rows:
  393. if not isinstance(row, dict):
  394. continue
  395. word = str(row.get("词") or "").strip()
  396. item = {"词": word, "贡献度": row.get("贡献度")}
  397. if word in matched_map:
  398. item["匹配需求列表"] = matched_map[word]
  399. matched_word_rows.append(item)
  400. output_words.append(item)
  401. matched_points = self.filter_matched_points(
  402. contribution_points.get("点列表"),
  403. set(matched_map.keys()),
  404. )
  405. return {
  406. "channelContentId": channel_content_id,
  407. "demand_cache_run_id": demand_cache_run_id,
  408. "高贡献词列表": output_words,
  409. "匹配到需求的词列表": matched_word_rows,
  410. "点列表": matched_points,
  411. }
  412. def sanitize_match_result(
  413. self,
  414. match_result: dict[str, Any],
  415. *,
  416. demand_name_set: list[str],
  417. demand_cache_run_id: int,
  418. ) -> dict[str, Any]:
  419. demand_lookup = _build_demand_lookup(demand_name_set)
  420. words_rows = match_result.get("高贡献词列表") or []
  421. if not isinstance(words_rows, list):
  422. words_rows = []
  423. output_words: list[dict[str, Any]] = []
  424. matched_word_rows: list[dict[str, Any]] = []
  425. valid_words: set[str] = set()
  426. for row in words_rows:
  427. if not isinstance(row, dict):
  428. continue
  429. word = str(row.get("词") or "").strip()
  430. item = {"词": word, "贡献度": row.get("贡献度")}
  431. match_rows = row.get("匹配需求列表") or []
  432. if not isinstance(match_rows, list):
  433. match_rows = []
  434. valid_match_rows: list[dict[str, str]] = []
  435. for match in match_rows:
  436. if not isinstance(match, dict):
  437. continue
  438. demand_name = str(match.get("demand_name") or "").strip()
  439. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  440. if canonical_demand_name is None:
  441. continue
  442. valid_match_rows.append(
  443. {
  444. "demand_name": canonical_demand_name,
  445. "reason": str(match.get("reason") or "").strip(),
  446. }
  447. )
  448. if word and valid_match_rows:
  449. item["匹配需求列表"] = valid_match_rows
  450. matched_word_rows.append(item)
  451. valid_words.add(word)
  452. output_words.append(item)
  453. return {
  454. "channelContentId": str(match_result.get("channelContentId") or ""),
  455. "demand_cache_run_id": demand_cache_run_id,
  456. "高贡献词列表": output_words,
  457. "匹配到需求的词列表": matched_word_rows,
  458. "点列表": self.filter_matched_points(
  459. match_result.get("点列表"),
  460. valid_words,
  461. ),
  462. }
  463. def llm_match_single_article(
  464. self,
  465. *,
  466. channel_content_id: str,
  467. words: list[str],
  468. demand_name_set: list[str],
  469. ) -> dict[str, Any]:
  470. if not words:
  471. return {"source": channel_content_id, "matched": []}
  472. system_prompt = """
  473. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  474. # 任务
  475. 我会提供两组数据:
  476. 1. 热点词列表:一组待匹配的热点词语
  477. 2. 需求词库:一组已有的需求词语
  478. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  479. # 匹配标准
  480. 满足以下任意一条,则视为匹配成功:
  481. - 热点词与需求词含义相同或高度相近(如同义词、近义词)
  482. - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
  483. - 热点词与需求词在用户意图上高度一致
  484. 以下情况,不得视为匹配:
  485. - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
  486. - 热点词与需求词只有表面字符重叠,语义方向不同
  487. - 热点词是需求词的上位概念(范围过宽,含义不够精确)
  488. - 两者只是同属某个大类,但具体含义差异明显
  489. # 多词组成的需求词处理规则
  490. 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
  491. # 输出规则
  492. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  493. # 约束
  494. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  495. """
  496. user_payload = {
  497. "source": channel_content_id,
  498. "words": words,
  499. "demand_name_set": demand_name_set,
  500. "output_schema": {
  501. "source": "string",
  502. "matched": [
  503. {
  504. "title": "string, must be selected from words",
  505. "demand_name": "string, must be selected from demand_name_set",
  506. "reason": "string",
  507. }
  508. ],
  509. },
  510. }
  511. last_error: Exception | None = None
  512. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  513. try:
  514. resp = create_chat_completion(
  515. [
  516. {"role": "system", "content": system_prompt},
  517. {
  518. "role": "user",
  519. "content": json.dumps(user_payload, ensure_ascii=False),
  520. },
  521. ],
  522. model=self.config.contribution_match_llm_model or None,
  523. temperature=0,
  524. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  525. )
  526. parsed = _extract_json_object(str(resp.get("content") or ""))
  527. parsed.setdefault("source", channel_content_id)
  528. parsed.setdefault("matched", [])
  529. return parsed
  530. except (OpenRouterCallError, HotContentFlowError) as exc:
  531. last_error = exc
  532. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  533. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  534. raise HotContentFlowError(
  535. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  536. ) from last_error
  537. def build_wxindex_trend(
  538. self,
  539. record: dict[str, Any],
  540. match_result: dict[str, Any],
  541. ) -> dict[str, Any] | None:
  542. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  543. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  544. return None
  545. contribution_words = [
  546. str(row.get("词") or "").strip()
  547. for row in matched_word_rows
  548. if isinstance(row, dict) and str(row.get("词") or "").strip()
  549. ]
  550. if not contribution_words:
  551. return None
  552. channel_content_id = str(
  553. match_result.get("channelContentId") or record.get("unique_key") or ""
  554. )
  555. article_title, body_text = self.extract_article_text(record)
  556. matched_demands = _collect_matched_demand_names(matched_word_rows)
  557. pick = self.llm_extract_wxindex_words(
  558. channel_content_id=channel_content_id,
  559. article_title=article_title,
  560. body_text=body_text,
  561. contribution_words=contribution_words,
  562. matched_demands=matched_demands,
  563. )
  564. selected_words = pick["selected_words"]
  565. start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
  566. threshold = float(self.config.wxindex_score_threshold)
  567. wxindex_searches: list[dict[str, Any]] = []
  568. for keyword in selected_words:
  569. wx_payload = {
  570. "keyword": keyword,
  571. "start_ymd": start_ymd,
  572. "end_ymd": end_ymd,
  573. }
  574. wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
  575. series = _parse_total_scores(wx_resp)
  576. latest_score = series[-1]["total_score"] if series else None
  577. wxindex_searches.append(
  578. {
  579. "keyword": keyword,
  580. "start_ymd": start_ymd,
  581. "end_ymd": end_ymd,
  582. "total_score_7d": series,
  583. "latest_total_score": latest_score,
  584. "threshold": threshold,
  585. "latest_gt_threshold": (
  586. False
  587. if latest_score is None
  588. else latest_score >= threshold
  589. ),
  590. "trend": calc_wxindex_trend(series),
  591. }
  592. )
  593. searchable = [
  594. item
  595. for item in wxindex_searches
  596. if item.get("latest_total_score") is not None
  597. ]
  598. if not searchable:
  599. raise WxindexSelectionSkipped(
  600. f"no wxindex score for any keyword in {channel_content_id}: "
  601. f"{selected_words}"
  602. )
  603. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  604. selected_word = str(best["keyword"])
  605. latest_score = best["latest_total_score"]
  606. series = best["total_score_7d"]
  607. return {
  608. "channelContentId": channel_content_id,
  609. "article_title": article_title,
  610. "llm_selected_words": selected_words,
  611. "llm_selected_word": selected_word,
  612. "llm_reason": pick["reason"],
  613. "wxindex_searches": wxindex_searches,
  614. "wxindex": {
  615. "keyword": selected_word,
  616. "keywords": selected_words,
  617. "start_ymd": start_ymd,
  618. "end_ymd": end_ymd,
  619. "total_score_7d": series,
  620. "latest_total_score": latest_score,
  621. "threshold": threshold,
  622. "latest_gt_threshold": latest_score >= threshold,
  623. "trend": best["trend"],
  624. },
  625. }
  626. def llm_extract_wxindex_words(
  627. self,
  628. *,
  629. channel_content_id: str,
  630. article_title: str,
  631. body_text: str,
  632. contribution_words: list[str],
  633. matched_demands: list[str],
  634. ) -> dict[str, Any]:
  635. system_prompt = """
  636. #角色
  637. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  638. # 任务
  639. 我会提供文章标题、正文,以及两类备选词来源:
  640. 1. 高贡献词:文章贡献度较高的关键词
  641. 2. 已匹配需求:已与需求库匹配上的需求名
  642. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  643. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  644. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  645. # 输出规则
  646. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  647. """
  648. user_payload = {
  649. "source": channel_content_id,
  650. "article_title": article_title,
  651. "article_body_text": body_text,
  652. "contribution_words": contribution_words,
  653. "matched_demands": matched_demands,
  654. "output_schema": {
  655. "source": "string",
  656. "selected_words": [
  657. "string, concise keyword for wxindex search, one or more"
  658. ],
  659. "reason": "string",
  660. },
  661. "constraints": [
  662. "selected_words 为数组,至少 1 个词,可多个",
  663. "每个词简洁(2-4 字),适合微信指数检索",
  664. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  665. "多个词应分别覆盖不同事件或角度,避免语义重复",
  666. "reason 简洁说明,不超过60字",
  667. "仅输出 JSON 对象,不要 markdown 代码块",
  668. ],
  669. }
  670. last_error: Exception | None = None
  671. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  672. try:
  673. resp = create_chat_completion(
  674. [
  675. {"role": "system", "content": system_prompt},
  676. {
  677. "role": "user",
  678. "content": json.dumps(user_payload, ensure_ascii=False),
  679. },
  680. ],
  681. model=self.config.wxindex_llm_model or None,
  682. temperature=0,
  683. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  684. )
  685. parsed = _extract_json_object(str(resp.get("content") or ""))
  686. raw_words = parsed.get("selected_words")
  687. if isinstance(raw_words, str):
  688. raw_words = [raw_words]
  689. if not isinstance(raw_words, list):
  690. legacy_word = str(parsed.get("selected_word") or "").strip()
  691. raw_words = [legacy_word] if legacy_word else []
  692. selected_words: list[str] = []
  693. seen: set[str] = set()
  694. for item in raw_words:
  695. word = str(item or "").strip()
  696. if word and word not in seen:
  697. seen.add(word)
  698. selected_words.append(word)
  699. reason = str(parsed.get("reason") or "").strip()
  700. if not selected_words:
  701. raise WxindexSelectionSkipped(
  702. f"selected_words empty for {channel_content_id}"
  703. )
  704. return {"selected_words": selected_words, "reason": reason}
  705. except WxindexSelectionSkipped:
  706. raise
  707. except (OpenRouterCallError, HotContentFlowError) as exc:
  708. last_error = exc
  709. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  710. continue
  711. raise HotContentFlowError(
  712. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  713. ) from last_error
  714. @staticmethod
  715. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  716. if not matched_words or not isinstance(raw_points, list):
  717. return []
  718. matched_points: list[dict[str, Any]] = []
  719. for point in raw_points:
  720. if not isinstance(point, dict):
  721. continue
  722. point_match_rows = point.get("匹配词列表") or []
  723. if not isinstance(point_match_rows, list):
  724. point_match_rows = []
  725. keep_rows = [
  726. row
  727. for row in point_match_rows
  728. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  729. ]
  730. if keep_rows:
  731. matched_points.append(
  732. {
  733. "来源": str(point.get("来源") or ""),
  734. "点": str(point.get("点") or ""),
  735. "点描述": str(point.get("点描述") or ""),
  736. "匹配词列表": keep_rows,
  737. }
  738. )
  739. return matched_points
  740. @staticmethod
  741. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  742. decode_result = record.get("decode_result_json")
  743. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  744. if not isinstance(target_post, dict):
  745. target_post = {}
  746. article_title = str(
  747. target_post.get("title") or record.get("article_title") or ""
  748. ).strip()
  749. body_text = str(
  750. target_post.get("body_text") or record.get("article_body") or ""
  751. ).strip()
  752. return article_title, body_text
  753. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  754. flow_config = config or load_flow_config()
  755. repository = HotContentRepository(flow_config.mysql)
  756. try:
  757. api_client = JsonApiClient(
  758. timeout_seconds=flow_config.request_timeout_seconds,
  759. verify_ssl=flow_config.https_verify_ssl,
  760. )
  761. service = ContributionPostprocessService(flow_config, repository, api_client)
  762. return service.run()
  763. finally:
  764. repository.close()