"""微信指数检索词汇总:从 wxindex_trend_json 提取全部检索词并持久化每日指数。""" from __future__ import annotations from datetime import date, datetime, timedelta from typing import Any from app.hot_content.client import JsonApiClient from app.hot_content.demand_export import get_wxindex_keywords from app.hot_content.exceptions import HotContentFlowError from app.hot_content.repository import HotContentRepository from app.hot_content.timezone import SHANGHAI_TZ WXINDEX_WORDS_LOOKBACK_DAYS = 7 WXINDEX_WORDS_UPDATE_WINDOW_DAYS = 7 WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11) WXINDEX_WORDS_MIN_MAX_SCORE = 100_000.0 def get_fetch_start_ymd_from_event( event_created_at: datetime, *, lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS, ) -> str: """数据窗口左边界:事件创建日往前 N 天(yyyymmdd)。""" event_date = normalize_event_created_at(event_created_at).date() start_date = event_date - timedelta(days=lookback_days) return start_date.strftime("%Y%m%d") def get_fetch_end_ymd_from_event( event_created_at: datetime, *, forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS, ) -> str: """数据窗口右边界:事件创建日后 N 天(yyyymmdd)。""" event_date = normalize_event_created_at(event_created_at).date() end_date = event_date + timedelta(days=forward_days) return end_date.strftime("%Y%m%d") def get_fetch_ymd_bounds_from_event( event_created_at: datetime, *, lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS, forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS, ) -> tuple[str, str]: return ( get_fetch_start_ymd_from_event( event_created_at, lookback_days=lookback_days, ), get_fetch_end_ymd_from_event( event_created_at, forward_days=forward_days, ), ) def get_word_data_window_ymd_bounds( event_created_at: datetime, *, window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS, ) -> tuple[str, str]: """事件创建日前后 N 天的数据窗口 [start_ymd, end_ymd]。""" event_date = normalize_event_created_at(event_created_at).date() start_date = event_date - timedelta(days=window_days) end_date = event_date + timedelta(days=window_days) return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d") def get_wxindex_fetch_start_ymd( *, today: date | None = None, lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS, ) -> str: """首次拉取起始日期:近 N 日(截至昨日)。""" start_ymd, _end_ymd = get_lookback_range(lookback_days, today=today) return start_ymd def normalize_event_created_at(value: datetime | None) -> datetime: if value is None: return datetime.now(SHANGHAI_TZ) if value.tzinfo is None: return value.replace(tzinfo=SHANGHAI_TZ) return value.astimezone(SHANGHAI_TZ) def is_word_update_active( event_created_at: datetime, *, today: date | None = None, window_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS, ) -> bool: """事件创建后 window_days 天内继续更新,超出则停止。""" current = today or datetime.now(SHANGHAI_TZ).date() event_date = normalize_event_created_at(event_created_at).date() return (current - event_date).days <= window_days def get_wxindex_end_ymd(*, today: date | None = None) -> str: current = today or datetime.now(SHANGHAI_TZ).date() return (current - timedelta(days=1)).strftime("%Y%m%d") def get_lookback_range(lookback_days: int, *, today: date | None = None) -> tuple[str, str]: """原流程使用的近 N 日区间(截至昨日)。""" current = today or datetime.now(SHANGHAI_TZ).date() end_date = current - timedelta(days=1) start_date = end_date - timedelta(days=max(lookback_days, 1)) return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d") def extract_searched_words(trend_json: dict[str, Any] | None) -> list[str]: """提取 wxindex_trend_json 中实际检索过微信指数的全部词(非仅最高分词)。""" if not isinstance(trend_json, dict): return [] words: list[str] = [] seen: set[str] = set() for item in trend_json.get("wxindex_searches") or []: if not isinstance(item, dict): continue keyword = str(item.get("keyword") or "").strip() if keyword and keyword not in seen: seen.add(keyword) words.append(keyword) if words: return words return get_wxindex_keywords(trend_json) def parse_wxindex_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]: rows = ((wx_resp.get("data") or {}).get("data") or []) if not isinstance(rows, list): return [] series: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): continue ymd = str(row.get("ymd") or "").strip() total_score = (row.get("channel_score") or {}).get("total_score") try: score_num = float(total_score) if total_score is not None else None except (TypeError, ValueError): score_num = None if ymd and score_num is not None: series.append({"ymd": ymd, "total_score": score_num}) series.sort(key=lambda item: item["ymd"]) return series def fetch_wxindex_scores( api_client: JsonApiClient, api_url: str, *, keyword: str, start_ymd: str, end_ymd: str | None = None, ) -> list[dict[str, Any]]: payload = { "keyword": keyword, "start_ymd": start_ymd, "end_ymd": end_ymd or get_wxindex_end_ymd(), } wx_resp = api_client.post_json(api_url, payload) return parse_wxindex_total_scores(wx_resp) def get_max_total_score(scores: list[dict[str, Any]]) -> float | None: """从指数序列中取 total_score 最大值。""" values: list[float] = [] for item in scores: if not isinstance(item, dict): continue try: values.append(float(item["total_score"])) except (TypeError, ValueError, KeyError): continue if not values: return None return max(values) def word_meets_max_score_threshold( scores: list[dict[str, Any]], *, min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE, ) -> bool: """新增词时:最大值需严格大于阈值(不超过阈值则不添加)。""" max_score = get_max_total_score(scores) if max_score is None: return False return max_score > min_max_score def filter_scores_in_ymd_window( scores: list[dict[str, Any]], *, start_ymd: str, end_ymd: str, ) -> list[dict[str, Any]]: start = str(start_ymd or "").strip() end = str(end_ymd or "").strip() if not start or not end: return [] filtered: list[dict[str, Any]] = [] for item in scores: if not isinstance(item, dict): continue ymd = str(item.get("ymd") or item.get("dt") or "").strip() if not ymd or ymd < start or ymd > end: continue filtered.append(item) filtered.sort(key=lambda row: str(row.get("ymd") or row.get("dt") or "")) return filtered def word_has_high_score_in_window( scores: list[dict[str, Any]], *, start_ymd: str, end_ymd: str, min_score: float = WXINDEX_WORDS_MIN_MAX_SCORE, ) -> bool: """窗口内是否存在严格大于阈值的微信指数。""" window_scores = filter_scores_in_ymd_window( scores, start_ymd=start_ymd, end_ymd=end_ymd, ) for item in window_scores: try: score = float(item["total_score"]) except (TypeError, ValueError, KeyError): continue if score > min_score: return True return False def merge_wxindex_score_series( *series_list: list[dict[str, Any]], ) -> list[dict[str, Any]]: merged: dict[str, dict[str, Any]] = {} for series in series_list: for item in series: if not isinstance(item, dict): continue ymd = str(item.get("ymd") or item.get("dt") or "").strip() if not ymd: continue try: total_score = float(item["total_score"]) except (TypeError, ValueError, KeyError): continue merged[ymd] = {"ymd": ymd, "total_score": total_score} return sorted(merged.values(), key=lambda row: row["ymd"]) def get_word_score_bounds( scores: list[dict[str, Any]], ) -> tuple[str | None, str | None]: ymds = [ str(item.get("ymd") or item.get("dt") or "").strip() for item in scores if isinstance(item, dict) and str(item.get("ymd") or item.get("dt") or "").strip() ] if not ymds: return None, None return min(ymds), max(ymds) def word_scores_need_supplement( scores: list[dict[str, Any]], *, end_ymd: str | None = None, start_ymd: str, ) -> tuple[bool, str]: """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。""" if not scores: return True, "empty" target_end = end_ymd or get_wxindex_end_ymd() earliest_ymd, latest_ymd = get_word_score_bounds(scores) if earliest_ymd is None or latest_ymd is None: return True, "empty" if earliest_ymd > start_ymd: return True, "missing_start" if latest_ymd < target_end: return True, "missing_end" return False, "complete" def get_supplement_fetch_range( scores: list[dict[str, Any]], *, end_ymd: str | None = None, start_ymd: str, ) -> tuple[str, str] | None: """计算补数 API 查询区间;无需补数时返回 None。""" need_supplement, reason = word_scores_need_supplement( scores, end_ymd=end_ymd, start_ymd=start_ymd, ) if not need_supplement: return None target_end = end_ymd or get_wxindex_end_ymd() if reason == "empty": return start_ymd, target_end earliest_ymd, latest_ymd = get_word_score_bounds(scores) if earliest_ymd is None or latest_ymd is None: return start_ymd, target_end if earliest_ymd > start_ymd: return start_ymd, target_end return next_ymd(latest_ymd), target_end def slice_scores_lookback( full_scores: list[dict[str, Any]], lookback_days: int, *, today: date | None = None, ) -> tuple[list[dict[str, Any]], str, str]: """从全量序列截取原流程所需的近 N 日数据。""" start_ymd, end_ymd = get_lookback_range(lookback_days, today=today) series = [ item for item in full_scores if isinstance(item, dict) and start_ymd <= str(item.get("ymd") or "") <= end_ymd ] series.sort(key=lambda item: str(item.get("ymd") or "")) if series: return series, start_ymd, end_ymd return [], start_ymd, end_ymd def next_ymd(ymd: str) -> str: current = datetime.strptime(ymd, "%Y%m%d").date() return (current + timedelta(days=1)).strftime("%Y%m%d") def refresh_stale_wxindex_words( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, end_ymd: str | None = None, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int]: """补全已存在但缺少最新日期数据的词(仅含 meta 的新词)。""" target_end = end_ymd or get_wxindex_end_ymd() summary = { "target_end_ymd": target_end, "stale_words": 0, "refreshed": 0, "inserted_rows": 0, "skipped_rows": 0, "fetch_failed": 0, "api_empty": 0, "no_new_range": 0, } stale_words = repository.list_stale_wxindex_words( end_ymd=target_end, update_window_days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS, ) summary["stale_words"] = len(stale_words) if not stale_words: return summary for item in stale_words: name = str(item.get("name") or "").strip() if not name: continue word_start_ymd = str(item.get("fetch_start_ymd") or "").strip() if not word_start_ymd: word_start_ymd = get_wxindex_fetch_start_ymd() stored_scores = repository.list_wxindex_word_scores(name) fetch_range = get_supplement_fetch_range( stored_scores, end_ymd=target_end, start_ymd=word_start_ymd, ) if fetch_range is None: summary["no_new_range"] += 1 if verbose: print(f"skip complete word={name}") continue start_ymd, end_ymd = fetch_range if start_ymd > end_ymd: summary["no_new_range"] += 1 if verbose: print(f"skip up-to-date word={name}") continue if dry_run: summary["refreshed"] += 1 if verbose: print(f"[dry-run] would refresh word={name} {start_ymd}->{end_ymd}") continue try: api_scores = fetch_wxindex_scores( api_client, api_url, keyword=name, start_ymd=start_ymd, end_ymd=end_ymd, ) if not api_scores: summary["api_empty"] += 1 if verbose: print(f"api empty word={name} range={start_ymd}->{end_ymd}") continue inserted, skipped = repository.save_wxindex_daily_scores( name=name, scores=api_scores, ) except Exception as exc: summary["fetch_failed"] += 1 if verbose: print(f"refresh failed word={name}: {exc}") continue if inserted <= 0: summary["api_empty"] += 1 if verbose: print( f"no new rows word={name} range={start_ymd}->{end_ymd} " f"api_rows={len(api_scores)} skipped={skipped}" ) continue summary["refreshed"] += 1 summary["inserted_rows"] += inserted summary["skipped_rows"] += skipped if verbose: print( f"refreshed word={name} range={start_ymd}->{end_ymd} " f"inserted={inserted} skipped={skipped}" ) return summary def sync_wxindex_words_from_meta( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, end_ymd: str | None = None, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """ 按 hot_content_wxindex_word_meta 同步 hot_content_wxindex_words。 1. 删除 meta 中不存在的词 2. 删除窗口 [fetch_start_ymd, fetch_end_ymd] 外的日期 3. 补全窗口内缺失日期(含 fetch_start 早于昨日的历史段,如 20260615 之前) """ target_end = end_ymd or get_wxindex_end_ymd() summary: dict[str, Any] = { "target_end_ymd": target_end, "meta_count": 0, "deleted_without_meta_rows": 0, "deleted_outside_window_rows": 0, "words_need_refresh": 0, "refreshed": 0, "inserted_rows": 0, "skipped_rows": 0, "fetch_failed": 0, "api_empty": 0, "no_new_range": 0, "dry_run": dry_run, } if dry_run: summary["deleted_without_meta_rows"] = repository.count_wxindex_words_without_meta() summary["deleted_outside_window_rows"] = ( repository.count_wxindex_words_outside_event_window() ) else: summary["deleted_without_meta_rows"] = repository.delete_wxindex_words_without_meta() summary["deleted_outside_window_rows"] = ( repository.delete_wxindex_words_outside_event_window() ) meta_rows = repository.list_all_wxindex_word_meta() summary["meta_count"] = len(meta_rows) for meta in meta_rows: name = str(meta.get("name") or "").strip() fetch_start = str(meta.get("fetch_start_ymd") or "").strip() fetch_end = str(meta.get("fetch_end_ymd") or "").strip() if not name or not fetch_start or not fetch_end: continue api_end = min(fetch_end, target_end) if fetch_start > api_end: summary["no_new_range"] += 1 if verbose: print( f"skip out-of-range word={name} " f"window={fetch_start}~{fetch_end} api_end={api_end}" ) continue stored_scores = repository.list_wxindex_word_scores(name) fetch_range = get_supplement_fetch_range( stored_scores, end_ymd=api_end, start_ymd=fetch_start, ) if fetch_range is None: summary["no_new_range"] += 1 if verbose: print(f"skip complete word={name} window={fetch_start}~{fetch_end}") continue summary["words_need_refresh"] += 1 start_ymd, range_end = fetch_range if start_ymd > range_end: summary["no_new_range"] += 1 if verbose: print(f"skip up-to-date word={name}") continue if dry_run: summary["refreshed"] += 1 if verbose: print( f"[dry-run] would fetch word={name} " f"{start_ymd}->{range_end} " f"save_window={fetch_start}~{fetch_end}" ) continue try: api_scores = fetch_wxindex_scores( api_client, api_url, keyword=name, start_ymd=start_ymd, end_ymd=range_end, ) window_scores = filter_scores_in_ymd_window( api_scores, start_ymd=fetch_start, end_ymd=fetch_end, ) if not window_scores: summary["api_empty"] += 1 if verbose: print( f"api empty word={name} fetch={start_ymd}->{range_end} " f"window={fetch_start}~{fetch_end}" ) continue inserted, skipped = repository.save_wxindex_daily_scores( name=name, scores=window_scores, ) except Exception as exc: summary["fetch_failed"] += 1 if verbose: print(f"sync failed word={name}: {exc}") continue if inserted <= 0: summary["api_empty"] += 1 if verbose: print( f"no new rows word={name} fetch={start_ymd}->{range_end} " f"api_rows={len(window_scores)} skipped={skipped}" ) continue summary["refreshed"] += 1 summary["inserted_rows"] += inserted summary["skipped_rows"] += skipped if verbose: print( f"synced word={name} fetch={start_ymd}->{range_end} " f"inserted={inserted} skipped={skipped}" ) return summary def cleanup_low_max_wxindex_words( repository: HotContentRepository, *, min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int | float]: """删除各 dt 最大值低于阈值的词(按 name 整词删除)。""" summary: dict[str, int | float] = { "min_max_score": min_max_score, "low_max_words": 0, "deleted_rows": 0, } low_words = repository.list_low_max_wxindex_words(min_max_score=min_max_score) summary["low_max_words"] = len(low_words) if not low_words: return summary if dry_run: if verbose: for item in low_words: print( f"[dry-run] would delete word={item['name']} " f"max_score={item['max_score']:.0f} rows={item['row_count']}" ) summary["deleted_rows"] = sum(int(item["row_count"]) for item in low_words) return summary names = [str(item["name"]) for item in low_words if str(item.get("name") or "").strip()] deleted_rows = repository.delete_wxindex_words_by_names(names) summary["deleted_rows"] = deleted_rows if verbose: for item in low_words: print( f"deleted word={item['name']} " f"max_score={item['max_score']:.0f} rows={item['row_count']}" ) return summary def try_register_wxindex_word_meta( repository: HotContentRepository, *, word: str, event_created_at: datetime | None = None, event_map: dict[str, datetime] | None = None, first_row_created_at: datetime | None = None, include_expired: bool = False, dry_run: bool = False, update_if_exists: bool = False, ) -> tuple[dict[str, Any] | None, str]: """补写 meta;返回 (meta, reason)。""" name = str(word or "").strip() if not name: return None, "empty" existing = repository.get_wxindex_word_meta(name) resolved_event_at = event_created_at if resolved_event_at is None and event_map is not None: resolved_event_at = event_map.get(name) if resolved_event_at is None: resolved_event_at = first_row_created_at if resolved_event_at is None: resolved_event_at = repository.get_wxindex_word_first_row_created_at(name) if resolved_event_at is None: if existing: return existing, "exists" return None, "no_event" normalized_event_at = normalize_event_created_at(resolved_event_at) if not include_expired and not is_word_update_active(normalized_event_at): if existing and not update_if_exists: return existing, "exists" if not existing: return None, "expired" fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at) fetch_end_ymd = get_fetch_end_ymd_from_event(normalized_event_at) if dry_run: return { "name": name, "event_created_at": normalized_event_at, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, }, "dry_run" if existing and update_if_exists: repository.update_wxindex_word_meta( name=name, event_created_at=normalized_event_at, fetch_start_ymd=fetch_start_ymd, fetch_end_ymd=fetch_end_ymd, ) meta = repository.get_wxindex_word_meta(name) if meta is None: raise HotContentFlowError(f"failed to update wxindex word meta: {name}") return meta, "updated" if existing: return existing, "exists" meta = repository.ensure_wxindex_word_meta( name=name, event_created_at=normalized_event_at, fetch_start_ymd=fetch_start_ymd, fetch_end_ymd=fetch_end_ymd, ) return meta, "registered" def fix_wxindex_word_meta_fetch_bounds( repository: HotContentRepository, *, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int]: """按 event_created_at 修正 meta.fetch_start_ymd / fetch_end_ymd。""" rows = repository.list_all_wxindex_word_meta() summary = { "total": len(rows), "updated": 0, "unchanged": 0, } for row in rows: name = str(row.get("name") or "").strip() event_created_at = row.get("event_created_at") old_fetch_start = str(row.get("fetch_start_ymd") or "").strip() old_fetch_end = str(row.get("fetch_end_ymd") or "").strip() if not name or event_created_at is None: continue new_fetch_start, new_fetch_end = get_fetch_ymd_bounds_from_event(event_created_at) if new_fetch_start == old_fetch_start and new_fetch_end == old_fetch_end: summary["unchanged"] += 1 continue if dry_run: summary["updated"] += 1 if verbose: print( f"[dry-run] word={name} " f"start {old_fetch_start}->{new_fetch_start} " f"end {old_fetch_end}->{new_fetch_end}" ) continue repository.update_wxindex_word_meta( name=name, event_created_at=event_created_at, fetch_start_ymd=new_fetch_start, fetch_end_ymd=new_fetch_end, ) summary["updated"] += 1 if verbose: print( f"updated word={name} " f"start {old_fetch_start}->{new_fetch_start} " f"end {old_fetch_end}->{new_fetch_end}" ) return summary def fix_wxindex_word_meta_fetch_start_ymd( repository: HotContentRepository, *, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int]: """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。""" return fix_wxindex_word_meta_fetch_bounds( repository, dry_run=dry_run, verbose=verbose, ) def cleanup_wxindex_words_outside_event_window( repository: HotContentRepository, *, window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int]: """删除 hot_content_wxindex_words 中超出 event_created_at 前后 window_days 的数据。""" to_delete = repository.count_wxindex_words_outside_event_window( window_days=window_days, ) summary = { "window_days": window_days, "rows_to_delete": to_delete, "deleted_rows": 0, "words_without_meta_rows": repository.count_wxindex_words_without_meta(), } if to_delete <= 0: return summary if dry_run: if verbose: samples = repository.list_wxindex_words_outside_event_window_samples( window_days=window_days, limit=20, ) for item in samples: print( f"[dry-run] word={item['name']} dt={item['dt']} " f"window={item['start_ymd']}~{item['end_ymd']} " f"event_created_at={item['event_created_at']}" ) return summary summary["deleted_rows"] = repository.delete_wxindex_words_outside_event_window( window_days=window_days, ) if verbose: print(f"deleted_rows={summary['deleted_rows']}") return summary def build_word_earliest_event_map( repository: HotContentRepository, *, since_dt: datetime, ) -> dict[str, datetime]: """从热点记录中汇总每个检索词对应的最早事件创建时间。""" return repository.list_word_earliest_event_times(since_dt=since_dt) def backfill_wxindex_word_meta( repository: HotContentRepository, *, since_date: date = WXINDEX_WORDS_RECORD_SINCE, include_expired: bool = True, fix_fetch_start: bool = True, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """为 hot_content_wxindex_words 中缺 meta 的词补登记,并修正 fetch_start_ymd。""" since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ) event_map = build_word_earliest_event_map(repository, since_dt=since_dt) candidates = repository.list_wxindex_word_bounds_without_meta() register_summary: dict[str, int | str | bool] = { "since_date": since_date.isoformat(), "include_expired": include_expired, "candidates": len(candidates), "registered": 0, "skipped_expired": 0, "skipped_no_event": 0, } for item in candidates: name = str(item.get("name") or "").strip() if not name: continue meta, reason = try_register_wxindex_word_meta( repository, word=name, event_map=event_map, first_row_created_at=item.get("first_created_at"), include_expired=include_expired, dry_run=dry_run, ) if reason in {"registered", "dry_run", "updated"}: register_summary["registered"] += 1 if verbose: label = "[dry-run] would register" if dry_run else "registered" print(f"{label} meta word={name} event_at={meta['event_created_at']}") elif reason == "expired": register_summary["skipped_expired"] += 1 if verbose: print(f"skip expired word={name}") elif reason == "no_event": register_summary["skipped_no_event"] += 1 if verbose: print(f"skip no_event word={name}") fetch_start_summary: dict[str, int] | None = None if fix_fetch_start: fetch_start_summary = fix_wxindex_word_meta_fetch_start_ymd( repository, dry_run=dry_run, verbose=verbose, ) return { "register": register_summary, "fetch_start_fix": fetch_start_summary, } def backfill_active_wxindex_word_meta( repository: HotContentRepository, *, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int | str]: """为表中仍处 7 天窗口内、但缺少 meta 的词补登记。""" current = datetime.now(SHANGHAI_TZ).date() since_dt = datetime.combine( current - timedelta(days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS), datetime.min.time(), ).replace(tzinfo=SHANGHAI_TZ) event_map = build_word_earliest_event_map(repository, since_dt=since_dt) candidates = repository.list_wxindex_word_bounds_without_meta() summary: dict[str, int | str] = { "active_since": since_dt.date().isoformat(), "candidates": len(candidates), "registered": 0, "skipped_expired": 0, "skipped_no_event": 0, } for item in candidates: name = str(item.get("name") or "").strip() if not name: continue meta, reason = try_register_wxindex_word_meta( repository, word=name, event_map=event_map, first_row_created_at=item.get("first_created_at"), dry_run=dry_run, ) if reason == "registered": summary["registered"] += 1 if verbose: print(f"registered meta word={name} event_at={meta['event_created_at']}") elif reason == "dry_run": summary["registered"] += 1 if verbose: print(f"[dry-run] would register meta word={name}") elif reason == "expired": summary["skipped_expired"] += 1 if verbose: print(f"skip expired word={name}") elif reason == "no_event": summary["skipped_no_event"] += 1 if verbose: print(f"skip no_event word={name}") return summary def run_wxindex_words_daily_job( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, end_ymd: str | None = None, min_max_score: float = WXINDEX_WORDS_MIN_MAX_SCORE, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """定时任务:补 meta、补全缺失日期、清理低最大值词。""" meta_summary = backfill_active_wxindex_word_meta( repository, dry_run=dry_run, verbose=verbose, ) refresh_summary = refresh_stale_wxindex_words( repository, api_client, api_url, end_ymd=end_ymd, dry_run=dry_run, verbose=verbose, ) cleanup_summary = cleanup_low_max_wxindex_words( repository, min_max_score=min_max_score, dry_run=dry_run, verbose=verbose, ) return { "meta_backfill": meta_summary, "refresh": refresh_summary, "cleanup": cleanup_summary, } def ensure_word_full_scores( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, keyword: str, end_ymd: str | None = None, event_created_at: datetime | None = None, include_expired: bool = False, force_refresh: bool = False, dry_run: bool = False, update_meta_if_exists: bool = False, ) -> tuple[list[dict[str, Any]], str]: """ 获取词微信指数并入库。 - meta 表:窗口内存在指数 > 10 万才写入/更新 - wxindex_words:仅保留 [fetch_start_ymd, fetch_end_ymd] 区间内数据 - 超过 7 天窗口:不再更新 返回 (scores, action)。 """ word = str(keyword or "").strip() if not word: return [], "empty" target_end = end_ymd or get_wxindex_end_ymd() stored_scores = repository.list_wxindex_word_scores(word) meta = repository.get_wxindex_word_meta(word) should_register_meta = meta is None and event_created_at is not None should_update_meta = ( update_meta_if_exists and meta is not None and event_created_at is not None ) fetch_start_ymd: str | None = None fetch_end_ymd: str | None = None if event_created_at is not None: normalized_event_at = normalize_event_created_at(event_created_at) if not include_expired and not is_word_update_active(normalized_event_at): if meta is None: return stored_scores, "expired" if not should_update_meta: return stored_scores, "expired" fetch_start_ymd, fetch_end_ymd = get_fetch_ymd_bounds_from_event( normalized_event_at ) elif meta is not None: fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip() fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip() if not fetch_end_ymd: fetch_end_ymd = get_fetch_end_ymd_from_event(meta["event_created_at"]) if not include_expired and not is_word_update_active(meta["event_created_at"]): return stored_scores, "expired" else: return stored_scores, "legacy" if not fetch_start_ymd or not fetch_end_ymd: return stored_scores, "legacy" api_end_ymd = min(fetch_end_ymd, target_end) fetch_range = None if force_refresh else get_supplement_fetch_range( stored_scores, end_ymd=api_end_ymd, start_ymd=fetch_start_ymd, ) if fetch_range is None and stored_scores and meta is not None and not should_update_meta: merged_scores = merge_wxindex_score_series(stored_scores) window_scores = filter_scores_in_ymd_window( merged_scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ) return window_scores, "cached" if dry_run: return [], "dry_run" had_data = bool(stored_scores) start_ymd, fetch_end_ymd_api = fetch_range or (fetch_start_ymd, api_end_ymd) api_scores: list[dict[str, Any]] = [] if fetch_range is not None or not stored_scores or force_refresh: api_scores = fetch_wxindex_scores( api_client, api_url, keyword=word, start_ymd=start_ymd, end_ymd=fetch_end_ymd_api, ) merged_scores = merge_wxindex_score_series(stored_scores, api_scores) window_scores = filter_scores_in_ymd_window( merged_scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ) if should_register_meta or should_update_meta: if not word_has_high_score_in_window( window_scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ): return stored_scores, "below_threshold" meta, register_reason = try_register_wxindex_word_meta( repository, word=word, event_created_at=event_created_at, include_expired=include_expired, dry_run=dry_run, update_if_exists=should_update_meta, ) if meta is None: if register_reason == "expired": return stored_scores, "expired" return stored_scores, "legacy" elif meta is None: return stored_scores, "below_threshold" if meta is None: return stored_scores, "legacy" if not api_scores and not window_scores: return stored_scores, "api_empty" if not had_data and not word_has_high_score_in_window( window_scores, start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ): return [], "below_threshold" inserted, _skipped = repository.save_wxindex_daily_scores( name=word, scores=window_scores, ) final_scores = filter_scores_in_ymd_window( repository.list_wxindex_word_scores(word), start_ymd=fetch_start_ymd, end_ymd=fetch_end_ymd, ) if inserted > 0: action = "updated" if had_data else "inserted" elif final_scores: action = "cached" else: action = "api_empty" return final_scores or window_scores, action def sync_words_from_trend_json( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, trend_json: dict[str, Any], record_id: int, event_created_at: datetime | None = None, dry_run: bool = False, verbose: bool = False, update_meta_if_exists: bool = False, ) -> dict[str, int]: """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(近 7 日数据)。""" summary = { "words_found": 0, "inserted": 0, "updated": 0, "cached": 0, "legacy": 0, "expired": 0, "api_empty": 0, "below_threshold": 0, "fetch_failed": 0, } words = extract_searched_words(trend_json) summary["words_found"] = len(words) if not words: return summary for name in words: try: _, action = ensure_word_full_scores( repository, api_client, api_url, keyword=name, event_created_at=event_created_at, dry_run=dry_run, update_meta_if_exists=update_meta_if_exists, ) except Exception as exc: summary["fetch_failed"] += 1 if verbose: print(f" fetch failed word={name}: {exc}") continue if action == "inserted": summary["inserted"] += 1 elif action == "updated": summary["updated"] += 1 elif action == "cached": summary["cached"] += 1 elif action == "legacy": summary["legacy"] += 1 elif action == "expired": summary["expired"] += 1 elif action == "api_empty": summary["api_empty"] += 1 elif action == "below_threshold": summary["below_threshold"] += 1 elif action == "dry_run": summary["inserted"] += 1 if verbose: print(f" word={name} action={action}") return summary def backfill_wxindex_words( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, since_date: date = WXINDEX_WORDS_RECORD_SINCE, dry_run: bool = False, verbose: bool = False, ) -> dict[str, int]: """扫描 hot_content_records,汇总 6/11 起全部微信指数检索词(历史回填调 API)。""" summary = { "records_scanned": 0, "records_with_words": 0, "words_found": 0, "inserted": 0, "updated": 0, "cached": 0, "legacy": 0, "expired": 0, "api_empty": 0, "below_threshold": 0, "fetch_failed": 0, "invalid_json": 0, } since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ) records = repository.list_records_with_wxindex_trend(since_dt=since_dt) for row in records: summary["records_scanned"] += 1 record_id = int(row["id"]) try: trend_json = row.get("wxindex_trend_json") if not isinstance(trend_json, dict): summary["invalid_json"] += 1 continue except (TypeError, ValueError): summary["invalid_json"] += 1 continue words = extract_searched_words(trend_json) if not words: continue summary["records_with_words"] += 1 if verbose: print(f"id={record_id} words={words}") result = sync_words_from_trend_json( repository, api_client, api_url, trend_json=trend_json, record_id=record_id, event_created_at=row.get("created_at"), dry_run=dry_run, verbose=verbose, ) for key in ( "words_found", "inserted", "updated", "cached", "legacy", "expired", "api_empty", "below_threshold", "fetch_failed", ): summary[key] += result[key] return summary def build_word_event_map_from_records( records: list[dict[str, Any]], ) -> dict[str, datetime]: word_events: dict[str, datetime] = {} for row in records: created_at = row.get("created_at") if not isinstance(created_at, datetime): continue for word in extract_searched_words(row.get("wxindex_trend_json")): existing = word_events.get(word) if existing is None or created_at < existing: word_events[word] = created_at return word_events def audit_wxindex_words_from_records( repository: HotContentRepository, *, after_created_at: datetime, ) -> dict[str, Any]: """检查指定时间后的热点记录,其微信指数词是否已在汇总表和 meta 表。""" records = repository.list_records_with_wxindex_trend_after( after_created_at=after_created_at, ) word_events = build_word_event_map_from_records(records) missing_words: list[str] = [] missing_meta: list[str] = [] for word in sorted(word_events): if not repository.has_wxindex_word(word): missing_words.append(word) elif repository.get_wxindex_word_meta(word) is None: missing_meta.append(word) return { "after_created_at": after_created_at.isoformat(sep=" ", timespec="seconds"), "records_scanned": len(records), "words_found": len(word_events), "missing_words_count": len(missing_words), "missing_meta_count": len(missing_meta), "missing_words": missing_words, "missing_meta": missing_meta, } def supplement_wxindex_words_from_records( repository: HotContentRepository, api_client: JsonApiClient, api_url: str, *, after_created_at: datetime, dry_run: bool = False, verbose: bool = False, ) -> dict[str, Any]: """补全指定时间后热点记录涉及、但缺失的 wxindex 词与 meta。""" audit = audit_wxindex_words_from_records( repository, after_created_at=after_created_at, ) records = repository.list_records_with_wxindex_trend_after( after_created_at=after_created_at, ) word_events = build_word_event_map_from_records(records) summary: dict[str, Any] = { "audit_before": { "records_scanned": audit["records_scanned"], "words_found": audit["words_found"], "missing_words_count": audit["missing_words_count"], "missing_meta_count": audit["missing_meta_count"], }, "supplemented_words": 0, "inserted": 0, "updated": 0, "cached": 0, "meta_registered": 0, "api_empty": 0, "below_threshold": 0, "fetch_failed": 0, } for word, event_at in sorted(word_events.items()): had_meta = repository.get_wxindex_word_meta(word) is not None try: _, action = ensure_word_full_scores( repository, api_client, api_url, keyword=word, event_created_at=event_at, include_expired=True, dry_run=dry_run, ) except Exception as exc: summary["fetch_failed"] += 1 if verbose: print(f"fetch failed word={word}: {exc}") continue summary["supplemented_words"] += 1 if action == "inserted": summary["inserted"] += 1 elif action == "updated": summary["updated"] += 1 elif action == "cached": summary["cached"] += 1 elif action == "api_empty": summary["api_empty"] += 1 elif action == "below_threshold": summary["below_threshold"] += 1 elif action == "dry_run": summary["inserted"] += 1 if not had_meta: if dry_run or repository.get_wxindex_word_meta(word) is not None: summary["meta_registered"] += 1 if verbose: print(f"word={word} event_at={event_at} action={action}") audit_after = audit_wxindex_words_from_records( repository, after_created_at=after_created_at, ) summary["audit_after"] = { "missing_words_count": audit_after["missing_words_count"], "missing_meta_count": audit_after["missing_meta_count"], "missing_words": audit_after["missing_words"], "missing_meta": audit_after["missing_meta"], } return summary