| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404 |
- """微信指数检索词汇总:从 wxindex_trend_json 提取全部检索词并持久化每日指数。"""
- from __future__ import annotations
- from datetime import date, datetime, timedelta
- from typing import Any
- from app.hot_content.client import JsonApiClient
- from app.hot_content.demand_export import get_wxindex_keywords
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.repository import HotContentRepository
- from app.hot_content.timezone import SHANGHAI_TZ
- WXINDEX_WORDS_LOOKBACK_DAYS = 7
- WXINDEX_WORDS_UPDATE_WINDOW_DAYS = 7
- WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11)
- WXINDEX_WORDS_MIN_MAX_SCORE = 100_000.0
- def get_fetch_start_ymd_from_event(
- event_created_at: datetime,
- *,
- lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
- ) -> str:
- """数据窗口左边界:事件创建日往前 N 天(yyyymmdd)。"""
- event_date = normalize_event_created_at(event_created_at).date()
- start_date = event_date - timedelta(days=lookback_days)
- return start_date.strftime("%Y%m%d")
- def get_fetch_end_ymd_from_event(
- event_created_at: datetime,
- *,
- forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
- ) -> str:
- """数据窗口右边界:事件创建日后 N 天(yyyymmdd)。"""
- event_date = normalize_event_created_at(event_created_at).date()
- end_date = event_date + timedelta(days=forward_days)
- return end_date.strftime("%Y%m%d")
- def get_fetch_ymd_bounds_from_event(
- event_created_at: datetime,
- *,
- lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
- forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
- ) -> tuple[str, str]:
- return (
- get_fetch_start_ymd_from_event(
- event_created_at,
- lookback_days=lookback_days,
- ),
- get_fetch_end_ymd_from_event(
- event_created_at,
- forward_days=forward_days,
- ),
- )
- def get_word_data_window_ymd_bounds(
- event_created_at: datetime,
- *,
- window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
- ) -> tuple[str, str]:
- """事件创建日前后 N 天的数据窗口 [start_ymd, end_ymd]。"""
- event_date = normalize_event_created_at(event_created_at).date()
- start_date = event_date - timedelta(days=window_days)
- end_date = event_date + timedelta(days=window_days)
- return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
- def get_wxindex_fetch_start_ymd(
- *,
- today: date | None = None,
- lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
- ) -> str:
- """首次拉取起始日期:近 N 日(截至昨日)。"""
- start_ymd, _end_ymd = get_lookback_range(lookback_days, today=today)
- return start_ymd
- def normalize_event_created_at(value: datetime | None) -> datetime:
- if value is None:
- return datetime.now(SHANGHAI_TZ)
- if value.tzinfo is None:
- return value.replace(tzinfo=SHANGHAI_TZ)
- return value.astimezone(SHANGHAI_TZ)
- def is_word_update_active(
- event_created_at: datetime,
- *,
- today: date | None = None,
- window_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
- ) -> bool:
- """事件创建后 window_days 天内继续更新,超出则停止。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- event_date = normalize_event_created_at(event_created_at).date()
- return (current - event_date).days <= window_days
- def get_wxindex_end_ymd(*, today: date | None = None) -> str:
- current = today or datetime.now(SHANGHAI_TZ).date()
- return (current - timedelta(days=1)).strftime("%Y%m%d")
- def get_lookback_range(lookback_days: int, *, today: date | None = None) -> tuple[str, str]:
- """原流程使用的近 N 日区间(截至昨日)。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- end_date = current - timedelta(days=1)
- start_date = end_date - timedelta(days=max(lookback_days, 1))
- return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
- def extract_searched_words(trend_json: dict[str, Any] | None) -> list[str]:
- """提取 wxindex_trend_json 中实际检索过微信指数的全部词(非仅最高分词)。"""
- if not isinstance(trend_json, dict):
- return []
- words: list[str] = []
- seen: set[str] = set()
- for item in trend_json.get("wxindex_searches") or []:
- if not isinstance(item, dict):
- continue
- keyword = str(item.get("keyword") or "").strip()
- if keyword and keyword not in seen:
- seen.add(keyword)
- words.append(keyword)
- if words:
- return words
- return get_wxindex_keywords(trend_json)
- def parse_wxindex_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
- rows = ((wx_resp.get("data") or {}).get("data") or [])
- if not isinstance(rows, list):
- return []
- series: list[dict[str, Any]] = []
- for row in rows:
- if not isinstance(row, dict):
- continue
- ymd = str(row.get("ymd") or "").strip()
- total_score = (row.get("channel_score") or {}).get("total_score")
- try:
- score_num = float(total_score) if total_score is not None else None
- except (TypeError, ValueError):
- score_num = None
- if ymd and score_num is not None:
- series.append({"ymd": ymd, "total_score": score_num})
- series.sort(key=lambda item: item["ymd"])
- return series
- def fetch_wxindex_scores(
- api_client: JsonApiClient,
- api_url: str,
- *,
- keyword: str,
- start_ymd: str,
- end_ymd: str | None = None,
- ) -> list[dict[str, Any]]:
- payload = {
- "keyword": keyword,
- "start_ymd": start_ymd,
- "end_ymd": end_ymd or get_wxindex_end_ymd(),
- }
- wx_resp = api_client.post_json(api_url, payload)
- return parse_wxindex_total_scores(wx_resp)
- def get_max_total_score(scores: list[dict[str, Any]]) -> float | None:
- """从指数序列中取 total_score 最大值。"""
- values: list[float] = []
- for item in scores:
- if not isinstance(item, dict):
- continue
- try:
- values.append(float(item["total_score"]))
- except (TypeError, ValueError, KeyError):
- continue
- if not values:
- return None
- return max(values)
- def word_meets_max_score_threshold(
- scores: list[dict[str, Any]],
- *,
- min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
- ) -> bool:
- """新增词时:最大值需严格大于阈值(不超过阈值则不添加)。"""
- max_score = get_max_total_score(scores)
- if max_score is None:
- return False
- return max_score > min_max_score
- def filter_scores_in_ymd_window(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- ) -> list[dict[str, Any]]:
- start = str(start_ymd or "").strip()
- end = str(end_ymd or "").strip()
- if not start or not end:
- return []
- filtered: list[dict[str, Any]] = []
- for item in scores:
- if not isinstance(item, dict):
- continue
- ymd = str(item.get("ymd") or item.get("dt") or "").strip()
- if not ymd or ymd < start or ymd > end:
- continue
- filtered.append(item)
- filtered.sort(key=lambda row: str(row.get("ymd") or row.get("dt") or ""))
- return filtered
- def word_has_high_score_in_window(
- scores: list[dict[str, Any]],
- *,
- start_ymd: str,
- end_ymd: str,
- min_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
- ) -> bool:
- """窗口内是否存在严格大于阈值的微信指数。"""
- window_scores = filter_scores_in_ymd_window(
- scores,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- for item in window_scores:
- try:
- score = float(item["total_score"])
- except (TypeError, ValueError, KeyError):
- continue
- if score > min_score:
- return True
- return False
- def merge_wxindex_score_series(
- *series_list: list[dict[str, Any]],
- ) -> list[dict[str, Any]]:
- merged: dict[str, dict[str, Any]] = {}
- for series in series_list:
- for item in series:
- if not isinstance(item, dict):
- continue
- ymd = str(item.get("ymd") or item.get("dt") or "").strip()
- if not ymd:
- continue
- try:
- total_score = float(item["total_score"])
- except (TypeError, ValueError, KeyError):
- continue
- merged[ymd] = {"ymd": ymd, "total_score": total_score}
- return sorted(merged.values(), key=lambda row: row["ymd"])
- def get_word_score_bounds(
- scores: list[dict[str, Any]],
- ) -> tuple[str | None, str | None]:
- ymds = [
- str(item.get("ymd") or item.get("dt") or "").strip()
- for item in scores
- if isinstance(item, dict) and str(item.get("ymd") or item.get("dt") or "").strip()
- ]
- if not ymds:
- return None, None
- return min(ymds), max(ymds)
- def word_scores_need_supplement(
- scores: list[dict[str, Any]],
- *,
- end_ymd: str | None = None,
- start_ymd: str,
- ) -> tuple[bool, str]:
- """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。"""
- if not scores:
- return True, "empty"
- target_end = end_ymd or get_wxindex_end_ymd()
- earliest_ymd, latest_ymd = get_word_score_bounds(scores)
- if earliest_ymd is None or latest_ymd is None:
- return True, "empty"
- if earliest_ymd > start_ymd:
- return True, "missing_start"
- if latest_ymd < target_end:
- return True, "missing_end"
- return False, "complete"
- def get_supplement_fetch_range(
- scores: list[dict[str, Any]],
- *,
- end_ymd: str | None = None,
- start_ymd: str,
- ) -> tuple[str, str] | None:
- """计算补数 API 查询区间;无需补数时返回 None。"""
- need_supplement, reason = word_scores_need_supplement(
- scores,
- end_ymd=end_ymd,
- start_ymd=start_ymd,
- )
- if not need_supplement:
- return None
- target_end = end_ymd or get_wxindex_end_ymd()
- if reason == "empty":
- return start_ymd, target_end
- earliest_ymd, latest_ymd = get_word_score_bounds(scores)
- if earliest_ymd is None or latest_ymd is None:
- return start_ymd, target_end
- if earliest_ymd > start_ymd:
- return start_ymd, target_end
- return next_ymd(latest_ymd), target_end
- def slice_scores_lookback(
- full_scores: list[dict[str, Any]],
- lookback_days: int,
- *,
- today: date | None = None,
- ) -> tuple[list[dict[str, Any]], str, str]:
- """从全量序列截取原流程所需的近 N 日数据。"""
- start_ymd, end_ymd = get_lookback_range(lookback_days, today=today)
- series = [
- item
- for item in full_scores
- if isinstance(item, dict)
- and start_ymd <= str(item.get("ymd") or "") <= end_ymd
- ]
- series.sort(key=lambda item: str(item.get("ymd") or ""))
- if series:
- return series, start_ymd, end_ymd
- return [], start_ymd, end_ymd
- def next_ymd(ymd: str) -> str:
- current = datetime.strptime(ymd, "%Y%m%d").date()
- return (current + timedelta(days=1)).strftime("%Y%m%d")
- def refresh_stale_wxindex_words(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- end_ymd: str | None = None,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int]:
- """补全已存在但缺少最新日期数据的词(仅含 meta 的新词)。"""
- target_end = end_ymd or get_wxindex_end_ymd()
- summary = {
- "target_end_ymd": target_end,
- "stale_words": 0,
- "refreshed": 0,
- "inserted_rows": 0,
- "skipped_rows": 0,
- "fetch_failed": 0,
- "api_empty": 0,
- "no_new_range": 0,
- }
- stale_words = repository.list_stale_wxindex_words(
- end_ymd=target_end,
- update_window_days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
- )
- summary["stale_words"] = len(stale_words)
- if not stale_words:
- return summary
- for item in stale_words:
- name = str(item.get("name") or "").strip()
- if not name:
- continue
- word_start_ymd = str(item.get("fetch_start_ymd") or "").strip()
- if not word_start_ymd:
- word_start_ymd = get_wxindex_fetch_start_ymd()
- stored_scores = repository.list_wxindex_word_scores(name)
- fetch_range = get_supplement_fetch_range(
- stored_scores,
- end_ymd=target_end,
- start_ymd=word_start_ymd,
- )
- if fetch_range is None:
- summary["no_new_range"] += 1
- if verbose:
- print(f"skip complete word={name}")
- continue
- start_ymd, end_ymd = fetch_range
- if start_ymd > end_ymd:
- summary["no_new_range"] += 1
- if verbose:
- print(f"skip up-to-date word={name}")
- continue
- if dry_run:
- summary["refreshed"] += 1
- if verbose:
- print(f"[dry-run] would refresh word={name} {start_ymd}->{end_ymd}")
- continue
- try:
- api_scores = fetch_wxindex_scores(
- api_client,
- api_url,
- keyword=name,
- start_ymd=start_ymd,
- end_ymd=end_ymd,
- )
- if not api_scores:
- summary["api_empty"] += 1
- if verbose:
- print(f"api empty word={name} range={start_ymd}->{end_ymd}")
- continue
- inserted, skipped = repository.save_wxindex_daily_scores(
- name=name,
- scores=api_scores,
- )
- except Exception as exc:
- summary["fetch_failed"] += 1
- if verbose:
- print(f"refresh failed word={name}: {exc}")
- continue
- if inserted <= 0:
- summary["api_empty"] += 1
- if verbose:
- print(
- f"no new rows word={name} range={start_ymd}->{end_ymd} "
- f"api_rows={len(api_scores)} skipped={skipped}"
- )
- continue
- summary["refreshed"] += 1
- summary["inserted_rows"] += inserted
- summary["skipped_rows"] += skipped
- if verbose:
- print(
- f"refreshed word={name} range={start_ymd}->{end_ymd} "
- f"inserted={inserted} skipped={skipped}"
- )
- return summary
- def sync_wxindex_words_from_meta(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- end_ymd: str | None = None,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """
- 按 hot_content_wxindex_word_meta 同步 hot_content_wxindex_words。
- 1. 删除 meta 中不存在的词
- 2. 删除窗口 [fetch_start_ymd, fetch_end_ymd] 外的日期
- 3. 补全窗口内缺失日期(含 fetch_start 早于昨日的历史段,如 20260615 之前)
- """
- target_end = end_ymd or get_wxindex_end_ymd()
- summary: dict[str, Any] = {
- "target_end_ymd": target_end,
- "meta_count": 0,
- "deleted_without_meta_rows": 0,
- "deleted_outside_window_rows": 0,
- "words_need_refresh": 0,
- "refreshed": 0,
- "inserted_rows": 0,
- "skipped_rows": 0,
- "fetch_failed": 0,
- "api_empty": 0,
- "no_new_range": 0,
- "dry_run": dry_run,
- }
- if dry_run:
- summary["deleted_without_meta_rows"] = repository.count_wxindex_words_without_meta()
- summary["deleted_outside_window_rows"] = (
- repository.count_wxindex_words_outside_event_window()
- )
- else:
- summary["deleted_without_meta_rows"] = repository.delete_wxindex_words_without_meta()
- summary["deleted_outside_window_rows"] = (
- repository.delete_wxindex_words_outside_event_window()
- )
- meta_rows = repository.list_all_wxindex_word_meta()
- summary["meta_count"] = len(meta_rows)
- for meta in meta_rows:
- name = str(meta.get("name") or "").strip()
- fetch_start = str(meta.get("fetch_start_ymd") or "").strip()
- fetch_end = str(meta.get("fetch_end_ymd") or "").strip()
- if not name or not fetch_start or not fetch_end:
- continue
- api_end = min(fetch_end, target_end)
- if fetch_start > api_end:
- summary["no_new_range"] += 1
- if verbose:
- print(
- f"skip out-of-range word={name} "
- f"window={fetch_start}~{fetch_end} api_end={api_end}"
- )
- continue
- stored_scores = repository.list_wxindex_word_scores(name)
- fetch_range = get_supplement_fetch_range(
- stored_scores,
- end_ymd=api_end,
- start_ymd=fetch_start,
- )
- if fetch_range is None:
- summary["no_new_range"] += 1
- if verbose:
- print(f"skip complete word={name} window={fetch_start}~{fetch_end}")
- continue
- summary["words_need_refresh"] += 1
- start_ymd, range_end = fetch_range
- if start_ymd > range_end:
- summary["no_new_range"] += 1
- if verbose:
- print(f"skip up-to-date word={name}")
- continue
- if dry_run:
- summary["refreshed"] += 1
- if verbose:
- print(
- f"[dry-run] would fetch word={name} "
- f"{start_ymd}->{range_end} "
- f"save_window={fetch_start}~{fetch_end}"
- )
- continue
- try:
- api_scores = fetch_wxindex_scores(
- api_client,
- api_url,
- keyword=name,
- start_ymd=start_ymd,
- end_ymd=range_end,
- )
- window_scores = filter_scores_in_ymd_window(
- api_scores,
- start_ymd=fetch_start,
- end_ymd=fetch_end,
- )
- if not window_scores:
- summary["api_empty"] += 1
- if verbose:
- print(
- f"api empty word={name} fetch={start_ymd}->{range_end} "
- f"window={fetch_start}~{fetch_end}"
- )
- continue
- inserted, skipped = repository.save_wxindex_daily_scores(
- name=name,
- scores=window_scores,
- )
- except Exception as exc:
- summary["fetch_failed"] += 1
- if verbose:
- print(f"sync failed word={name}: {exc}")
- continue
- if inserted <= 0:
- summary["api_empty"] += 1
- if verbose:
- print(
- f"no new rows word={name} fetch={start_ymd}->{range_end} "
- f"api_rows={len(window_scores)} skipped={skipped}"
- )
- continue
- summary["refreshed"] += 1
- summary["inserted_rows"] += inserted
- summary["skipped_rows"] += skipped
- if verbose:
- print(
- f"synced word={name} fetch={start_ymd}->{range_end} "
- f"inserted={inserted} skipped={skipped}"
- )
- return summary
- def cleanup_low_max_wxindex_words(
- repository: HotContentRepository,
- *,
- min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int | float]:
- """删除各 dt 最大值低于阈值的词(按 name 整词删除)。"""
- summary: dict[str, int | float] = {
- "min_max_score": min_max_score,
- "low_max_words": 0,
- "deleted_rows": 0,
- }
- low_words = repository.list_low_max_wxindex_words(min_max_score=min_max_score)
- summary["low_max_words"] = len(low_words)
- if not low_words:
- return summary
- if dry_run:
- if verbose:
- for item in low_words:
- print(
- f"[dry-run] would delete word={item['name']} "
- f"max_score={item['max_score']:.0f} rows={item['row_count']}"
- )
- summary["deleted_rows"] = sum(int(item["row_count"]) for item in low_words)
- return summary
- names = [str(item["name"]) for item in low_words if str(item.get("name") or "").strip()]
- deleted_rows = repository.delete_wxindex_words_by_names(names)
- summary["deleted_rows"] = deleted_rows
- if verbose:
- for item in low_words:
- print(
- f"deleted word={item['name']} "
- f"max_score={item['max_score']:.0f} rows={item['row_count']}"
- )
- return summary
- def try_register_wxindex_word_meta(
- repository: HotContentRepository,
- *,
- word: str,
- event_created_at: datetime | None = None,
- event_map: dict[str, datetime] | None = None,
- first_row_created_at: datetime | None = None,
- include_expired: bool = False,
- dry_run: bool = False,
- update_if_exists: bool = False,
- ) -> tuple[dict[str, Any] | None, str]:
- """补写 meta;返回 (meta, reason)。"""
- name = str(word or "").strip()
- if not name:
- return None, "empty"
- existing = repository.get_wxindex_word_meta(name)
- resolved_event_at = event_created_at
- if resolved_event_at is None and event_map is not None:
- resolved_event_at = event_map.get(name)
- if resolved_event_at is None:
- resolved_event_at = first_row_created_at
- if resolved_event_at is None:
- resolved_event_at = repository.get_wxindex_word_first_row_created_at(name)
- if resolved_event_at is None:
- if existing:
- return existing, "exists"
- return None, "no_event"
- normalized_event_at = normalize_event_created_at(resolved_event_at)
- if not include_expired and not is_word_update_active(normalized_event_at):
- if existing and not update_if_exists:
- return existing, "exists"
- if not existing:
- return None, "expired"
- fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at)
- fetch_end_ymd = get_fetch_end_ymd_from_event(normalized_event_at)
- if dry_run:
- return {
- "name": name,
- "event_created_at": normalized_event_at,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- }, "dry_run"
- if existing and update_if_exists:
- repository.update_wxindex_word_meta(
- name=name,
- event_created_at=normalized_event_at,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- )
- meta = repository.get_wxindex_word_meta(name)
- if meta is None:
- raise HotContentFlowError(f"failed to update wxindex word meta: {name}")
- return meta, "updated"
- if existing:
- return existing, "exists"
- meta = repository.ensure_wxindex_word_meta(
- name=name,
- event_created_at=normalized_event_at,
- fetch_start_ymd=fetch_start_ymd,
- fetch_end_ymd=fetch_end_ymd,
- )
- return meta, "registered"
- def fix_wxindex_word_meta_fetch_bounds(
- repository: HotContentRepository,
- *,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int]:
- """按 event_created_at 修正 meta.fetch_start_ymd / fetch_end_ymd。"""
- rows = repository.list_all_wxindex_word_meta()
- summary = {
- "total": len(rows),
- "updated": 0,
- "unchanged": 0,
- }
- for row in rows:
- name = str(row.get("name") or "").strip()
- event_created_at = row.get("event_created_at")
- old_fetch_start = str(row.get("fetch_start_ymd") or "").strip()
- old_fetch_end = str(row.get("fetch_end_ymd") or "").strip()
- if not name or event_created_at is None:
- continue
- new_fetch_start, new_fetch_end = get_fetch_ymd_bounds_from_event(event_created_at)
- if new_fetch_start == old_fetch_start and new_fetch_end == old_fetch_end:
- summary["unchanged"] += 1
- continue
- if dry_run:
- summary["updated"] += 1
- if verbose:
- print(
- f"[dry-run] word={name} "
- f"start {old_fetch_start}->{new_fetch_start} "
- f"end {old_fetch_end}->{new_fetch_end}"
- )
- continue
- repository.update_wxindex_word_meta(
- name=name,
- event_created_at=event_created_at,
- fetch_start_ymd=new_fetch_start,
- fetch_end_ymd=new_fetch_end,
- )
- summary["updated"] += 1
- if verbose:
- print(
- f"updated word={name} "
- f"start {old_fetch_start}->{new_fetch_start} "
- f"end {old_fetch_end}->{new_fetch_end}"
- )
- return summary
- def fix_wxindex_word_meta_fetch_start_ymd(
- repository: HotContentRepository,
- *,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int]:
- """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
- return fix_wxindex_word_meta_fetch_bounds(
- repository,
- dry_run=dry_run,
- verbose=verbose,
- )
- def cleanup_wxindex_words_outside_event_window(
- repository: HotContentRepository,
- *,
- window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int]:
- """删除 hot_content_wxindex_words 中超出 event_created_at 前后 window_days 的数据。"""
- to_delete = repository.count_wxindex_words_outside_event_window(
- window_days=window_days,
- )
- summary = {
- "window_days": window_days,
- "rows_to_delete": to_delete,
- "deleted_rows": 0,
- "words_without_meta_rows": repository.count_wxindex_words_without_meta(),
- }
- if to_delete <= 0:
- return summary
- if dry_run:
- if verbose:
- samples = repository.list_wxindex_words_outside_event_window_samples(
- window_days=window_days,
- limit=20,
- )
- for item in samples:
- print(
- f"[dry-run] word={item['name']} dt={item['dt']} "
- f"window={item['start_ymd']}~{item['end_ymd']} "
- f"event_created_at={item['event_created_at']}"
- )
- return summary
- summary["deleted_rows"] = repository.delete_wxindex_words_outside_event_window(
- window_days=window_days,
- )
- if verbose:
- print(f"deleted_rows={summary['deleted_rows']}")
- return summary
- def build_word_earliest_event_map(
- repository: HotContentRepository,
- *,
- since_dt: datetime,
- ) -> dict[str, datetime]:
- """从热点记录中汇总每个检索词对应的最早事件创建时间。"""
- return repository.list_word_earliest_event_times(since_dt=since_dt)
- def backfill_wxindex_word_meta(
- repository: HotContentRepository,
- *,
- since_date: date = WXINDEX_WORDS_RECORD_SINCE,
- include_expired: bool = True,
- fix_fetch_start: bool = True,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """为 hot_content_wxindex_words 中缺 meta 的词补登记,并修正 fetch_start_ymd。"""
- since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
- event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
- candidates = repository.list_wxindex_word_bounds_without_meta()
- register_summary: dict[str, int | str | bool] = {
- "since_date": since_date.isoformat(),
- "include_expired": include_expired,
- "candidates": len(candidates),
- "registered": 0,
- "skipped_expired": 0,
- "skipped_no_event": 0,
- }
- for item in candidates:
- name = str(item.get("name") or "").strip()
- if not name:
- continue
- meta, reason = try_register_wxindex_word_meta(
- repository,
- word=name,
- event_map=event_map,
- first_row_created_at=item.get("first_created_at"),
- include_expired=include_expired,
- dry_run=dry_run,
- )
- if reason in {"registered", "dry_run", "updated"}:
- register_summary["registered"] += 1
- if verbose:
- label = "[dry-run] would register" if dry_run else "registered"
- print(f"{label} meta word={name} event_at={meta['event_created_at']}")
- elif reason == "expired":
- register_summary["skipped_expired"] += 1
- if verbose:
- print(f"skip expired word={name}")
- elif reason == "no_event":
- register_summary["skipped_no_event"] += 1
- if verbose:
- print(f"skip no_event word={name}")
- fetch_start_summary: dict[str, int] | None = None
- if fix_fetch_start:
- fetch_start_summary = fix_wxindex_word_meta_fetch_start_ymd(
- repository,
- dry_run=dry_run,
- verbose=verbose,
- )
- return {
- "register": register_summary,
- "fetch_start_fix": fetch_start_summary,
- }
- def backfill_active_wxindex_word_meta(
- repository: HotContentRepository,
- *,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int | str]:
- """为表中仍处 7 天窗口内、但缺少 meta 的词补登记。"""
- current = datetime.now(SHANGHAI_TZ).date()
- since_dt = datetime.combine(
- current - timedelta(days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS),
- datetime.min.time(),
- ).replace(tzinfo=SHANGHAI_TZ)
- event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
- candidates = repository.list_wxindex_word_bounds_without_meta()
- summary: dict[str, int | str] = {
- "active_since": since_dt.date().isoformat(),
- "candidates": len(candidates),
- "registered": 0,
- "skipped_expired": 0,
- "skipped_no_event": 0,
- }
- for item in candidates:
- name = str(item.get("name") or "").strip()
- if not name:
- continue
- meta, reason = try_register_wxindex_word_meta(
- repository,
- word=name,
- event_map=event_map,
- first_row_created_at=item.get("first_created_at"),
- dry_run=dry_run,
- )
- if reason == "registered":
- summary["registered"] += 1
- if verbose:
- print(f"registered meta word={name} event_at={meta['event_created_at']}")
- elif reason == "dry_run":
- summary["registered"] += 1
- if verbose:
- print(f"[dry-run] would register meta word={name}")
- elif reason == "expired":
- summary["skipped_expired"] += 1
- if verbose:
- print(f"skip expired word={name}")
- elif reason == "no_event":
- summary["skipped_no_event"] += 1
- if verbose:
- print(f"skip no_event word={name}")
- return summary
- def run_wxindex_words_daily_job(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- end_ymd: str | None = None,
- min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """定时任务:补 meta、补全缺失日期、清理低最大值词。"""
- meta_summary = backfill_active_wxindex_word_meta(
- repository,
- dry_run=dry_run,
- verbose=verbose,
- )
- refresh_summary = refresh_stale_wxindex_words(
- repository,
- api_client,
- api_url,
- end_ymd=end_ymd,
- dry_run=dry_run,
- verbose=verbose,
- )
- cleanup_summary = cleanup_low_max_wxindex_words(
- repository,
- min_max_score=min_max_score,
- dry_run=dry_run,
- verbose=verbose,
- )
- return {
- "meta_backfill": meta_summary,
- "refresh": refresh_summary,
- "cleanup": cleanup_summary,
- }
- def ensure_word_full_scores(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- keyword: str,
- end_ymd: str | None = None,
- event_created_at: datetime | None = None,
- include_expired: bool = False,
- force_refresh: bool = False,
- dry_run: bool = False,
- update_meta_if_exists: bool = False,
- ) -> tuple[list[dict[str, Any]], str]:
- """
- 获取词微信指数并入库。
- - meta 表:窗口内存在指数 > 10 万才写入/更新
- - wxindex_words:仅保留 [fetch_start_ymd, fetch_end_ymd] 区间内数据
- - 超过 7 天窗口:不再更新
- 返回 (scores, action)。
- """
- word = str(keyword or "").strip()
- if not word:
- return [], "empty"
- target_end = end_ymd or get_wxindex_end_ymd()
- stored_scores = repository.list_wxindex_word_scores(word)
- meta = repository.get_wxindex_word_meta(word)
- should_register_meta = meta is None and event_created_at is not None
- should_update_meta = (
- update_meta_if_exists
- and meta is not None
- and event_created_at is not None
- )
- fetch_start_ymd: str | None = None
- fetch_end_ymd: str | None = None
- if event_created_at is not None:
- normalized_event_at = normalize_event_created_at(event_created_at)
- if not include_expired and not is_word_update_active(normalized_event_at):
- if meta is None:
- return stored_scores, "expired"
- if not should_update_meta:
- return stored_scores, "expired"
- fetch_start_ymd, fetch_end_ymd = get_fetch_ymd_bounds_from_event(
- normalized_event_at
- )
- elif meta is not None:
- fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
- fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
- if not fetch_end_ymd:
- fetch_end_ymd = get_fetch_end_ymd_from_event(meta["event_created_at"])
- if not include_expired and not is_word_update_active(meta["event_created_at"]):
- return stored_scores, "expired"
- else:
- return stored_scores, "legacy"
- if not fetch_start_ymd or not fetch_end_ymd:
- return stored_scores, "legacy"
- api_end_ymd = min(fetch_end_ymd, target_end)
- fetch_range = None if force_refresh else get_supplement_fetch_range(
- stored_scores,
- end_ymd=api_end_ymd,
- start_ymd=fetch_start_ymd,
- )
- if fetch_range is None and stored_scores and meta is not None and not should_update_meta:
- merged_scores = merge_wxindex_score_series(stored_scores)
- window_scores = filter_scores_in_ymd_window(
- merged_scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- return window_scores, "cached"
- if dry_run:
- return [], "dry_run"
- had_data = bool(stored_scores)
- start_ymd, fetch_end_ymd_api = fetch_range or (fetch_start_ymd, api_end_ymd)
- api_scores: list[dict[str, Any]] = []
- if fetch_range is not None or not stored_scores or force_refresh:
- api_scores = fetch_wxindex_scores(
- api_client,
- api_url,
- keyword=word,
- start_ymd=start_ymd,
- end_ymd=fetch_end_ymd_api,
- )
- merged_scores = merge_wxindex_score_series(stored_scores, api_scores)
- window_scores = filter_scores_in_ymd_window(
- merged_scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- if should_register_meta or should_update_meta:
- if not word_has_high_score_in_window(
- window_scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- ):
- return stored_scores, "below_threshold"
- meta, register_reason = try_register_wxindex_word_meta(
- repository,
- word=word,
- event_created_at=event_created_at,
- include_expired=include_expired,
- dry_run=dry_run,
- update_if_exists=should_update_meta,
- )
- if meta is None:
- if register_reason == "expired":
- return stored_scores, "expired"
- return stored_scores, "legacy"
- elif meta is None:
- return stored_scores, "below_threshold"
- if meta is None:
- return stored_scores, "legacy"
- if not api_scores and not window_scores:
- return stored_scores, "api_empty"
- if not had_data and not word_has_high_score_in_window(
- window_scores,
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- ):
- return [], "below_threshold"
- inserted, _skipped = repository.save_wxindex_daily_scores(
- name=word,
- scores=window_scores,
- )
- final_scores = filter_scores_in_ymd_window(
- repository.list_wxindex_word_scores(word),
- start_ymd=fetch_start_ymd,
- end_ymd=fetch_end_ymd,
- )
- if inserted > 0:
- action = "updated" if had_data else "inserted"
- elif final_scores:
- action = "cached"
- else:
- action = "api_empty"
- return final_scores or window_scores, action
- def sync_words_from_trend_json(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- trend_json: dict[str, Any],
- record_id: int,
- event_created_at: datetime | None = None,
- dry_run: bool = False,
- verbose: bool = False,
- update_meta_if_exists: bool = False,
- ) -> dict[str, int]:
- """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(近 7 日数据)。"""
- summary = {
- "words_found": 0,
- "inserted": 0,
- "updated": 0,
- "cached": 0,
- "legacy": 0,
- "expired": 0,
- "api_empty": 0,
- "below_threshold": 0,
- "fetch_failed": 0,
- }
- words = extract_searched_words(trend_json)
- summary["words_found"] = len(words)
- if not words:
- return summary
- for name in words:
- try:
- _, action = ensure_word_full_scores(
- repository,
- api_client,
- api_url,
- keyword=name,
- event_created_at=event_created_at,
- dry_run=dry_run,
- update_meta_if_exists=update_meta_if_exists,
- )
- except Exception as exc:
- summary["fetch_failed"] += 1
- if verbose:
- print(f" fetch failed word={name}: {exc}")
- continue
- if action == "inserted":
- summary["inserted"] += 1
- elif action == "updated":
- summary["updated"] += 1
- elif action == "cached":
- summary["cached"] += 1
- elif action == "legacy":
- summary["legacy"] += 1
- elif action == "expired":
- summary["expired"] += 1
- elif action == "api_empty":
- summary["api_empty"] += 1
- elif action == "below_threshold":
- summary["below_threshold"] += 1
- elif action == "dry_run":
- summary["inserted"] += 1
- if verbose:
- print(f" word={name} action={action}")
- return summary
- def backfill_wxindex_words(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- since_date: date = WXINDEX_WORDS_RECORD_SINCE,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, int]:
- """扫描 hot_content_records,汇总 6/11 起全部微信指数检索词(历史回填调 API)。"""
- summary = {
- "records_scanned": 0,
- "records_with_words": 0,
- "words_found": 0,
- "inserted": 0,
- "updated": 0,
- "cached": 0,
- "legacy": 0,
- "expired": 0,
- "api_empty": 0,
- "below_threshold": 0,
- "fetch_failed": 0,
- "invalid_json": 0,
- }
- since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
- records = repository.list_records_with_wxindex_trend(since_dt=since_dt)
- for row in records:
- summary["records_scanned"] += 1
- record_id = int(row["id"])
- try:
- trend_json = row.get("wxindex_trend_json")
- if not isinstance(trend_json, dict):
- summary["invalid_json"] += 1
- continue
- except (TypeError, ValueError):
- summary["invalid_json"] += 1
- continue
- words = extract_searched_words(trend_json)
- if not words:
- continue
- summary["records_with_words"] += 1
- if verbose:
- print(f"id={record_id} words={words}")
- result = sync_words_from_trend_json(
- repository,
- api_client,
- api_url,
- trend_json=trend_json,
- record_id=record_id,
- event_created_at=row.get("created_at"),
- dry_run=dry_run,
- verbose=verbose,
- )
- for key in (
- "words_found",
- "inserted",
- "updated",
- "cached",
- "legacy",
- "expired",
- "api_empty",
- "below_threshold",
- "fetch_failed",
- ):
- summary[key] += result[key]
- return summary
- def build_word_event_map_from_records(
- records: list[dict[str, Any]],
- ) -> dict[str, datetime]:
- word_events: dict[str, datetime] = {}
- for row in records:
- created_at = row.get("created_at")
- if not isinstance(created_at, datetime):
- continue
- for word in extract_searched_words(row.get("wxindex_trend_json")):
- existing = word_events.get(word)
- if existing is None or created_at < existing:
- word_events[word] = created_at
- return word_events
- def audit_wxindex_words_from_records(
- repository: HotContentRepository,
- *,
- after_created_at: datetime,
- ) -> dict[str, Any]:
- """检查指定时间后的热点记录,其微信指数词是否已在汇总表和 meta 表。"""
- records = repository.list_records_with_wxindex_trend_after(
- after_created_at=after_created_at,
- )
- word_events = build_word_event_map_from_records(records)
- missing_words: list[str] = []
- missing_meta: list[str] = []
- for word in sorted(word_events):
- if not repository.has_wxindex_word(word):
- missing_words.append(word)
- elif repository.get_wxindex_word_meta(word) is None:
- missing_meta.append(word)
- return {
- "after_created_at": after_created_at.isoformat(sep=" ", timespec="seconds"),
- "records_scanned": len(records),
- "words_found": len(word_events),
- "missing_words_count": len(missing_words),
- "missing_meta_count": len(missing_meta),
- "missing_words": missing_words,
- "missing_meta": missing_meta,
- }
- def supplement_wxindex_words_from_records(
- repository: HotContentRepository,
- api_client: JsonApiClient,
- api_url: str,
- *,
- after_created_at: datetime,
- dry_run: bool = False,
- verbose: bool = False,
- ) -> dict[str, Any]:
- """补全指定时间后热点记录涉及、但缺失的 wxindex 词与 meta。"""
- audit = audit_wxindex_words_from_records(
- repository,
- after_created_at=after_created_at,
- )
- records = repository.list_records_with_wxindex_trend_after(
- after_created_at=after_created_at,
- )
- word_events = build_word_event_map_from_records(records)
- summary: dict[str, Any] = {
- "audit_before": {
- "records_scanned": audit["records_scanned"],
- "words_found": audit["words_found"],
- "missing_words_count": audit["missing_words_count"],
- "missing_meta_count": audit["missing_meta_count"],
- },
- "supplemented_words": 0,
- "inserted": 0,
- "updated": 0,
- "cached": 0,
- "meta_registered": 0,
- "api_empty": 0,
- "below_threshold": 0,
- "fetch_failed": 0,
- }
- for word, event_at in sorted(word_events.items()):
- had_meta = repository.get_wxindex_word_meta(word) is not None
- try:
- _, action = ensure_word_full_scores(
- repository,
- api_client,
- api_url,
- keyword=word,
- event_created_at=event_at,
- include_expired=True,
- dry_run=dry_run,
- )
- except Exception as exc:
- summary["fetch_failed"] += 1
- if verbose:
- print(f"fetch failed word={word}: {exc}")
- continue
- summary["supplemented_words"] += 1
- if action == "inserted":
- summary["inserted"] += 1
- elif action == "updated":
- summary["updated"] += 1
- elif action == "cached":
- summary["cached"] += 1
- elif action == "api_empty":
- summary["api_empty"] += 1
- elif action == "below_threshold":
- summary["below_threshold"] += 1
- elif action == "dry_run":
- summary["inserted"] += 1
- if not had_meta:
- if dry_run or repository.get_wxindex_word_meta(word) is not None:
- summary["meta_registered"] += 1
- if verbose:
- print(f"word={word} event_at={event_at} action={action}")
- audit_after = audit_wxindex_words_from_records(
- repository,
- after_created_at=after_created_at,
- )
- summary["audit_after"] = {
- "missing_words_count": audit_after["missing_words_count"],
- "missing_meta_count": audit_after["missing_meta_count"],
- "missing_words": audit_after["missing_words"],
- "missing_meta": audit_after["missing_meta"],
- }
- return summary
|