Jelajahi Sumber

微信热度词过滤问题

xueyiming 1 Minggu lalu
induk
melakukan
f502627254
2 mengubah file dengan 497 tambahan dan 112 penghapusan
  1. 141 67
      app/hot_content/repository.py
  2. 356 45
      app/hot_content/wxindex_words.py

+ 141 - 67
app/hot_content/repository.py

@@ -1283,26 +1283,18 @@ class HotContentRepository:
     def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
         self._ensure_wxindex_word_meta_table()
         sql = """
-            SELECT name, event_created_at, fetch_start_ymd
+            SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
             FROM hot_content_wxindex_word_meta
-            ORDER BY name ASC
+            ORDER BY id ASC
         """
         with self.conn.cursor() as cursor:
             cursor.execute(sql)
             rows = cursor.fetchall()
         result: list[dict[str, Any]] = []
         for row in rows:
-            name = str(row.get("name") or "").strip()
-            fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
-            event_created_at = row.get("event_created_at")
-            if name and fetch_start_ymd and event_created_at is not None:
-                result.append(
-                    {
-                        "name": name,
-                        "event_created_at": event_created_at,
-                        "fetch_start_ymd": fetch_start_ymd,
-                    }
-                )
+            meta = self._normalize_wxindex_word_meta_row(row)
+            if meta is not None:
+                result.append(meta)
         return result
 
     def update_wxindex_word_meta_fetch_start(
@@ -1330,10 +1322,12 @@ class HotContentRepository:
         name: str,
         event_created_at: datetime,
         fetch_start_ymd: str,
+        fetch_end_ymd: str,
     ) -> None:
         word = str(name or "").strip()
         target_start = str(fetch_start_ymd or "").strip()
-        if not word or not target_start:
+        target_end = str(fetch_end_ymd or "").strip()
+        if not word or not target_start or not target_end:
             raise HotContentFlowError("invalid wxindex word meta payload")
         self._ensure_wxindex_word_meta_table()
         event_at = event_created_at
@@ -1342,11 +1336,12 @@ class HotContentRepository:
         sql = """
             UPDATE hot_content_wxindex_word_meta
             SET event_created_at = %s,
-                fetch_start_ymd = %s
+                fetch_start_ymd = %s,
+                fetch_end_ymd = %s
             WHERE name = %s
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (event_at, target_start, word))
+            cursor.execute(sql, (event_at, target_start, target_end, word))
 
     def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
         word = str(name or "").strip()
@@ -1354,7 +1349,7 @@ class HotContentRepository:
             return None
         self._ensure_wxindex_word_meta_table()
         sql = """
-            SELECT name, event_created_at, fetch_start_ymd
+            SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
             FROM hot_content_wxindex_word_meta
             WHERE name = %s
         """
@@ -1363,15 +1358,7 @@ class HotContentRepository:
             row = cursor.fetchone()
         if not row:
             return None
-        fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
-        event_created_at = row.get("event_created_at")
-        if not fetch_start_ymd or event_created_at is None:
-            return None
-        return {
-            "name": str(row.get("name") or "").strip(),
-            "event_created_at": event_created_at,
-            "fetch_start_ymd": fetch_start_ymd,
-        }
+        return self._normalize_wxindex_word_meta_row(row)
 
     def ensure_wxindex_word_meta(
         self,
@@ -1379,10 +1366,12 @@ class HotContentRepository:
         name: str,
         event_created_at: datetime,
         fetch_start_ymd: str,
+        fetch_end_ymd: str,
     ) -> dict[str, Any]:
         word = str(name or "").strip()
         target_start = str(fetch_start_ymd or "").strip()
-        if not word or not target_start:
+        target_end = str(fetch_end_ymd or "").strip()
+        if not word or not target_start or not target_end:
             raise HotContentFlowError("invalid wxindex word meta payload")
         self._ensure_wxindex_word_meta_table()
         event_at = event_created_at
@@ -1392,20 +1381,34 @@ class HotContentRepository:
             INSERT INTO hot_content_wxindex_word_meta (
                 name,
                 event_created_at,
-                fetch_start_ymd
+                fetch_start_ymd,
+                fetch_end_ymd
             )
-            VALUES (%s, %s, %s)
+            VALUES (%s, %s, %s, %s)
             ON DUPLICATE KEY UPDATE
                 event_created_at = VALUES(event_created_at),
-                fetch_start_ymd = VALUES(fetch_start_ymd)
+                fetch_start_ymd = VALUES(fetch_start_ymd),
+                fetch_end_ymd = VALUES(fetch_end_ymd)
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (word, event_at, target_start))
+            cursor.execute(sql, (word, event_at, target_start, target_end))
         meta = self.get_wxindex_word_meta(word)
         if meta is None:
             raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
         return meta
 
+    def delete_wxindex_word_meta_by_names(self, names: list[str]) -> int:
+        words = [str(name or "").strip() for name in names]
+        words = [name for name in words if name]
+        if not words:
+            return 0
+        self._ensure_wxindex_word_meta_table()
+        placeholders = ", ".join(["%s"] * len(words))
+        sql = f"DELETE FROM hot_content_wxindex_word_meta WHERE name IN ({placeholders})"
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, tuple(words))
+            return int(cursor.rowcount or 0)
+
     def list_low_max_wxindex_words(
         self,
         *,
@@ -1457,17 +1460,11 @@ class HotContentRepository:
             SELECT COUNT(*) AS row_count
             FROM hot_content_wxindex_words w
             INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
+            WHERE w.dt < m.fetch_start_ymd
+               OR w.dt > m.fetch_end_ymd
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days))
+            cursor.execute(sql)
             row = cursor.fetchone() or {}
         return int(row.get("row_count") or 0)
 
@@ -1484,29 +1481,17 @@ class HotContentRepository:
                 w.name,
                 w.dt,
                 m.event_created_at,
-                DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                ) AS start_ymd,
-                DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                ) AS end_ymd
+                m.fetch_start_ymd AS start_ymd,
+                m.fetch_end_ymd AS end_ymd
             FROM hot_content_wxindex_words w
             INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
+            WHERE w.dt < m.fetch_start_ymd
+               OR w.dt > m.fetch_end_ymd
             ORDER BY w.name ASC, w.dt ASC
             LIMIT %s
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days, window_days, window_days, limit))
+            cursor.execute(sql, (limit,))
             rows = cursor.fetchall()
         samples: list[dict[str, Any]] = []
         for row in rows:
@@ -1535,17 +1520,24 @@ class HotContentRepository:
             DELETE w
             FROM hot_content_wxindex_words w
             INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
+            WHERE w.dt < m.fetch_start_ymd
+               OR w.dt > m.fetch_end_ymd
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            return int(cursor.rowcount or 0)
+
+    def delete_wxindex_words_without_meta(self) -> int:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            DELETE w
+            FROM hot_content_wxindex_words w
+            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE m.name IS NULL
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days))
+            cursor.execute(sql)
             return int(cursor.rowcount or 0)
 
     def count_wxindex_words_without_meta(self) -> int:
@@ -1701,16 +1693,98 @@ class HotContentRepository:
     def _ensure_wxindex_word_meta_table(self) -> None:
         sql = """
             CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
                 name VARCHAR(256) NOT NULL COMMENT '词',
                 event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
                 fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
+                fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天',
                 meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
-                PRIMARY KEY (name),
+                PRIMARY KEY (id),
+                UNIQUE KEY uk_name (name),
                 KEY idx_event_created_at (event_created_at)
             ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
         """
         with self.conn.cursor() as cursor:
             cursor.execute(sql)
+            self._ensure_wxindex_word_meta_id_column(cursor)
+            self._ensure_wxindex_word_meta_fetch_end_column(cursor)
+
+    def _ensure_wxindex_word_meta_fetch_end_column(self, cursor: Any) -> None:
+        cursor.execute(
+            """
+            SELECT COUNT(*) AS cnt
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = 'hot_content_wxindex_word_meta'
+              AND COLUMN_NAME = 'fetch_end_ymd'
+            """
+        )
+        if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+            cursor.execute(
+                """
+                ALTER TABLE hot_content_wxindex_word_meta
+                    ADD COLUMN fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT ''
+                        COMMENT '数据窗口右边界:事件创建日后7天'
+                        AFTER fetch_start_ymd
+                """
+            )
+        cursor.execute(
+            """
+            UPDATE hot_content_wxindex_word_meta
+            SET fetch_end_ymd = DATE_FORMAT(
+                DATE_ADD(DATE(event_created_at), INTERVAL 7 DAY),
+                '%Y%m%d'
+            )
+            WHERE fetch_end_ymd IS NULL
+               OR TRIM(fetch_end_ymd) = ''
+            """
+        )
+
+    def _ensure_wxindex_word_meta_id_column(self, cursor: Any) -> None:
+        cursor.execute(
+            """
+            SELECT COUNT(*) AS cnt
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = 'hot_content_wxindex_word_meta'
+              AND COLUMN_NAME = 'id'
+            """
+        )
+        if int((cursor.fetchone() or {}).get("cnt") or 0) > 0:
+            return
+        cursor.execute(
+            """
+            ALTER TABLE hot_content_wxindex_word_meta
+                DROP PRIMARY KEY,
+                ADD COLUMN id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST,
+                ADD UNIQUE KEY uk_name (name)
+            """
+        )
+
+    @staticmethod
+    def _normalize_wxindex_word_meta_row(row: dict[str, Any]) -> dict[str, Any] | None:
+        name = str(row.get("name") or "").strip()
+        fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
+        fetch_end_ymd = str(row.get("fetch_end_ymd") or "").strip()
+        event_created_at = row.get("event_created_at")
+        if not name or not fetch_start_ymd or event_created_at is None:
+            return None
+        if not fetch_end_ymd and isinstance(event_created_at, datetime):
+            event_date = event_created_at.date()
+            fetch_end_ymd = (event_date + timedelta(days=7)).strftime("%Y%m%d")
+        if not fetch_end_ymd:
+            return None
+        try:
+            meta_id = int(row.get("id"))
+        except (TypeError, ValueError):
+            return None
+        return {
+            "id": meta_id,
+            "name": name,
+            "event_created_at": event_created_at,
+            "fetch_start_ymd": fetch_start_ymd,
+            "fetch_end_ymd": fetch_end_ymd,
+        }
 
     def _ensure_wxindex_words_table(self) -> None:
         sql = """

+ 356 - 45
app/hot_content/wxindex_words.py

@@ -28,6 +28,35 @@ def get_fetch_start_ymd_from_event(
     return start_date.strftime("%Y%m%d")
 
 
+def get_fetch_end_ymd_from_event(
+    event_created_at: datetime,
+    *,
+    forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
+) -> str:
+    """数据窗口右边界:事件创建日后 N 天(yyyymmdd)。"""
+    event_date = normalize_event_created_at(event_created_at).date()
+    end_date = event_date + timedelta(days=forward_days)
+    return end_date.strftime("%Y%m%d")
+
+
+def get_fetch_ymd_bounds_from_event(
+    event_created_at: datetime,
+    *,
+    lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
+    forward_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
+) -> tuple[str, str]:
+    return (
+        get_fetch_start_ymd_from_event(
+            event_created_at,
+            lookback_days=lookback_days,
+        ),
+        get_fetch_end_ymd_from_event(
+            event_created_at,
+            forward_days=forward_days,
+        ),
+    )
+
+
 def get_word_data_window_ymd_bounds(
     event_created_at: datetime,
     *,
@@ -168,6 +197,70 @@ def word_meets_max_score_threshold(
     return max_score > min_max_score
 
 
+def filter_scores_in_ymd_window(
+    scores: list[dict[str, Any]],
+    *,
+    start_ymd: str,
+    end_ymd: str,
+) -> list[dict[str, Any]]:
+    start = str(start_ymd or "").strip()
+    end = str(end_ymd or "").strip()
+    if not start or not end:
+        return []
+    filtered: list[dict[str, Any]] = []
+    for item in scores:
+        if not isinstance(item, dict):
+            continue
+        ymd = str(item.get("ymd") or item.get("dt") or "").strip()
+        if not ymd or ymd < start or ymd > end:
+            continue
+        filtered.append(item)
+    filtered.sort(key=lambda row: str(row.get("ymd") or row.get("dt") or ""))
+    return filtered
+
+
+def word_has_high_score_in_window(
+    scores: list[dict[str, Any]],
+    *,
+    start_ymd: str,
+    end_ymd: str,
+    min_score: float = WXINDEX_WORDS_MIN_MAX_SCORE,
+) -> bool:
+    """窗口内是否存在严格大于阈值的微信指数。"""
+    window_scores = filter_scores_in_ymd_window(
+        scores,
+        start_ymd=start_ymd,
+        end_ymd=end_ymd,
+    )
+    for item in window_scores:
+        try:
+            score = float(item["total_score"])
+        except (TypeError, ValueError, KeyError):
+            continue
+        if score > min_score:
+            return True
+    return False
+
+
+def merge_wxindex_score_series(
+    *series_list: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    merged: dict[str, dict[str, Any]] = {}
+    for series in series_list:
+        for item in series:
+            if not isinstance(item, dict):
+                continue
+            ymd = str(item.get("ymd") or item.get("dt") or "").strip()
+            if not ymd:
+                continue
+            try:
+                total_score = float(item["total_score"])
+            except (TypeError, ValueError, KeyError):
+                continue
+            merged[ymd] = {"ymd": ymd, "total_score": total_score}
+    return sorted(merged.values(), key=lambda row: row["ymd"])
+
+
 def get_word_score_bounds(
     scores: list[dict[str, Any]],
 ) -> tuple[str | None, str | None]:
@@ -362,6 +455,152 @@ def refresh_stale_wxindex_words(
     return summary
 
 
+def sync_wxindex_words_from_meta(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    end_ymd: str | None = None,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, Any]:
+    """
+    按 hot_content_wxindex_word_meta 同步 hot_content_wxindex_words。
+
+    1. 删除 meta 中不存在的词
+    2. 删除窗口 [fetch_start_ymd, fetch_end_ymd] 外的日期
+    3. 补全窗口内缺失日期(含 fetch_start 早于昨日的历史段,如 20260615 之前)
+    """
+    target_end = end_ymd or get_wxindex_end_ymd()
+    summary: dict[str, Any] = {
+        "target_end_ymd": target_end,
+        "meta_count": 0,
+        "deleted_without_meta_rows": 0,
+        "deleted_outside_window_rows": 0,
+        "words_need_refresh": 0,
+        "refreshed": 0,
+        "inserted_rows": 0,
+        "skipped_rows": 0,
+        "fetch_failed": 0,
+        "api_empty": 0,
+        "no_new_range": 0,
+        "dry_run": dry_run,
+    }
+
+    if dry_run:
+        summary["deleted_without_meta_rows"] = repository.count_wxindex_words_without_meta()
+        summary["deleted_outside_window_rows"] = (
+            repository.count_wxindex_words_outside_event_window()
+        )
+    else:
+        summary["deleted_without_meta_rows"] = repository.delete_wxindex_words_without_meta()
+        summary["deleted_outside_window_rows"] = (
+            repository.delete_wxindex_words_outside_event_window()
+        )
+
+    meta_rows = repository.list_all_wxindex_word_meta()
+    summary["meta_count"] = len(meta_rows)
+
+    for meta in meta_rows:
+        name = str(meta.get("name") or "").strip()
+        fetch_start = str(meta.get("fetch_start_ymd") or "").strip()
+        fetch_end = str(meta.get("fetch_end_ymd") or "").strip()
+        if not name or not fetch_start or not fetch_end:
+            continue
+
+        api_end = min(fetch_end, target_end)
+        if fetch_start > api_end:
+            summary["no_new_range"] += 1
+            if verbose:
+                print(
+                    f"skip out-of-range word={name} "
+                    f"window={fetch_start}~{fetch_end} api_end={api_end}"
+                )
+            continue
+
+        stored_scores = repository.list_wxindex_word_scores(name)
+        fetch_range = get_supplement_fetch_range(
+            stored_scores,
+            end_ymd=api_end,
+            start_ymd=fetch_start,
+        )
+        if fetch_range is None:
+            summary["no_new_range"] += 1
+            if verbose:
+                print(f"skip complete word={name} window={fetch_start}~{fetch_end}")
+            continue
+
+        summary["words_need_refresh"] += 1
+        start_ymd, range_end = fetch_range
+        if start_ymd > range_end:
+            summary["no_new_range"] += 1
+            if verbose:
+                print(f"skip up-to-date word={name}")
+            continue
+
+        if dry_run:
+            summary["refreshed"] += 1
+            if verbose:
+                print(
+                    f"[dry-run] would fetch word={name} "
+                    f"{start_ymd}->{range_end} "
+                    f"save_window={fetch_start}~{fetch_end}"
+                )
+            continue
+
+        try:
+            api_scores = fetch_wxindex_scores(
+                api_client,
+                api_url,
+                keyword=name,
+                start_ymd=start_ymd,
+                end_ymd=range_end,
+            )
+            window_scores = filter_scores_in_ymd_window(
+                api_scores,
+                start_ymd=fetch_start,
+                end_ymd=fetch_end,
+            )
+            if not window_scores:
+                summary["api_empty"] += 1
+                if verbose:
+                    print(
+                        f"api empty word={name} fetch={start_ymd}->{range_end} "
+                        f"window={fetch_start}~{fetch_end}"
+                    )
+                continue
+
+            inserted, skipped = repository.save_wxindex_daily_scores(
+                name=name,
+                scores=window_scores,
+            )
+        except Exception as exc:
+            summary["fetch_failed"] += 1
+            if verbose:
+                print(f"sync failed word={name}: {exc}")
+            continue
+
+        if inserted <= 0:
+            summary["api_empty"] += 1
+            if verbose:
+                print(
+                    f"no new rows word={name} fetch={start_ymd}->{range_end} "
+                    f"api_rows={len(window_scores)} skipped={skipped}"
+                )
+            continue
+
+        summary["refreshed"] += 1
+        summary["inserted_rows"] += inserted
+        summary["skipped_rows"] += skipped
+        if verbose:
+            print(
+                f"synced word={name} fetch={start_ymd}->{range_end} "
+                f"inserted={inserted} skipped={skipped}"
+            )
+
+    return summary
+
+
 def cleanup_low_max_wxindex_words(
     repository: HotContentRepository,
     *,
@@ -440,11 +679,13 @@ def try_register_wxindex_word_meta(
             return None, "expired"
 
     fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at)
+    fetch_end_ymd = get_fetch_end_ymd_from_event(normalized_event_at)
     if dry_run:
         return {
             "name": name,
             "event_created_at": normalized_event_at,
             "fetch_start_ymd": fetch_start_ymd,
+            "fetch_end_ymd": fetch_end_ymd,
         }, "dry_run"
 
     if existing and update_if_exists:
@@ -452,6 +693,7 @@ def try_register_wxindex_word_meta(
             name=name,
             event_created_at=normalized_event_at,
             fetch_start_ymd=fetch_start_ymd,
+            fetch_end_ymd=fetch_end_ymd,
         )
         meta = repository.get_wxindex_word_meta(name)
         if meta is None:
@@ -465,17 +707,18 @@ def try_register_wxindex_word_meta(
         name=name,
         event_created_at=normalized_event_at,
         fetch_start_ymd=fetch_start_ymd,
+        fetch_end_ymd=fetch_end_ymd,
     )
     return meta, "registered"
 
 
-def fix_wxindex_word_meta_fetch_start_ymd(
+def fix_wxindex_word_meta_fetch_bounds(
     repository: HotContentRepository,
     *,
     dry_run: bool = False,
     verbose: bool = False,
 ) -> dict[str, int]:
-    """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
+    """按 event_created_at 修正 meta.fetch_start_ymd / fetch_end_ymd。"""
     rows = repository.list_all_wxindex_word_meta()
     summary = {
         "total": len(rows),
@@ -486,10 +729,11 @@ def fix_wxindex_word_meta_fetch_start_ymd(
         name = str(row.get("name") or "").strip()
         event_created_at = row.get("event_created_at")
         old_fetch_start = str(row.get("fetch_start_ymd") or "").strip()
+        old_fetch_end = str(row.get("fetch_end_ymd") or "").strip()
         if not name or event_created_at is None:
             continue
-        new_fetch_start = get_fetch_start_ymd_from_event(event_created_at)
-        if new_fetch_start == old_fetch_start:
+        new_fetch_start, new_fetch_end = get_fetch_ymd_bounds_from_event(event_created_at)
+        if new_fetch_start == old_fetch_start and new_fetch_end == old_fetch_end:
             summary["unchanged"] += 1
             continue
         if dry_run:
@@ -497,24 +741,40 @@ def fix_wxindex_word_meta_fetch_start_ymd(
             if verbose:
                 print(
                     f"[dry-run] word={name} "
-                    f"event_created_at={event_created_at} "
-                    f"{old_fetch_start} -> {new_fetch_start}"
+                    f"start {old_fetch_start}->{new_fetch_start} "
+                    f"end {old_fetch_end}->{new_fetch_end}"
                 )
             continue
-        repository.update_wxindex_word_meta_fetch_start(
+        repository.update_wxindex_word_meta(
             name=name,
+            event_created_at=event_created_at,
             fetch_start_ymd=new_fetch_start,
+            fetch_end_ymd=new_fetch_end,
         )
         summary["updated"] += 1
         if verbose:
             print(
                 f"updated word={name} "
-                f"event_created_at={event_created_at} "
-                f"{old_fetch_start} -> {new_fetch_start}"
+                f"start {old_fetch_start}->{new_fetch_start} "
+                f"end {old_fetch_end}->{new_fetch_end}"
             )
     return summary
 
 
+def fix_wxindex_word_meta_fetch_start_ymd(
+    repository: HotContentRepository,
+    *,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
+    return fix_wxindex_word_meta_fetch_bounds(
+        repository,
+        dry_run=dry_run,
+        verbose=verbose,
+    )
+
+
 def cleanup_wxindex_words_outside_event_window(
     repository: HotContentRepository,
     *,
@@ -734,7 +994,8 @@ def ensure_word_full_scores(
     """
     获取词微信指数并入库。
 
-    - 表中已有数据但无 meta:若在 7 天窗口内,自动补 meta 并继续更新
+    - meta 表:窗口内存在指数 > 10 万才写入/更新
+    - wxindex_words:仅保留 [fetch_start_ymd, fetch_end_ymd] 区间内数据
     - 超过 7 天窗口:不再更新
 
     返回 (scores, action)。
@@ -744,17 +1005,86 @@ def ensure_word_full_scores(
         return [], "empty"
 
     target_end = end_ymd or get_wxindex_end_ymd()
-    fetch_start_ymd = get_wxindex_fetch_start_ymd()
     stored_scores = repository.list_wxindex_word_scores(word)
     meta = repository.get_wxindex_word_meta(word)
 
-    should_register_meta = meta is None
+    should_register_meta = meta is None and event_created_at is not None
     should_update_meta = (
         update_meta_if_exists
         and meta is not None
         and event_created_at is not None
     )
+
+    fetch_start_ymd: str | None = None
+    fetch_end_ymd: str | None = None
+
+    if event_created_at is not None:
+        normalized_event_at = normalize_event_created_at(event_created_at)
+        if not include_expired and not is_word_update_active(normalized_event_at):
+            if meta is None:
+                return stored_scores, "expired"
+            if not should_update_meta:
+                return stored_scores, "expired"
+        fetch_start_ymd, fetch_end_ymd = get_fetch_ymd_bounds_from_event(
+            normalized_event_at
+        )
+    elif meta is not None:
+        fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
+        fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
+        if not fetch_end_ymd:
+            fetch_end_ymd = get_fetch_end_ymd_from_event(meta["event_created_at"])
+        if not include_expired and not is_word_update_active(meta["event_created_at"]):
+            return stored_scores, "expired"
+    else:
+        return stored_scores, "legacy"
+
+    if not fetch_start_ymd or not fetch_end_ymd:
+        return stored_scores, "legacy"
+
+    api_end_ymd = min(fetch_end_ymd, target_end)
+    fetch_range = None if force_refresh else get_supplement_fetch_range(
+        stored_scores,
+        end_ymd=api_end_ymd,
+        start_ymd=fetch_start_ymd,
+    )
+    if fetch_range is None and stored_scores and meta is not None and not should_update_meta:
+        merged_scores = merge_wxindex_score_series(stored_scores)
+        window_scores = filter_scores_in_ymd_window(
+            merged_scores,
+            start_ymd=fetch_start_ymd,
+            end_ymd=fetch_end_ymd,
+        )
+        return window_scores, "cached"
+
+    if dry_run:
+        return [], "dry_run"
+
+    had_data = bool(stored_scores)
+    start_ymd, fetch_end_ymd_api = fetch_range or (fetch_start_ymd, api_end_ymd)
+    api_scores: list[dict[str, Any]] = []
+    if fetch_range is not None or not stored_scores or force_refresh:
+        api_scores = fetch_wxindex_scores(
+            api_client,
+            api_url,
+            keyword=word,
+            start_ymd=start_ymd,
+            end_ymd=fetch_end_ymd_api,
+        )
+
+    merged_scores = merge_wxindex_score_series(stored_scores, api_scores)
+    window_scores = filter_scores_in_ymd_window(
+        merged_scores,
+        start_ymd=fetch_start_ymd,
+        end_ymd=fetch_end_ymd,
+    )
+
     if should_register_meta or should_update_meta:
+        if not word_has_high_score_in_window(
+            window_scores,
+            start_ymd=fetch_start_ymd,
+            end_ymd=fetch_end_ymd,
+        ):
+            return stored_scores, "below_threshold"
         meta, register_reason = try_register_wxindex_word_meta(
             repository,
             word=word,
@@ -767,57 +1097,38 @@ def ensure_word_full_scores(
             if register_reason == "expired":
                 return stored_scores, "expired"
             return stored_scores, "legacy"
-        if dry_run and register_reason == "dry_run":
-            return [], "dry_run"
+    elif meta is None:
+        return stored_scores, "below_threshold"
 
     if meta is None:
         return stored_scores, "legacy"
 
-    if not include_expired and not is_word_update_active(meta["event_created_at"]):
-        return stored_scores, "expired"
-
-    word_start_ymd = str(meta.get("fetch_start_ymd") or fetch_start_ymd)
-    fetch_range = None if force_refresh else get_supplement_fetch_range(
-        stored_scores,
-        end_ymd=target_end,
-        start_ymd=word_start_ymd,
-    )
-    if fetch_range is None and stored_scores:
-        return stored_scores, "cached"
-
-    if dry_run:
-        return [], "dry_run"
-
-    had_data = bool(stored_scores)
-    start_ymd, fetch_end_ymd = fetch_range or (word_start_ymd, target_end)
-    api_scores = fetch_wxindex_scores(
-        api_client,
-        api_url,
-        keyword=word,
-        start_ymd=start_ymd,
-        end_ymd=fetch_end_ymd,
-    )
-    if not api_scores:
+    if not api_scores and not window_scores:
         return stored_scores, "api_empty"
 
-    if not had_data and not word_meets_max_score_threshold(
-        api_scores,
-        min_max_score=WXINDEX_WORDS_MIN_MAX_SCORE,
+    if not had_data and not word_has_high_score_in_window(
+        window_scores,
+        start_ymd=fetch_start_ymd,
+        end_ymd=fetch_end_ymd,
     ):
         return [], "below_threshold"
 
     inserted, _skipped = repository.save_wxindex_daily_scores(
         name=word,
-        scores=api_scores,
+        scores=window_scores,
+    )
+    final_scores = filter_scores_in_ymd_window(
+        repository.list_wxindex_word_scores(word),
+        start_ymd=fetch_start_ymd,
+        end_ymd=fetch_end_ymd,
     )
-    final_scores = repository.list_wxindex_word_scores(word)
     if inserted > 0:
         action = "updated" if had_data else "inserted"
     elif final_scores:
         action = "cached"
     else:
         action = "api_empty"
-    return final_scores or api_scores, action
+    return final_scores or window_scores, action
 
 
 def sync_words_from_trend_json(