Explorar o código

增加微信词统计

xueyiming hai 1 semana
pai
achega
45355d56cf

+ 10 - 0
app/core/config.py

@@ -158,6 +158,8 @@ class Settings:
     wxindex_llm_max_tokens: int = 4000
     wxindex_api_url: str = "http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex"
     wxindex_lookback_days: int = 7
+    wxindex_words_cron_hour: int = 7
+    wxindex_words_cron_minute: int = 30
     demand_event_sense_threshold: float = 6.0
     demand_senior_fit_threshold: float = 6.0
     demand_quality_llm_model: str = "anthropic/claude-haiku-4-5"
@@ -340,6 +342,14 @@ class Settings:
                 "WXINDEX_LOOKBACK_DAYS",
                 defaults.wxindex_lookback_days,
             ),
+            wxindex_words_cron_hour=_env_int(
+                "WXINDEX_WORDS_CRON_HOUR",
+                defaults.wxindex_words_cron_hour,
+            ),
+            wxindex_words_cron_minute=_env_int(
+                "WXINDEX_WORDS_CRON_MINUTE",
+                defaults.wxindex_words_cron_minute,
+            ),
             demand_event_sense_threshold=_env_float(
                 "DEMAND_EVENT_SENSE_THRESHOLD",
                 defaults.demand_event_sense_threshold,

+ 8 - 0
app/hot_content/config.py

@@ -254,6 +254,14 @@ def load_flow_config(interval_override: int | None = None) -> FlowConfig:
             "WXINDEX_LOOKBACK_DAYS",
             settings.wxindex_lookback_days,
         ),
+        wxindex_words_cron_hour=_get_env_int(
+            "WXINDEX_WORDS_CRON_HOUR",
+            settings.wxindex_words_cron_hour,
+        ),
+        wxindex_words_cron_minute=_get_env_int(
+            "WXINDEX_WORDS_CRON_MINUTE",
+            settings.wxindex_words_cron_minute,
+        ),
         demand_event_sense_threshold=_get_env_float(
             "DEMAND_EVENT_SENSE_THRESHOLD",
             settings.demand_event_sense_threshold,

+ 37 - 10
app/hot_content/postprocess_service.py

@@ -25,6 +25,11 @@ from app.hot_content.demand_export import (
 from app.hot_content.demand_pool_writer import sync_hot_demands_to_hive
 from app.hot_content.demand_quality import run_demand_quality_pipeline
 from app.hot_content.wxindex_trend import calc_wxindex_trend
+from app.hot_content.wxindex_words import (
+    ensure_word_full_scores,
+    slice_scores_lookback,
+    sync_words_from_trend_json,
+)
 
 
 class WxindexSelectionSkipped(Exception):
@@ -230,6 +235,10 @@ class ContributionPostprocessService:
                     record_id=record_id,
                     trend_json=trend_result,
                 )
+                self.sync_wxindex_words(
+                    record_id=record_id,
+                    trend_result=trend_result,
+                )
                 event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
                     record=record,
                     match_result=match_result,
@@ -296,6 +305,22 @@ class ContributionPostprocessService:
             result["hive_sync_error"] = str(exc)
         return result
 
+    def sync_wxindex_words(
+        self,
+        *,
+        record_id: int,
+        trend_result: dict[str, Any],
+        verbose: bool = False,
+    ) -> dict[str, int]:
+        return sync_words_from_trend_json(
+            self.repository,
+            self.api_client,
+            self.config.wxindex_api_url,
+            trend_json=trend_result,
+            record_id=record_id,
+            verbose=verbose,
+        )
+
     def _save_empty_demand_quality(self, *, record_id: int) -> None:
         self.repository.save_demand_quality(
             record_id=record_id,
@@ -617,18 +642,20 @@ class ContributionPostprocessService:
             matched_demands=matched_demands,
         )
         selected_words = pick["selected_words"]
-        start_ymd, end_ymd = _get_recent_range(self.config.wxindex_lookback_days)
         threshold = float(self.config.wxindex_score_threshold)
 
         wxindex_searches: list[dict[str, Any]] = []
         for keyword in selected_words:
-            wx_payload = {
-                "keyword": keyword,
-                "start_ymd": start_ymd,
-                "end_ymd": end_ymd,
-            }
-            wx_resp = self.api_client.post_json(self.config.wxindex_api_url, wx_payload)
-            series = _parse_total_scores(wx_resp)
+            full_scores, _action = ensure_word_full_scores(
+                self.repository,
+                self.api_client,
+                self.config.wxindex_api_url,
+                keyword=keyword,
+            )
+            series, start_ymd, end_ymd = slice_scores_lookback(
+                full_scores,
+                self.config.wxindex_lookback_days,
+            )
             latest_score = series[-1]["total_score"] if series else None
             wxindex_searches.append(
                 {
@@ -672,8 +699,8 @@ class ContributionPostprocessService:
             "wxindex": {
                 "keyword": selected_word,
                 "keywords": selected_words,
-                "start_ymd": start_ymd,
-                "end_ymd": end_ymd,
+                "start_ymd": best["start_ymd"],
+                "end_ymd": best["end_ymd"],
                 "total_score_7d": series,
                 "latest_total_score": latest_score,
                 "threshold": threshold,

+ 209 - 0
app/hot_content/repository.py

@@ -1082,6 +1082,215 @@ class HotContentRepository:
                 """
             )
 
+    def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
+        word = str(name or "").strip()
+        if not word:
+            return []
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT dt, total_score
+            FROM hot_content_wxindex_words
+            WHERE name = %s
+            ORDER BY dt ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word,))
+            rows = cursor.fetchall()
+        scores: list[dict[str, Any]] = []
+        for row in rows:
+            dt = str(row.get("dt") or "").strip()
+            if not dt:
+                continue
+            try:
+                total_score = float(row["total_score"])
+            except (TypeError, ValueError, KeyError):
+                continue
+            scores.append({"ymd": dt, "total_score": total_score})
+        return scores
+
+    def list_stale_wxindex_words(self, *, end_ymd: str) -> list[dict[str, Any]]:
+        """返回已存在但最新日期早于 end_ymd 的词。"""
+        target_end = str(end_ymd or "").strip()
+        if not target_end:
+            return []
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT name, MAX(dt) AS latest_dt
+            FROM hot_content_wxindex_words
+            GROUP BY name
+            HAVING MAX(dt) < %s
+            ORDER BY name ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (target_end,))
+            rows = cursor.fetchall()
+        stale_words: list[dict[str, Any]] = []
+        for row in rows:
+            name = str(row.get("name") or "").strip()
+            latest_dt = str(row.get("latest_dt") or "").strip()
+            if name and latest_dt:
+                stale_words.append({"name": name, "latest_dt": latest_dt})
+        return stale_words
+
+    def list_low_avg_wxindex_words(
+        self,
+        *,
+        min_avg_score: float,
+    ) -> list[dict[str, Any]]:
+        """按 name 聚合,返回平均分低于阈值的词。"""
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT
+                name,
+                AVG(total_score) AS avg_score,
+                COUNT(*) AS row_count
+            FROM hot_content_wxindex_words
+            GROUP BY name
+            HAVING AVG(total_score) < %s
+            ORDER BY name ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (min_avg_score,))
+            rows = cursor.fetchall()
+
+        low_words: list[dict[str, Any]] = []
+        for row in rows:
+            name = str(row.get("name") or "").strip()
+            if not name:
+                continue
+            try:
+                avg_score = float(row["avg_score"])
+                row_count = int(row["row_count"])
+            except (TypeError, ValueError, KeyError):
+                continue
+            low_words.append(
+                {
+                    "name": name,
+                    "avg_score": avg_score,
+                    "row_count": row_count,
+                }
+            )
+        return low_words
+
+    def delete_wxindex_words_by_names(self, names: list[str]) -> int:
+        cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
+        if not cleaned:
+            return 0
+        self._ensure_wxindex_words_table()
+        placeholders = ", ".join(["%s"] * len(cleaned))
+        sql = f"""
+            DELETE FROM hot_content_wxindex_words
+            WHERE name IN ({placeholders})
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, tuple(cleaned))
+            return int(cursor.rowcount or 0)
+
+    def get_wxindex_word_latest_dt(self, name: str) -> str | None:
+        word = str(name or "").strip()
+        if not word:
+            return None
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT MAX(dt) AS latest_dt
+            FROM hot_content_wxindex_words
+            WHERE name = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word,))
+            row = cursor.fetchone() or {}
+        latest_dt = str(row.get("latest_dt") or "").strip()
+        return latest_dt or None
+
+    def save_wxindex_daily_scores(
+        self,
+        *,
+        name: str,
+        scores: list[dict[str, Any]],
+    ) -> tuple[int, int]:
+        """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
+        word = str(name or "").strip()
+        if not word or not scores:
+            return 0, 0
+        self._ensure_wxindex_words_table()
+        sql = """
+            INSERT IGNORE INTO hot_content_wxindex_words (
+                name,
+                dt,
+                total_score
+            )
+            VALUES (%s, %s, %s)
+        """
+        rows: list[tuple[str, str, float]] = []
+        seen: set[tuple[str, str]] = set()
+        for item in scores:
+            if not isinstance(item, dict):
+                continue
+            dt = str(item.get("ymd") or item.get("dt") or "").strip()
+            if not dt:
+                continue
+            try:
+                total_score = float(item["total_score"])
+            except (TypeError, ValueError, KeyError):
+                continue
+            key = (word, dt)
+            if key in seen:
+                continue
+            seen.add(key)
+            rows.append((word, dt, total_score))
+        if not rows:
+            return 0, 0
+
+        with self.conn.cursor() as cursor:
+            cursor.executemany(sql, rows)
+            inserted = int(cursor.rowcount or 0)
+        skipped = len(rows) - inserted
+        return inserted, skipped
+
+    def list_records_with_wxindex_trend(
+        self,
+        *,
+        since_dt: datetime,
+    ) -> list[dict[str, Any]]:
+        sql = """
+            SELECT id, wxindex_trend_json
+            FROM hot_content_records
+            WHERE created_at >= %s
+              AND wxindex_trend_json IS NOT NULL
+              AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
+            ORDER BY id ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (since_dt,))
+            rows = cursor.fetchall()
+
+        records: list[dict[str, Any]] = []
+        for row in rows:
+            records.append(
+                {
+                    "id": int(row["id"]),
+                    "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
+                }
+            )
+        return records
+
+    def _ensure_wxindex_words_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
+                name VARCHAR(256) NOT NULL COMMENT '词',
+                dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
+                total_score DOUBLE NOT NULL COMMENT '微信指数',
+                created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+                PRIMARY KEY (id),
+                UNIQUE KEY uk_name_dt (name, dt),
+                KEY idx_name (name),
+                KEY idx_dt (dt)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+
     def _ensure_odps_sync_log_table(self) -> None:
         sql = """
             CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (

+ 2 - 0
app/hot_content/types.py

@@ -53,6 +53,8 @@ class FlowConfig:
     wxindex_llm_max_tokens: int
     wxindex_api_url: str
     wxindex_lookback_days: int
+    wxindex_words_cron_hour: int
+    wxindex_words_cron_minute: int
     demand_event_sense_threshold: float
     demand_senior_fit_threshold: float
     demand_quality_llm_model: str

+ 434 - 0
app/hot_content/wxindex_words.py

@@ -0,0 +1,434 @@
+"""微信指数检索词汇总:从 wxindex_trend_json 提取全部检索词并持久化每日指数。"""
+
+from __future__ import annotations
+
+from datetime import date, datetime, timedelta
+from typing import Any
+
+from app.hot_content.client import JsonApiClient
+from app.hot_content.demand_export import get_wxindex_keywords
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.timezone import SHANGHAI_TZ
+
+WXINDEX_WORDS_START_YMD = "20260601"
+WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11)
+WXINDEX_WORDS_MIN_AVG_SCORE = 100_000.0
+
+
+def get_wxindex_end_ymd(*, today: date | None = None) -> str:
+    current = today or datetime.now(SHANGHAI_TZ).date()
+    return (current - timedelta(days=1)).strftime("%Y%m%d")
+
+
+def get_lookback_range(lookback_days: int, *, today: date | None = None) -> tuple[str, str]:
+    """原流程使用的近 N 日区间(截至昨日)。"""
+    current = today or datetime.now(SHANGHAI_TZ).date()
+    end_date = current - timedelta(days=1)
+    start_date = end_date - timedelta(days=max(lookback_days, 1))
+    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
+
+
+def extract_searched_words(trend_json: dict[str, Any] | None) -> list[str]:
+    """提取 wxindex_trend_json 中实际检索过微信指数的全部词(非仅最高分词)。"""
+    if not isinstance(trend_json, dict):
+        return []
+
+    words: list[str] = []
+    seen: set[str] = set()
+    for item in trend_json.get("wxindex_searches") or []:
+        if not isinstance(item, dict):
+            continue
+        keyword = str(item.get("keyword") or "").strip()
+        if keyword and keyword not in seen:
+            seen.add(keyword)
+            words.append(keyword)
+
+    if words:
+        return words
+    return get_wxindex_keywords(trend_json)
+
+
+def parse_wxindex_total_scores(wx_resp: dict[str, Any]) -> list[dict[str, Any]]:
+    rows = ((wx_resp.get("data") or {}).get("data") or [])
+    if not isinstance(rows, list):
+        return []
+
+    series: list[dict[str, Any]] = []
+    for row in rows:
+        if not isinstance(row, dict):
+            continue
+        ymd = str(row.get("ymd") or "").strip()
+        total_score = (row.get("channel_score") or {}).get("total_score")
+        try:
+            score_num = float(total_score) if total_score is not None else None
+        except (TypeError, ValueError):
+            score_num = None
+        if ymd and score_num is not None:
+            series.append({"ymd": ymd, "total_score": score_num})
+    series.sort(key=lambda item: item["ymd"])
+    return series
+
+
+def fetch_wxindex_scores(
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    keyword: str,
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
+    end_ymd: str | None = None,
+) -> list[dict[str, Any]]:
+    payload = {
+        "keyword": keyword,
+        "start_ymd": start_ymd,
+        "end_ymd": end_ymd or get_wxindex_end_ymd(),
+    }
+    wx_resp = api_client.post_json(api_url, payload)
+    return parse_wxindex_total_scores(wx_resp)
+
+
+def scores_need_refresh(
+    scores: list[dict[str, Any]],
+    *,
+    end_ymd: str | None = None,
+) -> bool:
+    if not scores:
+        return True
+    target_end = end_ymd or get_wxindex_end_ymd()
+    latest_ymd = max(
+        str(item.get("ymd") or "")
+        for item in scores
+        if isinstance(item, dict) and str(item.get("ymd") or "").strip()
+    )
+    return latest_ymd < target_end
+
+
+def slice_scores_lookback(
+    full_scores: list[dict[str, Any]],
+    lookback_days: int,
+    *,
+    today: date | None = None,
+) -> tuple[list[dict[str, Any]], str, str]:
+    """从全量序列截取原流程所需的近 N 日数据。"""
+    start_ymd, end_ymd = get_lookback_range(lookback_days, today=today)
+    series = [
+        item
+        for item in full_scores
+        if isinstance(item, dict)
+        and start_ymd <= str(item.get("ymd") or "") <= end_ymd
+    ]
+    series.sort(key=lambda item: str(item.get("ymd") or ""))
+    if series:
+        return series, start_ymd, end_ymd
+    return [], start_ymd, end_ymd
+
+
+def next_ymd(ymd: str) -> str:
+    current = datetime.strptime(ymd, "%Y%m%d").date()
+    return (current + timedelta(days=1)).strftime("%Y%m%d")
+
+
+def refresh_stale_wxindex_words(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    end_ymd: str | None = None,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """补全已存在但缺少最新日期数据的词。"""
+    target_end = end_ymd or get_wxindex_end_ymd()
+    summary = {
+        "target_end_ymd": target_end,
+        "stale_words": 0,
+        "refreshed": 0,
+        "inserted_rows": 0,
+        "skipped_rows": 0,
+        "fetch_failed": 0,
+        "no_new_range": 0,
+    }
+
+    stale_words = repository.list_stale_wxindex_words(end_ymd=target_end)
+    summary["stale_words"] = len(stale_words)
+    if not stale_words:
+        return summary
+
+    for item in stale_words:
+        name = str(item.get("name") or "").strip()
+        latest_dt = str(item.get("latest_dt") or "").strip()
+        if not name or not latest_dt:
+            continue
+
+        start_ymd = next_ymd(latest_dt)
+        if start_ymd > target_end:
+            summary["no_new_range"] += 1
+            if verbose:
+                print(f"skip up-to-date word={name} latest_dt={latest_dt}")
+            continue
+
+        if dry_run:
+            summary["refreshed"] += 1
+            if verbose:
+                print(f"[dry-run] would refresh word={name} {start_ymd}->{target_end}")
+            continue
+
+        try:
+            api_scores = fetch_wxindex_scores(
+                api_client,
+                api_url,
+                keyword=name,
+                start_ymd=start_ymd,
+                end_ymd=target_end,
+            )
+            inserted, skipped = repository.save_wxindex_daily_scores(
+                name=name,
+                scores=api_scores,
+            )
+        except Exception as exc:
+            summary["fetch_failed"] += 1
+            if verbose:
+                print(f"refresh failed word={name}: {exc}")
+            continue
+
+        summary["refreshed"] += 1
+        summary["inserted_rows"] += inserted
+        summary["skipped_rows"] += skipped
+        if verbose:
+            print(
+                f"refreshed word={name} range={start_ymd}->{target_end} "
+                f"inserted={inserted} skipped={skipped}"
+            )
+
+    return summary
+
+
+def cleanup_low_avg_wxindex_words(
+    repository: HotContentRepository,
+    *,
+    min_avg_score: float = WXINDEX_WORDS_MIN_AVG_SCORE,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int | float]:
+    """删除各 dt 平均分低于阈值的词(按 name 整词删除)。"""
+    summary: dict[str, int | float] = {
+        "min_avg_score": min_avg_score,
+        "low_avg_words": 0,
+        "deleted_rows": 0,
+    }
+    low_words = repository.list_low_avg_wxindex_words(min_avg_score=min_avg_score)
+    summary["low_avg_words"] = len(low_words)
+    if not low_words:
+        return summary
+
+    if dry_run:
+        if verbose:
+            for item in low_words:
+                print(
+                    f"[dry-run] would delete word={item['name']} "
+                    f"avg_score={item['avg_score']:.0f} rows={item['row_count']}"
+                )
+        summary["deleted_rows"] = sum(int(item["row_count"]) for item in low_words)
+        return summary
+
+    names = [str(item["name"]) for item in low_words if str(item.get("name") or "").strip()]
+    deleted_rows = repository.delete_wxindex_words_by_names(names)
+    summary["deleted_rows"] = deleted_rows
+    if verbose:
+        for item in low_words:
+            print(
+                f"deleted word={item['name']} "
+                f"avg_score={item['avg_score']:.0f} rows={item['row_count']}"
+            )
+    return summary
+
+
+def run_wxindex_words_daily_job(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    end_ymd: str | None = None,
+    min_avg_score: float = WXINDEX_WORDS_MIN_AVG_SCORE,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, Any]:
+    """定时任务:先补全缺失日期,再清理低均值词。"""
+    refresh_summary = refresh_stale_wxindex_words(
+        repository,
+        api_client,
+        api_url,
+        end_ymd=end_ymd,
+        dry_run=dry_run,
+        verbose=verbose,
+    )
+    cleanup_summary = cleanup_low_avg_wxindex_words(
+        repository,
+        min_avg_score=min_avg_score,
+        dry_run=dry_run,
+        verbose=verbose,
+    )
+    return {
+        "refresh": refresh_summary,
+        "cleanup": cleanup_summary,
+    }
+
+
+def ensure_word_full_scores(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    keyword: str,
+    end_ymd: str | None = None,
+    force_refresh: bool = False,
+    dry_run: bool = False,
+) -> tuple[list[dict[str, Any]], str]:
+    """
+    获取词的全量微信指数(20260601 起至昨日),按词+日期逐行入库。
+
+    返回 (scores, action),action 为 inserted / updated / cached / dry_run。
+    """
+    word = str(keyword or "").strip()
+    if not word:
+        return [], "empty"
+
+    target_end = end_ymd or get_wxindex_end_ymd()
+    stored_scores = repository.list_wxindex_word_scores(word)
+    if stored_scores and not force_refresh and not scores_need_refresh(
+        stored_scores,
+        end_ymd=target_end,
+    ):
+        return stored_scores, "cached"
+
+    if dry_run:
+        return [], "dry_run"
+
+    had_data = bool(stored_scores)
+    api_scores = fetch_wxindex_scores(
+        api_client,
+        api_url,
+        keyword=word,
+        end_ymd=target_end,
+    )
+    inserted, _skipped = repository.save_wxindex_daily_scores(
+        name=word,
+        scores=api_scores,
+    )
+    final_scores = repository.list_wxindex_word_scores(word)
+    if inserted > 0:
+        action = "updated" if had_data else "inserted"
+    else:
+        action = "cached"
+    return final_scores or api_scores, action
+
+
+def sync_words_from_trend_json(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    trend_json: dict[str, Any],
+    record_id: int,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(全量数据)。"""
+    summary = {
+        "words_found": 0,
+        "inserted": 0,
+        "updated": 0,
+        "cached": 0,
+        "fetch_failed": 0,
+    }
+    words = extract_searched_words(trend_json)
+    summary["words_found"] = len(words)
+    if not words:
+        return summary
+
+    for name in words:
+        try:
+            _, action = ensure_word_full_scores(
+                repository,
+                api_client,
+                api_url,
+                keyword=name,
+                dry_run=dry_run,
+            )
+        except Exception as exc:
+            summary["fetch_failed"] += 1
+            if verbose:
+                print(f"  fetch failed word={name}: {exc}")
+            continue
+
+        if action == "inserted":
+            summary["inserted"] += 1
+        elif action == "updated":
+            summary["updated"] += 1
+        elif action == "cached":
+            summary["cached"] += 1
+        elif action == "dry_run":
+            summary["inserted"] += 1
+
+        if verbose:
+            print(f"  word={name} action={action}")
+
+    return summary
+
+
+def backfill_wxindex_words(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    since_date: date = WXINDEX_WORDS_RECORD_SINCE,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """扫描 hot_content_records,汇总 6/11 起全部微信指数检索词(历史回填调 API)。"""
+    summary = {
+        "records_scanned": 0,
+        "records_with_words": 0,
+        "words_found": 0,
+        "inserted": 0,
+        "updated": 0,
+        "cached": 0,
+        "fetch_failed": 0,
+        "invalid_json": 0,
+    }
+
+    since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
+    records = repository.list_records_with_wxindex_trend(since_dt=since_dt)
+
+    for row in records:
+        summary["records_scanned"] += 1
+        record_id = int(row["id"])
+        try:
+            trend_json = row.get("wxindex_trend_json")
+            if not isinstance(trend_json, dict):
+                summary["invalid_json"] += 1
+                continue
+        except (TypeError, ValueError):
+            summary["invalid_json"] += 1
+            continue
+
+        words = extract_searched_words(trend_json)
+        if not words:
+            continue
+
+        summary["records_with_words"] += 1
+        if verbose:
+            print(f"id={record_id} words={words}")
+
+        result = sync_words_from_trend_json(
+            repository,
+            api_client,
+            api_url,
+            trend_json=trend_json,
+            record_id=record_id,
+            dry_run=dry_run,
+            verbose=verbose,
+        )
+        for key in ("words_found", "inserted", "updated", "cached", "fetch_failed"):
+            summary[key] += result[key]
+
+    return summary

+ 51 - 3
app/scheduler.py

@@ -13,12 +13,15 @@ PROJECT_ROOT = Path(__file__).resolve().parents[1]
 if str(PROJECT_ROOT) not in sys.path:
     sys.path.insert(0, str(PROJECT_ROOT))
 
+from app.hot_content.client import JsonApiClient
 from app.hot_content.decode_result_service import run_once as run_decode_result_once
 from app.hot_content.config import load_flow_config
 from app.hot_content.postprocess_service import run_once as run_postprocess_once
+from app.hot_content.repository import HotContentRepository
 from app.hot_content.service import run_once
 from app.hot_content.timezone import SHANGHAI_TZ
 from app.hot_content.types import FlowConfig
+from app.hot_content.wxindex_words import run_wxindex_words_daily_job
 
 
 def _import_blocking_scheduler() -> Any:
@@ -61,6 +64,31 @@ def run_postprocess_job(config: FlowConfig) -> None:
         print(f"postprocess flow failed: {exc}", file=sys.stderr)
 
 
+def run_wxindex_words_refresh_job(config: FlowConfig) -> None:
+    repository = HotContentRepository(config.mysql)
+    api_client = JsonApiClient(
+        timeout_seconds=config.request_timeout_seconds,
+        verify_ssl=config.https_verify_ssl,
+    )
+    try:
+        summary = run_wxindex_words_daily_job(
+            repository,
+            api_client,
+            config.wxindex_api_url,
+        )
+        print(
+            json.dumps(
+                {"job": "wxindex_words_refresh", "summary": summary},
+                ensure_ascii=False,
+                indent=2,
+            )
+        )
+    except Exception as exc:
+        print(f"wxindex words refresh failed: {exc}", file=sys.stderr)
+    finally:
+        repository.close()
+
+
 def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
     scheduler.add_job(
         run_hot_content_job,
@@ -93,17 +121,35 @@ def register_decode_result_job(scheduler: Any, config: FlowConfig) -> None:
     )
 
 
+def register_wxindex_words_refresh_job(scheduler: Any, config: FlowConfig) -> None:
+    scheduler.add_job(
+        run_wxindex_words_refresh_job,
+        trigger="cron",
+        hour=config.wxindex_words_cron_hour,
+        minute=config.wxindex_words_cron_minute,
+        timezone=SHANGHAI_TZ,
+        args=[config],
+        id="wxindex_words_refresh",
+        name="微信指数词汇总表补全缺失日期并清理低均值词",
+        replace_existing=True,
+        coalesce=True,
+        max_instances=1,
+    )
+
+
 def start_scheduler() -> None:
     BlockingScheduler = _import_blocking_scheduler()
     scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
     config = load_flow_config()
     register_hot_content_job(scheduler, config)
     register_decode_result_job(scheduler, config)
+    register_wxindex_words_refresh_job(scheduler, config)
     print(
         "scheduler started, timezone=Asia/Shanghai, "
-        "jobs=['hot_content_flow', 'decode_result_flow'], "
+        "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh'], "
         f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
-        f"decode_result_interval={config.decode_result_interval_seconds}s"
+        f"decode_result_interval={config.decode_result_interval_seconds}s, "
+        f"wxindex_words_cron={config.wxindex_words_cron_hour}:{config.wxindex_words_cron_minute:02d}"
     )
     scheduler.start()
 
@@ -113,7 +159,7 @@ def parse_args() -> argparse.Namespace:
     parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
     parser.add_argument(
         "--job",
-        choices=("all", "hot-content", "decode-result", "postprocess"),
+        choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh"),
         default="all",
         help="--once 时选择执行哪个任务",
     )
@@ -159,6 +205,8 @@ def main() -> None:
                     indent=2,
                 )
             )
+        if args.job in {"wxindex-refresh"}:
+            run_wxindex_words_refresh_job(config)
         return
     start_scheduler()