فهرست منبع

Revert "微信指数更新"

This reverts commit 91d220403c04d90bb23fd2e8d5a80d0f367b6881.
xueyiming 1 هفته پیش
والد
کامیت
fa6d2ebcb2
5فایلهای تغییر یافته به همراه26 افزوده شده و 986 حذف شده
  1. 2 2
      app/core/config.py
  2. 0 7
      app/hot_content/postprocess_service.py
  3. 13 436
      app/hot_content/repository.py
  4. 11 539
      app/hot_content/wxindex_words.py
  5. 0 2
      docker-compose.yml

+ 2 - 2
app/core/config.py

@@ -155,8 +155,8 @@ class Settings:
     wxindex_llm_max_tokens: int = 4000
     wxindex_api_url: str = "http://crawapi.piaoquantv.com/crawler/wei_xin/wxindex"
     wxindex_lookback_days: int = 7
-    wxindex_words_cron_hour: int = 10
-    wxindex_words_cron_minute: int = 0
+    wxindex_words_cron_hour: int = 7
+    wxindex_words_cron_minute: int = 30
     demand_event_sense_threshold: float = 6.0
     demand_senior_fit_threshold: float = 6.0
     demand_quality_llm_model: str = "anthropic/claude-haiku-4-5"

+ 0 - 7
app/hot_content/postprocess_service.py

@@ -238,7 +238,6 @@ class ContributionPostprocessService:
                 self.sync_wxindex_words(
                     record_id=record_id,
                     trend_result=trend_result,
-                    event_created_at=record.get("created_at"),
                 )
                 event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
                     record=record,
@@ -311,7 +310,6 @@ class ContributionPostprocessService:
         *,
         record_id: int,
         trend_result: dict[str, Any],
-        event_created_at: datetime | None = None,
         verbose: bool = False,
     ) -> dict[str, int]:
         return sync_words_from_trend_json(
@@ -320,9 +318,7 @@ class ContributionPostprocessService:
             self.config.wxindex_api_url,
             trend_json=trend_result,
             record_id=record_id,
-            event_created_at=event_created_at,
             verbose=verbose,
-            update_meta_if_exists=True,
         )
 
     def _save_empty_demand_quality(self, *, record_id: int) -> None:
@@ -649,15 +645,12 @@ class ContributionPostprocessService:
         threshold = float(self.config.wxindex_score_threshold)
 
         wxindex_searches: list[dict[str, Any]] = []
-        event_created_at = record.get("created_at")
         for keyword in selected_words:
             full_scores, _action = ensure_word_full_scores(
                 self.repository,
                 self.api_client,
                 self.config.wxindex_api_url,
                 keyword=keyword,
-                event_created_at=event_created_at,
-                update_meta_if_exists=True,
             )
             series, start_ymd, end_ymd = slice_scores_lookback(
                 full_scores,

+ 13 - 436
app/hot_content/repository.py

@@ -4,7 +4,7 @@ from __future__ import annotations
 
 import hashlib
 import json
-from datetime import date, datetime, timedelta
+from datetime import datetime, timedelta
 from typing import Any
 
 try:
@@ -471,7 +471,6 @@ class HotContentRepository:
                 unique_key,
                 source,
                 title,
-                created_at,
                 article_title,
                 article_body,
                 demand_cache_run_id,
@@ -501,7 +500,6 @@ class HotContentRepository:
                 "unique_key": str(row["unique_key"]),
                 "source": str(row.get("source") or ""),
                 "title": str(row.get("title") or ""),
-                "created_at": row.get("created_at"),
                 "article_title": row.get("article_title"),
                 "article_body": row.get("article_body"),
                 "demand_cache_run_id": row.get("demand_cache_run_id"),
@@ -1114,298 +1112,39 @@ class HotContentRepository:
         self,
         *,
         end_ymd: str,
-        update_window_days: int = 7,
-        today: date | None = None,
+        start_ymd: str = "20260601",
     ) -> list[dict[str, Any]]:
-        """返回更新窗口内、仍缺近 7 日区间数据的词。"""
+        """返回已存在但缺最新日期,或未从 start_ymd 补齐的词。"""
         target_end = str(end_ymd or "").strip()
-        if not target_end:
+        target_start = str(start_ymd or "").strip()
+        if not target_end or not target_start:
             return []
-        current = today or datetime.now(SHANGHAI_TZ).date()
-        active_since = current - timedelta(days=max(update_window_days, 0))
-        self._ensure_wxindex_word_meta_table()
         self._ensure_wxindex_words_table()
         sql = """
-            SELECT
-                m.name,
-                m.event_created_at,
-                m.fetch_start_ymd,
-                MIN(w.dt) AS earliest_dt,
-                MAX(w.dt) AS latest_dt
-            FROM hot_content_wxindex_word_meta m
-            INNER JOIN hot_content_wxindex_words w ON w.name = m.name
-            WHERE DATE(m.event_created_at) >= %s
-            GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
-            HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
-            ORDER BY m.name ASC
+            SELECT name, MIN(dt) AS earliest_dt, MAX(dt) AS latest_dt
+            FROM hot_content_wxindex_words
+            GROUP BY name
+            HAVING MAX(dt) < %s OR MIN(dt) > %s
+            ORDER BY name ASC
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (active_since, target_end))
+            cursor.execute(sql, (target_end, target_start))
             rows = cursor.fetchall()
         stale_words: list[dict[str, Any]] = []
         for row in rows:
             name = str(row.get("name") or "").strip()
-            fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
             earliest_dt = str(row.get("earliest_dt") or "").strip()
             latest_dt = str(row.get("latest_dt") or "").strip()
-            event_created_at = row.get("event_created_at")
-            if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
+            if name and earliest_dt and latest_dt:
                 stale_words.append(
                     {
                         "name": name,
-                        "event_created_at": event_created_at,
-                        "fetch_start_ymd": fetch_start_ymd,
                         "earliest_dt": earliest_dt,
                         "latest_dt": latest_dt,
                     }
                 )
         return stale_words
 
-    def list_word_earliest_event_times(
-        self,
-        *,
-        since_dt: datetime,
-    ) -> dict[str, datetime]:
-        """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
-        self._ensure_record_quality_columns()
-        sql = """
-            SELECT
-                word_name,
-                MIN(event_created_at) AS event_created_at
-            FROM (
-                SELECT
-                    TRIM(searches.keyword) AS word_name,
-                    r.created_at AS event_created_at
-                FROM hot_content_records r
-                JOIN JSON_TABLE(
-                    r.wxindex_trend_json,
-                    '$.wxindex_searches[*]' COLUMNS (
-                        keyword VARCHAR(256) PATH '$.keyword'
-                    )
-                ) AS searches
-                WHERE r.created_at >= %s
-                  AND r.wxindex_trend_json IS NOT NULL
-                  AND TRIM(searches.keyword) <> ''
-
-                UNION ALL
-
-                SELECT
-                    TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
-                    r.created_at AS event_created_at
-                FROM hot_content_records r
-                WHERE r.created_at >= %s
-                  AND r.wxindex_trend_json IS NOT NULL
-                  AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
-            ) AS word_events
-            WHERE word_name IS NOT NULL
-              AND word_name <> ''
-            GROUP BY word_name
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (since_dt, since_dt))
-            rows = cursor.fetchall()
-
-        event_map: dict[str, datetime] = {}
-        for row in rows:
-            name = str(row.get("word_name") or "").strip()
-            event_created_at = row.get("event_created_at")
-            if name and isinstance(event_created_at, datetime):
-                event_map[name] = event_created_at
-        return event_map
-
-    def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT
-                w.name,
-                MIN(w.dt) AS earliest_dt,
-                MIN(w.created_at) AS first_created_at
-            FROM hot_content_wxindex_words w
-            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE m.name IS NULL
-            GROUP BY w.name
-            ORDER BY w.name ASC
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-            rows = cursor.fetchall()
-        bounds: list[dict[str, Any]] = []
-        for row in rows:
-            name = str(row.get("name") or "").strip()
-            earliest_dt = str(row.get("earliest_dt") or "").strip()
-            first_created_at = row.get("first_created_at")
-            if name and earliest_dt:
-                bounds.append(
-                    {
-                        "name": name,
-                        "earliest_dt": earliest_dt,
-                        "first_created_at": first_created_at,
-                    }
-                )
-        return bounds
-
-    def list_wxindex_word_names_without_meta(self) -> list[str]:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT DISTINCT w.name
-            FROM hot_content_wxindex_words w
-            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE m.name IS NULL
-            ORDER BY w.name ASC
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-            rows = cursor.fetchall()
-        return [
-            str(row.get("name") or "").strip()
-            for row in rows
-            if str(row.get("name") or "").strip()
-        ]
-
-    def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
-        word = str(name or "").strip()
-        if not word:
-            return None
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT MIN(created_at) AS first_created_at
-            FROM hot_content_wxindex_words
-            WHERE name = %s
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (word,))
-            row = cursor.fetchone() or {}
-        first_created_at = row.get("first_created_at")
-        return first_created_at if isinstance(first_created_at, datetime) else None
-
-    def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
-        self._ensure_wxindex_word_meta_table()
-        sql = """
-            SELECT name, event_created_at, fetch_start_ymd
-            FROM hot_content_wxindex_word_meta
-            ORDER BY name ASC
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-            rows = cursor.fetchall()
-        result: list[dict[str, Any]] = []
-        for row in rows:
-            name = str(row.get("name") or "").strip()
-            fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
-            event_created_at = row.get("event_created_at")
-            if name and fetch_start_ymd and event_created_at is not None:
-                result.append(
-                    {
-                        "name": name,
-                        "event_created_at": event_created_at,
-                        "fetch_start_ymd": fetch_start_ymd,
-                    }
-                )
-        return result
-
-    def update_wxindex_word_meta_fetch_start(
-        self,
-        *,
-        name: str,
-        fetch_start_ymd: str,
-    ) -> None:
-        word = str(name or "").strip()
-        target_start = str(fetch_start_ymd or "").strip()
-        if not word or not target_start:
-            raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
-        self._ensure_wxindex_word_meta_table()
-        sql = """
-            UPDATE hot_content_wxindex_word_meta
-            SET fetch_start_ymd = %s
-            WHERE name = %s
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (target_start, word))
-
-    def update_wxindex_word_meta(
-        self,
-        *,
-        name: str,
-        event_created_at: datetime,
-        fetch_start_ymd: str,
-    ) -> None:
-        word = str(name or "").strip()
-        target_start = str(fetch_start_ymd or "").strip()
-        if not word or not target_start:
-            raise HotContentFlowError("invalid wxindex word meta payload")
-        self._ensure_wxindex_word_meta_table()
-        event_at = event_created_at
-        if event_at.tzinfo is not None:
-            event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
-        sql = """
-            UPDATE hot_content_wxindex_word_meta
-            SET event_created_at = %s,
-                fetch_start_ymd = %s
-            WHERE name = %s
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (event_at, target_start, word))
-
-    def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
-        word = str(name or "").strip()
-        if not word:
-            return None
-        self._ensure_wxindex_word_meta_table()
-        sql = """
-            SELECT name, event_created_at, fetch_start_ymd
-            FROM hot_content_wxindex_word_meta
-            WHERE name = %s
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (word,))
-            row = cursor.fetchone()
-        if not row:
-            return None
-        fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
-        event_created_at = row.get("event_created_at")
-        if not fetch_start_ymd or event_created_at is None:
-            return None
-        return {
-            "name": str(row.get("name") or "").strip(),
-            "event_created_at": event_created_at,
-            "fetch_start_ymd": fetch_start_ymd,
-        }
-
-    def ensure_wxindex_word_meta(
-        self,
-        *,
-        name: str,
-        event_created_at: datetime,
-        fetch_start_ymd: str,
-    ) -> dict[str, Any]:
-        word = str(name or "").strip()
-        target_start = str(fetch_start_ymd or "").strip()
-        if not word or not target_start:
-            raise HotContentFlowError("invalid wxindex word meta payload")
-        self._ensure_wxindex_word_meta_table()
-        event_at = event_created_at
-        if event_at.tzinfo is not None:
-            event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
-        sql = """
-            INSERT INTO hot_content_wxindex_word_meta (
-                name,
-                event_created_at,
-                fetch_start_ymd
-            )
-            VALUES (%s, %s, %s)
-            ON DUPLICATE KEY UPDATE
-                event_created_at = VALUES(event_created_at),
-                fetch_start_ymd = VALUES(fetch_start_ymd)
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (word, event_at, target_start))
-        meta = self.get_wxindex_word_meta(word)
-        if meta is None:
-            raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
-        return meta
-
     def list_low_max_wxindex_words(
         self,
         *,
@@ -1446,122 +1185,6 @@ class HotContentRepository:
             )
         return low_words
 
-    def count_wxindex_words_outside_event_window(
-        self,
-        *,
-        window_days: int = 7,
-    ) -> int:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT COUNT(*) AS row_count
-            FROM hot_content_wxindex_words w
-            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days))
-            row = cursor.fetchone() or {}
-        return int(row.get("row_count") or 0)
-
-    def list_wxindex_words_outside_event_window_samples(
-        self,
-        *,
-        window_days: int = 7,
-        limit: int = 20,
-    ) -> list[dict[str, Any]]:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT
-                w.name,
-                w.dt,
-                m.event_created_at,
-                DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                ) AS start_ymd,
-                DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                ) AS end_ymd
-            FROM hot_content_wxindex_words w
-            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-            ORDER BY w.name ASC, w.dt ASC
-            LIMIT %s
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days, window_days, window_days, limit))
-            rows = cursor.fetchall()
-        samples: list[dict[str, Any]] = []
-        for row in rows:
-            name = str(row.get("name") or "").strip()
-            dt = str(row.get("dt") or "").strip()
-            if name and dt:
-                samples.append(
-                    {
-                        "name": name,
-                        "dt": dt,
-                        "event_created_at": row.get("event_created_at"),
-                        "start_ymd": str(row.get("start_ymd") or "").strip(),
-                        "end_ymd": str(row.get("end_ymd") or "").strip(),
-                    }
-                )
-        return samples
-
-    def delete_wxindex_words_outside_event_window(
-        self,
-        *,
-        window_days: int = 7,
-    ) -> int:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            DELETE w
-            FROM hot_content_wxindex_words w
-            INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE w.dt < DATE_FORMAT(
-                    DATE_SUB(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-               OR w.dt > DATE_FORMAT(
-                    DATE_ADD(DATE(m.event_created_at), INTERVAL %s DAY),
-                    '%%Y%%m%%d'
-                )
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (window_days, window_days))
-            return int(cursor.rowcount or 0)
-
-    def count_wxindex_words_without_meta(self) -> int:
-        self._ensure_wxindex_word_meta_table()
-        self._ensure_wxindex_words_table()
-        sql = """
-            SELECT COUNT(*) AS row_count
-            FROM hot_content_wxindex_words w
-            LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
-            WHERE m.name IS NULL
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-            row = cursor.fetchone() or {}
-        return int(row.get("row_count") or 0)
-
     def delete_wxindex_words_by_names(self, names: list[str]) -> int:
         cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
         if not cleaned:
@@ -1576,9 +1199,6 @@ class HotContentRepository:
             cursor.execute(sql, tuple(cleaned))
             return int(cursor.rowcount or 0)
 
-    def has_wxindex_word(self, name: str) -> bool:
-        return self.get_wxindex_word_latest_dt(name) is not None
-
     def get_wxindex_word_latest_dt(self, name: str) -> str | None:
         word = str(name or "").strip()
         if not word:
@@ -1642,41 +1262,13 @@ class HotContentRepository:
         skipped = len(rows) - inserted
         return inserted, skipped
 
-    def list_records_with_wxindex_trend_after(
-        self,
-        *,
-        after_created_at: datetime,
-    ) -> list[dict[str, Any]]:
-        sql = """
-            SELECT id, created_at, wxindex_trend_json
-            FROM hot_content_records
-            WHERE created_at > %s
-              AND wxindex_trend_json IS NOT NULL
-              AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
-            ORDER BY id ASC
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql, (after_created_at,))
-            rows = cursor.fetchall()
-
-        records: list[dict[str, Any]] = []
-        for row in rows:
-            records.append(
-                {
-                    "id": int(row["id"]),
-                    "created_at": row.get("created_at"),
-                    "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
-                }
-            )
-        return records
-
     def list_records_with_wxindex_trend(
         self,
         *,
         since_dt: datetime,
     ) -> list[dict[str, Any]]:
         sql = """
-            SELECT id, created_at, wxindex_trend_json
+            SELECT id, wxindex_trend_json
             FROM hot_content_records
             WHERE created_at >= %s
               AND wxindex_trend_json IS NOT NULL
@@ -1692,26 +1284,11 @@ class HotContentRepository:
             records.append(
                 {
                     "id": int(row["id"]),
-                    "created_at": row.get("created_at"),
                     "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
                 }
             )
         return records
 
-    def _ensure_wxindex_word_meta_table(self) -> None:
-        sql = """
-            CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
-                name VARCHAR(256) NOT NULL COMMENT '词',
-                event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
-                fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
-                meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
-                PRIMARY KEY (name),
-                KEY idx_event_created_at (event_created_at)
-            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
-        """
-        with self.conn.cursor() as cursor:
-            cursor.execute(sql)
-
     def _ensure_wxindex_words_table(self) -> None:
         sql = """
             CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (

+ 11 - 539
app/hot_content/wxindex_words.py

@@ -10,65 +10,11 @@ from app.hot_content.demand_export import get_wxindex_keywords
 from app.hot_content.repository import HotContentRepository
 from app.hot_content.timezone import SHANGHAI_TZ
 
-WXINDEX_WORDS_LOOKBACK_DAYS = 7
-WXINDEX_WORDS_UPDATE_WINDOW_DAYS = 7
+WXINDEX_WORDS_START_YMD = "20260601"
 WXINDEX_WORDS_RECORD_SINCE = date(2026, 6, 11)
 WXINDEX_WORDS_MIN_MAX_SCORE = 100_000.0
 
 
-def get_fetch_start_ymd_from_event(
-    event_created_at: datetime,
-    *,
-    lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
-) -> str:
-    """数据窗口左边界:事件创建日往前 N 天(yyyymmdd)。"""
-    event_date = normalize_event_created_at(event_created_at).date()
-    start_date = event_date - timedelta(days=lookback_days)
-    return start_date.strftime("%Y%m%d")
-
-
-def get_word_data_window_ymd_bounds(
-    event_created_at: datetime,
-    *,
-    window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
-) -> tuple[str, str]:
-    """事件创建日前后 N 天的数据窗口 [start_ymd, end_ymd]。"""
-    event_date = normalize_event_created_at(event_created_at).date()
-    start_date = event_date - timedelta(days=window_days)
-    end_date = event_date + timedelta(days=window_days)
-    return start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")
-
-
-def get_wxindex_fetch_start_ymd(
-    *,
-    today: date | None = None,
-    lookback_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
-) -> str:
-    """首次拉取起始日期:近 N 日(截至昨日)。"""
-    start_ymd, _end_ymd = get_lookback_range(lookback_days, today=today)
-    return start_ymd
-
-
-def normalize_event_created_at(value: datetime | None) -> datetime:
-    if value is None:
-        return datetime.now(SHANGHAI_TZ)
-    if value.tzinfo is None:
-        return value.replace(tzinfo=SHANGHAI_TZ)
-    return value.astimezone(SHANGHAI_TZ)
-
-
-def is_word_update_active(
-    event_created_at: datetime,
-    *,
-    today: date | None = None,
-    window_days: int = WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
-) -> bool:
-    """事件创建后 window_days 天内继续更新,超出则停止。"""
-    current = today or datetime.now(SHANGHAI_TZ).date()
-    event_date = normalize_event_created_at(event_created_at).date()
-    return (current - event_date).days <= window_days
-
-
 def get_wxindex_end_ymd(*, today: date | None = None) -> str:
     current = today or datetime.now(SHANGHAI_TZ).date()
     return (current - timedelta(days=1)).strftime("%Y%m%d")
@@ -128,7 +74,7 @@ def fetch_wxindex_scores(
     api_url: str,
     *,
     keyword: str,
-    start_ymd: str,
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
     end_ymd: str | None = None,
 ) -> list[dict[str, Any]]:
     payload = {
@@ -184,7 +130,7 @@ def word_scores_need_supplement(
     scores: list[dict[str, Any]],
     *,
     end_ymd: str | None = None,
-    start_ymd: str,
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
 ) -> tuple[bool, str]:
     """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。"""
     if not scores:
@@ -204,7 +150,7 @@ def get_supplement_fetch_range(
     scores: list[dict[str, Any]],
     *,
     end_ymd: str | None = None,
-    start_ymd: str,
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
 ) -> tuple[str, str] | None:
     """计算补数 API 查询区间;无需补数时返回 None。"""
     need_supplement, reason = word_scores_need_supplement(
@@ -261,7 +207,7 @@ def refresh_stale_wxindex_words(
     dry_run: bool = False,
     verbose: bool = False,
 ) -> dict[str, int]:
-    """补全已存在但缺少最新日期数据的词(仅含 meta 的新词)。"""
+    """补全已存在但缺少最新日期数据的词。"""
     target_end = end_ymd or get_wxindex_end_ymd()
     summary = {
         "target_end_ymd": target_end,
@@ -276,7 +222,7 @@ def refresh_stale_wxindex_words(
 
     stale_words = repository.list_stale_wxindex_words(
         end_ymd=target_end,
-        update_window_days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS,
+        start_ymd=WXINDEX_WORDS_START_YMD,
     )
     summary["stale_words"] = len(stale_words)
     if not stale_words:
@@ -287,15 +233,10 @@ def refresh_stale_wxindex_words(
         if not name:
             continue
 
-        word_start_ymd = str(item.get("fetch_start_ymd") or "").strip()
-        if not word_start_ymd:
-            word_start_ymd = get_wxindex_fetch_start_ymd()
-
         stored_scores = repository.list_wxindex_word_scores(name)
         fetch_range = get_supplement_fetch_range(
             stored_scores,
             end_ymd=target_end,
-            start_ymd=word_start_ymd,
         )
         if fetch_range is None:
             summary["no_new_range"] += 1
@@ -401,285 +342,6 @@ def cleanup_low_max_wxindex_words(
     return summary
 
 
-def try_register_wxindex_word_meta(
-    repository: HotContentRepository,
-    *,
-    word: str,
-    event_created_at: datetime | None = None,
-    event_map: dict[str, datetime] | None = None,
-    first_row_created_at: datetime | None = None,
-    include_expired: bool = False,
-    dry_run: bool = False,
-    update_if_exists: bool = False,
-) -> tuple[dict[str, Any] | None, str]:
-    """补写 meta;返回 (meta, reason)。"""
-    name = str(word or "").strip()
-    if not name:
-        return None, "empty"
-
-    existing = repository.get_wxindex_word_meta(name)
-
-    resolved_event_at = event_created_at
-    if resolved_event_at is None and event_map is not None:
-        resolved_event_at = event_map.get(name)
-    if resolved_event_at is None:
-        resolved_event_at = first_row_created_at
-    if resolved_event_at is None:
-        resolved_event_at = repository.get_wxindex_word_first_row_created_at(name)
-    if resolved_event_at is None:
-        if existing:
-            return existing, "exists"
-        return None, "no_event"
-
-    normalized_event_at = normalize_event_created_at(resolved_event_at)
-    if not include_expired and not is_word_update_active(normalized_event_at):
-        if existing and not update_if_exists:
-            return existing, "exists"
-        if not existing:
-            return None, "expired"
-
-    fetch_start_ymd = get_fetch_start_ymd_from_event(normalized_event_at)
-    if dry_run:
-        return {
-            "name": name,
-            "event_created_at": normalized_event_at,
-            "fetch_start_ymd": fetch_start_ymd,
-        }, "dry_run"
-
-    if existing and update_if_exists:
-        repository.update_wxindex_word_meta(
-            name=name,
-            event_created_at=normalized_event_at,
-            fetch_start_ymd=fetch_start_ymd,
-        )
-        meta = repository.get_wxindex_word_meta(name)
-        if meta is None:
-            raise HotContentFlowError(f"failed to update wxindex word meta: {name}")
-        return meta, "updated"
-
-    if existing:
-        return existing, "exists"
-
-    meta = repository.ensure_wxindex_word_meta(
-        name=name,
-        event_created_at=normalized_event_at,
-        fetch_start_ymd=fetch_start_ymd,
-    )
-    return meta, "registered"
-
-
-def fix_wxindex_word_meta_fetch_start_ymd(
-    repository: HotContentRepository,
-    *,
-    dry_run: bool = False,
-    verbose: bool = False,
-) -> dict[str, int]:
-    """按 event_created_at 往前 7 天,批量修正 meta.fetch_start_ymd。"""
-    rows = repository.list_all_wxindex_word_meta()
-    summary = {
-        "total": len(rows),
-        "updated": 0,
-        "unchanged": 0,
-    }
-    for row in rows:
-        name = str(row.get("name") or "").strip()
-        event_created_at = row.get("event_created_at")
-        old_fetch_start = str(row.get("fetch_start_ymd") or "").strip()
-        if not name or event_created_at is None:
-            continue
-        new_fetch_start = get_fetch_start_ymd_from_event(event_created_at)
-        if new_fetch_start == old_fetch_start:
-            summary["unchanged"] += 1
-            continue
-        if dry_run:
-            summary["updated"] += 1
-            if verbose:
-                print(
-                    f"[dry-run] word={name} "
-                    f"event_created_at={event_created_at} "
-                    f"{old_fetch_start} -> {new_fetch_start}"
-                )
-            continue
-        repository.update_wxindex_word_meta_fetch_start(
-            name=name,
-            fetch_start_ymd=new_fetch_start,
-        )
-        summary["updated"] += 1
-        if verbose:
-            print(
-                f"updated word={name} "
-                f"event_created_at={event_created_at} "
-                f"{old_fetch_start} -> {new_fetch_start}"
-            )
-    return summary
-
-
-def cleanup_wxindex_words_outside_event_window(
-    repository: HotContentRepository,
-    *,
-    window_days: int = WXINDEX_WORDS_LOOKBACK_DAYS,
-    dry_run: bool = False,
-    verbose: bool = False,
-) -> dict[str, int]:
-    """删除 hot_content_wxindex_words 中超出 event_created_at 前后 window_days 的数据。"""
-    to_delete = repository.count_wxindex_words_outside_event_window(
-        window_days=window_days,
-    )
-    summary = {
-        "window_days": window_days,
-        "rows_to_delete": to_delete,
-        "deleted_rows": 0,
-        "words_without_meta_rows": repository.count_wxindex_words_without_meta(),
-    }
-    if to_delete <= 0:
-        return summary
-
-    if dry_run:
-        if verbose:
-            samples = repository.list_wxindex_words_outside_event_window_samples(
-                window_days=window_days,
-                limit=20,
-            )
-            for item in samples:
-                print(
-                    f"[dry-run] word={item['name']} dt={item['dt']} "
-                    f"window={item['start_ymd']}~{item['end_ymd']} "
-                    f"event_created_at={item['event_created_at']}"
-                )
-        return summary
-
-    summary["deleted_rows"] = repository.delete_wxindex_words_outside_event_window(
-        window_days=window_days,
-    )
-    if verbose:
-        print(f"deleted_rows={summary['deleted_rows']}")
-    return summary
-
-
-def build_word_earliest_event_map(
-    repository: HotContentRepository,
-    *,
-    since_dt: datetime,
-) -> dict[str, datetime]:
-    """从热点记录中汇总每个检索词对应的最早事件创建时间。"""
-    return repository.list_word_earliest_event_times(since_dt=since_dt)
-
-
-def backfill_wxindex_word_meta(
-    repository: HotContentRepository,
-    *,
-    since_date: date = WXINDEX_WORDS_RECORD_SINCE,
-    include_expired: bool = True,
-    fix_fetch_start: bool = True,
-    dry_run: bool = False,
-    verbose: bool = False,
-) -> dict[str, Any]:
-    """为 hot_content_wxindex_words 中缺 meta 的词补登记,并修正 fetch_start_ymd。"""
-    since_dt = datetime.combine(since_date, datetime.min.time()).replace(tzinfo=SHANGHAI_TZ)
-    event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
-    candidates = repository.list_wxindex_word_bounds_without_meta()
-    register_summary: dict[str, int | str | bool] = {
-        "since_date": since_date.isoformat(),
-        "include_expired": include_expired,
-        "candidates": len(candidates),
-        "registered": 0,
-        "skipped_expired": 0,
-        "skipped_no_event": 0,
-    }
-
-    for item in candidates:
-        name = str(item.get("name") or "").strip()
-        if not name:
-            continue
-        meta, reason = try_register_wxindex_word_meta(
-            repository,
-            word=name,
-            event_map=event_map,
-            first_row_created_at=item.get("first_created_at"),
-            include_expired=include_expired,
-            dry_run=dry_run,
-        )
-        if reason in {"registered", "dry_run"}:
-            register_summary["registered"] += 1
-            if verbose:
-                label = "[dry-run] would register" if dry_run else "registered"
-                print(f"{label} meta word={name} event_at={meta['event_created_at']}")
-        elif reason == "expired":
-            register_summary["skipped_expired"] += 1
-            if verbose:
-                print(f"skip expired word={name}")
-        elif reason == "no_event":
-            register_summary["skipped_no_event"] += 1
-            if verbose:
-                print(f"skip no_event word={name}")
-
-    fetch_start_summary: dict[str, int] | None = None
-    if fix_fetch_start:
-        fetch_start_summary = fix_wxindex_word_meta_fetch_start_ymd(
-            repository,
-            dry_run=dry_run,
-            verbose=verbose,
-        )
-
-    return {
-        "register": register_summary,
-        "fetch_start_fix": fetch_start_summary,
-    }
-
-
-def backfill_active_wxindex_word_meta(
-    repository: HotContentRepository,
-    *,
-    dry_run: bool = False,
-    verbose: bool = False,
-) -> dict[str, int | str]:
-    """为表中仍处 7 天窗口内、但缺少 meta 的词补登记。"""
-    current = datetime.now(SHANGHAI_TZ).date()
-    since_dt = datetime.combine(
-        current - timedelta(days=WXINDEX_WORDS_UPDATE_WINDOW_DAYS),
-        datetime.min.time(),
-    ).replace(tzinfo=SHANGHAI_TZ)
-    event_map = build_word_earliest_event_map(repository, since_dt=since_dt)
-    candidates = repository.list_wxindex_word_bounds_without_meta()
-    summary: dict[str, int | str] = {
-        "active_since": since_dt.date().isoformat(),
-        "candidates": len(candidates),
-        "registered": 0,
-        "skipped_expired": 0,
-        "skipped_no_event": 0,
-    }
-
-    for item in candidates:
-        name = str(item.get("name") or "").strip()
-        if not name:
-            continue
-        meta, reason = try_register_wxindex_word_meta(
-            repository,
-            word=name,
-            event_map=event_map,
-            first_row_created_at=item.get("first_created_at"),
-            dry_run=dry_run,
-        )
-        if reason == "registered":
-            summary["registered"] += 1
-            if verbose:
-                print(f"registered meta word={name} event_at={meta['event_created_at']}")
-        elif reason == "dry_run":
-            summary["registered"] += 1
-            if verbose:
-                print(f"[dry-run] would register meta word={name}")
-        elif reason == "expired":
-            summary["skipped_expired"] += 1
-            if verbose:
-                print(f"skip expired word={name}")
-        elif reason == "no_event":
-            summary["skipped_no_event"] += 1
-            if verbose:
-                print(f"skip no_event word={name}")
-
-    return summary
-
-
 def run_wxindex_words_daily_job(
     repository: HotContentRepository,
     api_client: JsonApiClient,
@@ -690,12 +352,7 @@ def run_wxindex_words_daily_job(
     dry_run: bool = False,
     verbose: bool = False,
 ) -> dict[str, Any]:
-    """定时任务:补 meta、补全缺失日期、清理低最大值词。"""
-    meta_summary = backfill_active_wxindex_word_meta(
-        repository,
-        dry_run=dry_run,
-        verbose=verbose,
-    )
+    """定时任务:先补全缺失日期,再清理低最大值词。"""
     refresh_summary = refresh_stale_wxindex_words(
         repository,
         api_client,
@@ -711,7 +368,6 @@ def run_wxindex_words_daily_job(
         verbose=verbose,
     )
     return {
-        "meta_backfill": meta_summary,
         "refresh": refresh_summary,
         "cleanup": cleanup_summary,
     }
@@ -724,62 +380,23 @@ def ensure_word_full_scores(
     *,
     keyword: str,
     end_ymd: str | None = None,
-    event_created_at: datetime | None = None,
-    include_expired: bool = False,
     force_refresh: bool = False,
     dry_run: bool = False,
-    update_meta_if_exists: bool = False,
 ) -> tuple[list[dict[str, Any]], str]:
     """
-    获取词微信指数并入库。
-
-    - 表中已有数据但无 meta:若在 7 天窗口内,自动补 meta 并继续更新
-    - 超过 7 天窗口:不再更新
+    获取词的全量微信指数(20260601 起至昨日),按词+日期逐行入库。
 
-    返回 (scores, action)。
+    返回 (scores, action),action 为 inserted / updated / cached / dry_run。
     """
     word = str(keyword or "").strip()
     if not word:
         return [], "empty"
 
     target_end = end_ymd or get_wxindex_end_ymd()
-    fetch_start_ymd = get_wxindex_fetch_start_ymd()
     stored_scores = repository.list_wxindex_word_scores(word)
-    meta = repository.get_wxindex_word_meta(word)
-
-    should_register_meta = meta is None
-    should_update_meta = (
-        update_meta_if_exists
-        and meta is not None
-        and event_created_at is not None
-    )
-    if should_register_meta or should_update_meta:
-        meta, register_reason = try_register_wxindex_word_meta(
-            repository,
-            word=word,
-            event_created_at=event_created_at,
-            include_expired=include_expired,
-            dry_run=dry_run,
-            update_if_exists=should_update_meta,
-        )
-        if meta is None:
-            if register_reason == "expired":
-                return stored_scores, "expired"
-            return stored_scores, "legacy"
-        if dry_run and register_reason == "dry_run":
-            return [], "dry_run"
-
-    if meta is None:
-        return stored_scores, "legacy"
-
-    if not include_expired and not is_word_update_active(meta["event_created_at"]):
-        return stored_scores, "expired"
-
-    word_start_ymd = str(meta.get("fetch_start_ymd") or fetch_start_ymd)
     fetch_range = None if force_refresh else get_supplement_fetch_range(
         stored_scores,
         end_ymd=target_end,
-        start_ymd=word_start_ymd,
     )
     if fetch_range is None and stored_scores:
         return stored_scores, "cached"
@@ -788,7 +405,7 @@ def ensure_word_full_scores(
         return [], "dry_run"
 
     had_data = bool(stored_scores)
-    start_ymd, fetch_end_ymd = fetch_range or (word_start_ymd, target_end)
+    start_ymd, fetch_end_ymd = fetch_range or (WXINDEX_WORDS_START_YMD, target_end)
     api_scores = fetch_wxindex_scores(
         api_client,
         api_url,
@@ -826,19 +443,15 @@ def sync_words_from_trend_json(
     *,
     trend_json: dict[str, Any],
     record_id: int,
-    event_created_at: datetime | None = None,
     dry_run: bool = False,
     verbose: bool = False,
-    update_meta_if_exists: bool = False,
 ) -> dict[str, int]:
-    """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(近 7 日数据)。"""
+    """将单条记录的 wxindex_trend_json 中检索词写入/刷新汇总表(全量数据)。"""
     summary = {
         "words_found": 0,
         "inserted": 0,
         "updated": 0,
         "cached": 0,
-        "legacy": 0,
-        "expired": 0,
         "api_empty": 0,
         "below_threshold": 0,
         "fetch_failed": 0,
@@ -855,9 +468,7 @@ def sync_words_from_trend_json(
                 api_client,
                 api_url,
                 keyword=name,
-                event_created_at=event_created_at,
                 dry_run=dry_run,
-                update_meta_if_exists=update_meta_if_exists,
             )
         except Exception as exc:
             summary["fetch_failed"] += 1
@@ -871,10 +482,6 @@ def sync_words_from_trend_json(
             summary["updated"] += 1
         elif action == "cached":
             summary["cached"] += 1
-        elif action == "legacy":
-            summary["legacy"] += 1
-        elif action == "expired":
-            summary["expired"] += 1
         elif action == "api_empty":
             summary["api_empty"] += 1
         elif action == "below_threshold":
@@ -905,8 +512,6 @@ def backfill_wxindex_words(
         "inserted": 0,
         "updated": 0,
         "cached": 0,
-        "legacy": 0,
-        "expired": 0,
         "api_empty": 0,
         "below_threshold": 0,
         "fetch_failed": 0,
@@ -942,7 +547,6 @@ def backfill_wxindex_words(
             api_url,
             trend_json=trend_json,
             record_id=record_id,
-            event_created_at=row.get("created_at"),
             dry_run=dry_run,
             verbose=verbose,
         )
@@ -951,8 +555,6 @@ def backfill_wxindex_words(
             "inserted",
             "updated",
             "cached",
-            "legacy",
-            "expired",
             "api_empty",
             "below_threshold",
             "fetch_failed",
@@ -960,133 +562,3 @@ def backfill_wxindex_words(
             summary[key] += result[key]
 
     return summary
-
-
-def build_word_event_map_from_records(
-    records: list[dict[str, Any]],
-) -> dict[str, datetime]:
-    word_events: dict[str, datetime] = {}
-    for row in records:
-        created_at = row.get("created_at")
-        if not isinstance(created_at, datetime):
-            continue
-        for word in extract_searched_words(row.get("wxindex_trend_json")):
-            existing = word_events.get(word)
-            if existing is None or created_at < existing:
-                word_events[word] = created_at
-    return word_events
-
-
-def audit_wxindex_words_from_records(
-    repository: HotContentRepository,
-    *,
-    after_created_at: datetime,
-) -> dict[str, Any]:
-    """检查指定时间后的热点记录,其微信指数词是否已在汇总表和 meta 表。"""
-    records = repository.list_records_with_wxindex_trend_after(
-        after_created_at=after_created_at,
-    )
-    word_events = build_word_event_map_from_records(records)
-    missing_words: list[str] = []
-    missing_meta: list[str] = []
-    for word in sorted(word_events):
-        if not repository.has_wxindex_word(word):
-            missing_words.append(word)
-        elif repository.get_wxindex_word_meta(word) is None:
-            missing_meta.append(word)
-    return {
-        "after_created_at": after_created_at.isoformat(sep=" ", timespec="seconds"),
-        "records_scanned": len(records),
-        "words_found": len(word_events),
-        "missing_words_count": len(missing_words),
-        "missing_meta_count": len(missing_meta),
-        "missing_words": missing_words,
-        "missing_meta": missing_meta,
-    }
-
-
-def supplement_wxindex_words_from_records(
-    repository: HotContentRepository,
-    api_client: JsonApiClient,
-    api_url: str,
-    *,
-    after_created_at: datetime,
-    dry_run: bool = False,
-    verbose: bool = False,
-) -> dict[str, Any]:
-    """补全指定时间后热点记录涉及、但缺失的 wxindex 词与 meta。"""
-    audit = audit_wxindex_words_from_records(
-        repository,
-        after_created_at=after_created_at,
-    )
-    records = repository.list_records_with_wxindex_trend_after(
-        after_created_at=after_created_at,
-    )
-    word_events = build_word_event_map_from_records(records)
-    summary: dict[str, Any] = {
-        "audit_before": {
-            "records_scanned": audit["records_scanned"],
-            "words_found": audit["words_found"],
-            "missing_words_count": audit["missing_words_count"],
-            "missing_meta_count": audit["missing_meta_count"],
-        },
-        "supplemented_words": 0,
-        "inserted": 0,
-        "updated": 0,
-        "cached": 0,
-        "meta_registered": 0,
-        "api_empty": 0,
-        "below_threshold": 0,
-        "fetch_failed": 0,
-    }
-
-    for word, event_at in sorted(word_events.items()):
-        had_meta = repository.get_wxindex_word_meta(word) is not None
-        try:
-            _, action = ensure_word_full_scores(
-                repository,
-                api_client,
-                api_url,
-                keyword=word,
-                event_created_at=event_at,
-                include_expired=True,
-                dry_run=dry_run,
-            )
-        except Exception as exc:
-            summary["fetch_failed"] += 1
-            if verbose:
-                print(f"fetch failed word={word}: {exc}")
-            continue
-
-        summary["supplemented_words"] += 1
-        if action == "inserted":
-            summary["inserted"] += 1
-        elif action == "updated":
-            summary["updated"] += 1
-        elif action == "cached":
-            summary["cached"] += 1
-        elif action == "api_empty":
-            summary["api_empty"] += 1
-        elif action == "below_threshold":
-            summary["below_threshold"] += 1
-        elif action == "dry_run":
-            summary["inserted"] += 1
-
-        if not had_meta:
-            if dry_run or repository.get_wxindex_word_meta(word) is not None:
-                summary["meta_registered"] += 1
-
-        if verbose:
-            print(f"word={word} event_at={event_at} action={action}")
-
-    audit_after = audit_wxindex_words_from_records(
-        repository,
-        after_created_at=after_created_at,
-    )
-    summary["audit_after"] = {
-        "missing_words_count": audit_after["missing_words_count"],
-        "missing_meta_count": audit_after["missing_meta_count"],
-        "missing_words": audit_after["missing_words"],
-        "missing_meta": audit_after["missing_meta"],
-    }
-    return summary

+ 0 - 2
docker-compose.yml

@@ -26,8 +26,6 @@ services:
       HOT_FLOW_CRON_HOURS: ${HOT_FLOW_CRON_HOURS:-6,12,18}
       HOT_FLOW_CRON_MINUTE: ${HOT_FLOW_CRON_MINUTE:-0}
       DECODE_RESULT_FLOW_INTERVAL_SECONDS: ${DECODE_RESULT_FLOW_INTERVAL_SECONDS:-1800}
-      WXINDEX_WORDS_CRON_HOUR: ${WXINDEX_WORDS_CRON_HOUR:-10}
-      WXINDEX_WORDS_CRON_MINUTE: ${WXINDEX_WORDS_CRON_MINUTE:-0}
       # 业务阈值
       WXINDEX_SCORE_THRESHOLD: ${WXINDEX_SCORE_THRESHOLD:-1000000}
       DEMAND_POOL_SOURCE_TABLE: ${DEMAND_POOL_SOURCE_TABLE:-dwd_multi_demand_pool_di}