wxindex_words.py 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404
  1. """微信指数检索词汇总:从 wxindex_trend_json 提取全部检索词并持久化每日指数。"""
  2. from __future__ import annotations
  3. from datetime import date, datetime, timedelta
  4. from typing import Any
  5. from app.hot_content.client import JsonApiClient
  6. from app.hot_content.demand_export import get_wxindex_keywords
  7. from app.hot_content.exceptions import HotContentFlowError
  8. from app.hot_content.repository import HotContentRepository
  9. from app.hot_content.timezone import SHANGHAI_TZ
  10. WXINDEX_WORDS_LOOKBACK_DAYS = 7
  11. WXINDEX_WORDS_UPDATE_WINDOW_DAYS = 7
  12. WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11)
  13. WXINDEX_WORDS_MIN_MAX_SCORE = 100_000.0
  14. def get_fetch_start_ymd_from_event(
  15. event_created_at: datetime,
  16. *,
  17. lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
  18. ) -> str:
  19. """数据窗口左边界:事件创建日往前 N 天(yyyymmdd)。"""
  20. event_date = normalize_event_created_at(event_created_at).date()
  21. start_date = event_date - timedelta(days=lookback_days)
  22. return start_date.strftime("%Y%m%d")
  23. def get_fetch_end_ymd_from_event(
  24. event_created_at: datetime,
  25. *,
  26. forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
  27. ) -> str:
  28. """数据窗口右边界:事件创建日后 N 天(yyyymmdd)。"""
  29. event_date = normalize_event_created_at(event_created_at).date()
  30. end_date = event_date + timedelta(days=forward_days)
  31. return end_date.strftime("%Y%m%d")
  32. def get_fetch_ymd_bounds_from_event(
  33. event_created_at: datetime,
  34. *,
  35. lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
  36. forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
  37. ) -> tuple[str, str]:
  38. return (
  39. get_fetch_start_ymd_from_event(
  40. event_created_at,
  41. lookback_days=lookback_days,
  42. ),
  43. get_fetch_end_ymd_from_event(
  44. event_created_at,
  45. forward_days=forward_days,
  46. ),
  47. )
  48. def get_word_data_window_ymd_bounds(
  49. event_created_at: datetime,
  50. *,
  51. window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
  52. ) -> tuple[str, str]:
  53. """事件创建日前后 N 天的数据窗口 [start_ymd, end_ymd]。"""
  54. event_date = normalize_event_created_at(event_created_at).date()
  55. start_date = event_date - timedelta(days=window_days)
  56. end_date = event_date + timedelta(days=window_days)
  57. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  58. def get_wxindex_fetch_start_ymd(
  59. *,
  60. today: date | None = None,
  61. lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
  62. ) -> str:
  63. """首次拉取起始日期:近 N 日(截至昨日)。"""
  64. start_ymd, _end_ymd = get_lookback_range(lookback_days, today=today)
  65. return start_ymd
  66. def normalize_event_created_at(value: datetime | None) -> datetime:
  67. if value is None:
  68. return datetime.now(SHANGHAI_TZ)
  69. if value.tzinfo is None:
  70. return value.replace(tzinfo=SHANGHAI_TZ)
  71. return value.astimezone(SHANGHAI_TZ)
  72. def is_word_update_active(
  73. event_created_at: datetime,
  74. *,
  75. today: date | None = None,
  76. window_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
  77. ) -> bool:
  78. """事件创建后 window_days 天内继续更新,超出则停止。"""
  79. current = today or datetime.now(SHANGHAI_TZ).date()
  80. event_date = normalize_event_created_at(event_created_at).date()
  81. return (current - event_date).days <= window_days
  82. def get_wxindex_end_ymd(*, today: date | None = None) -> str:
  83. current = today or datetime.now(SHANGHAI_TZ).date()
  84. return (current - timedelta(days=1)).strftime("%Y%m%d")
  85. def get_lookback_range(lookback_days: int, *, today: date | None = None) -> tuple[str, str]:
  86. """原流程使用的近 N 日区间(截至昨日)。"""
  87. current = today or datetime.now(SHANGHAI_TZ).date()
  88. end_date = current - timedelta(days=1)
  89. start_date = end_date - timedelta(days=max(lookback_days, 1))
  90. return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
  91. def extract_searched_words(trend_json: dict[str, Any] | None) -> list[str]:
  92. """提取 wxindex_trend_json 中实际检索过微信指数的全部词(非仅最高分词)。"""
  93. if not isinstance(trend_json, dict):
  94. return []
  95. words: list[str] = []
  96. seen: set[str] = set()
  97. for item in trend_json.get("wxindex_searches") or []:
  98. if not isinstance(item, dict):
  99. continue
  100. keyword = str(item.get("keyword") or "").strip()
  101. if keyword and keyword not in seen:
  102. seen.add(keyword)
  103. words.append(keyword)
  104. if words:
  105. return words
  106. return get_wxindex_keywords(trend_json)
  107. def parse_wxindex_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
  108. rows = ((wx_resp.get("data") or {}).get("data") or [])
  109. if not isinstance(rows, list):
  110. return []
  111. series: list[dict[str, Any]] = []
  112. for row in rows:
  113. if not isinstance(row, dict):
  114. continue
  115. ymd = str(row.get("ymd") or "").strip()
  116. total_score = (row.get("channel_score") or {}).get("total_score")
  117. try:
  118. score_num = float(total_score) if total_score is not None else None
  119. except (TypeError, ValueError):
  120. score_num = None
  121. if ymd and score_num is not None:
  122. series.append({"ymd": ymd, "total_score": score_num})
  123. series.sort(key=lambda item: item["ymd"])
  124. return series
  125. def fetch_wxindex_scores(
  126. api_client: JsonApiClient,
  127. api_url: str,
  128. *,
  129. keyword: str,
  130. start_ymd: str,
  131. end_ymd: str | None = None,
  132. ) -> list[dict[str, Any]]:
  133. payload = {
  134. "keyword": keyword,
  135. "start_ymd": start_ymd,
  136. "end_ymd": end_ymd or get_wxindex_end_ymd(),
  137. }
  138. wx_resp = api_client.post_json(api_url, payload)
  139. return parse_wxindex_total_scores(wx_resp)
  140. def get_max_total_score(scores: list[dict[str, Any]]) -> float | None:
  141. """从指数序列中取 total_score 最大值。"""
  142. values: list[float] = []
  143. for item in scores:
  144. if not isinstance(item, dict):
  145. continue
  146. try:
  147. values.append(float(item["total_score"]))
  148. except (TypeError, ValueError, KeyError):
  149. continue
  150. if not values:
  151. return None
  152. return max(values)
  153. def word_meets_max_score_threshold(
  154. scores: list[dict[str, Any]],
  155. *,
  156. min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
  157. ) -> bool:
  158. """新增词时:最大值需严格大于阈值(不超过阈值则不添加)。"""
  159. max_score = get_max_total_score(scores)
  160. if max_score is None:
  161. return False
  162. return max_score > min_max_score
  163. def filter_scores_in_ymd_window(
  164. scores: list[dict[str, Any]],
  165. *,
  166. start_ymd: str,
  167. end_ymd: str,
  168. ) -> list[dict[str, Any]]:
  169. start = str(start_ymd or "").strip()
  170. end = str(end_ymd or "").strip()
  171. if not start or not end:
  172. return []
  173. filtered: list[dict[str, Any]] = []
  174. for item in scores:
  175. if not isinstance(item, dict):
  176. continue
  177. ymd = str(item.get("ymd") or item.get("dt") or "").strip()
  178. if not ymd or ymd < start or ymd > end:
  179. continue
  180. filtered.append(item)
  181. filtered.sort(key=lambda row: str(row.get("ymd") or row.get("dt") or ""))
  182. return filtered
  183. def word_has_high_score_in_window(
  184. scores: list[dict[str, Any]],
  185. *,
  186. start_ymd: str,
  187. end_ymd: str,
  188. min_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
  189. ) -> bool:
  190. """窗口内是否存在严格大于阈值的微信指数。"""
  191. window_scores = filter_scores_in_ymd_window(
  192. scores,
  193. start_ymd=start_ymd,
  194. end_ymd=end_ymd,
  195. )
  196. for item in window_scores:
  197. try:
  198. score = float(item["total_score"])
  199. except (TypeError, ValueError, KeyError):
  200. continue
  201. if score > min_score:
  202. return True
  203. return False
  204. def merge_wxindex_score_series(
  205. *series_list: list[dict[str, Any]],
  206. ) -> list[dict[str, Any]]:
  207. merged: dict[str, dict[str, Any]] = {}
  208. for series in series_list:
  209. for item in series:
  210. if not isinstance(item, dict):
  211. continue
  212. ymd = str(item.get("ymd") or item.get("dt") or "").strip()
  213. if not ymd:
  214. continue
  215. try:
  216. total_score = float(item["total_score"])
  217. except (TypeError, ValueError, KeyError):
  218. continue
  219. merged[ymd] = {"ymd": ymd, "total_score": total_score}
  220. return sorted(merged.values(), key=lambda row: row["ymd"])
  221. def get_word_score_bounds(
  222. scores: list[dict[str, Any]],
  223. ) -> tuple[str | None, str | None]:
  224. ymds = [
  225. str(item.get("ymd") or item.get("dt") or "").strip()
  226. for item in scores
  227. if isinstance(item, dict) and str(item.get("ymd") or item.get("dt") or "").strip()
  228. ]
  229. if not ymds:
  230. return None, None
  231. return min(ymds), max(ymds)
  232. def word_scores_need_supplement(
  233. scores: list[dict[str, Any]],
  234. *,
  235. end_ymd: str | None = None,
  236. start_ymd: str,
  237. ) -> tuple[bool, str]:
  238. """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。"""
  239. if not scores:
  240. return True, "empty"
  241. target_end = end_ymd or get_wxindex_end_ymd()
  242. earliest_ymd, latest_ymd = get_word_score_bounds(scores)
  243. if earliest_ymd is None or latest_ymd is None:
  244. return True, "empty"
  245. if earliest_ymd > start_ymd:
  246. return True, "missing_start"
  247. if latest_ymd < target_end:
  248. return True, "missing_end"
  249. return False, "complete"
  250. def get_supplement_fetch_range(
  251. scores: list[dict[str, Any]],
  252. *,
  253. end_ymd: str | None = None,
  254. start_ymd: str,
  255. ) -> tuple[str, str] | None:
  256. """计算补数 API 查询区间;无需补数时返回 None。"""
  257. need_supplement, reason = word_scores_need_supplement(
  258. scores,
  259. end_ymd=end_ymd,
  260. start_ymd=start_ymd,
  261. )
  262. if not need_supplement:
  263. return None
  264. target_end = end_ymd or get_wxindex_end_ymd()
  265. if reason == "empty":
  266. return start_ymd, target_end
  267. earliest_ymd, latest_ymd = get_word_score_bounds(scores)
  268. if earliest_ymd is None or latest_ymd is None:
  269. return start_ymd, target_end
  270. if earliest_ymd > start_ymd:
  271. return start_ymd, target_end
  272. return next_ymd(latest_ymd), target_end
  273. def slice_scores_lookback(
  274. full_scores: list[dict[str, Any]],
  275. lookback_days: int,
  276. *,
  277. today: date | None = None,
  278. ) -> tuple[list[dict[str, Any]], str, str]:
  279. """从全量序列截取原流程所需的近 N 日数据。"""
  280. start_ymd, end_ymd = get_lookback_range(lookback_days, today=today)
  281. series = [
  282. item
  283. for item in full_scores
  284. if isinstance(item, dict)
  285. and start_ymd <= str(item.get("ymd") or "") <= end_ymd
  286. ]
  287. series.sort(key=lambda item: str(item.get("ymd") or ""))
  288. if series:
  289. return series, start_ymd, end_ymd
  290. return [], start_ymd, end_ymd
  291. def next_ymd(ymd: str) -> str:
  292. current = datetime.strptime(ymd, "%Y%m%d").date()
  293. return (current + timedelta(days=1)).strftime("%Y%m%d")
  294. def refresh_stale_wxindex_words(
  295. repository: HotContentRepository,
  296. api_client: JsonApiClient,
  297. api_url: str,
  298. *,
  299. end_ymd: str | None = None,
  300. dry_run: bool = False,
  301. verbose: bool = False,
  302. ) -> dict[str, int]:
  303. """补全已存在但缺少最新日期数据的词(仅含 meta 的新词)。"""
  304. target_end = end_ymd or get_wxindex_end_ymd()
  305. summary = {
  306. "target_end_ymd": target_end,
  307. "stale_words": 0,
  308. "refreshed": 0,
  309. "inserted_rows": 0,
  310. "skipped_rows": 0,
  311. "fetch_failed": 0,
  312. "api_empty": 0,
  313. "no_new_range": 0,
  314. }
  315. stale_words = repository.list_stale_wxindex_words(
  316. end_ymd=target_end,
  317. update_window_days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
  318. )
  319. summary["stale_words"] = len(stale_words)
  320. if not stale_words:
  321. return summary
  322. for item in stale_words:
  323. name = str(item.get("name") or "").strip()
  324. if not name:
  325. continue
  326. word_start_ymd = str(item.get("fetch_start_ymd") or "").strip()
  327. if not word_start_ymd:
  328. word_start_ymd = get_wxindex_fetch_start_ymd()
  329. stored_scores = repository.list_wxindex_word_scores(name)
  330. fetch_range = get_supplement_fetch_range(
  331. stored_scores,
  332. end_ymd=target_end,
  333. start_ymd=word_start_ymd,
  334. )
  335. if fetch_range is None:
  336. summary["no_new_range"] += 1
  337. if verbose:
  338. print(f"skip complete word={name}")
  339. continue
  340. start_ymd, end_ymd = fetch_range
  341. if start_ymd > end_ymd:
  342. summary["no_new_range"] += 1
  343. if verbose:
  344. print(f"skip up-to-date word={name}")
  345. continue
  346. if dry_run:
  347. summary["refreshed"] += 1
  348. if verbose:
  349. print(f"[dry-run] would refresh word={name} {start_ymd}->{end_ymd}")
  350. continue
  351. try:
  352. api_scores = fetch_wxindex_scores(
  353. api_client,
  354. api_url,
  355. keyword=name,
  356. start_ymd=start_ymd,
  357. end_ymd=end_ymd,
  358. )
  359. if not api_scores:
  360. summary["api_empty"] += 1
  361. if verbose:
  362. print(f"api empty word={name} range={start_ymd}->{end_ymd}")
  363. continue
  364. inserted, skipped = repository.save_wxindex_daily_scores(
  365. name=name,
  366. scores=api_scores,
  367. )
  368. except Exception as exc:
  369. summary["fetch_failed"] += 1
  370. if verbose:
  371. print(f"refresh failed word={name}: {exc}")
  372. continue
  373. if inserted <= 0:
  374. summary["api_empty"] += 1
  375. if verbose:
  376. print(
  377. f"no new rows word={name} range={start_ymd}->{end_ymd} "
  378. f"api_rows={len(api_scores)} skipped={skipped}"
  379. )
  380. continue
  381. summary["refreshed"] += 1
  382. summary["inserted_rows"] += inserted
  383. summary["skipped_rows"] += skipped
  384. if verbose:
  385. print(
  386. f"refreshed word={name} range={start_ymd}->{end_ymd} "
  387. f"inserted={inserted} skipped={skipped}"
  388. )
  389. return summary
  390. def sync_wxindex_words_from_meta(
  391. repository: HotContentRepository,
  392. api_client: JsonApiClient,
  393. api_url: str,
  394. *,
  395. end_ymd: str | None = None,
  396. dry_run: bool = False,
  397. verbose: bool = False,
  398. ) -> dict[str, Any]:
  399. """
  400. 按 hot_content_wxindex_word_meta 同步 hot_content_wxindex_words。
  401. 1. 删除 meta 中不存在的词
  402. 2. 删除窗口 [fetch_start_ymd, fetch_end_ymd] 外的日期
  403. 3. 补全窗口内缺失日期(含 fetch_start 早于昨日的历史段,如 20260615 之前)
  404. """
  405. target_end = end_ymd or get_wxindex_end_ymd()
  406. summary: dict[str, Any] = {
  407. "target_end_ymd": target_end,
  408. "meta_count": 0,
  409. "deleted_without_meta_rows": 0,
  410. "deleted_outside_window_rows": 0,
  411. "words_need_refresh": 0,
  412. "refreshed": 0,
  413. "inserted_rows": 0,
  414. "skipped_rows": 0,
  415. "fetch_failed": 0,
  416. "api_empty": 0,
  417. "no_new_range": 0,
  418. "dry_run": dry_run,
  419. }
  420. if dry_run:
  421. summary["deleted_without_meta_rows"] = repository.count_wxindex_words_without_meta()
  422. summary["deleted_outside_window_rows"] = (
  423. repository.count_wxindex_words_outside_event_window()
  424. )
  425. else:
  426. summary["deleted_without_meta_rows"] = repository.delete_wxindex_words_without_meta()
  427. summary["deleted_outside_window_rows"] = (
  428. repository.delete_wxindex_words_outside_event_window()
  429. )
  430. meta_rows = repository.list_all_wxindex_word_meta()
  431. summary["meta_count"] = len(meta_rows)
  432. for meta in meta_rows:
  433. name = str(meta.get("name") or "").strip()
  434. fetch_start = str(meta.get("fetch_start_ymd") or "").strip()
  435. fetch_end = str(meta.get("fetch_end_ymd") or "").strip()
  436. if not name or not fetch_start or not fetch_end:
  437. continue
  438. api_end = min(fetch_end, target_end)
  439. if fetch_start > api_end:
  440. summary["no_new_range"] += 1
  441. if verbose:
  442. print(
  443. f"skip out-of-range word={name} "
  444. f"window={fetch_start}~{fetch_end} api_end={api_end}"
  445. )
  446. continue
  447. stored_scores = repository.list_wxindex_word_scores(name)
  448. fetch_range = get_supplement_fetch_range(
  449. stored_scores,
  450. end_ymd=api_end,
  451. start_ymd=fetch_start,
  452. )
  453. if fetch_range is None:
  454. summary["no_new_range"] += 1
  455. if verbose:
  456. print(f"skip complete word={name} window={fetch_start}~{fetch_end}")
  457. continue
  458. summary["words_need_refresh"] += 1
  459. start_ymd, range_end = fetch_range
  460. if start_ymd > range_end:
  461. summary["no_new_range"] += 1
  462. if verbose:
  463. print(f"skip up-to-date word={name}")
  464. continue
  465. if dry_run:
  466. summary["refreshed"] += 1
  467. if verbose:
  468. print(
  469. f"[dry-run] would fetch word={name} "
  470. f"{start_ymd}->{range_end} "
  471. f"save_window={fetch_start}~{fetch_end}"
  472. )
  473. continue
  474. try:
  475. api_scores = fetch_wxindex_scores(
  476. api_client,
  477. api_url,
  478. keyword=name,
  479. start_ymd=start_ymd,
  480. end_ymd=range_end,
  481. )
  482. window_scores = filter_scores_in_ymd_window(
  483. api_scores,
  484. start_ymd=fetch_start,
  485. end_ymd=fetch_end,
  486. )
  487. if not window_scores:
  488. summary["api_empty"] += 1
  489. if verbose:
  490. print(
  491. f"api empty word={name} fetch={start_ymd}->{range_end} "
  492. f"window={fetch_start}~{fetch_end}"
  493. )
  494. continue
  495. inserted, skipped = repository.save_wxindex_daily_scores(
  496. name=name,
  497. scores=window_scores,
  498. )
  499. except Exception as exc:
  500. summary["fetch_failed"] += 1
  501. if verbose:
  502. print(f"sync failed word={name}: {exc}")
  503. continue
  504. if inserted <= 0:
  505. summary["api_empty"] += 1
  506. if verbose:
  507. print(
  508. f"no new rows word={name} fetch={start_ymd}->{range_end} "
  509. f"api_rows={len(window_scores)} skipped={skipped}"
  510. )
  511. continue
  512. summary["refreshed"] += 1
  513. summary["inserted_rows"] += inserted
  514. summary["skipped_rows"] += skipped
  515. if verbose:
  516. print(
  517. f"synced word={name} fetch={start_ymd}->{range_end} "
  518. f"inserted={inserted} skipped={skipped}"
  519. )
  520. return summary
  521. def cleanup_low_max_wxindex_words(
  522. repository: HotContentRepository,
  523. *,
  524. min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
  525. dry_run: bool = False,
  526. verbose: bool = False,
  527. ) -> dict[str, int | float]:
  528. """删除各 dt 最大值低于阈值的词(按 name 整词删除)。"""
  529. summary: dict[str, int | float] = {
  530. "min_max_score": min_max_score,
  531. "low_max_words": 0,
  532. "deleted_rows": 0,
  533. }
  534. low_words = repository.list_low_max_wxindex_words(min_max_score=min_max_score)
  535. summary["low_max_words"] = len(low_words)
  536. if not low_words:
  537. return summary
  538. if dry_run:
  539. if verbose:
  540. for item in low_words:
  541. print(
  542. f"[dry-run] would delete word={item['name']} "
  543. f"max_score={item['max_score']:.0f} rows={item['row_count']}"
  544. )
  545. summary["deleted_rows"] = sum(int(item["row_count"]) for item in low_words)
  546. return summary
  547. names = [str(item["name"]) for item in low_words if str(item.get("name") or "").strip()]
  548. deleted_rows = repository.delete_wxindex_words_by_names(names)
  549. summary["deleted_rows"] = deleted_rows
  550. if verbose:
  551. for item in low_words:
  552. print(
  553. f"deleted word={item['name']} "
  554. f"max_score={item['max_score']:.0f} rows={item['row_count']}"
  555. )
  556. return summary
  557. def try_register_wxindex_word_meta(
  558. repository: HotContentRepository,
  559. *,
  560. word: str,
  561. event_created_at: datetime | None = None,
  562. event_map: dict[str, datetime] | None = None,
  563. first_row_created_at: datetime | None = None,
  564. include_expired: bool = False,
  565. dry_run: bool = False,
  566. update_if_exists: bool = False,
  567. ) -> tuple[dict[str, Any] | None, str]:
  568. """补写 meta;返回 (meta, reason)。"""
  569. name = str(word or "").strip()
  570. if not name:
  571. return None, "empty"
  572. existing = repository.get_wxindex_word_meta(name)
  573. resolved_event_at = event_created_at
  574. if resolved_event_at is None and event_map is not None:
  575. resolved_event_at = event_map.get(name)
  576. if resolved_event_at is None:
  577. resolved_event_at = first_row_created_at
  578. if resolved_event_at is None:
  579. resolved_event_at = repository.get_wxindex_word_first_row_created_at(name)
  580. if resolved_event_at is None:
  581. if existing:
  582. return existing, "exists"
  583. return None, "no_event"
  584. normalized_event_at = normalize_event_created_at(resolved_event_at)
  585. if not include_expired and not is_word_update_active(normalized_event_at):
  586. if existing and not update_if_exists:
  587. return existing, "exists"
  588. if not existing:
  589. return None, "expired"
  590. fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at)
  591. fetch_end_ymd = get_fetch_end_ymd_from_event(normalized_event_at)
  592. if dry_run:
  593. return {
  594. "name": name,
  595. "event_created_at": normalized_event_at,
  596. "fetch_start_ymd": fetch_start_ymd,
  597. "fetch_end_ymd": fetch_end_ymd,
  598. }, "dry_run"
  599. if existing and update_if_exists:
  600. repository.update_wxindex_word_meta(
  601. name=name,
  602. event_created_at=normalized_event_at,
  603. fetch_start_ymd=fetch_start_ymd,
  604. fetch_end_ymd=fetch_end_ymd,
  605. )
  606. meta = repository.get_wxindex_word_meta(name)
  607. if meta is None:
  608. raise HotContentFlowError(f"failed to update wxindex word meta: {name}")
  609. return meta, "updated"
  610. if existing:
  611. return existing, "exists"
  612. meta = repository.ensure_wxindex_word_meta(
  613. name=name,
  614. event_created_at=normalized_event_at,
  615. fetch_start_ymd=fetch_start_ymd,
  616. fetch_end_ymd=fetch_end_ymd,
  617. )
  618. return meta, "registered"
  619. def fix_wxindex_word_meta_fetch_bounds(
  620. repository: HotContentRepository,
  621. *,
  622. dry_run: bool = False,
  623. verbose: bool = False,
  624. ) -> dict[str, int]:
  625. """按 event_created_at 修正 meta.fetch_start_ymd / fetch_end_ymd。"""
  626. rows = repository.list_all_wxindex_word_meta()
  627. summary = {
  628. "total": len(rows),
  629. "updated": 0,
  630. "unchanged": 0,
  631. }
  632. for row in rows:
  633. name = str(row.get("name") or "").strip()
  634. event_created_at = row.get("event_created_at")
  635. old_fetch_start = str(row.get("fetch_start_ymd") or "").strip()
  636. old_fetch_end = str(row.get("fetch_end_ymd") or "").strip()
  637. if not name or event_created_at is None:
  638. continue
  639. new_fetch_start, new_fetch_end = get_fetch_ymd_bounds_from_event(event_created_at)
  640. if new_fetch_start == old_fetch_start and new_fetch_end == old_fetch_end:
  641. summary["unchanged"] += 1
  642. continue
  643. if dry_run:
  644. summary["updated"] += 1
  645. if verbose:
  646. print(
  647. f"[dry-run] word={name} "
  648. f"start {old_fetch_start}->{new_fetch_start} "
  649. f"end {old_fetch_end}->{new_fetch_end}"
  650. )
  651. continue
  652. repository.update_wxindex_word_meta(
  653. name=name,
  654. event_created_at=event_created_at,
  655. fetch_start_ymd=new_fetch_start,
  656. fetch_end_ymd=new_fetch_end,
  657. )
  658. summary["updated"] += 1
  659. if verbose:
  660. print(
  661. f"updated word={name} "
  662. f"start {old_fetch_start}->{new_fetch_start} "
  663. f"end {old_fetch_end}->{new_fetch_end}"
  664. )
  665. return summary
  666. def fix_wxindex_word_meta_fetch_start_ymd(
  667. repository: HotContentRepository,
  668. *,
  669. dry_run: bool = False,
  670. verbose: bool = False,
  671. ) -> dict[str, int]:
  672. """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
  673. return fix_wxindex_word_meta_fetch_bounds(
  674. repository,
  675. dry_run=dry_run,
  676. verbose=verbose,
  677. )
  678. def cleanup_wxindex_words_outside_event_window(
  679. repository: HotContentRepository,
  680. *,
  681. window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
  682. dry_run: bool = False,
  683. verbose: bool = False,
  684. ) -> dict[str, int]:
  685. """删除 hot_content_wxindex_words 中超出 event_created_at 前后 window_days 的数据。"""
  686. to_delete = repository.count_wxindex_words_outside_event_window(
  687. window_days=window_days,
  688. )
  689. summary = {
  690. "window_days": window_days,
  691. "rows_to_delete": to_delete,
  692. "deleted_rows": 0,
  693. "words_without_meta_rows": repository.count_wxindex_words_without_meta(),
  694. }
  695. if to_delete <= 0:
  696. return summary
  697. if dry_run:
  698. if verbose:
  699. samples = repository.list_wxindex_words_outside_event_window_samples(
  700. window_days=window_days,
  701. limit=20,
  702. )
  703. for item in samples:
  704. print(
  705. f"[dry-run] word={item['name']} dt={item['dt']} "
  706. f"window={item['start_ymd']}~{item['end_ymd']} "
  707. f"event_created_at={item['event_created_at']}"
  708. )
  709. return summary
  710. summary["deleted_rows"] = repository.delete_wxindex_words_outside_event_window(
  711. window_days=window_days,
  712. )
  713. if verbose:
  714. print(f"deleted_rows={summary['deleted_rows']}")
  715. return summary
  716. def build_word_earliest_event_map(
  717. repository: HotContentRepository,
  718. *,
  719. since_dt: datetime,
  720. ) -> dict[str, datetime]:
  721. """从热点记录中汇总每个检索词对应的最早事件创建时间。"""
  722. return repository.list_word_earliest_event_times(since_dt=since_dt)
  723. def backfill_wxindex_word_meta(
  724. repository: HotContentRepository,
  725. *,
  726. since_date: date = WXINDEX_WORDS_RECORD_SINCE,
  727. include_expired: bool = True,
  728. fix_fetch_start: bool = True,
  729. dry_run: bool = False,
  730. verbose: bool = False,
  731. ) -> dict[str, Any]:
  732. """为 hot_content_wxindex_words 中缺 meta 的词补登记,并修正 fetch_start_ymd。"""
  733. since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
  734. event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
  735. candidates = repository.list_wxindex_word_bounds_without_meta()
  736. register_summary: dict[str, int | str | bool] = {
  737. "since_date": since_date.isoformat(),
  738. "include_expired": include_expired,
  739. "candidates": len(candidates),
  740. "registered": 0,
  741. "skipped_expired": 0,
  742. "skipped_no_event": 0,
  743. }
  744. for item in candidates:
  745. name = str(item.get("name") or "").strip()
  746. if not name:
  747. continue
  748. meta, reason = try_register_wxindex_word_meta(
  749. repository,
  750. word=name,
  751. event_map=event_map,
  752. first_row_created_at=item.get("first_created_at"),
  753. include_expired=include_expired,
  754. dry_run=dry_run,
  755. )
  756. if reason in {"registered", "dry_run", "updated"}:
  757. register_summary["registered"] += 1
  758. if verbose:
  759. label = "[dry-run] would register" if dry_run else "registered"
  760. print(f"{label} meta word={name} event_at={meta['event_created_at']}")
  761. elif reason == "expired":
  762. register_summary["skipped_expired"] += 1
  763. if verbose:
  764. print(f"skip expired word={name}")
  765. elif reason == "no_event":
  766. register_summary["skipped_no_event"] += 1
  767. if verbose:
  768. print(f"skip no_event word={name}")
  769. fetch_start_summary: dict[str, int] | None = None
  770. if fix_fetch_start:
  771. fetch_start_summary = fix_wxindex_word_meta_fetch_start_ymd(
  772. repository,
  773. dry_run=dry_run,
  774. verbose=verbose,
  775. )
  776. return {
  777. "register": register_summary,
  778. "fetch_start_fix": fetch_start_summary,
  779. }
  780. def backfill_active_wxindex_word_meta(
  781. repository: HotContentRepository,
  782. *,
  783. dry_run: bool = False,
  784. verbose: bool = False,
  785. ) -> dict[str, int | str]:
  786. """为表中仍处 7 天窗口内、但缺少 meta 的词补登记。"""
  787. current = datetime.now(SHANGHAI_TZ).date()
  788. since_dt = datetime.combine(
  789. current - timedelta(days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS),
  790. datetime.min.time(),
  791. ).replace(tzinfo=SHANGHAI_TZ)
  792. event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
  793. candidates = repository.list_wxindex_word_bounds_without_meta()
  794. summary: dict[str, int | str] = {
  795. "active_since": since_dt.date().isoformat(),
  796. "candidates": len(candidates),
  797. "registered": 0,
  798. "skipped_expired": 0,
  799. "skipped_no_event": 0,
  800. }
  801. for item in candidates:
  802. name = str(item.get("name") or "").strip()
  803. if not name:
  804. continue
  805. meta, reason = try_register_wxindex_word_meta(
  806. repository,
  807. word=name,
  808. event_map=event_map,
  809. first_row_created_at=item.get("first_created_at"),
  810. dry_run=dry_run,
  811. )
  812. if reason == "registered":
  813. summary["registered"] += 1
  814. if verbose:
  815. print(f"registered meta word={name} event_at={meta['event_created_at']}")
  816. elif reason == "dry_run":
  817. summary["registered"] += 1
  818. if verbose:
  819. print(f"[dry-run] would register meta word={name}")
  820. elif reason == "expired":
  821. summary["skipped_expired"] += 1
  822. if verbose:
  823. print(f"skip expired word={name}")
  824. elif reason == "no_event":
  825. summary["skipped_no_event"] += 1
  826. if verbose:
  827. print(f"skip no_event word={name}")
  828. return summary
  829. def run_wxindex_words_daily_job(
  830. repository: HotContentRepository,
  831. api_client: JsonApiClient,
  832. api_url: str,
  833. *,
  834. end_ymd: str | None = None,
  835. min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
  836. dry_run: bool = False,
  837. verbose: bool = False,
  838. ) -> dict[str, Any]:
  839. """定时任务:补 meta、补全缺失日期、清理低最大值词。"""
  840. meta_summary = backfill_active_wxindex_word_meta(
  841. repository,
  842. dry_run=dry_run,
  843. verbose=verbose,
  844. )
  845. refresh_summary = refresh_stale_wxindex_words(
  846. repository,
  847. api_client,
  848. api_url,
  849. end_ymd=end_ymd,
  850. dry_run=dry_run,
  851. verbose=verbose,
  852. )
  853. cleanup_summary = cleanup_low_max_wxindex_words(
  854. repository,
  855. min_max_score=min_max_score,
  856. dry_run=dry_run,
  857. verbose=verbose,
  858. )
  859. return {
  860. "meta_backfill": meta_summary,
  861. "refresh": refresh_summary,
  862. "cleanup": cleanup_summary,
  863. }
  864. def ensure_word_full_scores(
  865. repository: HotContentRepository,
  866. api_client: JsonApiClient,
  867. api_url: str,
  868. *,
  869. keyword: str,
  870. end_ymd: str | None = None,
  871. event_created_at: datetime | None = None,
  872. include_expired: bool = False,
  873. force_refresh: bool = False,
  874. dry_run: bool = False,
  875. update_meta_if_exists: bool = False,
  876. ) -> tuple[list[dict[str, Any]], str]:
  877. """
  878. 获取词微信指数并入库。
  879. - meta 表:窗口内存在指数 > 10 万才写入/更新
  880. - wxindex_words:仅保留 [fetch_start_ymd, fetch_end_ymd] 区间内数据
  881. - 超过 7 天窗口:不再更新
  882. 返回 (scores, action)。
  883. """
  884. word = str(keyword or "").strip()
  885. if not word:
  886. return [], "empty"
  887. target_end = end_ymd or get_wxindex_end_ymd()
  888. stored_scores = repository.list_wxindex_word_scores(word)
  889. meta = repository.get_wxindex_word_meta(word)
  890. should_register_meta = meta is None and event_created_at is not None
  891. should_update_meta = (
  892. update_meta_if_exists
  893. and meta is not None
  894. and event_created_at is not None
  895. )
  896. fetch_start_ymd: str | None = None
  897. fetch_end_ymd: str | None = None
  898. if event_created_at is not None:
  899. normalized_event_at = normalize_event_created_at(event_created_at)
  900. if not include_expired and not is_word_update_active(normalized_event_at):
  901. if meta is None:
  902. return stored_scores, "expired"
  903. if not should_update_meta:
  904. return stored_scores, "expired"
  905. fetch_start_ymd, fetch_end_ymd = get_fetch_ymd_bounds_from_event(
  906. normalized_event_at
  907. )
  908. elif meta is not None:
  909. fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
  910. fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
  911. if not fetch_end_ymd:
  912. fetch_end_ymd = get_fetch_end_ymd_from_event(meta["event_created_at"])
  913. if not include_expired and not is_word_update_active(meta["event_created_at"]):
  914. return stored_scores, "expired"
  915. else:
  916. return stored_scores, "legacy"
  917. if not fetch_start_ymd or not fetch_end_ymd:
  918. return stored_scores, "legacy"
  919. api_end_ymd = min(fetch_end_ymd, target_end)
  920. fetch_range = None if force_refresh else get_supplement_fetch_range(
  921. stored_scores,
  922. end_ymd=api_end_ymd,
  923. start_ymd=fetch_start_ymd,
  924. )
  925. if fetch_range is None and stored_scores and meta is not None and not should_update_meta:
  926. merged_scores = merge_wxindex_score_series(stored_scores)
  927. window_scores = filter_scores_in_ymd_window(
  928. merged_scores,
  929. start_ymd=fetch_start_ymd,
  930. end_ymd=fetch_end_ymd,
  931. )
  932. return window_scores, "cached"
  933. if dry_run:
  934. return [], "dry_run"
  935. had_data = bool(stored_scores)
  936. start_ymd, fetch_end_ymd_api = fetch_range or (fetch_start_ymd, api_end_ymd)
  937. api_scores: list[dict[str, Any]] = []
  938. if fetch_range is not None or not stored_scores or force_refresh:
  939. api_scores = fetch_wxindex_scores(
  940. api_client,
  941. api_url,
  942. keyword=word,
  943. start_ymd=start_ymd,
  944. end_ymd=fetch_end_ymd_api,
  945. )
  946. merged_scores = merge_wxindex_score_series(stored_scores, api_scores)
  947. window_scores = filter_scores_in_ymd_window(
  948. merged_scores,
  949. start_ymd=fetch_start_ymd,
  950. end_ymd=fetch_end_ymd,
  951. )
  952. if should_register_meta or should_update_meta:
  953. if not word_has_high_score_in_window(
  954. window_scores,
  955. start_ymd=fetch_start_ymd,
  956. end_ymd=fetch_end_ymd,
  957. ):
  958. return stored_scores, "below_threshold"
  959. meta, register_reason = try_register_wxindex_word_meta(
  960. repository,
  961. word=word,
  962. event_created_at=event_created_at,
  963. include_expired=include_expired,
  964. dry_run=dry_run,
  965. update_if_exists=should_update_meta,
  966. )
  967. if meta is None:
  968. if register_reason == "expired":
  969. return stored_scores, "expired"
  970. return stored_scores, "legacy"
  971. elif meta is None:
  972. return stored_scores, "below_threshold"
  973. if meta is None:
  974. return stored_scores, "legacy"
  975. if not api_scores and not window_scores:
  976. return stored_scores, "api_empty"
  977. if not had_data and not word_has_high_score_in_window(
  978. window_scores,
  979. start_ymd=fetch_start_ymd,
  980. end_ymd=fetch_end_ymd,
  981. ):
  982. return [], "below_threshold"
  983. inserted, _skipped = repository.save_wxindex_daily_scores(
  984. name=word,
  985. scores=window_scores,
  986. )
  987. final_scores = filter_scores_in_ymd_window(
  988. repository.list_wxindex_word_scores(word),
  989. start_ymd=fetch_start_ymd,
  990. end_ymd=fetch_end_ymd,
  991. )
  992. if inserted > 0:
  993. action = "updated" if had_data else "inserted"
  994. elif final_scores:
  995. action = "cached"
  996. else:
  997. action = "api_empty"
  998. return final_scores or window_scores, action
  999. def sync_words_from_trend_json(
  1000. repository: HotContentRepository,
  1001. api_client: JsonApiClient,
  1002. api_url: str,
  1003. *,
  1004. trend_json: dict[str, Any],
  1005. record_id: int,
  1006. event_created_at: datetime | None = None,
  1007. dry_run: bool = False,
  1008. verbose: bool = False,
  1009. update_meta_if_exists: bool = False,
  1010. ) -> dict[str, int]:
  1011. """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(近 7 日数据)。"""
  1012. summary = {
  1013. "words_found": 0,
  1014. "inserted": 0,
  1015. "updated": 0,
  1016. "cached": 0,
  1017. "legacy": 0,
  1018. "expired": 0,
  1019. "api_empty": 0,
  1020. "below_threshold": 0,
  1021. "fetch_failed": 0,
  1022. }
  1023. words = extract_searched_words(trend_json)
  1024. summary["words_found"] = len(words)
  1025. if not words:
  1026. return summary
  1027. for name in words:
  1028. try:
  1029. _, action = ensure_word_full_scores(
  1030. repository,
  1031. api_client,
  1032. api_url,
  1033. keyword=name,
  1034. event_created_at=event_created_at,
  1035. dry_run=dry_run,
  1036. update_meta_if_exists=update_meta_if_exists,
  1037. )
  1038. except Exception as exc:
  1039. summary["fetch_failed"] += 1
  1040. if verbose:
  1041. print(f" fetch failed word={name}: {exc}")
  1042. continue
  1043. if action == "inserted":
  1044. summary["inserted"] += 1
  1045. elif action == "updated":
  1046. summary["updated"] += 1
  1047. elif action == "cached":
  1048. summary["cached"] += 1
  1049. elif action == "legacy":
  1050. summary["legacy"] += 1
  1051. elif action == "expired":
  1052. summary["expired"] += 1
  1053. elif action == "api_empty":
  1054. summary["api_empty"] += 1
  1055. elif action == "below_threshold":
  1056. summary["below_threshold"] += 1
  1057. elif action == "dry_run":
  1058. summary["inserted"] += 1
  1059. if verbose:
  1060. print(f" word={name} action={action}")
  1061. return summary
  1062. def backfill_wxindex_words(
  1063. repository: HotContentRepository,
  1064. api_client: JsonApiClient,
  1065. api_url: str,
  1066. *,
  1067. since_date: date = WXINDEX_WORDS_RECORD_SINCE,
  1068. dry_run: bool = False,
  1069. verbose: bool = False,
  1070. ) -> dict[str, int]:
  1071. """扫描 hot_content_records,汇总 6/11 起全部微信指数检索词(历史回填调 API)。"""
  1072. summary = {
  1073. "records_scanned": 0,
  1074. "records_with_words": 0,
  1075. "words_found": 0,
  1076. "inserted": 0,
  1077. "updated": 0,
  1078. "cached": 0,
  1079. "legacy": 0,
  1080. "expired": 0,
  1081. "api_empty": 0,
  1082. "below_threshold": 0,
  1083. "fetch_failed": 0,
  1084. "invalid_json": 0,
  1085. }
  1086. since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
  1087. records = repository.list_records_with_wxindex_trend(since_dt=since_dt)
  1088. for row in records:
  1089. summary["records_scanned"] += 1
  1090. record_id = int(row["id"])
  1091. try:
  1092. trend_json = row.get("wxindex_trend_json")
  1093. if not isinstance(trend_json, dict):
  1094. summary["invalid_json"] += 1
  1095. continue
  1096. except (TypeError, ValueError):
  1097. summary["invalid_json"] += 1
  1098. continue
  1099. words = extract_searched_words(trend_json)
  1100. if not words:
  1101. continue
  1102. summary["records_with_words"] += 1
  1103. if verbose:
  1104. print(f"id={record_id} words={words}")
  1105. result = sync_words_from_trend_json(
  1106. repository,
  1107. api_client,
  1108. api_url,
  1109. trend_json=trend_json,
  1110. record_id=record_id,
  1111. event_created_at=row.get("created_at"),
  1112. dry_run=dry_run,
  1113. verbose=verbose,
  1114. )
  1115. for key in (
  1116. "words_found",
  1117. "inserted",
  1118. "updated",
  1119. "cached",
  1120. "legacy",
  1121. "expired",
  1122. "api_empty",
  1123. "below_threshold",
  1124. "fetch_failed",
  1125. ):
  1126. summary[key] += result[key]
  1127. return summary
  1128. def build_word_event_map_from_records(
  1129. records: list[dict[str, Any]],
  1130. ) -> dict[str, datetime]:
  1131. word_events: dict[str, datetime] = {}
  1132. for row in records:
  1133. created_at = row.get("created_at")
  1134. if not isinstance(created_at, datetime):
  1135. continue
  1136. for word in extract_searched_words(row.get("wxindex_trend_json")):
  1137. existing = word_events.get(word)
  1138. if existing is None or created_at < existing:
  1139. word_events[word] = created_at
  1140. return word_events
  1141. def audit_wxindex_words_from_records(
  1142. repository: HotContentRepository,
  1143. *,
  1144. after_created_at: datetime,
  1145. ) -> dict[str, Any]:
  1146. """检查指定时间后的热点记录,其微信指数词是否已在汇总表和 meta 表。"""
  1147. records = repository.list_records_with_wxindex_trend_after(
  1148. after_created_at=after_created_at,
  1149. )
  1150. word_events = build_word_event_map_from_records(records)
  1151. missing_words: list[str] = []
  1152. missing_meta: list[str] = []
  1153. for word in sorted(word_events):
  1154. if not repository.has_wxindex_word(word):
  1155. missing_words.append(word)
  1156. elif repository.get_wxindex_word_meta(word) is None:
  1157. missing_meta.append(word)
  1158. return {
  1159. "after_created_at": after_created_at.isoformat(sep=" ", timespec="seconds"),
  1160. "records_scanned": len(records),
  1161. "words_found": len(word_events),
  1162. "missing_words_count": len(missing_words),
  1163. "missing_meta_count": len(missing_meta),
  1164. "missing_words": missing_words,
  1165. "missing_meta": missing_meta,
  1166. }
  1167. def supplement_wxindex_words_from_records(
  1168. repository: HotContentRepository,
  1169. api_client: JsonApiClient,
  1170. api_url: str,
  1171. *,
  1172. after_created_at: datetime,
  1173. dry_run: bool = False,
  1174. verbose: bool = False,
  1175. ) -> dict[str, Any]:
  1176. """补全指定时间后热点记录涉及、但缺失的 wxindex 词与 meta。"""
  1177. audit = audit_wxindex_words_from_records(
  1178. repository,
  1179. after_created_at=after_created_at,
  1180. )
  1181. records = repository.list_records_with_wxindex_trend_after(
  1182. after_created_at=after_created_at,
  1183. )
  1184. word_events = build_word_event_map_from_records(records)
  1185. summary: dict[str, Any] = {
  1186. "audit_before": {
  1187. "records_scanned": audit["records_scanned"],
  1188. "words_found": audit["words_found"],
  1189. "missing_words_count": audit["missing_words_count"],
  1190. "missing_meta_count": audit["missing_meta_count"],
  1191. },
  1192. "supplemented_words": 0,
  1193. "inserted": 0,
  1194. "updated": 0,
  1195. "cached": 0,
  1196. "meta_registered": 0,
  1197. "api_empty": 0,
  1198. "below_threshold": 0,
  1199. "fetch_failed": 0,
  1200. }
  1201. for word, event_at in sorted(word_events.items()):
  1202. had_meta = repository.get_wxindex_word_meta(word) is not None
  1203. try:
  1204. _, action = ensure_word_full_scores(
  1205. repository,
  1206. api_client,
  1207. api_url,
  1208. keyword=word,
  1209. event_created_at=event_at,
  1210. include_expired=True,
  1211. dry_run=dry_run,
  1212. )
  1213. except Exception as exc:
  1214. summary["fetch_failed"] += 1
  1215. if verbose:
  1216. print(f"fetch failed word={word}: {exc}")
  1217. continue
  1218. summary["supplemented_words"] += 1
  1219. if action == "inserted":
  1220. summary["inserted"] += 1
  1221. elif action == "updated":
  1222. summary["updated"] += 1
  1223. elif action == "cached":
  1224. summary["cached"] += 1
  1225. elif action == "api_empty":
  1226. summary["api_empty"] += 1
  1227. elif action == "below_threshold":
  1228. summary["below_threshold"] += 1
  1229. elif action == "dry_run":
  1230. summary["inserted"] += 1
  1231. if not had_meta:
  1232. if dry_run or repository.get_wxindex_word_meta(word) is not None:
  1233. summary["meta_registered"] += 1
  1234. if verbose:
  1235. print(f"word={word} event_at={event_at} action={action}")
  1236. audit_after = audit_wxindex_words_from_records(
  1237. repository,
  1238. after_created_at=after_created_at,
  1239. )
  1240. summary["audit_after"] = {
  1241. "missing_words_count": audit_after["missing_words_count"],
  1242. "missing_meta_count": audit_after["missing_meta_count"],
  1243. "missing_words": audit_after["missing_words"],
  1244. "missing_meta": audit_after["missing_meta"],
  1245. }
  1246. return summary