Forráskód Böngészése

微信指数更新

xueyiming 1 hete
szülő
commit
91d220403c

+ 2 - 2
app/core/config.py

@@ -155,8 +155,8 @@ class Settings:
     wxindex_llm_max_tokens: int = 4000
     wxindex_api_url: str = "http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex"
     wxindex_lookback_days: int = 7
-    wxindex_words_cron_hour: int = 7
-    wxindex_words_cron_minute: int = 30
+    wxindex_words_cron_hour: int = 10
+    wxindex_words_cron_minute: int = 0
     demand_event_sense_threshold: float = 6.0
     demand_senior_fit_threshold: float = 6.0
     demand_quality_llm_model: str = "anthropic/claude-haiku-4-5"

+ 7 - 0
app/hot_content/postprocess_service.py

@@ -238,6 +238,7 @@ class ContributionPostprocessService:
                 self.sync_wxindex_words(
                     record_id=record_id,
                     trend_result=trend_result,
+                    event_created_at=record.get("created_at"),
                 )
                 event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
                     record=record,
@@ -310,6 +311,7 @@ class ContributionPostprocessService:
         *,
         record_id: int,
         trend_result: dict[str, Any],
+        event_created_at: datetime | None = None,
         verbose: bool = False,
     ) -> dict[str, int]:
         return sync_words_from_trend_json(
@@ -318,7 +320,9 @@ class ContributionPostprocessService:
             self.config.wxindex_api_url,
             trend_json=trend_result,
             record_id=record_id,
+            event_created_at=event_created_at,
             verbose=verbose,
+            update_meta_if_exists=True,
         )
 
     def _save_empty_demand_quality(self, *, record_id: int) -> None:
@@ -645,12 +649,15 @@ class ContributionPostprocessService:
         threshold = float(self.config.wxindex_score_threshold)
 
         wxindex_searches: list[dict[str, Any]] = []
+        event_created_at = record.get("created_at")
         for keyword in selected_words:
             full_scores, _action = ensure_word_full_scores(
                 self.repository,
                 self.api_client,
                 self.config.wxindex_api_url,
                 keyword=keyword,
+                event_created_at=event_created_at,
+                update_meta_if_exists=True,
             )
             series, start_ymd, end_ymd = slice_scores_lookback(
                 full_scores,

+ 436 - 13
app/hot_content/repository.py

@@ -4,7 +4,7 @@ from __future__ import annotations
 
 import hashlib
 import json
-from datetime import datetime, timedelta
+from datetime import date, datetime, timedelta
 from typing import Any
 
 try:
@@ -471,6 +471,7 @@ class HotContentRepository:
                 unique_key,
                 source,
                 title,
+                created_at,
                 article_title,
                 article_body,
                 demand_cache_run_id,
@@ -500,6 +501,7 @@ class HotContentRepository:
                 "unique_key": str(row["unique_key"]),
                 "source": str(row.get("source") or ""),
                 "title": str(row.get("title") or ""),
+                "created_at": row.get("created_at"),
                 "article_title": row.get("article_title"),
                 "article_body": row.get("article_body"),
                 "demand_cache_run_id": row.get("demand_cache_run_id"),
@@ -1112,39 +1114,298 @@ class HotContentRepository:
         self,
         *,
         end_ymd: str,
-        start_ymd: str = "20260601",
+        update_window_days: int = 7,
+        today: date | None = None,
     ) -> list[dict[str, Any]]:
-        """返回已存在但缺最新日期,或未从 start_ymd 补齐的词。"""
+        """返回更新窗口内、仍缺近 7 日区间数据的词。"""
         target_end = str(end_ymd or "").strip()
-        target_start = str(start_ymd or "").strip()
-        if not target_end or not target_start:
+        if not target_end:
             return []
+        current = today or datetime.now(SHANGHAI_TZ).date()
+        active_since = current - timedelta(days=max(update_window_days, 0))
+        self._ensure_wxindex_word_meta_table()
         self._ensure_wxindex_words_table()
         sql = """
-            SELECT name, MIN(dt) AS earliest_dt, MAX(dt) AS latest_dt
-            FROM hot_content_wxindex_words
-            GROUP BY name
-            HAVING MAX(dt) < %s OR MIN(dt) > %s
-            ORDER BY name ASC
+            SELECT
+                m.name,
+                m.event_created_at,
+                m.fetch_start_ymd,
+                MIN(w.dt) AS earliest_dt,
+                MAX(w.dt) AS latest_dt
+            FROM hot_content_wxindex_word_meta m
+            INNER JOIN hot_content_wxindex_words w ON w.name = m.name
+            WHERE DATE(m.event_created_at) >= %s
+            GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
+            HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
+            ORDER BY m.name ASC
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (target_end, target_start))
+            cursor.execute(sql, (active_since, target_end))
             rows = cursor.fetchall()
         stale_words: list[dict[str, Any]] = []
         for row in rows:
             name = str(row.get("name") or "").strip()
+            fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
             earliest_dt = str(row.get("earliest_dt") or "").strip()
             latest_dt = str(row.get("latest_dt") or "").strip()
-            if name and earliest_dt and latest_dt:
+            event_created_at = row.get("event_created_at")
+            if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
                 stale_words.append(
                     {
                         "name": name,
+                        "event_created_at": event_created_at,
+                        "fetch_start_ymd": fetch_start_ymd,
                         "earliest_dt": earliest_dt,
                         "latest_dt": latest_dt,
                     }
                 )
         return stale_words
 
+    def list_word_earliest_event_times(
+        self,
+        *,
+        since_dt: datetime,
+    ) -> dict[str, datetime]:
+        """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
+        self._ensure_record_quality_columns()
+        sql = """
+            SELECT
+                word_name,
+                MIN(event_created_at) AS event_created_at
+            FROM (
+                SELECT
+                    TRIM(searches.keyword) AS word_name,
+                    r.created_at AS event_created_at
+                FROM hot_content_records r
+                JOIN JSON_TABLE(
+                    r.wxindex_trend_json,
+                    '$.wxindex_searches[*]' COLUMNS (
+                        keyword VARCHAR(256) PATH '$.keyword'
+                    )
+                ) AS searches
+                WHERE r.created_at >= %s
+                  AND r.wxindex_trend_json IS NOT NULL
+                  AND TRIM(searches.keyword) <> ''
+
+                UNION ALL
+
+                SELECT
+                    TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
+                    r.created_at AS event_created_at
+                FROM hot_content_records r
+                WHERE r.created_at >= %s
+                  AND r.wxindex_trend_json IS NOT NULL
+                  AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
+            ) AS word_events
+            WHERE word_name IS NOT NULL
+              AND word_name <> ''
+            GROUP BY word_name
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (since_dt, since_dt))
+            rows = cursor.fetchall()
+
+        event_map: dict[str, datetime] = {}
+        for row in rows:
+            name = str(row.get("word_name") or "").strip()
+            event_created_at = row.get("event_created_at")
+            if name and isinstance(event_created_at, datetime):
+                event_map[name] = event_created_at
+        return event_map
+
+    def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT
+                w.name,
+                MIN(w.dt) AS earliest_dt,
+                MIN(w.created_at) AS first_created_at
+            FROM hot_content_wxindex_words w
+            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE m.name IS NULL
+            GROUP BY w.name
+            ORDER BY w.name ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            rows = cursor.fetchall()
+        bounds: list[dict[str, Any]] = []
+        for row in rows:
+            name = str(row.get("name") or "").strip()
+            earliest_dt = str(row.get("earliest_dt") or "").strip()
+            first_created_at = row.get("first_created_at")
+            if name and earliest_dt:
+                bounds.append(
+                    {
+                        "name": name,
+                        "earliest_dt": earliest_dt,
+                        "first_created_at": first_created_at,
+                    }
+                )
+        return bounds
+
+    def list_wxindex_word_names_without_meta(self) -> list[str]:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT DISTINCT w.name
+            FROM hot_content_wxindex_words w
+            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE m.name IS NULL
+            ORDER BY w.name ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            rows = cursor.fetchall()
+        return [
+            str(row.get("name") or "").strip()
+            for row in rows
+            if str(row.get("name") or "").strip()
+        ]
+
+    def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
+        word = str(name or "").strip()
+        if not word:
+            return None
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT MIN(created_at) AS first_created_at
+            FROM hot_content_wxindex_words
+            WHERE name = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word,))
+            row = cursor.fetchone() or {}
+        first_created_at = row.get("first_created_at")
+        return first_created_at if isinstance(first_created_at, datetime) else None
+
+    def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
+        self._ensure_wxindex_word_meta_table()
+        sql = """
+            SELECT name, event_created_at, fetch_start_ymd
+            FROM hot_content_wxindex_word_meta
+            ORDER BY name ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            rows = cursor.fetchall()
+        result: list[dict[str, Any]] = []
+        for row in rows:
+            name = str(row.get("name") or "").strip()
+            fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
+            event_created_at = row.get("event_created_at")
+            if name and fetch_start_ymd and event_created_at is not None:
+                result.append(
+                    {
+                        "name": name,
+                        "event_created_at": event_created_at,
+                        "fetch_start_ymd": fetch_start_ymd,
+                    }
+                )
+        return result
+
+    def update_wxindex_word_meta_fetch_start(
+        self,
+        *,
+        name: str,
+        fetch_start_ymd: str,
+    ) -> None:
+        word = str(name or "").strip()
+        target_start = str(fetch_start_ymd or "").strip()
+        if not word or not target_start:
+            raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
+        self._ensure_wxindex_word_meta_table()
+        sql = """
+            UPDATE hot_content_wxindex_word_meta
+            SET fetch_start_ymd = %s
+            WHERE name = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (target_start, word))
+
+    def update_wxindex_word_meta(
+        self,
+        *,
+        name: str,
+        event_created_at: datetime,
+        fetch_start_ymd: str,
+    ) -> None:
+        word = str(name or "").strip()
+        target_start = str(fetch_start_ymd or "").strip()
+        if not word or not target_start:
+            raise HotContentFlowError("invalid wxindex word meta payload")
+        self._ensure_wxindex_word_meta_table()
+        event_at = event_created_at
+        if event_at.tzinfo is not None:
+            event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
+        sql = """
+            UPDATE hot_content_wxindex_word_meta
+            SET event_created_at = %s,
+                fetch_start_ymd = %s
+            WHERE name = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (event_at, target_start, word))
+
+    def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
+        word = str(name or "").strip()
+        if not word:
+            return None
+        self._ensure_wxindex_word_meta_table()
+        sql = """
+            SELECT name, event_created_at, fetch_start_ymd
+            FROM hot_content_wxindex_word_meta
+            WHERE name = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word,))
+            row = cursor.fetchone()
+        if not row:
+            return None
+        fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
+        event_created_at = row.get("event_created_at")
+        if not fetch_start_ymd or event_created_at is None:
+            return None
+        return {
+            "name": str(row.get("name") or "").strip(),
+            "event_created_at": event_created_at,
+            "fetch_start_ymd": fetch_start_ymd,
+        }
+
+    def ensure_wxindex_word_meta(
+        self,
+        *,
+        name: str,
+        event_created_at: datetime,
+        fetch_start_ymd: str,
+    ) -> dict[str, Any]:
+        word = str(name or "").strip()
+        target_start = str(fetch_start_ymd or "").strip()
+        if not word or not target_start:
+            raise HotContentFlowError("invalid wxindex word meta payload")
+        self._ensure_wxindex_word_meta_table()
+        event_at = event_created_at
+        if event_at.tzinfo is not None:
+            event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
+        sql = """
+            INSERT INTO hot_content_wxindex_word_meta (
+                name,
+                event_created_at,
+                fetch_start_ymd
+            )
+            VALUES (%s, %s, %s)
+            ON DUPLICATE KEY UPDATE
+                event_created_at = VALUES(event_created_at),
+                fetch_start_ymd = VALUES(fetch_start_ymd)
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word, event_at, target_start))
+        meta = self.get_wxindex_word_meta(word)
+        if meta is None:
+            raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
+        return meta
+
     def list_low_max_wxindex_words(
         self,
         *,
@@ -1185,6 +1446,122 @@ class HotContentRepository:
             )
         return low_words
 
+    def count_wxindex_words_outside_event_window(
+        self,
+        *,
+        window_days: int = 7,
+    ) -> int:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT COUNT(*) AS row_count
+            FROM hot_content_wxindex_words w
+            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE w.dt < DATE_FORMAT(
+                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+               OR w.dt > DATE_FORMAT(
+                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (window_days, window_days))
+            row = cursor.fetchone() or {}
+        return int(row.get("row_count") or 0)
+
+    def list_wxindex_words_outside_event_window_samples(
+        self,
+        *,
+        window_days: int = 7,
+        limit: int = 20,
+    ) -> list[dict[str, Any]]:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT
+                w.name,
+                w.dt,
+                m.event_created_at,
+                DATE_FORMAT(
+                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                ) AS start_ymd,
+                DATE_FORMAT(
+                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                ) AS end_ymd
+            FROM hot_content_wxindex_words w
+            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE w.dt < DATE_FORMAT(
+                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+               OR w.dt > DATE_FORMAT(
+                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+            ORDER BY w.name ASC, w.dt ASC
+            LIMIT %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (window_days, window_days, window_days, window_days, limit))
+            rows = cursor.fetchall()
+        samples: list[dict[str, Any]] = []
+        for row in rows:
+            name = str(row.get("name") or "").strip()
+            dt = str(row.get("dt") or "").strip()
+            if name and dt:
+                samples.append(
+                    {
+                        "name": name,
+                        "dt": dt,
+                        "event_created_at": row.get("event_created_at"),
+                        "start_ymd": str(row.get("start_ymd") or "").strip(),
+                        "end_ymd": str(row.get("end_ymd") or "").strip(),
+                    }
+                )
+        return samples
+
+    def delete_wxindex_words_outside_event_window(
+        self,
+        *,
+        window_days: int = 7,
+    ) -> int:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            DELETE w
+            FROM hot_content_wxindex_words w
+            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE w.dt < DATE_FORMAT(
+                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+               OR w.dt > DATE_FORMAT(
+                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
+                    '%%Y%%m%%d'
+                )
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (window_days, window_days))
+            return int(cursor.rowcount or 0)
+
+    def count_wxindex_words_without_meta(self) -> int:
+        self._ensure_wxindex_word_meta_table()
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT COUNT(*) AS row_count
+            FROM hot_content_wxindex_words w
+            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
+            WHERE m.name IS NULL
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            row = cursor.fetchone() or {}
+        return int(row.get("row_count") or 0)
+
     def delete_wxindex_words_by_names(self, names: list[str]) -> int:
         cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
         if not cleaned:
@@ -1199,6 +1576,9 @@ class HotContentRepository:
             cursor.execute(sql, tuple(cleaned))
             return int(cursor.rowcount or 0)
 
+    def has_wxindex_word(self, name: str) -> bool:
+        return self.get_wxindex_word_latest_dt(name) is not None
+
     def get_wxindex_word_latest_dt(self, name: str) -> str | None:
         word = str(name or "").strip()
         if not word:
@@ -1262,13 +1642,41 @@ class HotContentRepository:
         skipped = len(rows) - inserted
         return inserted, skipped
 
+    def list_records_with_wxindex_trend_after(
+        self,
+        *,
+        after_created_at: datetime,
+    ) -> list[dict[str, Any]]:
+        sql = """
+            SELECT id, created_at, wxindex_trend_json
+            FROM hot_content_records
+            WHERE created_at > %s
+              AND wxindex_trend_json IS NOT NULL
+              AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
+            ORDER BY id ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (after_created_at,))
+            rows = cursor.fetchall()
+
+        records: list[dict[str, Any]] = []
+        for row in rows:
+            records.append(
+                {
+                    "id": int(row["id"]),
+                    "created_at": row.get("created_at"),
+                    "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
+                }
+            )
+        return records
+
     def list_records_with_wxindex_trend(
         self,
         *,
         since_dt: datetime,
     ) -> list[dict[str, Any]]:
         sql = """
-            SELECT id, wxindex_trend_json
+            SELECT id, created_at, wxindex_trend_json
             FROM hot_content_records
             WHERE created_at >= %s
               AND wxindex_trend_json IS NOT NULL
@@ -1284,11 +1692,26 @@ class HotContentRepository:
             records.append(
                 {
                     "id": int(row["id"]),
+                    "created_at": row.get("created_at"),
                     "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
                 }
             )
         return records
 
+    def _ensure_wxindex_word_meta_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
+                name VARCHAR(256) NOT NULL COMMENT '词',
+                event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
+                fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
+                meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
+                PRIMARY KEY (name),
+                KEY idx_event_created_at (event_created_at)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+
     def _ensure_wxindex_words_table(self) -> None:
         sql = """
             CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (

+ 539 - 11
app/hot_content/wxindex_words.py

@@ -10,11 +10,65 @@ from app.hot_content.demand_export import get_wxindex_keywords
 from app.hot_content.repository import HotContentRepository
 from app.hot_content.timezone import SHANGHAI_TZ
 
-WXINDEX_WORDS_START_YMD = "20260601"
+WXINDEX_WORDS_LOOKBACK_DAYS = 7
+WXINDEX_WORDS_UPDATE_WINDOW_DAYS = 7
 WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11)
 WXINDEX_WORDS_MIN_MAX_SCORE = 100_000.0
 
 
+def get_fetch_start_ymd_from_event(
+    event_created_at: datetime,
+    *,
+    lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
+) -> str:
+    """数据窗口左边界:事件创建日往前 N 天(yyyymmdd)。"""
+    event_date = normalize_event_created_at(event_created_at).date()
+    start_date = event_date - timedelta(days=lookback_days)
+    return start_date.strftime("%Y%m%d")
+
+
+def get_word_data_window_ymd_bounds(
+    event_created_at: datetime,
+    *,
+    window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
+) -> tuple[str, str]:
+    """事件创建日前后 N 天的数据窗口 [start_ymd, end_ymd]。"""
+    event_date = normalize_event_created_at(event_created_at).date()
+    start_date = event_date - timedelta(days=window_days)
+    end_date = event_date + timedelta(days=window_days)
+    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
+
+
+def get_wxindex_fetch_start_ymd(
+    *,
+    today: date | None = None,
+    lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
+) -> str:
+    """首次拉取起始日期:近 N 日(截至昨日)。"""
+    start_ymd, _end_ymd = get_lookback_range(lookback_days, today=today)
+    return start_ymd
+
+
+def normalize_event_created_at(value: datetime | None) -> datetime:
+    if value is None:
+        return datetime.now(SHANGHAI_TZ)
+    if value.tzinfo is None:
+        return value.replace(tzinfo=SHANGHAI_TZ)
+    return value.astimezone(SHANGHAI_TZ)
+
+
+def is_word_update_active(
+    event_created_at: datetime,
+    *,
+    today: date | None = None,
+    window_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
+) -> bool:
+    """事件创建后 window_days 天内继续更新,超出则停止。"""
+    current = today or datetime.now(SHANGHAI_TZ).date()
+    event_date = normalize_event_created_at(event_created_at).date()
+    return (current - event_date).days <= window_days
+
+
 def get_wxindex_end_ymd(*, today: date | None = None) -> str:
     current = today or datetime.now(SHANGHAI_TZ).date()
     return (current - timedelta(days=1)).strftime("%Y%m%d")
@@ -74,7 +128,7 @@ def fetch_wxindex_scores(
     api_url: str,
     *,
     keyword: str,
-    start_ymd: str = WXINDEX_WORDS_START_YMD,
+    start_ymd: str,
     end_ymd: str | None = None,
 ) -> list[dict[str, Any]]:
     payload = {
@@ -130,7 +184,7 @@ def word_scores_need_supplement(
     scores: list[dict[str, Any]],
     *,
     end_ymd: str | None = None,
-    start_ymd: str = WXINDEX_WORDS_START_YMD,
+    start_ymd: str,
 ) -> tuple[bool, str]:
     """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。"""
     if not scores:
@@ -150,7 +204,7 @@ def get_supplement_fetch_range(
     scores: list[dict[str, Any]],
     *,
     end_ymd: str | None = None,
-    start_ymd: str = WXINDEX_WORDS_START_YMD,
+    start_ymd: str,
 ) -> tuple[str, str] | None:
     """计算补数 API 查询区间;无需补数时返回 None。"""
     need_supplement, reason = word_scores_need_supplement(
@@ -207,7 +261,7 @@ def refresh_stale_wxindex_words(
     dry_run: bool = False,
     verbose: bool = False,
 ) -> dict[str, int]:
-    """补全已存在但缺少最新日期数据的词。"""
+    """补全已存在但缺少最新日期数据的词(仅含 meta 的新词)。"""
     target_end = end_ymd or get_wxindex_end_ymd()
     summary = {
         "target_end_ymd": target_end,
@@ -222,7 +276,7 @@ def refresh_stale_wxindex_words(
 
     stale_words = repository.list_stale_wxindex_words(
         end_ymd=target_end,
-        start_ymd=WXINDEX_WORDS_START_YMD,
+        update_window_days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
     )
     summary["stale_words"] = len(stale_words)
     if not stale_words:
@@ -233,10 +287,15 @@ def refresh_stale_wxindex_words(
         if not name:
             continue
 
+        word_start_ymd = str(item.get("fetch_start_ymd") or "").strip()
+        if not word_start_ymd:
+            word_start_ymd = get_wxindex_fetch_start_ymd()
+
         stored_scores = repository.list_wxindex_word_scores(name)
         fetch_range = get_supplement_fetch_range(
             stored_scores,
             end_ymd=target_end,
+            start_ymd=word_start_ymd,
         )
         if fetch_range is None:
             summary["no_new_range"] += 1
@@ -342,6 +401,285 @@ def cleanup_low_max_wxindex_words(
     return summary
 
 
+def try_register_wxindex_word_meta(
+    repository: HotContentRepository,
+    *,
+    word: str,
+    event_created_at: datetime | None = None,
+    event_map: dict[str, datetime] | None = None,
+    first_row_created_at: datetime | None = None,
+    include_expired: bool = False,
+    dry_run: bool = False,
+    update_if_exists: bool = False,
+) -> tuple[dict[str, Any] | None, str]:
+    """补写 meta;返回 (meta, reason)。"""
+    name = str(word or "").strip()
+    if not name:
+        return None, "empty"
+
+    existing = repository.get_wxindex_word_meta(name)
+
+    resolved_event_at = event_created_at
+    if resolved_event_at is None and event_map is not None:
+        resolved_event_at = event_map.get(name)
+    if resolved_event_at is None:
+        resolved_event_at = first_row_created_at
+    if resolved_event_at is None:
+        resolved_event_at = repository.get_wxindex_word_first_row_created_at(name)
+    if resolved_event_at is None:
+        if existing:
+            return existing, "exists"
+        return None, "no_event"
+
+    normalized_event_at = normalize_event_created_at(resolved_event_at)
+    if not include_expired and not is_word_update_active(normalized_event_at):
+        if existing and not update_if_exists:
+            return existing, "exists"
+        if not existing:
+            return None, "expired"
+
+    fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at)
+    if dry_run:
+        return {
+            "name": name,
+            "event_created_at": normalized_event_at,
+            "fetch_start_ymd": fetch_start_ymd,
+        }, "dry_run"
+
+    if existing and update_if_exists:
+        repository.update_wxindex_word_meta(
+            name=name,
+            event_created_at=normalized_event_at,
+            fetch_start_ymd=fetch_start_ymd,
+        )
+        meta = repository.get_wxindex_word_meta(name)
+        if meta is None:
+            raise HotContentFlowError(f"failed to update wxindex word meta: {name}")
+        return meta, "updated"
+
+    if existing:
+        return existing, "exists"
+
+    meta = repository.ensure_wxindex_word_meta(
+        name=name,
+        event_created_at=normalized_event_at,
+        fetch_start_ymd=fetch_start_ymd,
+    )
+    return meta, "registered"
+
+
+def fix_wxindex_word_meta_fetch_start_ymd(
+    repository: HotContentRepository,
+    *,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
+    rows = repository.list_all_wxindex_word_meta()
+    summary = {
+        "total": len(rows),
+        "updated": 0,
+        "unchanged": 0,
+    }
+    for row in rows:
+        name = str(row.get("name") or "").strip()
+        event_created_at = row.get("event_created_at")
+        old_fetch_start = str(row.get("fetch_start_ymd") or "").strip()
+        if not name or event_created_at is None:
+            continue
+        new_fetch_start = get_fetch_start_ymd_from_event(event_created_at)
+        if new_fetch_start == old_fetch_start:
+            summary["unchanged"] += 1
+            continue
+        if dry_run:
+            summary["updated"] += 1
+            if verbose:
+                print(
+                    f"[dry-run] word={name} "
+                    f"event_created_at={event_created_at} "
+                    f"{old_fetch_start} -> {new_fetch_start}"
+                )
+            continue
+        repository.update_wxindex_word_meta_fetch_start(
+            name=name,
+            fetch_start_ymd=new_fetch_start,
+        )
+        summary["updated"] += 1
+        if verbose:
+            print(
+                f"updated word={name} "
+                f"event_created_at={event_created_at} "
+                f"{old_fetch_start} -> {new_fetch_start}"
+            )
+    return summary
+
+
+def cleanup_wxindex_words_outside_event_window(
+    repository: HotContentRepository,
+    *,
+    window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int]:
+    """删除 hot_content_wxindex_words 中超出 event_created_at 前后 window_days 的数据。"""
+    to_delete = repository.count_wxindex_words_outside_event_window(
+        window_days=window_days,
+    )
+    summary = {
+        "window_days": window_days,
+        "rows_to_delete": to_delete,
+        "deleted_rows": 0,
+        "words_without_meta_rows": repository.count_wxindex_words_without_meta(),
+    }
+    if to_delete <= 0:
+        return summary
+
+    if dry_run:
+        if verbose:
+            samples = repository.list_wxindex_words_outside_event_window_samples(
+                window_days=window_days,
+                limit=20,
+            )
+            for item in samples:
+                print(
+                    f"[dry-run] word={item['name']} dt={item['dt']} "
+                    f"window={item['start_ymd']}~{item['end_ymd']} "
+                    f"event_created_at={item['event_created_at']}"
+                )
+        return summary
+
+    summary["deleted_rows"] = repository.delete_wxindex_words_outside_event_window(
+        window_days=window_days,
+    )
+    if verbose:
+        print(f"deleted_rows={summary['deleted_rows']}")
+    return summary
+
+
+def build_word_earliest_event_map(
+    repository: HotContentRepository,
+    *,
+    since_dt: datetime,
+) -> dict[str, datetime]:
+    """从热点记录中汇总每个检索词对应的最早事件创建时间。"""
+    return repository.list_word_earliest_event_times(since_dt=since_dt)
+
+
+def backfill_wxindex_word_meta(
+    repository: HotContentRepository,
+    *,
+    since_date: date = WXINDEX_WORDS_RECORD_SINCE,
+    include_expired: bool = True,
+    fix_fetch_start: bool = True,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, Any]:
+    """为 hot_content_wxindex_words 中缺 meta 的词补登记,并修正 fetch_start_ymd。"""
+    since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
+    event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
+    candidates = repository.list_wxindex_word_bounds_without_meta()
+    register_summary: dict[str, int | str | bool] = {
+        "since_date": since_date.isoformat(),
+        "include_expired": include_expired,
+        "candidates": len(candidates),
+        "registered": 0,
+        "skipped_expired": 0,
+        "skipped_no_event": 0,
+    }
+
+    for item in candidates:
+        name = str(item.get("name") or "").strip()
+        if not name:
+            continue
+        meta, reason = try_register_wxindex_word_meta(
+            repository,
+            word=name,
+            event_map=event_map,
+            first_row_created_at=item.get("first_created_at"),
+            include_expired=include_expired,
+            dry_run=dry_run,
+        )
+        if reason in {"registered", "dry_run"}:
+            register_summary["registered"] += 1
+            if verbose:
+                label = "[dry-run] would register" if dry_run else "registered"
+                print(f"{label} meta word={name} event_at={meta['event_created_at']}")
+        elif reason == "expired":
+            register_summary["skipped_expired"] += 1
+            if verbose:
+                print(f"skip expired word={name}")
+        elif reason == "no_event":
+            register_summary["skipped_no_event"] += 1
+            if verbose:
+                print(f"skip no_event word={name}")
+
+    fetch_start_summary: dict[str, int] | None = None
+    if fix_fetch_start:
+        fetch_start_summary = fix_wxindex_word_meta_fetch_start_ymd(
+            repository,
+            dry_run=dry_run,
+            verbose=verbose,
+        )
+
+    return {
+        "register": register_summary,
+        "fetch_start_fix": fetch_start_summary,
+    }
+
+
+def backfill_active_wxindex_word_meta(
+    repository: HotContentRepository,
+    *,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, int | str]:
+    """为表中仍处 7 天窗口内、但缺少 meta 的词补登记。"""
+    current = datetime.now(SHANGHAI_TZ).date()
+    since_dt = datetime.combine(
+        current - timedelta(days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS),
+        datetime.min.time(),
+    ).replace(tzinfo=SHANGHAI_TZ)
+    event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
+    candidates = repository.list_wxindex_word_bounds_without_meta()
+    summary: dict[str, int | str] = {
+        "active_since": since_dt.date().isoformat(),
+        "candidates": len(candidates),
+        "registered": 0,
+        "skipped_expired": 0,
+        "skipped_no_event": 0,
+    }
+
+    for item in candidates:
+        name = str(item.get("name") or "").strip()
+        if not name:
+            continue
+        meta, reason = try_register_wxindex_word_meta(
+            repository,
+            word=name,
+            event_map=event_map,
+            first_row_created_at=item.get("first_created_at"),
+            dry_run=dry_run,
+        )
+        if reason == "registered":
+            summary["registered"] += 1
+            if verbose:
+                print(f"registered meta word={name} event_at={meta['event_created_at']}")
+        elif reason == "dry_run":
+            summary["registered"] += 1
+            if verbose:
+                print(f"[dry-run] would register meta word={name}")
+        elif reason == "expired":
+            summary["skipped_expired"] += 1
+            if verbose:
+                print(f"skip expired word={name}")
+        elif reason == "no_event":
+            summary["skipped_no_event"] += 1
+            if verbose:
+                print(f"skip no_event word={name}")
+
+    return summary
+
+
 def run_wxindex_words_daily_job(
     repository: HotContentRepository,
     api_client: JsonApiClient,
@@ -352,7 +690,12 @@ def run_wxindex_words_daily_job(
     dry_run: bool = False,
     verbose: bool = False,
 ) -> dict[str, Any]:
-    """定时任务:先补全缺失日期,再清理低最大值词。"""
+    """定时任务:补 meta、补全缺失日期、清理低最大值词。"""
+    meta_summary = backfill_active_wxindex_word_meta(
+        repository,
+        dry_run=dry_run,
+        verbose=verbose,
+    )
     refresh_summary = refresh_stale_wxindex_words(
         repository,
         api_client,
@@ -368,6 +711,7 @@ def run_wxindex_words_daily_job(
         verbose=verbose,
     )
     return {
+        "meta_backfill": meta_summary,
         "refresh": refresh_summary,
         "cleanup": cleanup_summary,
     }
@@ -380,23 +724,62 @@ def ensure_word_full_scores(
     *,
     keyword: str,
     end_ymd: str | None = None,
+    event_created_at: datetime | None = None,
+    include_expired: bool = False,
     force_refresh: bool = False,
     dry_run: bool = False,
+    update_meta_if_exists: bool = False,
 ) -> tuple[list[dict[str, Any]], str]:
     """
-    获取词的全量微信指数(20260601 起至昨日),按词+日期逐行入库。
+    获取词微信指数并入库。
+
+    - 表中已有数据但无 meta:若在 7 天窗口内,自动补 meta 并继续更新
+    - 超过 7 天窗口:不再更新
 
-    返回 (scores, action),action 为 inserted / updated / cached / dry_run。
+    返回 (scores, action)。
     """
     word = str(keyword or "").strip()
     if not word:
         return [], "empty"
 
     target_end = end_ymd or get_wxindex_end_ymd()
+    fetch_start_ymd = get_wxindex_fetch_start_ymd()
     stored_scores = repository.list_wxindex_word_scores(word)
+    meta = repository.get_wxindex_word_meta(word)
+
+    should_register_meta = meta is None
+    should_update_meta = (
+        update_meta_if_exists
+        and meta is not None
+        and event_created_at is not None
+    )
+    if should_register_meta or should_update_meta:
+        meta, register_reason = try_register_wxindex_word_meta(
+            repository,
+            word=word,
+            event_created_at=event_created_at,
+            include_expired=include_expired,
+            dry_run=dry_run,
+            update_if_exists=should_update_meta,
+        )
+        if meta is None:
+            if register_reason == "expired":
+                return stored_scores, "expired"
+            return stored_scores, "legacy"
+        if dry_run and register_reason == "dry_run":
+            return [], "dry_run"
+
+    if meta is None:
+        return stored_scores, "legacy"
+
+    if not include_expired and not is_word_update_active(meta["event_created_at"]):
+        return stored_scores, "expired"
+
+    word_start_ymd = str(meta.get("fetch_start_ymd") or fetch_start_ymd)
     fetch_range = None if force_refresh else get_supplement_fetch_range(
         stored_scores,
         end_ymd=target_end,
+        start_ymd=word_start_ymd,
     )
     if fetch_range is None and stored_scores:
         return stored_scores, "cached"
@@ -405,7 +788,7 @@ def ensure_word_full_scores(
         return [], "dry_run"
 
     had_data = bool(stored_scores)
-    start_ymd, fetch_end_ymd = fetch_range or (WXINDEX_WORDS_START_YMD, target_end)
+    start_ymd, fetch_end_ymd = fetch_range or (word_start_ymd, target_end)
     api_scores = fetch_wxindex_scores(
         api_client,
         api_url,
@@ -443,15 +826,19 @@ def sync_words_from_trend_json(
     *,
     trend_json: dict[str, Any],
     record_id: int,
+    event_created_at: datetime | None = None,
     dry_run: bool = False,
     verbose: bool = False,
+    update_meta_if_exists: bool = False,
 ) -> dict[str, int]:
-    """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(全量数据)。"""
+    """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(近 7 日数据)。"""
     summary = {
         "words_found": 0,
         "inserted": 0,
         "updated": 0,
         "cached": 0,
+        "legacy": 0,
+        "expired": 0,
         "api_empty": 0,
         "below_threshold": 0,
         "fetch_failed": 0,
@@ -468,7 +855,9 @@ def sync_words_from_trend_json(
                 api_client,
                 api_url,
                 keyword=name,
+                event_created_at=event_created_at,
                 dry_run=dry_run,
+                update_meta_if_exists=update_meta_if_exists,
             )
         except Exception as exc:
             summary["fetch_failed"] += 1
@@ -482,6 +871,10 @@ def sync_words_from_trend_json(
             summary["updated"] += 1
         elif action == "cached":
             summary["cached"] += 1
+        elif action == "legacy":
+            summary["legacy"] += 1
+        elif action == "expired":
+            summary["expired"] += 1
         elif action == "api_empty":
             summary["api_empty"] += 1
         elif action == "below_threshold":
@@ -512,6 +905,8 @@ def backfill_wxindex_words(
         "inserted": 0,
         "updated": 0,
         "cached": 0,
+        "legacy": 0,
+        "expired": 0,
         "api_empty": 0,
         "below_threshold": 0,
         "fetch_failed": 0,
@@ -547,6 +942,7 @@ def backfill_wxindex_words(
             api_url,
             trend_json=trend_json,
             record_id=record_id,
+            event_created_at=row.get("created_at"),
             dry_run=dry_run,
             verbose=verbose,
         )
@@ -555,6 +951,8 @@ def backfill_wxindex_words(
             "inserted",
             "updated",
             "cached",
+            "legacy",
+            "expired",
             "api_empty",
             "below_threshold",
             "fetch_failed",
@@ -562,3 +960,133 @@ def backfill_wxindex_words(
             summary[key] += result[key]
 
     return summary
+
+
+def build_word_event_map_from_records(
+    records: list[dict[str, Any]],
+) -> dict[str, datetime]:
+    word_events: dict[str, datetime] = {}
+    for row in records:
+        created_at = row.get("created_at")
+        if not isinstance(created_at, datetime):
+            continue
+        for word in extract_searched_words(row.get("wxindex_trend_json")):
+            existing = word_events.get(word)
+            if existing is None or created_at < existing:
+                word_events[word] = created_at
+    return word_events
+
+
+def audit_wxindex_words_from_records(
+    repository: HotContentRepository,
+    *,
+    after_created_at: datetime,
+) -> dict[str, Any]:
+    """检查指定时间后的热点记录,其微信指数词是否已在汇总表和 meta 表。"""
+    records = repository.list_records_with_wxindex_trend_after(
+        after_created_at=after_created_at,
+    )
+    word_events = build_word_event_map_from_records(records)
+    missing_words: list[str] = []
+    missing_meta: list[str] = []
+    for word in sorted(word_events):
+        if not repository.has_wxindex_word(word):
+            missing_words.append(word)
+        elif repository.get_wxindex_word_meta(word) is None:
+            missing_meta.append(word)
+    return {
+        "after_created_at": after_created_at.isoformat(sep=" ", timespec="seconds"),
+        "records_scanned": len(records),
+        "words_found": len(word_events),
+        "missing_words_count": len(missing_words),
+        "missing_meta_count": len(missing_meta),
+        "missing_words": missing_words,
+        "missing_meta": missing_meta,
+    }
+
+
+def supplement_wxindex_words_from_records(
+    repository: HotContentRepository,
+    api_client: JsonApiClient,
+    api_url: str,
+    *,
+    after_created_at: datetime,
+    dry_run: bool = False,
+    verbose: bool = False,
+) -> dict[str, Any]:
+    """补全指定时间后热点记录涉及、但缺失的 wxindex 词与 meta。"""
+    audit = audit_wxindex_words_from_records(
+        repository,
+        after_created_at=after_created_at,
+    )
+    records = repository.list_records_with_wxindex_trend_after(
+        after_created_at=after_created_at,
+    )
+    word_events = build_word_event_map_from_records(records)
+    summary: dict[str, Any] = {
+        "audit_before": {
+            "records_scanned": audit["records_scanned"],
+            "words_found": audit["words_found"],
+            "missing_words_count": audit["missing_words_count"],
+            "missing_meta_count": audit["missing_meta_count"],
+        },
+        "supplemented_words": 0,
+        "inserted": 0,
+        "updated": 0,
+        "cached": 0,
+        "meta_registered": 0,
+        "api_empty": 0,
+        "below_threshold": 0,
+        "fetch_failed": 0,
+    }
+
+    for word, event_at in sorted(word_events.items()):
+        had_meta = repository.get_wxindex_word_meta(word) is not None
+        try:
+            _, action = ensure_word_full_scores(
+                repository,
+                api_client,
+                api_url,
+                keyword=word,
+                event_created_at=event_at,
+                include_expired=True,
+                dry_run=dry_run,
+            )
+        except Exception as exc:
+            summary["fetch_failed"] += 1
+            if verbose:
+                print(f"fetch failed word={word}: {exc}")
+            continue
+
+        summary["supplemented_words"] += 1
+        if action == "inserted":
+            summary["inserted"] += 1
+        elif action == "updated":
+            summary["updated"] += 1
+        elif action == "cached":
+            summary["cached"] += 1
+        elif action == "api_empty":
+            summary["api_empty"] += 1
+        elif action == "below_threshold":
+            summary["below_threshold"] += 1
+        elif action == "dry_run":
+            summary["inserted"] += 1
+
+        if not had_meta:
+            if dry_run or repository.get_wxindex_word_meta(word) is not None:
+                summary["meta_registered"] += 1
+
+        if verbose:
+            print(f"word={word} event_at={event_at} action={action}")
+
+    audit_after = audit_wxindex_words_from_records(
+        repository,
+        after_created_at=after_created_at,
+    )
+    summary["audit_after"] = {
+        "missing_words_count": audit_after["missing_words_count"],
+        "missing_meta_count": audit_after["missing_meta_count"],
+        "missing_words": audit_after["missing_words"],
+        "missing_meta": audit_after["missing_meta"],
+    }
+    return summary

+ 2 - 0
docker-compose.yml

@@ -26,6 +26,8 @@ services:
       HOT_FLOW_CRON_HOURS: ${HOT_FLOW_CRON_HOURS:-6,12,18}
       HOT_FLOW_CRON_MINUTE: ${HOT_FLOW_CRON_MINUTE:-0}
       DECODE_RESULT_FLOW_INTERVAL_SECONDS: ${DECODE_RESULT_FLOW_INTERVAL_SECONDS:-1800}
+      WXINDEX_WORDS_CRON_HOUR: ${WXINDEX_WORDS_CRON_HOUR:-10}
+      WXINDEX_WORDS_CRON_MINUTE: ${WXINDEX_WORDS_CRON_MINUTE:-0}
       # 业务阈值
       WXINDEX_SCORE_THRESHOLD: ${WXINDEX_SCORE_THRESHOLD:-1000000}
       DEMAND_POOL_SOURCE_TABLE: ${DEMAND_POOL_SOURCE_TABLE:-dwd_multi_demand_pool_di}