postprocess_service.py 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037
  1. """贡献点需求匹配与微信指数趋势后处理服务。"""
  2. from __future__ import annotations
  3. import json
  4. import re
  5. import time
  6. from datetime import datetime, timedelta
  7. from typing import Any
  8. from app.core.open_router_llm import OpenRouterCallError, create_chat_completion
  9. from app.hot_content.client import JsonApiClient
  10. from app.hot_content.config import load_flow_config
  11. from app.hot_content.demand_cache_service import DemandCacheService
  12. from app.hot_content.exceptions import HotContentFlowError
  13. from app.hot_content.repository import HotContentRepository
  14. from app.hot_content.status import PostprocessStatus
  15. from app.hot_content.timezone import SHANGHAI_TZ
  16. from app.hot_content.types import FlowConfig
  17. from app.hot_content.demand_export import (
  18. attach_wxindex_metadata,
  19. build_demand_export_rows,
  20. build_export_rows_from_record,
  21. )
  22. from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
  23. from app.hot_content.demand_quality import run_demand_quality_pipeline
  24. from app.hot_content.wxindex_trend import calc_wxindex_trend
  25. from app.hot_content.wxindex_words import (
  26. ensure_word_full_scores,
  27. slice_scores_lookback,
  28. sync_words_from_trend_json,
  29. )
  30. class WxindexSelectionSkipped(Exception):
  31. """微信指数选词无效时跳过后续查询。"""
  32. def _extract_json_object(text: str) -> dict[str, Any]:
  33. raw = text.strip()
  34. if raw.startswith("```"):
  35. raw = re.sub(r"^```(?:json)?\s*", "", raw)
  36. raw = re.sub(r"\s*```$", "", raw)
  37. try:
  38. parsed = json.loads(raw)
  39. if isinstance(parsed, dict):
  40. return parsed
  41. except json.JSONDecodeError:
  42. pass
  43. match = re.search(r"\{[\s\S]*\}", raw)
  44. if not match:
  45. raise HotContentFlowError("llm output is not json object")
  46. try:
  47. parsed = json.loads(match.group(0))
  48. except json.JSONDecodeError as exc:
  49. raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
  50. if not isinstance(parsed, dict):
  51. raise HotContentFlowError("llm output is not json object")
  52. return parsed
  53. def _get_recent_range(lookback_days: int) -> tuple[str, str]:
  54. today = datetime.now(SHANGHAI_TZ).date()
  55. end_date = today - timedelta(days=1)
  56. start_date = end_date - timedelta(days=max(lookback_days, 1))
  57. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  58. def _parse_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  59. rows = ((wx_resp.get("data") or {}).get("data") or [])
  60. if not isinstance(rows, list):
  61. return []
  62. series: list[dict[str, Any]] = []
  63. for row in rows:
  64. if not isinstance(row, dict):
  65. continue
  66. ymd = str(row.get("ymd") or "").strip()
  67. total_score = ((row.get("channel_score") or {}).get("total_score"))
  68. try:
  69. score_num = float(total_score) if total_score is not None else None
  70. except (TypeError, ValueError):
  71. score_num = None
  72. if ymd and score_num is not None:
  73. series.append({"ymd": ymd, "total_score": score_num})
  74. series.sort(key=lambda x: x["ymd"])
  75. return series
  76. def _normalize_demand_key(value: str) -> str:
  77. return "".join(value.split())
  78. def _build_demand_lookup(demand_name_set: list[str]) -> dict[str, str]:
  79. lookup: dict[str, str] = {}
  80. for item in demand_name_set:
  81. demand_name = str(item).strip()
  82. if not demand_name:
  83. continue
  84. lookup.setdefault(demand_name, demand_name)
  85. compact_key = _normalize_demand_key(demand_name)
  86. if compact_key:
  87. lookup.setdefault(compact_key, demand_name)
  88. return lookup
  89. def _resolve_demand_name(
  90. demand_name: str,
  91. demand_lookup: dict[str, str],
  92. ) -> str | None:
  93. value = demand_name.strip()
  94. if not value:
  95. return None
  96. return demand_lookup.get(value) or demand_lookup.get(_normalize_demand_key(value))
  97. def _collect_matched_demand_names(matched_word_rows: list[Any]) -> list[str]:
  98. demand_names: list[str] = []
  99. seen: set[str] = set()
  100. for row in matched_word_rows:
  101. if not isinstance(row, dict):
  102. continue
  103. match_rows = row.get("匹配需求列表") or []
  104. if not isinstance(match_rows, list):
  105. continue
  106. for match in match_rows:
  107. if not isinstance(match, dict):
  108. continue
  109. demand_name = str(match.get("demand_name") or "").strip()
  110. if demand_name and demand_name not in seen:
  111. seen.add(demand_name)
  112. demand_names.append(demand_name)
  113. return demand_names
  114. class ContributionPostprocessService:
  115. def __init__(
  116. self,
  117. config: FlowConfig,
  118. repository: HotContentRepository,
  119. api_client: JsonApiClient,
  120. ):
  121. self.config = config
  122. self.repository = repository
  123. self.api_client = api_client
  124. def run(self) -> dict[str, Any]:
  125. records = self.repository.list_postprocess_candidates(
  126. limit=max(self.config.postprocess_batch_size, 1)
  127. )
  128. if not records:
  129. return self._finalize_run_result(
  130. {
  131. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  132. "status": "success",
  133. "candidate_count": 0,
  134. "matched_count": 0,
  135. "wxindex_count": 0,
  136. "skipped_count": 0,
  137. "failed_count": 0,
  138. }
  139. )
  140. cache = DemandCacheService(
  141. self.config,
  142. self.repository,
  143. ).get_or_create_current_hour_cache()
  144. demand_cache_run_id = int(cache["id"])
  145. demand_name_set = cache["demand_name_set"]
  146. if not demand_name_set:
  147. return self._finalize_run_result(
  148. {
  149. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  150. "status": "skipped",
  151. "reason": "empty_demand_cache",
  152. "demand_cache_run_id": demand_cache_run_id,
  153. "cache_source": cache.get("source"),
  154. "processed_count": 0,
  155. }
  156. )
  157. matched_count = 0
  158. wxindex_count = 0
  159. quality_count = 0
  160. exported_count = 0
  161. export_failed_count = 0
  162. skipped_count = 0
  163. failed_count = 0
  164. for record in records:
  165. record_id = int(record["id"])
  166. quality_saved = False
  167. match_result: dict[str, Any] | None = None
  168. try:
  169. match_result = record.get("contribution_demand_match_json")
  170. if (
  171. not isinstance(match_result, dict)
  172. or int(record.get("demand_cache_run_id") or 0) != demand_cache_run_id
  173. ):
  174. match_result = self.match_record(
  175. record=record,
  176. demand_name_set=demand_name_set,
  177. demand_cache_run_id=demand_cache_run_id,
  178. )
  179. self.repository.save_contribution_demand_match(
  180. record_id=record_id,
  181. demand_cache_run_id=demand_cache_run_id,
  182. match_json=match_result,
  183. )
  184. matched_count += 1
  185. sanitized_match_result = self.sanitize_match_result(
  186. match_result,
  187. demand_name_set=demand_name_set,
  188. demand_cache_run_id=demand_cache_run_id,
  189. )
  190. if sanitized_match_result != match_result:
  191. match_result = sanitized_match_result
  192. self.repository.save_contribution_demand_match(
  193. record_id=record_id,
  194. demand_cache_run_id=demand_cache_run_id,
  195. match_json=match_result,
  196. )
  197. postprocess_status = int(record.get("postprocess_status") or 0)
  198. existing_trend = record.get("wxindex_trend_json")
  199. if (
  200. postprocess_status == PostprocessStatus.WXINDEX_DONE
  201. and isinstance(existing_trend, dict)
  202. ):
  203. trend_result = existing_trend
  204. else:
  205. trend_result = self.build_wxindex_trend(record, match_result)
  206. if trend_result is None:
  207. self.repository.update_postprocess_status(
  208. record_id=record_id,
  209. status=PostprocessStatus.SKIPPED,
  210. error_message="no matched demand words",
  211. )
  212. self._save_empty_demand_quality(record_id=record_id)
  213. export_rows_count, export_error = self._export_demand_terms_if_needed(
  214. record=record,
  215. match_result=match_result,
  216. trend_result=None,
  217. )
  218. if export_error:
  219. export_failed_count += 1
  220. else:
  221. exported_count += export_rows_count
  222. skipped_count += 1
  223. continue
  224. if postprocess_status != PostprocessStatus.WXINDEX_DONE:
  225. self.repository.save_wxindex_trend(
  226. record_id=record_id,
  227. trend_json=trend_result,
  228. )
  229. self.sync_wxindex_words(
  230. record_id=record_id,
  231. trend_result=trend_result,
  232. event_created_at=record.get("created_at"),
  233. )
  234. wxindex_count += 1
  235. event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
  236. record=record,
  237. match_result=match_result,
  238. trend_result=trend_result,
  239. )
  240. self.repository.save_demand_quality(
  241. record_id=record_id,
  242. event_sense_json=event_sense_json,
  243. senior_fit_json=senior_fit_json,
  244. )
  245. quality_saved = True
  246. quality_count += 1
  247. export_rows_count, export_error = self._export_demand_terms_if_needed(
  248. record=record,
  249. match_result=match_result,
  250. trend_result=trend_result,
  251. event_sense_json=event_sense_json,
  252. senior_fit_json=senior_fit_json,
  253. )
  254. if export_error:
  255. export_failed_count += 1
  256. else:
  257. exported_count += export_rows_count
  258. except WxindexSelectionSkipped as exc:
  259. self.repository.update_postprocess_status(
  260. record_id=record_id,
  261. status=PostprocessStatus.SKIPPED,
  262. error_message=str(exc),
  263. )
  264. if isinstance(match_result, dict):
  265. self._save_empty_demand_quality(record_id=record_id)
  266. export_rows_count, export_error = self._export_demand_terms_if_needed(
  267. record=record,
  268. match_result=match_result,
  269. trend_result=None,
  270. )
  271. if export_error:
  272. export_failed_count += 1
  273. else:
  274. exported_count += export_rows_count
  275. skipped_count += 1
  276. except Exception as exc:
  277. if not quality_saved:
  278. self.repository.update_postprocess_status(
  279. record_id=record_id,
  280. status=PostprocessStatus.FAILED,
  281. error_message=str(exc),
  282. )
  283. failed_count += 1
  284. return self._finalize_run_result(
  285. {
  286. "run_at": datetime.now(SHANGHAI_TZ).isoformat(),
  287. "status": "success",
  288. "demand_cache_run_id": demand_cache_run_id,
  289. "cache_source": cache.get("source"),
  290. "cache_hour": str(cache.get("cache_hour") or ""),
  291. "demand_name_count": len(demand_name_set),
  292. "candidate_count": len(records),
  293. "matched_count": matched_count,
  294. "wxindex_count": wxindex_count,
  295. "quality_count": quality_count,
  296. "exported_count": exported_count,
  297. "export_failed_count": export_failed_count,
  298. "skipped_count": skipped_count,
  299. "failed_count": failed_count,
  300. }
  301. )
  302. def _export_demand_terms_if_needed(
  303. self,
  304. *,
  305. record: dict[str, Any],
  306. match_result: dict[str, Any],
  307. trend_result: dict[str, Any] | None,
  308. event_sense_json: dict[str, Any] | None = None,
  309. senior_fit_json: dict[str, Any] | None = None,
  310. ) -> tuple[int, str | None]:
  311. """导出需求词,失败时返回错误信息且不改变 postprocess_status。"""
  312. try:
  313. exported_rows = self.export_demand_terms_if_needed(
  314. record=record,
  315. match_result=match_result,
  316. trend_result=trend_result,
  317. event_sense_json=event_sense_json,
  318. senior_fit_json=senior_fit_json,
  319. )
  320. except Exception as exc:
  321. return 0, str(exc)
  322. return exported_rows, None
  323. def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
  324. try:
  325. result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
  326. except Exception as exc:
  327. result["hive_sync_error"] = str(exc)
  328. if result.get("hive_sync_error"):
  329. if result.get("status") == "success":
  330. result["status"] = "partial_failure"
  331. if int(result.get("export_failed_count") or 0) > 0 and result.get("status") == "success":
  332. result["status"] = "partial_failure"
  333. return result
  334. def sync_wxindex_words(
  335. self,
  336. *,
  337. record_id: int,
  338. trend_result: dict[str, Any],
  339. event_created_at: datetime | None = None,
  340. verbose: bool = False,
  341. ) -> dict[str, int]:
  342. return sync_words_from_trend_json(
  343. self.repository,
  344. self.api_client,
  345. self.config.wxindex_api_url,
  346. trend_json=trend_result,
  347. record_id=record_id,
  348. event_created_at=event_created_at,
  349. verbose=verbose,
  350. update_meta_if_exists=True,
  351. )
  352. def _save_empty_demand_quality(self, *, record_id: int) -> None:
  353. self.repository.save_demand_quality(
  354. record_id=record_id,
  355. event_sense_json={},
  356. senior_fit_json={},
  357. update_status=False,
  358. )
  359. def run_demand_quality_judgment(
  360. self,
  361. *,
  362. record: dict[str, Any],
  363. match_result: dict[str, Any],
  364. trend_result: dict[str, Any],
  365. ) -> tuple[dict[str, Any], dict[str, Any]]:
  366. channel_content_id = str(
  367. match_result.get("channelContentId") or record.get("unique_key") or ""
  368. ).strip()
  369. base_export_rows = attach_wxindex_metadata(
  370. build_demand_export_rows(
  371. match_result,
  372. contribution_points=(
  373. record.get("contribution_points_json")
  374. if isinstance(record.get("contribution_points_json"), dict)
  375. else None
  376. ),
  377. trend_json=trend_result,
  378. ),
  379. trend_result,
  380. wxindex_threshold=self.config.wxindex_score_threshold,
  381. )
  382. return run_demand_quality_pipeline(
  383. channel_content_id=channel_content_id,
  384. export_rows=base_export_rows,
  385. wxindex_threshold=self.config.wxindex_score_threshold,
  386. event_threshold=self.config.demand_event_sense_threshold,
  387. senior_threshold=self.config.demand_senior_fit_threshold,
  388. model=self.config.demand_quality_llm_model,
  389. max_attempts=self.config.demand_quality_llm_max_attempts,
  390. retry_sleep_seconds=self.config.demand_quality_llm_retry_sleep_seconds,
  391. max_tokens=self.config.demand_quality_llm_max_tokens,
  392. )
  393. def export_demand_terms_if_needed(
  394. self,
  395. *,
  396. record: dict[str, Any],
  397. match_result: dict[str, Any],
  398. trend_result: dict[str, Any] | None,
  399. event_sense_json: dict[str, Any] | None = None,
  400. senior_fit_json: dict[str, Any] | None = None,
  401. ) -> int:
  402. normalized_record = {
  403. "contribution_demand_match_json": match_result,
  404. "contribution_points_json": (
  405. record.get("contribution_points_json")
  406. if isinstance(record.get("contribution_points_json"), dict)
  407. else None
  408. ),
  409. "wxindex_trend_json": trend_result if isinstance(trend_result, dict) else None,
  410. "demand_event_sense_json": event_sense_json if isinstance(event_sense_json, dict) else {},
  411. "demand_senior_fit_json": senior_fit_json if isinstance(senior_fit_json, dict) else {},
  412. }
  413. export_rows = build_export_rows_from_record(
  414. normalized_record,
  415. wxindex_threshold=self.config.wxindex_score_threshold,
  416. event_sense_json=normalized_record["demand_event_sense_json"],
  417. senior_fit_json=normalized_record["demand_senior_fit_json"],
  418. event_threshold=self.config.demand_event_sense_threshold,
  419. senior_threshold=self.config.demand_senior_fit_threshold,
  420. )
  421. self.repository.replace_demand_export_rows(
  422. record_id=int(record["id"]),
  423. source=str(record.get("source") or ""),
  424. hot_title=str(record.get("title") or ""),
  425. article_title=str(record.get("article_title") or ""),
  426. rows=export_rows,
  427. )
  428. return len(export_rows)
  429. def match_record(
  430. self,
  431. *,
  432. record: dict[str, Any],
  433. demand_name_set: list[str],
  434. demand_cache_run_id: int,
  435. ) -> dict[str, Any]:
  436. contribution_points = record.get("contribution_points_json")
  437. if not isinstance(contribution_points, dict):
  438. raise HotContentFlowError(f"invalid contribution_points_json id={record['id']}")
  439. channel_content_id = str(
  440. contribution_points.get("channelContentId") or record.get("unique_key") or ""
  441. ).strip()
  442. words_rows = contribution_points.get("高贡献词列表") or []
  443. if not isinstance(words_rows, list):
  444. words_rows = []
  445. word_list = [
  446. str(item.get("词") or "").strip()
  447. for item in words_rows
  448. if isinstance(item, dict) and str(item.get("词") or "").strip()
  449. ]
  450. llm_result = self.llm_match_single_article(
  451. channel_content_id=channel_content_id or str(record["unique_key"]),
  452. words=word_list,
  453. demand_name_set=demand_name_set,
  454. )
  455. demand_lookup = _build_demand_lookup(demand_name_set)
  456. matched_map: dict[str, list[dict[str, str]]] = {}
  457. for item in llm_result.get("matched") or []:
  458. if not isinstance(item, dict):
  459. continue
  460. word = str(item.get("title") or item.get("word") or item.get("词") or "").strip()
  461. demand_name = str(item.get("demand_name") or "").strip()
  462. reason = str(item.get("reason") or "").strip()
  463. if not word or word not in word_list:
  464. continue
  465. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  466. if canonical_demand_name is None:
  467. continue
  468. matched_map.setdefault(word, []).append(
  469. {
  470. "demand_name": canonical_demand_name,
  471. "reason": reason,
  472. }
  473. )
  474. output_words: list[dict[str, Any]] = []
  475. matched_word_rows: list[dict[str, Any]] = []
  476. for row in words_rows:
  477. if not isinstance(row, dict):
  478. continue
  479. word = str(row.get("词") or "").strip()
  480. item = {"词": word, "贡献度": row.get("贡献度")}
  481. if word in matched_map:
  482. item["匹配需求列表"] = matched_map[word]
  483. matched_word_rows.append(item)
  484. output_words.append(item)
  485. matched_points = self.filter_matched_points(
  486. contribution_points.get("点列表"),
  487. set(matched_map.keys()),
  488. )
  489. return {
  490. "channelContentId": channel_content_id,
  491. "demand_cache_run_id": demand_cache_run_id,
  492. "高贡献词列表": output_words,
  493. "匹配到需求的词列表": matched_word_rows,
  494. "点列表": matched_points,
  495. }
  496. def sanitize_match_result(
  497. self,
  498. match_result: dict[str, Any],
  499. *,
  500. demand_name_set: list[str],
  501. demand_cache_run_id: int,
  502. ) -> dict[str, Any]:
  503. demand_lookup = _build_demand_lookup(demand_name_set)
  504. words_rows = match_result.get("高贡献词列表") or []
  505. if not isinstance(words_rows, list):
  506. words_rows = []
  507. output_words: list[dict[str, Any]] = []
  508. matched_word_rows: list[dict[str, Any]] = []
  509. valid_words: set[str] = set()
  510. for row in words_rows:
  511. if not isinstance(row, dict):
  512. continue
  513. word = str(row.get("词") or "").strip()
  514. item = {"词": word, "贡献度": row.get("贡献度")}
  515. match_rows = row.get("匹配需求列表") or []
  516. if not isinstance(match_rows, list):
  517. match_rows = []
  518. valid_match_rows: list[dict[str, str]] = []
  519. for match in match_rows:
  520. if not isinstance(match, dict):
  521. continue
  522. demand_name = str(match.get("demand_name") or "").strip()
  523. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  524. if canonical_demand_name is None:
  525. continue
  526. valid_match_rows.append(
  527. {
  528. "demand_name": canonical_demand_name,
  529. "reason": str(match.get("reason") or "").strip(),
  530. }
  531. )
  532. if word and valid_match_rows:
  533. item["匹配需求列表"] = valid_match_rows
  534. matched_word_rows.append(item)
  535. valid_words.add(word)
  536. output_words.append(item)
  537. return {
  538. "channelContentId": str(match_result.get("channelContentId") or ""),
  539. "demand_cache_run_id": demand_cache_run_id,
  540. "高贡献词列表": output_words,
  541. "匹配到需求的词列表": matched_word_rows,
  542. "点列表": self.filter_matched_points(
  543. match_result.get("点列表"),
  544. valid_words,
  545. ),
  546. }
  547. def llm_match_single_article(
  548. self,
  549. *,
  550. channel_content_id: str,
  551. words: list[str],
  552. demand_name_set: list[str],
  553. ) -> dict[str, Any]:
  554. if not words:
  555. return {"source": channel_content_id, "matched": []}
  556. system_prompt = """
  557. 你是一个专业的语义匹配分析专家,擅长判断词语之间的语义关联性。
  558. # 任务
  559. 我会提供两组数据:
  560. 1. 热点词列表:一组待匹配的热点词语
  561. 2. 需求词库:一组已有的需求词语
  562. 请你逐一分析每个热点词,判断它是否能与需求词库中的某个/某些需求词匹配上。
  563. # 匹配标准
  564. 满足以下任意一条,则视为匹配成功:
  565. - 热点词与需求词含义相同或高度相近(如同义词、近义词)
  566. - 热点词是需求词的下位概念(热点词所指的事物属于需求词所描述的范畴)
  567. - 热点词与需求词在用户意图上高度一致
  568. 以下情况,不得视为匹配:
  569. - 热点词仅与需求词中的某一个字/词相关,但未覆盖需求词的完整含义
  570. - 热点词与需求词只有表面字符重叠,语义方向不同
  571. - 热点词是需求词的上位概念(范围过宽,含义不够精确)
  572. - 两者只是同属某个大类,但具体含义差异明显
  573. # 多词组成的需求词处理规则
  574. 若需求词由多个词语组成(如"XX类 YY问题"),热点词必须能够同时覆盖该需求词的所有关键语义成分,缺少任意一个关键成分则不视为匹配。
  575. # 输出规则
  576. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  577. # 约束
  578. 热点词和需求词必须来自给定词语,不能创造给定词语之外的词。
  579. """
  580. user_payload = {
  581. "source": channel_content_id,
  582. "words": words,
  583. "demand_name_set": demand_name_set,
  584. "output_schema": {
  585. "source": "string",
  586. "matched": [
  587. {
  588. "title": "string, must be selected from words",
  589. "demand_name": "string, must be selected from demand_name_set",
  590. "reason": "string",
  591. }
  592. ],
  593. },
  594. }
  595. last_error: Exception | None = None
  596. for attempt in range(1, max(self.config.contribution_match_llm_max_attempts, 1) + 1):
  597. try:
  598. resp = create_chat_completion(
  599. [
  600. {"role": "system", "content": system_prompt},
  601. {
  602. "role": "user",
  603. "content": json.dumps(user_payload, ensure_ascii=False),
  604. },
  605. ],
  606. model=self.config.contribution_match_llm_model or None,
  607. temperature=0,
  608. max_tokens=max(self.config.contribution_match_llm_max_tokens, 1),
  609. )
  610. parsed = _extract_json_object(str(resp.get("content") or ""))
  611. parsed.setdefault("source", channel_content_id)
  612. parsed.setdefault("matched", [])
  613. return parsed
  614. except (OpenRouterCallError, HotContentFlowError) as exc:
  615. last_error = exc
  616. if attempt < max(self.config.contribution_match_llm_max_attempts, 1):
  617. time.sleep(max(self.config.contribution_match_llm_retry_sleep_seconds, 0))
  618. raise HotContentFlowError(
  619. f"llm match failed for channelContentId={channel_content_id}: {last_error}"
  620. ) from last_error
  621. @staticmethod
  622. def _build_word_demand_match_result(
  623. *,
  624. word: str,
  625. llm_result: dict[str, Any],
  626. demand_lookup: dict[str, str],
  627. ) -> dict[str, Any]:
  628. target_word = str(word or "").strip()
  629. match_list: list[dict[str, str]] = []
  630. seen: set[tuple[str, str]] = set()
  631. for item in llm_result.get("matched") or []:
  632. if not isinstance(item, dict):
  633. continue
  634. matched_word = str(
  635. item.get("title") or item.get("word") or item.get("词") or ""
  636. ).strip()
  637. demand_name = str(item.get("demand_name") or "").strip()
  638. reason = str(item.get("reason") or "").strip()
  639. if matched_word != target_word:
  640. continue
  641. canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
  642. if canonical_demand_name is None:
  643. continue
  644. dedupe_key = (canonical_demand_name, reason)
  645. if dedupe_key in seen:
  646. continue
  647. seen.add(dedupe_key)
  648. match_list.append(
  649. {
  650. "demand_name": canonical_demand_name,
  651. "reason": reason,
  652. }
  653. )
  654. matched_demand_names: list[str] = []
  655. matched_seen: set[str] = set()
  656. for item in match_list:
  657. demand_name = str(item.get("demand_name") or "").strip()
  658. if demand_name and demand_name not in matched_seen:
  659. matched_seen.add(demand_name)
  660. matched_demand_names.append(demand_name)
  661. return {
  662. "word": target_word,
  663. "matched": bool(match_list),
  664. "matched_demand": " ".join(matched_demand_names),
  665. "match_list": match_list,
  666. }
  667. def match_words_to_demand_pool(
  668. self,
  669. *,
  670. words: list[str],
  671. demand_name_set: list[str],
  672. source_id: str | None = None,
  673. ) -> dict[str, dict[str, Any]]:
  674. """批量热词与票圈内部需求池匹配,返回 word -> match_result。"""
  675. cleaned: list[str] = []
  676. seen: set[str] = set()
  677. for raw in words:
  678. word = str(raw or "").strip()
  679. if not word or word in seen:
  680. continue
  681. seen.add(word)
  682. cleaned.append(word)
  683. if not cleaned:
  684. return {}
  685. llm_result = self.llm_match_single_article(
  686. channel_content_id=source_id or f"wxindex_words:{','.join(cleaned[:3])}",
  687. words=cleaned,
  688. demand_name_set=demand_name_set,
  689. )
  690. demand_lookup = _build_demand_lookup(demand_name_set)
  691. return {
  692. word: self._build_word_demand_match_result(
  693. word=word,
  694. llm_result=llm_result,
  695. demand_lookup=demand_lookup,
  696. )
  697. for word in cleaned
  698. }
  699. def match_single_word_to_demand_pool(
  700. self,
  701. *,
  702. word: str,
  703. demand_name_set: list[str],
  704. source_id: str | None = None,
  705. ) -> dict[str, Any]:
  706. """单热词与票圈内部需求池匹配(复用主流程 LLM 规则)。"""
  707. target_word = str(word or "").strip()
  708. if not target_word:
  709. return {
  710. "word": "",
  711. "matched": False,
  712. "matched_demand": "",
  713. "match_list": [],
  714. }
  715. batch_result = self.match_words_to_demand_pool(
  716. words=[target_word],
  717. demand_name_set=demand_name_set,
  718. source_id=source_id,
  719. )
  720. return batch_result.get(
  721. target_word,
  722. {
  723. "word": target_word,
  724. "matched": False,
  725. "matched_demand": "",
  726. "match_list": [],
  727. },
  728. )
  729. def build_wxindex_trend(
  730. self,
  731. record: dict[str, Any],
  732. match_result: dict[str, Any],
  733. ) -> dict[str, Any] | None:
  734. matched_word_rows = match_result.get("匹配到需求的词列表") or []
  735. if not isinstance(matched_word_rows, list) or not matched_word_rows:
  736. return None
  737. contribution_words = [
  738. str(row.get("词") or "").strip()
  739. for row in matched_word_rows
  740. if isinstance(row, dict) and str(row.get("词") or "").strip()
  741. ]
  742. if not contribution_words:
  743. return None
  744. channel_content_id = str(
  745. match_result.get("channelContentId") or record.get("unique_key") or ""
  746. )
  747. article_title, body_text = self.extract_article_text(record)
  748. matched_demands = _collect_matched_demand_names(matched_word_rows)
  749. pick = self.llm_extract_wxindex_words(
  750. channel_content_id=channel_content_id,
  751. article_title=article_title,
  752. body_text=body_text,
  753. contribution_words=contribution_words,
  754. matched_demands=matched_demands,
  755. )
  756. selected_words = pick["selected_words"]
  757. threshold = float(self.config.wxindex_score_threshold)
  758. wxindex_searches: list[dict[str, Any]] = []
  759. event_created_at = record.get("created_at")
  760. for keyword in selected_words:
  761. full_scores, _action = ensure_word_full_scores(
  762. self.repository,
  763. self.api_client,
  764. self.config.wxindex_api_url,
  765. keyword=keyword,
  766. event_created_at=event_created_at,
  767. update_meta_if_exists=True,
  768. )
  769. series, start_ymd, end_ymd = slice_scores_lookback(
  770. full_scores,
  771. self.config.wxindex_lookback_days,
  772. )
  773. latest_score = series[-1]["total_score"] if series else None
  774. wxindex_searches.append(
  775. {
  776. "keyword": keyword,
  777. "start_ymd": start_ymd,
  778. "end_ymd": end_ymd,
  779. "total_score_7d": series,
  780. "latest_total_score": latest_score,
  781. "threshold": threshold,
  782. "latest_gt_threshold": (
  783. False
  784. if latest_score is None
  785. else latest_score >= threshold
  786. ),
  787. "trend": calc_wxindex_trend(series),
  788. }
  789. )
  790. searchable = [
  791. item
  792. for item in wxindex_searches
  793. if item.get("latest_total_score") is not None
  794. ]
  795. if not searchable:
  796. raise WxindexSelectionSkipped(
  797. f"no wxindex score for any keyword in {channel_content_id}: "
  798. f"{selected_words}"
  799. )
  800. best = max(searchable, key=lambda item: float(item["latest_total_score"]))
  801. selected_word = str(best["keyword"])
  802. latest_score = best["latest_total_score"]
  803. series = best["total_score_7d"]
  804. return {
  805. "channelContentId": channel_content_id,
  806. "article_title": article_title,
  807. "llm_selected_words": selected_words,
  808. "llm_selected_word": selected_word,
  809. "llm_reason": pick["reason"],
  810. "wxindex_searches": wxindex_searches,
  811. "wxindex": {
  812. "keyword": selected_word,
  813. "keywords": selected_words,
  814. "start_ymd": best["start_ymd"],
  815. "end_ymd": best["end_ymd"],
  816. "total_score_7d": series,
  817. "latest_total_score": latest_score,
  818. "threshold": threshold,
  819. "latest_gt_threshold": latest_score >= threshold,
  820. "trend": best["trend"],
  821. },
  822. }
  823. def llm_extract_wxindex_words(
  824. self,
  825. *,
  826. channel_content_id: str,
  827. article_title: str,
  828. body_text: str,
  829. contribution_words: list[str],
  830. matched_demands: list[str],
  831. ) -> dict[str, Any]:
  832. system_prompt = """
  833. #角色
  834. 你是一个专业的语义分析专家,擅长从文章中提取简洁、精准的热搜检索词。
  835. # 任务
  836. 我会提供文章标题、正文,以及两类备选词来源:
  837. 1. 高贡献词:文章贡献度较高的关键词
  838. 2. 已匹配需求:已与需求库匹配上的需求名
  839. 请结合标题、正文与上述备选词,提取用于「微信指数」热度检索的词。
  840. 需自行从标题中识别可检索的关键词;词应简洁(2-4 字)、概括、精准覆盖事件。
  841. 若文章涉及多个子事件,可分别提取多个词,每个词覆盖部分事件。
  842. # 输出规则
  843. 1. 严格输出 JSON 对象,禁止输出 JSON 之外的任何内容。
  844. """
  845. user_payload = {
  846. "source": channel_content_id,
  847. "article_title": article_title,
  848. "article_body_text": body_text,
  849. "contribution_words": contribution_words,
  850. "matched_demands": matched_demands,
  851. "output_schema": {
  852. "source": "string",
  853. "selected_words": [
  854. "string, concise keyword for wxindex search, one or more"
  855. ],
  856. "reason": "string",
  857. },
  858. "constraints": [
  859. "selected_words 为数组,至少 1 个词,可多个",
  860. "每个词简洁(2-4 字),适合微信指数检索",
  861. "结合标题、高贡献词、已匹配需求提炼,可合并改写,不必逐字照搬",
  862. "多个词应分别覆盖不同事件或角度,避免语义重复",
  863. "reason 简洁说明,不超过60字",
  864. "仅输出 JSON 对象,不要 markdown 代码块",
  865. ],
  866. }
  867. last_error: Exception | None = None
  868. for attempt in range(1, max(self.config.wxindex_llm_max_attempts, 1) + 1):
  869. try:
  870. resp = create_chat_completion(
  871. [
  872. {"role": "system", "content": system_prompt},
  873. {
  874. "role": "user",
  875. "content": json.dumps(user_payload, ensure_ascii=False),
  876. },
  877. ],
  878. model=self.config.wxindex_llm_model or None,
  879. temperature=0,
  880. max_tokens=max(self.config.wxindex_llm_max_tokens, 1),
  881. )
  882. parsed = _extract_json_object(str(resp.get("content") or ""))
  883. raw_words = parsed.get("selected_words")
  884. if isinstance(raw_words, str):
  885. raw_words = [raw_words]
  886. if not isinstance(raw_words, list):
  887. legacy_word = str(parsed.get("selected_word") or "").strip()
  888. raw_words = [legacy_word] if legacy_word else []
  889. selected_words: list[str] = []
  890. seen: set[str] = set()
  891. for item in raw_words:
  892. word = str(item or "").strip()
  893. if word and word not in seen:
  894. seen.add(word)
  895. selected_words.append(word)
  896. reason = str(parsed.get("reason") or "").strip()
  897. if not selected_words:
  898. raise WxindexSelectionSkipped(
  899. f"selected_words empty for {channel_content_id}"
  900. )
  901. return {"selected_words": selected_words, "reason": reason}
  902. except WxindexSelectionSkipped:
  903. raise
  904. except (OpenRouterCallError, HotContentFlowError) as exc:
  905. last_error = exc
  906. if attempt < max(self.config.wxindex_llm_max_attempts, 1):
  907. continue
  908. raise HotContentFlowError(
  909. f"llm extract wxindex words failed for {channel_content_id}: {last_error}"
  910. ) from last_error
  911. @staticmethod
  912. def filter_matched_points(raw_points: Any, matched_words: set[str]) -> list[dict[str, Any]]:
  913. if not matched_words or not isinstance(raw_points, list):
  914. return []
  915. matched_points: list[dict[str, Any]] = []
  916. for point in raw_points:
  917. if not isinstance(point, dict):
  918. continue
  919. point_match_rows = point.get("匹配词列表") or []
  920. if not isinstance(point_match_rows, list):
  921. point_match_rows = []
  922. keep_rows = [
  923. row
  924. for row in point_match_rows
  925. if isinstance(row, dict) and str(row.get("词") or "").strip() in matched_words
  926. ]
  927. if keep_rows:
  928. matched_points.append(
  929. {
  930. "来源": str(point.get("来源") or ""),
  931. "点": str(point.get("点") or ""),
  932. "点描述": str(point.get("点描述") or ""),
  933. "匹配词列表": keep_rows,
  934. }
  935. )
  936. return matched_points
  937. @staticmethod
  938. def extract_article_text(record: dict[str, Any]) -> tuple[str, str]:
  939. decode_result = record.get("decode_result_json")
  940. target_post = decode_result.get("target_post") if isinstance(decode_result, dict) else {}
  941. if not isinstance(target_post, dict):
  942. target_post = {}
  943. article_title = str(
  944. target_post.get("title") or record.get("article_title") or ""
  945. ).strip()
  946. body_text = str(
  947. target_post.get("body_text") or record.get("article_body") or ""
  948. ).strip()
  949. return article_title, body_text
  950. def run_once(config: FlowConfig | None = None) -> dict[str, Any]:
  951. flow_config = config or load_flow_config()
  952. repository = HotContentRepository(flow_config.mysql)
  953. try:
  954. api_client = JsonApiClient(
  955. timeout_seconds=flow_config.request_timeout_seconds,
  956. verify_ssl=flow_config.https_verify_ssl,
  957. )
  958. service = ContributionPostprocessService(flow_config, repository, api_client)
  959. return service.run()
  960. finally:
  961. repository.close()