Przeglądaj źródła

增加新流程

xueyiming 1 tydzień temu
rodzic
commit
815d26f7db

+ 10 - 0
app/core/config.py

@@ -157,6 +157,8 @@ class Settings:
     wxindex_lookback_days: int = 7
     wxindex_words_cron_hour: int = 10
     wxindex_words_cron_minute: int = 0
+    wxindex_heat_pattern_cron_hour: int = 11
+    wxindex_heat_pattern_cron_minute: int = 0
     demand_event_sense_threshold: float = 6.0
     demand_senior_fit_threshold: float = 6.0
     demand_quality_llm_model: str = "anthropic/claude-haiku-4-5"
@@ -347,6 +349,14 @@ class Settings:
                 "WXINDEX_WORDS_CRON_MINUTE",
                 defaults.wxindex_words_cron_minute,
             ),
+            wxindex_heat_pattern_cron_hour=_env_int(
+                "WXINDEX_HEAT_PATTERN_CRON_HOUR",
+                defaults.wxindex_heat_pattern_cron_hour,
+            ),
+            wxindex_heat_pattern_cron_minute=_env_int(
+                "WXINDEX_HEAT_PATTERN_CRON_MINUTE",
+                defaults.wxindex_heat_pattern_cron_minute,
+            ),
             demand_event_sense_threshold=_env_float(
                 "DEMAND_EVENT_SENSE_THRESHOLD",
                 defaults.demand_event_sense_threshold,

+ 8 - 0
app/hot_content/config.py

@@ -272,6 +272,14 @@ def load_flow_config(interval_override: int | None = None) -> FlowConfig:
             "WXINDEX_WORDS_CRON_MINUTE",
             settings.wxindex_words_cron_minute,
         ),
+        wxindex_heat_pattern_cron_hour=_get_env_int(
+            "WXINDEX_HEAT_PATTERN_CRON_HOUR",
+            settings.wxindex_heat_pattern_cron_hour,
+        ),
+        wxindex_heat_pattern_cron_minute=_get_env_int(
+            "WXINDEX_HEAT_PATTERN_CRON_MINUTE",
+            settings.wxindex_heat_pattern_cron_minute,
+        ),
         demand_event_sense_threshold=_get_env_float(
             "DEMAND_EVENT_SENSE_THRESHOLD",
             settings.demand_event_sense_threshold,

+ 110 - 16
app/hot_content/demand_pool_writer.py

@@ -84,6 +84,38 @@ def apply_odps_daily_write_limit(
     return rows_to_write, limit_skipped, limit_meta
 
 
+def filter_odps_rows_skip_synced_demand_ids(
+    repository: HotContentRepository,
+    writer: HotDemandPoolWriter,
+    *,
+    hive_rows: list[dict[str, Any]],
+    partition_dt: str,
+    strategy: str,
+) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
+    """跳过 hot_content_odps_sync_log 当天已有及 ODPS 分区已有的 demand_id。"""
+    synced_demand_ids = repository.list_synced_odps_demand_ids(
+        partition_dt=partition_dt,
+    )
+    odps_existing_demand_ids = writer._list_odps_partition_demand_ids(
+        partition_dt=partition_dt,
+        strategy=strategy,
+    )
+    skip_demand_ids = synced_demand_ids | odps_existing_demand_ids
+    rows_to_write: list[dict[str, Any]] = []
+    skipped_rows: list[dict[str, Any]] = []
+    for row in hive_rows:
+        demand_id = str(row.get("demand_id") or "").strip()
+        if demand_id in skip_demand_ids:
+            skipped_rows.append(row)
+            continue
+        rows_to_write.append(row)
+    return rows_to_write, skipped_rows, {
+        "sync_log_demand_id_count": len(synced_demand_ids),
+        "odps_existing_demand_id_count": len(odps_existing_demand_ids),
+        "skip_demand_id_count": len(skip_demand_ids),
+    }
+
+
 class HotDemandPoolWriter:
     def __init__(self, config: FlowConfig, repository: HotContentRepository):
         self.config = config
@@ -102,25 +134,16 @@ class HotDemandPoolWriter:
             event_threshold=self.config.demand_event_sense_threshold,
             senior_threshold=self.config.demand_senior_fit_threshold,
         )
-        synced_demand_ids = self.repository.list_synced_odps_demand_ids(
+        pending_rows, skipped_rows, dedupe_meta = filter_odps_rows_skip_synced_demand_ids(
+            self.repository,
+            self,
+            hive_rows=hive_rows,
             partition_dt=partition_dt,
             strategy=strategy,
         )
-        odps_existing_demand_ids = self._list_odps_partition_demand_ids(
+        daily_written_count = self.repository.count_odps_sync_log_rows(
             partition_dt=partition_dt,
-            strategy=strategy,
         )
-        skip_demand_ids = synced_demand_ids | odps_existing_demand_ids
-        daily_written_count = len(skip_demand_ids)
-
-        pending_rows: list[dict[str, Any]] = []
-        skipped_rows: list[dict[str, Any]] = []
-        for row in hive_rows:
-            demand_id = str(row.get("demand_id") or "").strip()
-            if demand_id in skip_demand_ids:
-                skipped_rows.append(row)
-                continue
-            pending_rows.append(row)
 
         rows_to_write, limit_skipped_rows, limit_meta = apply_odps_daily_write_limit(
             pending_rows,
@@ -140,6 +163,7 @@ class HotDemandPoolWriter:
             "skipped_rows": skipped_rows,
             "limit_skipped_rows": limit_skipped_rows,
             "target_table": self.config.demand_pool_source_table,
+            **dedupe_meta,
             **limit_meta,
         }
 
@@ -220,8 +244,10 @@ class HotDemandPoolWriter:
                     if demand_id:
                         demand_ids.add(demand_id)
             return demand_ids
-        except Exception:
-            return set()
+        except Exception as exc:
+            raise HotContentFlowError(
+                f"failed to list odps partition demand ids dt={partition_dt}: {exc}"
+            ) from exc
 
     def _insert_partition_rows(
         self,
@@ -272,3 +298,71 @@ def sync_hot_demands_to_hive(
 ) -> dict[str, Any]:
     writer = HotDemandPoolWriter(config, repository)
     return writer.sync_today()
+
+
+def sync_wxindex_word_rows_to_odps(
+    config: FlowConfig,
+    repository: HotContentRepository,
+    *,
+    hive_rows: list[dict[str, Any]],
+    partition_dt: str,
+    strategy: str,
+) -> dict[str, Any]:
+    """将微信指数最终保留词写入 ODPS 需求池表及 hot_content_odps_sync_log。"""
+    if not hive_rows:
+        return {
+            "partition_dt": partition_dt,
+            "strategy": strategy,
+            "candidate_row_count": 0,
+            "written_count": 0,
+            "odps_synced": 0,
+            "target_table": config.demand_pool_source_table,
+        }
+
+    writer = HotDemandPoolWriter(config, repository)
+    pending_rows, skipped_rows, dedupe_meta = filter_odps_rows_skip_synced_demand_ids(
+        repository,
+        writer,
+        hive_rows=hive_rows,
+        partition_dt=partition_dt,
+        strategy=strategy,
+    )
+    daily_written_count = repository.count_odps_sync_log_rows(partition_dt=partition_dt)
+    rows_to_write, limit_skipped_rows, limit_meta = apply_odps_daily_write_limit(
+        pending_rows,
+        existing_count=daily_written_count,
+        daily_limit=config.odps_daily_write_limit,
+    )
+    written_count = writer._insert_partition_rows(
+        hive_rows=rows_to_write,
+        partition_dt=partition_dt,
+    )
+    odps_synced = 0
+    if written_count:
+        odps_synced = repository.save_odps_sync_logs(
+            [
+                {
+                    "partition_dt": partition_dt,
+                    "strategy": strategy,
+                    "demand_id": row["demand_id"],
+                    "demand_name": row["demand_name"],
+                    "demand_type": row["type"],
+                    "record_id": row.get("record_id") or 0,
+                    "weight": row.get("weight"),
+                }
+                for row in rows_to_write
+            ]
+        )
+    return {
+        "partition_dt": partition_dt,
+        "strategy": strategy,
+        "candidate_row_count": len(hive_rows),
+        "pending_row_count": len(rows_to_write),
+        "skipped_row_count": len(skipped_rows),
+        "limit_skipped_row_count": len(limit_skipped_rows),
+        "written_count": written_count,
+        "odps_synced": odps_synced,
+        "target_table": config.demand_pool_source_table,
+        **dedupe_meta,
+        **limit_meta,
+    }

+ 106 - 17
app/hot_content/demand_quality.py

@@ -64,27 +64,111 @@ def passes_wxindex_gate(
     return _record_wxindex_score(export_rows) >= wxindex_threshold
 
 
-def _extract_json_object(text: str) -> dict[str, Any]:
+def _repair_json_text(text: str) -> str:
+    repaired = text.strip()
+    repaired = re.sub(r",\s*([}\]])", r"\1", repaired)
+    repaired = repaired.replace(""", '"').replace(""", '"')
+    repaired = repaired.replace("'", "'").replace("'", "'")
+    return repaired
+
+
+def _extract_score_demands_fallback(
+    raw: str,
+    candidates: list[dict[str, str]],
+) -> dict[str, Any] | None:
+    """标准 json.loads 失败时,按候选词宽松提取 score/items。"""
+    items: list[dict[str, Any]] = []
+    for candidate in candidates:
+        demand_type = str(candidate.get("demand_type") or "").strip()
+        demand_text = str(candidate.get("demand_text") or "").strip()
+        if not demand_type or not demand_text:
+            continue
+        escaped_text = re.escape(demand_text)
+        escaped_type = re.escape(demand_type)
+        score_patterns = [
+            (
+                rf'"demand_type"\s*:\s*"{escaped_type}"\s*,\s*'
+                rf'"demand_text"\s*:\s*"{escaped_text}"\s*,\s*'
+                rf'"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)'
+            ),
+            (
+                rf'"demand_text"\s*:\s*"{escaped_text}"\s*,\s*'
+                rf'"demand_type"\s*:\s*"{escaped_type}"\s*,\s*'
+                rf'"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)'
+            ),
+            (
+                rf'"demand_text"\s*:\s*"{escaped_text}"'
+                rf'[\s\S]{{0,400}}?"score"\s*:\s*([0-9]+(?:\.[0-9]+)?)'
+            ),
+        ]
+        score_value: float | None = None
+        for pattern in score_patterns:
+            match = re.search(pattern, raw)
+            if match:
+                score_value = _normalize_score(match.group(1))
+                break
+        if score_value is None:
+            continue
+        reason = ""
+        reason_match = re.search(
+            rf'"demand_text"\s*:\s*"{escaped_text}"'
+            rf'[\s\S]{{0,600}}?"reason"\s*:\s*"((?:[^"\\]|\\.)*)"',
+            raw,
+        )
+        if reason_match:
+            reason = (
+                reason_match.group(1)
+                .replace('\\"', '"')
+                .replace("\\n", "\n")
+                .replace("\\t", "\t")
+            )
+        items.append(
+            {
+                "demand_type": demand_type,
+                "demand_text": demand_text,
+                "score": score_value,
+                "reason": reason,
+            }
+        )
+    if not items:
+        return None
+    source_match = re.search(r'"source"\s*:\s*"((?:[^"\\]|\\.)*)"', raw)
+    return {
+        "source": source_match.group(1) if source_match else "",
+        "items": items,
+    }
+
+
+def _extract_json_object(
+    text: str,
+    *,
+    candidates: list[dict[str, str]] | None = None,
+) -> dict[str, Any]:
     raw = text.strip()
     if raw.startswith("```"):
         raw = re.sub(r"^```(?:json)?\s*", "", raw)
         raw = re.sub(r"\s*```$", "", raw)
-    try:
-        parsed = json.loads(raw)
-        if isinstance(parsed, dict):
-            return parsed
-    except json.JSONDecodeError:
-        pass
+    blocks = [raw]
     match = re.search(r"\{[\s\S]*\}", raw)
-    if not match:
-        raise HotContentFlowError("llm output is not json object")
-    try:
-        parsed = json.loads(match.group(0))
-    except json.JSONDecodeError as exc:
-        raise HotContentFlowError(f"llm output invalid json: {exc}") from exc
-    if not isinstance(parsed, dict):
-        raise HotContentFlowError("llm output is not json object")
-    return parsed
+    if match:
+        blocks.append(match.group(0))
+
+    for block in blocks:
+        for candidate_text in (block, _repair_json_text(block)):
+            try:
+                parsed = json.loads(candidate_text)
+                if isinstance(parsed, dict):
+                    return parsed
+            except json.JSONDecodeError:
+                continue
+
+    if candidates:
+        for block in blocks:
+            fallback = _extract_score_demands_fallback(block, candidates)
+            if fallback:
+                return fallback
+
+    raise HotContentFlowError("llm output is not json object")
 
 
 def _candidate_key(demand_type: str, demand_text: str) -> tuple[str, str]:
@@ -341,6 +425,7 @@ def _llm_score_demands(
             "仅对给定 demands 逐项评分,不得新增或遗漏",
             "score 为 0-10 的数字,越大表示越符合判断标准",
             "demand_type 与 demand_text 必须与输入完全一致",
+            "reason 字段请用中文表述,不要使用英文双引号 \"",
             "仅输出 JSON 对象,不要 markdown 代码块",
         ],
     }
@@ -360,7 +445,10 @@ def _llm_score_demands(
                 temperature=0,
                 max_tokens=max(max_tokens, 1),
             )
-            parsed = _extract_json_object(str(resp.get("content") or ""))
+            parsed = _extract_json_object(
+                str(resp.get("content") or ""),
+                candidates=candidates,
+            )
             parsed.setdefault("source", channel_content_id)
             items = _normalize_llm_items(parsed, candidates)
             return {
@@ -583,6 +671,7 @@ def llm_score_senior_fit(
 # 五、输出规则
 
 严格输出 JSON 对象(含 items 数组),禁止输出 JSON 之外的任何内容(无前缀、无解释、无markdown格式)。
+reason 字段请用中文表述,不要使用英文双引号 "。
     """
     return _llm_score_demands(
         channel_content_id=channel_content_id,

+ 193 - 21
app/hot_content/postprocess_service.py

@@ -183,13 +183,19 @@ class ContributionPostprocessService:
         wxindex_count = 0
         quality_count = 0
         exported_count = 0
+        export_failed_count = 0
         skipped_count = 0
         failed_count = 0
         for record in records:
             record_id = int(record["id"])
+            quality_saved = False
+            match_result: dict[str, Any] | None = None
             try:
                 match_result = record.get("contribution_demand_match_json")
-                if not isinstance(match_result, dict):
+                if (
+                    not isinstance(match_result, dict)
+                    or int(record.get("demand_cache_run_id") or 0) != demand_cache_run_id
+                ):
                     match_result = self.match_record(
                         record=record,
                         demand_name_set=demand_name_set,
@@ -215,7 +221,15 @@ class ContributionPostprocessService:
                         match_json=match_result,
                     )
 
-                trend_result = self.build_wxindex_trend(record, match_result)
+                postprocess_status = int(record.get("postprocess_status") or 0)
+                existing_trend = record.get("wxindex_trend_json")
+                if (
+                    postprocess_status == PostprocessStatus.WXINDEX_DONE
+                    and isinstance(existing_trend, dict)
+                ):
+                    trend_result = existing_trend
+                else:
+                    trend_result = self.build_wxindex_trend(record, match_result)
                 if trend_result is None:
                     self.repository.update_postprocess_status(
                         record_id=record_id,
@@ -223,23 +237,29 @@ class ContributionPostprocessService:
                         error_message="no matched demand words",
                     )
                     self._save_empty_demand_quality(record_id=record_id)
-                    exported_count += self.export_demand_terms_if_needed(
+                    export_rows_count, export_error = self._export_demand_terms_if_needed(
                         record=record,
                         match_result=match_result,
                         trend_result=None,
                     )
+                    if export_error:
+                        export_failed_count += 1
+                    else:
+                        exported_count += export_rows_count
                     skipped_count += 1
                     continue
 
-                self.repository.save_wxindex_trend(
-                    record_id=record_id,
-                    trend_json=trend_result,
-                )
-                self.sync_wxindex_words(
-                    record_id=record_id,
-                    trend_result=trend_result,
-                    event_created_at=record.get("created_at"),
-                )
+                if postprocess_status != PostprocessStatus.WXINDEX_DONE:
+                    self.repository.save_wxindex_trend(
+                        record_id=record_id,
+                        trend_json=trend_result,
+                    )
+                    self.sync_wxindex_words(
+                        record_id=record_id,
+                        trend_result=trend_result,
+                        event_created_at=record.get("created_at"),
+                    )
+                    wxindex_count += 1
                 event_sense_json, senior_fit_json = self.run_demand_quality_judgment(
                     record=record,
                     match_result=match_result,
@@ -250,15 +270,19 @@ class ContributionPostprocessService:
                     event_sense_json=event_sense_json,
                     senior_fit_json=senior_fit_json,
                 )
-                exported_count += self.export_demand_terms_if_needed(
+                quality_saved = True
+                quality_count += 1
+                export_rows_count, export_error = self._export_demand_terms_if_needed(
                     record=record,
                     match_result=match_result,
                     trend_result=trend_result,
                     event_sense_json=event_sense_json,
                     senior_fit_json=senior_fit_json,
                 )
-                wxindex_count += 1
-                quality_count += 1
+                if export_error:
+                    export_failed_count += 1
+                else:
+                    exported_count += export_rows_count
             except WxindexSelectionSkipped as exc:
                 self.repository.update_postprocess_status(
                     record_id=record_id,
@@ -267,18 +291,23 @@ class ContributionPostprocessService:
                 )
                 if isinstance(match_result, dict):
                     self._save_empty_demand_quality(record_id=record_id)
-                    exported_count += self.export_demand_terms_if_needed(
+                    export_rows_count, export_error = self._export_demand_terms_if_needed(
                         record=record,
                         match_result=match_result,
                         trend_result=None,
                     )
+                    if export_error:
+                        export_failed_count += 1
+                    else:
+                        exported_count += export_rows_count
                 skipped_count += 1
             except Exception as exc:
-                self.repository.update_postprocess_status(
-                    record_id=record_id,
-                    status=PostprocessStatus.FAILED,
-                    error_message=str(exc),
-                )
+                if not quality_saved:
+                    self.repository.update_postprocess_status(
+                        record_id=record_id,
+                        status=PostprocessStatus.FAILED,
+                        error_message=str(exc),
+                    )
                 failed_count += 1
 
         return self._finalize_run_result(
@@ -294,16 +323,44 @@ class ContributionPostprocessService:
                 "wxindex_count": wxindex_count,
                 "quality_count": quality_count,
                 "exported_count": exported_count,
+                "export_failed_count": export_failed_count,
                 "skipped_count": skipped_count,
                 "failed_count": failed_count,
             }
         )
 
+    def _export_demand_terms_if_needed(
+        self,
+        *,
+        record: dict[str, Any],
+        match_result: dict[str, Any],
+        trend_result: dict[str, Any] | None,
+        event_sense_json: dict[str, Any] | None = None,
+        senior_fit_json: dict[str, Any] | None = None,
+    ) -> tuple[int, str | None]:
+        """导出需求词,失败时返回错误信息且不改变 postprocess_status。"""
+        try:
+            exported_rows = self.export_demand_terms_if_needed(
+                record=record,
+                match_result=match_result,
+                trend_result=trend_result,
+                event_sense_json=event_sense_json,
+                senior_fit_json=senior_fit_json,
+            )
+        except Exception as exc:
+            return 0, str(exc)
+        return exported_rows, None
+
     def _finalize_run_result(self, result: dict[str, Any]) -> dict[str, Any]:
         try:
             result["hive_sync"] = sync_hot_demands_to_hive(self.config, self.repository)
         except Exception as exc:
             result["hive_sync_error"] = str(exc)
+        if result.get("hive_sync_error"):
+            if result.get("status") == "success":
+                result["status"] = "partial_failure"
+        if int(result.get("export_failed_count") or 0) > 0 and result.get("status") == "success":
+            result["status"] = "partial_failure"
         return result
 
     def sync_wxindex_words(
@@ -615,6 +672,121 @@ class ContributionPostprocessService:
             f"llm match failed for channelContentId={channel_content_id}: {last_error}"
         ) from last_error
 
+    @staticmethod
+    def _build_word_demand_match_result(
+        *,
+        word: str,
+        llm_result: dict[str, Any],
+        demand_lookup: dict[str, str],
+    ) -> dict[str, Any]:
+        target_word = str(word or "").strip()
+        match_list: list[dict[str, str]] = []
+        seen: set[tuple[str, str]] = set()
+        for item in llm_result.get("matched") or []:
+            if not isinstance(item, dict):
+                continue
+            matched_word = str(
+                item.get("title") or item.get("word") or item.get("词") or ""
+            ).strip()
+            demand_name = str(item.get("demand_name") or "").strip()
+            reason = str(item.get("reason") or "").strip()
+            if matched_word != target_word:
+                continue
+            canonical_demand_name = _resolve_demand_name(demand_name, demand_lookup)
+            if canonical_demand_name is None:
+                continue
+            dedupe_key = (canonical_demand_name, reason)
+            if dedupe_key in seen:
+                continue
+            seen.add(dedupe_key)
+            match_list.append(
+                {
+                    "demand_name": canonical_demand_name,
+                    "reason": reason,
+                }
+            )
+
+        matched_demand_names: list[str] = []
+        matched_seen: set[str] = set()
+        for item in match_list:
+            demand_name = str(item.get("demand_name") or "").strip()
+            if demand_name and demand_name not in matched_seen:
+                matched_seen.add(demand_name)
+                matched_demand_names.append(demand_name)
+
+        return {
+            "word": target_word,
+            "matched": bool(match_list),
+            "matched_demand": " ".join(matched_demand_names),
+            "match_list": match_list,
+        }
+
+    def match_words_to_demand_pool(
+        self,
+        *,
+        words: list[str],
+        demand_name_set: list[str],
+        source_id: str | None = None,
+    ) -> dict[str, dict[str, Any]]:
+        """批量热词与票圈内部需求池匹配,返回 word -> match_result。"""
+        cleaned: list[str] = []
+        seen: set[str] = set()
+        for raw in words:
+            word = str(raw or "").strip()
+            if not word or word in seen:
+                continue
+            seen.add(word)
+            cleaned.append(word)
+        if not cleaned:
+            return {}
+
+        llm_result = self.llm_match_single_article(
+            channel_content_id=source_id or f"wxindex_words:{','.join(cleaned[:3])}",
+            words=cleaned,
+            demand_name_set=demand_name_set,
+        )
+        demand_lookup = _build_demand_lookup(demand_name_set)
+        return {
+            word: self._build_word_demand_match_result(
+                word=word,
+                llm_result=llm_result,
+                demand_lookup=demand_lookup,
+            )
+            for word in cleaned
+        }
+
+    def match_single_word_to_demand_pool(
+        self,
+        *,
+        word: str,
+        demand_name_set: list[str],
+        source_id: str | None = None,
+    ) -> dict[str, Any]:
+        """单热词与票圈内部需求池匹配(复用主流程 LLM 规则)。"""
+        target_word = str(word or "").strip()
+        if not target_word:
+            return {
+                "word": "",
+                "matched": False,
+                "matched_demand": "",
+                "match_list": [],
+            }
+
+        batch_result = self.match_words_to_demand_pool(
+            words=[target_word],
+            demand_name_set=demand_name_set,
+            source_id=source_id,
+        )
+        return batch_result.get(
+            target_word,
+            {
+                "word": target_word,
+                "matched": False,
+                "matched_demand": "",
+                "match_list": [],
+            },
+        )
+
     def build_wxindex_trend(
         self,
         record: dict[str, Any],

+ 693 - 8
app/hot_content/repository.py

@@ -24,6 +24,197 @@ def _json_dumps(data: Any) -> str:
     return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
 
 
+def _nullable_bool(value: Any) -> int | None:
+    if value is None:
+        return None
+    return 1 if value else 0
+
+
+def _nullable_str(value: Any) -> str | None:
+    if value is None:
+        return None
+    text = str(value).strip()
+    return text if text else None
+
+
+def _nullable_json_dumps(value: Any) -> str | None:
+    if value is None:
+        return None
+    return _json_dumps(value)
+
+
+def _wxindex_word_record_row(payload: dict[str, Any]) -> tuple[Any, ...]:
+    name = str(payload.get("name") or "").strip()
+    analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
+    if not name or not analyze_ymd:
+        raise HotContentFlowError("invalid wxindex word record payload")
+    return (
+        name,
+        payload.get("meta_id"),
+        analyze_ymd,
+        payload.get("fetch_start_ymd"),
+        payload.get("fetch_end_ymd"),
+        payload.get("data_start_ymd"),
+        payload.get("data_end_ymd"),
+        payload.get("data_days"),
+        _nullable_bool(payload.get("is_sustained_high")),
+        _nullable_bool(payload.get("is_rising")),
+        _nullable_bool(payload.get("is_spike")),
+        payload.get("retain_reason"),
+        _nullable_bool(payload.get("is_internal_demand_matched")),
+        _nullable_str(payload.get("matched_demand")),
+        payload.get("demand_cache_run_id"),
+        _nullable_json_dumps(payload.get("internal_demand_match_json")),
+        payload.get("senior_fit_score"),
+        _nullable_json_dumps(payload.get("demand_senior_fit_json")),
+        _nullable_bool(payload.get("is_final_retained")),
+        payload.get("min_score"),
+        payload.get("max_score"),
+        payload.get("avg_score"),
+        _nullable_json_dumps(payload.get("detail_json")),
+    )
+
+
+_WXINDEX_WORD_RECORD_UPSERT_SQL = """
+    INSERT INTO hot_content_wxindex_word_records (
+        name,
+        meta_id,
+        analyze_ymd,
+        fetch_start_ymd,
+        fetch_end_ymd,
+        data_start_ymd,
+        data_end_ymd,
+        data_days,
+        is_sustained_high,
+        is_rising,
+        is_spike,
+        retain_reason,
+        is_internal_demand_matched,
+        matched_demand,
+        demand_cache_run_id,
+        internal_demand_match_json,
+        senior_fit_score,
+        demand_senior_fit_json,
+        is_final_retained,
+        min_score,
+        max_score,
+        avg_score,
+        detail_json
+    )
+    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    ON DUPLICATE KEY UPDATE
+        meta_id = VALUES(meta_id),
+        fetch_start_ymd = VALUES(fetch_start_ymd),
+        fetch_end_ymd = VALUES(fetch_end_ymd),
+        data_start_ymd = VALUES(data_start_ymd),
+        data_end_ymd = VALUES(data_end_ymd),
+        data_days = VALUES(data_days),
+        is_sustained_high = VALUES(is_sustained_high),
+        is_rising = VALUES(is_rising),
+        is_spike = VALUES(is_spike),
+        retain_reason = VALUES(retain_reason),
+        is_internal_demand_matched = VALUES(is_internal_demand_matched),
+        matched_demand = VALUES(matched_demand),
+        demand_cache_run_id = VALUES(demand_cache_run_id),
+        internal_demand_match_json = VALUES(internal_demand_match_json),
+        senior_fit_score = VALUES(senior_fit_score),
+        demand_senior_fit_json = VALUES(demand_senior_fit_json),
+        is_final_retained = VALUES(is_final_retained),
+        min_score = VALUES(min_score),
+        max_score = VALUES(max_score),
+        avg_score = VALUES(avg_score),
+        detail_json = VALUES(detail_json)
+"""
+
+_WXINDEX_WORD_RECORD_INIT_UPSERT_SQL = """
+    INSERT INTO hot_content_wxindex_word_records (
+        name,
+        meta_id,
+        analyze_ymd,
+        fetch_start_ymd,
+        fetch_end_ymd,
+        data_start_ymd,
+        data_end_ymd,
+        data_days,
+        is_sustained_high,
+        is_rising,
+        is_spike,
+        retain_reason,
+        is_internal_demand_matched,
+        matched_demand,
+        demand_cache_run_id,
+        internal_demand_match_json,
+        senior_fit_score,
+        demand_senior_fit_json,
+        is_final_retained,
+        min_score,
+        max_score,
+        avg_score,
+        detail_json
+    )
+    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    ON DUPLICATE KEY UPDATE
+        meta_id = VALUES(meta_id),
+        fetch_start_ymd = VALUES(fetch_start_ymd),
+        fetch_end_ymd = VALUES(fetch_end_ymd),
+        demand_cache_run_id = VALUES(demand_cache_run_id)
+"""
+
+
+def _wxindex_word_stats_row(payload: dict[str, Any]) -> tuple[Any, ...]:
+    name = str(payload.get("name") or "").strip()
+    analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
+    if not name or not analyze_ymd:
+        raise HotContentFlowError("invalid wxindex word stats payload")
+    return (
+        name,
+        payload.get("meta_id"),
+        analyze_ymd,
+        payload.get("wxindex_word_record_id"),
+        payload.get("retain_reason"),
+        payload.get("senior_fit_score"),
+        payload.get("data_start_ymd"),
+        payload.get("data_end_ymd"),
+        payload.get("data_days"),
+        payload.get("min_score"),
+        payload.get("max_score"),
+        payload.get("avg_score"),
+        _nullable_json_dumps(payload.get("detail_json")),
+    )
+
+
+_WXINDEX_WORD_STATS_UPSERT_SQL = """
+    INSERT INTO hot_content_wxindex_word_stats (
+        name,
+        meta_id,
+        analyze_ymd,
+        wxindex_word_record_id,
+        retain_reason,
+        senior_fit_score,
+        data_start_ymd,
+        data_end_ymd,
+        data_days,
+        min_score,
+        max_score,
+        avg_score,
+        detail_json
+    )
+    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    ON DUPLICATE KEY UPDATE
+        meta_id = VALUES(meta_id),
+        wxindex_word_record_id = VALUES(wxindex_word_record_id),
+        retain_reason = VALUES(retain_reason),
+        senior_fit_score = VALUES(senior_fit_score),
+        data_start_ymd = VALUES(data_start_ymd),
+        data_end_ymd = VALUES(data_end_ymd),
+        data_days = VALUES(data_days),
+        min_score = VALUES(min_score),
+        max_score = VALUES(max_score),
+        avg_score = VALUES(avg_score),
+        detail_json = VALUES(detail_json)
+"""
+
+
 def _json_loads(value: Any) -> Any:
     if value is None:
         return None
@@ -475,12 +666,16 @@ class HotContentRepository:
                 article_title,
                 article_body,
                 demand_cache_run_id,
+                postprocess_status,
                 decode_result_json,
                 contribution_points_json,
-                contribution_demand_match_json
+                contribution_demand_match_json,
+                wxindex_trend_json,
+                demand_event_sense_json,
+                demand_senior_fit_json
             FROM hot_content_records
             WHERE contribution_points_json IS NOT NULL
-              AND postprocess_status IN (%s, %s, %s)
+              AND postprocess_status IN (%s, %s, %s, %s)
             ORDER BY updated_at ASC, id ASC
             LIMIT %s
         """
@@ -490,6 +685,7 @@ class HotContentRepository:
                 (
                     PostprocessStatus.PENDING,
                     PostprocessStatus.DEMAND_MATCHED,
+                    PostprocessStatus.WXINDEX_DONE,
                     PostprocessStatus.FAILED,
                     limit,
                 ),
@@ -505,11 +701,17 @@ class HotContentRepository:
                 "article_title": row.get("article_title"),
                 "article_body": row.get("article_body"),
                 "demand_cache_run_id": row.get("demand_cache_run_id"),
+                "postprocess_status": int(row.get("postprocess_status") or 0),
                 "decode_result_json": _json_loads(row.get("decode_result_json")),
                 "contribution_points_json": _json_loads(row.get("contribution_points_json")),
                 "contribution_demand_match_json": _json_loads(
                     row.get("contribution_demand_match_json")
                 ),
+                "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
+                "demand_event_sense_json": _json_loads(
+                    row.get("demand_event_sense_json")
+                ),
+                "demand_senior_fit_json": _json_loads(row.get("demand_senior_fit_json")),
             }
             for row in rows
         ]
@@ -686,7 +888,7 @@ class HotContentRepository:
                 cursor.executemany(insert_sql, insert_rows)
 
     def list_odps_sync_records(self) -> list[dict[str, Any]]:
-        """读取当天创建且已完成质量判断的记录,供 ODPS 同步(不处理历史数据)。"""
+        """读取当天完成 postprocess 且质量判断完成的记录,供 ODPS 同步。"""
         self._ensure_record_quality_columns()
         today_start = datetime.now(SHANGHAI_TZ).replace(
             hour=0,
@@ -705,8 +907,8 @@ class HotContentRepository:
                 demand_event_sense_json,
                 demand_senior_fit_json
             FROM hot_content_records
-            WHERE created_at >= %s
-              AND created_at < %s
+            WHERE updated_at >= %s
+              AND updated_at < %s
               AND postprocess_status = %s
               AND contribution_demand_match_json IS NOT NULL
               AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
@@ -1007,17 +1209,19 @@ class HotContentRepository:
         self,
         *,
         partition_dt: str,
-        strategy: str,
     ) -> set[str]:
+        """返回 hot_content_odps_sync_log 中指定分区日已有的 demand_id(跨流程去重用)。"""
+        normalized_partition_dt = str(partition_dt or "").strip()
+        if not normalized_partition_dt:
+            return set()
         self._ensure_odps_sync_log_table()
         sql = """
             SELECT demand_id
             FROM hot_content_odps_sync_log
             WHERE partition_dt = %s
-              AND strategy = %s
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (partition_dt, strategy))
+            cursor.execute(sql, (normalized_partition_dt,))
             rows = cursor.fetchall()
         return {
             str(row.get("demand_id") or "").strip()
@@ -1025,6 +1229,22 @@ class HotContentRepository:
             if str(row.get("demand_id") or "").strip()
         }
 
+    def count_odps_sync_log_rows(self, *, partition_dt: str) -> int:
+        """统计 hot_content_odps_sync_log 指定分区日已写入条数(供日限额计算)。"""
+        normalized_partition_dt = str(partition_dt or "").strip()
+        if not normalized_partition_dt:
+            return 0
+        self._ensure_odps_sync_log_table()
+        sql = """
+            SELECT COUNT(*) AS cnt
+            FROM hot_content_odps_sync_log
+            WHERE partition_dt = %s
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (normalized_partition_dt,))
+            row = cursor.fetchone() or {}
+        return int(row.get("cnt") or 0)
+
     def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
         if not rows:
             return 0
@@ -1110,6 +1330,471 @@ class HotContentRepository:
             scores.append({"ymd": dt, "total_score": total_score})
         return scores
 
+    def list_wxindex_word_scores_in_range(
+        self,
+        name: str,
+        *,
+        start_ymd: str,
+        end_ymd: str,
+    ) -> list[dict[str, Any]]:
+        word = str(name or "").strip()
+        start = str(start_ymd or "").strip()
+        end = str(end_ymd or "").strip()
+        if not word or not start or not end:
+            return []
+        self._ensure_wxindex_words_table()
+        sql = """
+            SELECT dt, total_score
+            FROM hot_content_wxindex_words
+            WHERE name = %s
+              AND dt >= %s
+              AND dt <= %s
+            ORDER BY dt ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (word, start, end))
+            rows = cursor.fetchall()
+        scores: list[dict[str, Any]] = []
+        for row in rows:
+            dt = str(row.get("dt") or "").strip()
+            if not dt:
+                continue
+            try:
+                total_score = float(row["total_score"])
+            except (TypeError, ValueError, KeyError):
+                continue
+            scores.append({"ymd": dt, "total_score": total_score})
+        return scores
+
+    def list_active_wxindex_word_meta(
+        self,
+        *,
+        today: date | None = None,
+    ) -> list[dict[str, Any]]:
+        """返回当天仍在抓取窗口内的 meta(today < fetch_end_ymd)。"""
+        current = today or datetime.now(SHANGHAI_TZ).date()
+        today_ymd = current.strftime("%Y%m%d")
+        self._ensure_wxindex_word_meta_table()
+        sql = """
+            SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
+            FROM hot_content_wxindex_word_meta
+            WHERE fetch_end_ymd > %s
+            ORDER BY id ASC
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, (today_ymd,))
+            rows = cursor.fetchall()
+        result: list[dict[str, Any]] = []
+        for row in rows:
+            meta = self._normalize_wxindex_word_meta_row(row)
+            if meta is not None:
+                result.append(meta)
+        return result
+
+    def save_wxindex_word_record(self, payload: dict[str, Any]) -> int:
+        row = _wxindex_word_record_row(payload)
+        name = str(row[0])
+        analyze_ymd = str(row[2])
+        self._ensure_wxindex_word_records_table()
+        with self.conn.cursor() as cursor:
+            cursor.execute(_WXINDEX_WORD_RECORD_UPSERT_SQL, row)
+            cursor.execute(
+                """
+                SELECT id
+                FROM hot_content_wxindex_word_records
+                WHERE name = %s
+                  AND analyze_ymd = %s
+                """,
+                (name, analyze_ymd),
+            )
+            record_row = cursor.fetchone() or {}
+        return int(record_row.get("id") or 0)
+
+    def init_wxindex_word_records(
+        self,
+        payloads: list[dict[str, Any]],
+    ) -> dict[str, int]:
+        """批量 init 追溯记录:一次 executemany,重复跑只更新 meta/抓取窗口。"""
+        if not payloads:
+            return {}
+        rows: list[tuple[Any, ...]] = []
+        analyze_ymd = ""
+        names: list[str] = []
+        for payload in payloads:
+            row = _wxindex_word_record_row(payload)
+            rows.append(row)
+            analyze_ymd = str(row[2])
+            names.append(str(row[0]))
+        self._ensure_wxindex_word_records_table()
+        with self.conn.cursor() as cursor:
+            cursor.executemany(_WXINDEX_WORD_RECORD_INIT_UPSERT_SQL, rows)
+        return self.map_wxindex_word_record_ids(analyze_ymd=analyze_ymd, names=names)
+
+    def map_wxindex_word_record_ids(
+        self,
+        *,
+        analyze_ymd: str,
+        names: list[str],
+    ) -> dict[str, int]:
+        normalized_analyze_ymd = str(analyze_ymd or "").strip()
+        normalized_names = [
+            str(name or "").strip() for name in names if str(name or "").strip()
+        ]
+        if not normalized_analyze_ymd or not normalized_names:
+            return {}
+        placeholders = ", ".join(["%s"] * len(normalized_names))
+        sql = f"""
+            SELECT id, name
+            FROM hot_content_wxindex_word_records
+            WHERE analyze_ymd = %s
+              AND name IN ({placeholders})
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, [normalized_analyze_ymd, *normalized_names])
+            rows = cursor.fetchall()
+        return {
+            str(row.get("name") or "").strip(): int(row.get("id") or 0)
+            for row in rows
+            if str(row.get("name") or "").strip()
+        }
+
+    def list_wxindex_word_records_by_analyze_ymd(
+        self,
+        *,
+        analyze_ymd: str,
+        names: list[str] | None = None,
+    ) -> dict[str, dict[str, Any]]:
+        """按 analyze_ymd 批量读取追溯记录,供同日重跑跳过已完成阶段。"""
+        normalized_analyze_ymd = str(analyze_ymd or "").strip()
+        if not normalized_analyze_ymd:
+            return {}
+        normalized_names = [
+            str(name or "").strip() for name in (names or []) if str(name or "").strip()
+        ]
+        self._ensure_wxindex_word_records_table()
+        params: list[Any] = [normalized_analyze_ymd]
+        name_filter = ""
+        if normalized_names:
+            placeholders = ", ".join(["%s"] * len(normalized_names))
+            name_filter = f" AND name IN ({placeholders})"
+            params.extend(normalized_names)
+        sql = f"""
+            SELECT
+                id,
+                name,
+                meta_id,
+                analyze_ymd,
+                fetch_start_ymd,
+                fetch_end_ymd,
+                data_start_ymd,
+                data_end_ymd,
+                data_days,
+                is_sustained_high,
+                is_rising,
+                is_spike,
+                retain_reason,
+                is_internal_demand_matched,
+                matched_demand,
+                demand_cache_run_id,
+                internal_demand_match_json,
+                senior_fit_score,
+                demand_senior_fit_json,
+                is_final_retained,
+                min_score,
+                max_score,
+                avg_score,
+                detail_json
+            FROM hot_content_wxindex_word_records
+            WHERE analyze_ymd = %s{name_filter}
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, params)
+            rows = cursor.fetchall()
+        result: dict[str, dict[str, Any]] = {}
+        for row in rows or []:
+            name = str(row.get("name") or "").strip()
+            if not name:
+                continue
+            record = dict(row)
+            record["internal_demand_match_json"] = _json_loads(
+                record.get("internal_demand_match_json")
+            )
+            record["demand_senior_fit_json"] = _json_loads(
+                record.get("demand_senior_fit_json")
+            )
+            record["detail_json"] = _json_loads(record.get("detail_json"))
+            result[name] = record
+        return result
+
+    def list_wxindex_word_stats_names(
+        self,
+        *,
+        analyze_ymd: str,
+        names: list[str] | None = None,
+    ) -> set[str]:
+        """返回指定 analyze_ymd 下已写入 stats 表的词名集合。"""
+        normalized_analyze_ymd = str(analyze_ymd or "").strip()
+        if not normalized_analyze_ymd:
+            return set()
+        normalized_names = [
+            str(name or "").strip() for name in (names or []) if str(name or "").strip()
+        ]
+        self._ensure_wxindex_word_stats_table()
+        params: list[Any] = [normalized_analyze_ymd]
+        name_filter = ""
+        if normalized_names:
+            placeholders = ", ".join(["%s"] * len(normalized_names))
+            name_filter = f" AND name IN ({placeholders})"
+            params.extend(normalized_names)
+        sql = f"""
+            SELECT name
+            FROM hot_content_wxindex_word_stats
+            WHERE analyze_ymd = %s{name_filter}
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, params)
+            rows = cursor.fetchall()
+        return {
+            str(row.get("name") or "").strip()
+            for row in rows or []
+            if str(row.get("name") or "").strip()
+        }
+
+    def save_wxindex_word_stats_batch(
+        self,
+        payloads: list[dict[str, Any]],
+    ) -> int:
+        """批量写入通过热度+老年性筛选的词统计。"""
+        if not payloads:
+            return 0
+        rows = [_wxindex_word_stats_row(payload) for payload in payloads]
+        self._ensure_wxindex_word_stats_table()
+        with self.conn.cursor() as cursor:
+            cursor.executemany(_WXINDEX_WORD_STATS_UPSERT_SQL, rows)
+        return len(rows)
+
+    def _ensure_wxindex_word_stats_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_stats (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
+                name VARCHAR(256) NOT NULL COMMENT '词',
+                meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
+                analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
+                wxindex_word_record_id BIGINT UNSIGNED NULL COMMENT '关联 records.id',
+                retain_reason VARCHAR(64) NULL COMMENT '热度保留原因',
+                senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
+                data_start_ymd VARCHAR(8) NULL COMMENT '分析数据起始日',
+                data_end_ymd VARCHAR(8) NULL COMMENT '分析数据结束日',
+                data_days INT NULL COMMENT '分析天数',
+                min_score DOUBLE NULL COMMENT '区间最低热度',
+                max_score DOUBLE NULL COMMENT '区间最高热度',
+                avg_score DOUBLE NULL COMMENT '区间平均热度',
+                detail_json JSON NULL COMMENT '扩展详情',
+                created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+                updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
+                    ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+                PRIMARY KEY (id),
+                UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
+                KEY idx_analyze_ymd (analyze_ymd),
+                KEY idx_retain_reason (retain_reason)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+
+    def _ensure_wxindex_word_records_table(self) -> None:
+        sql = """
+            CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_records (
+                id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
+                name VARCHAR(256) NOT NULL COMMENT '词',
+                meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
+                analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
+                fetch_start_ymd VARCHAR(8) NULL COMMENT 'meta 抓取起始日',
+                fetch_end_ymd VARCHAR(8) NULL COMMENT 'meta 抓取结束日',
+                data_start_ymd VARCHAR(8) NULL COMMENT '实际分析数据起始日',
+                data_end_ymd VARCHAR(8) NULL COMMENT '实际分析数据结束日',
+                data_days INT NULL COMMENT '实际分析天数',
+                is_sustained_high TINYINT(1) NULL COMMENT '持续热度>1000万',
+                is_rising TINYINT(1) NULL COMMENT '热度持续上涨',
+                is_spike TINYINT(1) NULL COMMENT '最近3天突然暴涨',
+                retain_reason VARCHAR(64) NULL COMMENT '保留原因(按2->3->1优先级)',
+                is_internal_demand_matched TINYINT(1) NULL COMMENT '是否匹配票圈内部需求',
+                matched_demand VARCHAR(1024) NULL COMMENT '匹配到的内部需求',
+                demand_cache_run_id BIGINT UNSIGNED NULL COMMENT '需求池缓存ID',
+                internal_demand_match_json JSON NULL COMMENT '内部需求匹配详情',
+                senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
+                demand_senior_fit_json JSON NULL COMMENT '老年性 LLM 评分结果',
+                is_final_retained TINYINT(1) NULL COMMENT '老年性达标且最终保留',
+                min_score DOUBLE NULL COMMENT '区间最低热度',
+                max_score DOUBLE NULL COMMENT '区间最高热度',
+                avg_score DOUBLE NULL COMMENT '区间平均热度',
+                detail_json JSON NULL COMMENT '分析详情',
+                created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+                updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
+                    ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+                PRIMARY KEY (id),
+                UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
+                KEY idx_analyze_ymd (analyze_ymd),
+                KEY idx_patterns (is_sustained_high, is_rising, is_spike),
+                KEY idx_retain_reason (retain_reason)
+            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql)
+            self._ensure_wxindex_word_records_retain_columns(cursor)
+            self._ensure_wxindex_word_records_senior_columns(cursor)
+            self._ensure_wxindex_word_records_nullable_columns(cursor)
+
+    def _ensure_wxindex_word_records_nullable_columns(self, cursor: Any) -> None:
+        nullable_specs = {
+            "fetch_start_ymd": (
+                "MODIFY COLUMN fetch_start_ymd VARCHAR(8) NULL "
+                "COMMENT 'meta 抓取起始日'"
+            ),
+            "fetch_end_ymd": (
+                "MODIFY COLUMN fetch_end_ymd VARCHAR(8) NULL "
+                "COMMENT 'meta 抓取结束日'"
+            ),
+            "data_start_ymd": (
+                "MODIFY COLUMN data_start_ymd VARCHAR(8) NULL "
+                "COMMENT '实际分析数据起始日'"
+            ),
+            "data_end_ymd": (
+                "MODIFY COLUMN data_end_ymd VARCHAR(8) NULL "
+                "COMMENT '实际分析数据结束日'"
+            ),
+            "data_days": (
+                "MODIFY COLUMN data_days INT NULL COMMENT '实际分析天数'"
+            ),
+            "is_sustained_high": (
+                "MODIFY COLUMN is_sustained_high TINYINT(1) NULL "
+                "COMMENT '持续热度>1000万'"
+            ),
+            "is_rising": (
+                "MODIFY COLUMN is_rising TINYINT(1) NULL "
+                "COMMENT '热度持续上涨'"
+            ),
+            "is_spike": (
+                "MODIFY COLUMN is_spike TINYINT(1) NULL "
+                "COMMENT '最近3天突然暴涨'"
+            ),
+            "matched_demand": (
+                "MODIFY COLUMN matched_demand VARCHAR(1024) NULL "
+                "COMMENT '匹配到的内部需求'"
+            ),
+            "is_final_retained": (
+                "MODIFY COLUMN is_final_retained TINYINT(1) NULL "
+                "COMMENT '老年性达标且最终保留'"
+            ),
+        }
+        column_names = list(nullable_specs.keys())
+        placeholders = ", ".join(["%s"] * len(column_names))
+        cursor.execute(
+            f"""
+            SELECT COLUMN_NAME, IS_NULLABLE
+            FROM information_schema.COLUMNS
+            WHERE TABLE_SCHEMA = DATABASE()
+              AND TABLE_NAME = 'hot_content_wxindex_word_records'
+              AND COLUMN_NAME IN ({placeholders})
+            """,
+            column_names,
+        )
+        nullable_map = {
+            str(row.get("COLUMN_NAME") or ""): str(row.get("IS_NULLABLE") or "").upper()
+            for row in (cursor.fetchall() or [])
+        }
+        alters = [
+            sql
+            for column_name, sql in nullable_specs.items()
+            if nullable_map.get(column_name) == "NO"
+        ]
+        if alters:
+            cursor.execute(
+                "ALTER TABLE hot_content_wxindex_word_records " + ", ".join(alters)
+            )
+
+    def _ensure_wxindex_word_records_senior_columns(self, cursor: Any) -> None:
+        columns = {
+            "senior_fit_score": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN senior_fit_score DOUBLE NULL "
+                "COMMENT '老年性得分 0-10' "
+                "AFTER internal_demand_match_json"
+            ),
+            "demand_senior_fit_json": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN demand_senior_fit_json JSON NULL "
+                "COMMENT '老年性 LLM 评分结果' "
+                "AFTER senior_fit_score"
+            ),
+            "is_final_retained": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN is_final_retained TINYINT(1) NULL "
+                "COMMENT '老年性达标且最终保留' "
+                "AFTER demand_senior_fit_json"
+            ),
+        }
+        for column_name, alter_sql in columns.items():
+            cursor.execute(
+                """
+                SELECT COUNT(*) AS cnt
+                FROM information_schema.COLUMNS
+                WHERE TABLE_SCHEMA = DATABASE()
+                  AND TABLE_NAME = 'hot_content_wxindex_word_records'
+                  AND COLUMN_NAME = %s
+                """,
+                (column_name,),
+            )
+            if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+                cursor.execute(alter_sql)
+
+    def _ensure_wxindex_word_records_retain_columns(self, cursor: Any) -> None:
+        columns = {
+            "retain_reason": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN retain_reason VARCHAR(64) NULL "
+                "COMMENT '保留原因(按2->3->1优先级)' "
+                "AFTER is_spike"
+            ),
+            "is_internal_demand_matched": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN is_internal_demand_matched TINYINT(1) NULL "
+                "COMMENT '是否匹配票圈内部需求' "
+                "AFTER retain_reason"
+            ),
+            "matched_demand": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN matched_demand VARCHAR(1024) NULL "
+                "COMMENT '匹配到的内部需求' "
+                "AFTER is_internal_demand_matched"
+            ),
+            "demand_cache_run_id": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN demand_cache_run_id BIGINT UNSIGNED NULL "
+                "COMMENT '需求池缓存ID' "
+                "AFTER matched_demand"
+            ),
+            "internal_demand_match_json": (
+                "ALTER TABLE hot_content_wxindex_word_records "
+                "ADD COLUMN internal_demand_match_json JSON NULL "
+                "COMMENT '内部需求匹配详情' "
+                "AFTER demand_cache_run_id"
+            ),
+        }
+        for column_name, alter_sql in columns.items():
+            cursor.execute(
+                """
+                SELECT COUNT(*) AS cnt
+                FROM information_schema.COLUMNS
+                WHERE TABLE_SCHEMA = DATABASE()
+                  AND TABLE_NAME = 'hot_content_wxindex_word_records'
+                  AND COLUMN_NAME = %s
+                """,
+                (column_name,),
+            )
+            if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
+                cursor.execute(alter_sql)
+
     def list_stale_wxindex_words(
         self,
         *,

+ 2 - 0
app/hot_content/types.py

@@ -58,6 +58,8 @@ class FlowConfig:
     wxindex_lookback_days: int
     wxindex_words_cron_hour: int
     wxindex_words_cron_minute: int
+    wxindex_heat_pattern_cron_hour: int
+    wxindex_heat_pattern_cron_minute: int
     demand_event_sense_threshold: float
     demand_senior_fit_threshold: float
     demand_quality_llm_model: str

+ 1603 - 0
app/hot_content/wxindex_heat_pattern.py

@@ -0,0 +1,1603 @@
+"""微信指数热度模式分析:持续高热、持续上涨、突然暴涨。"""
+
+from __future__ import annotations
+
+import csv
+import json
+from datetime import date, datetime
+from pathlib import Path
+from typing import Any
+
+from app.hot_content.client import JsonApiClient
+from app.hot_content.demand_cache_service import DemandCacheService
+from app.hot_content.demand_hive_export import WEIGHT_DIVISOR, build_demand_id
+from app.hot_content.demand_quality import (
+    TYPE_PHRASE,
+    llm_score_senior_fit,
+    lookup_quality_scores,
+)
+from app.hot_content.demand_pool_writer import sync_wxindex_word_rows_to_odps
+from app.hot_content.exceptions import HotContentFlowError
+from app.hot_content.postprocess_service import ContributionPostprocessService
+from app.hot_content.repository import HotContentRepository
+from app.hot_content.timezone import SHANGHAI_TZ
+from app.hot_content.types import FlowConfig
+from app.hot_content.wxindex_trend import (
+    HEAT_RISING_ADJACENT_UP_RATIO,
+    HEAT_RISING_OVERALL_CHANGE_RATE,
+    HEAT_RISING_WINDOW_CHANGE_RATE,
+    HEAT_SPIKE_BASELINE_FLOOR,
+    HEAT_SPIKE_RATIO,
+    extract_sorted_scores,
+    is_wxindex_heat_rising_scores,
+    is_wxindex_spike_scores,
+)
+from app.hot_content.wxindex_words import filter_scores_in_ymd_window
+
+WXINDEX_HEAT_MIN_DAYS = 7
+WXINDEX_HEAT_MAX_DAYS = 14
+WXINDEX_SUSTAINED_HIGH_THRESHOLD = 10_000_000.0
+WXINDEX_SPIKE_LOOKBACK_DAYS = 3
+WXINDEX_SPIKE_RATIO = HEAT_SPIKE_RATIO
+WXINDEX_SPIKE_BASELINE_FLOOR = HEAT_SPIKE_BASELINE_FLOOR
+WXINDEX_RISING_OVERALL_CHANGE_RATE = HEAT_RISING_OVERALL_CHANGE_RATE
+WXINDEX_RISING_WINDOW_CHANGE_RATE = HEAT_RISING_WINDOW_CHANGE_RATE
+WXINDEX_RISING_ADJACENT_UP_RATIO = HEAT_RISING_ADJACENT_UP_RATIO
+WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD = 0.6
+WXINDEX_WORD_LLM_BATCH_SIZE = 10
+
+PATTERN_SUSTAINED_HIGH = "sustained_high"
+PATTERN_RISING = "rising"
+PATTERN_SPIKE = "spike"
+
+RETAIN_REASON_SUSTAINED_HIGH = "持续高热度"
+RETAIN_REASON_RISING = "热度持续上涨"
+RETAIN_REASON_SPIKE = "热度突然暴涨"
+
+PHASE_ANALYSIS_SKIPPED = "analysis_skipped"
+PHASE_HEAT_ANALYZED = "heat_analyzed"
+PHASE_DEMAND_MATCHED = "demand_matched"
+PHASE_SENIOR_FIT_SCORED = "senior_fit_scored"
+PHASE_FINALIZED = "finalized"
+
+_HEAT_DONE_PHASES = frozenset(
+    {
+        PHASE_ANALYSIS_SKIPPED,
+        PHASE_HEAT_ANALYZED,
+        PHASE_DEMAND_MATCHED,
+        PHASE_SENIOR_FIT_SCORED,
+        PHASE_FINALIZED,
+    }
+)
+
+_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS: dict[str, Any] = {
+    "data_start_ymd": None,
+    "data_end_ymd": None,
+    "data_days": None,
+    "is_sustained_high": None,
+    "is_rising": None,
+    "is_spike": None,
+    "retain_reason": None,
+    "is_internal_demand_matched": None,
+    "matched_demand": None,
+    "internal_demand_match_json": None,
+    "senior_fit_score": None,
+    "demand_senior_fit_json": None,
+    "is_final_retained": None,
+    "min_score": None,
+    "max_score": None,
+    "avg_score": None,
+    "detail_json": None,
+}
+
+
+def _parse_record_detail_json(record: dict[str, Any]) -> dict[str, Any]:
+    detail = record.get("detail_json")
+    if isinstance(detail, dict):
+        return detail
+    if isinstance(detail, str) and detail.strip():
+        try:
+            parsed = json.loads(detail)
+            return parsed if isinstance(parsed, dict) else {}
+        except json.JSONDecodeError:
+            return {}
+    return {}
+
+
+def _record_phase(record: dict[str, Any]) -> str:
+    return str(_parse_record_detail_json(record).get("phase") or "").strip()
+
+
+def _nullable_bool_from_record(value: Any) -> bool | None:
+    if value is None:
+        return None
+    return bool(value)
+
+
+def _parse_record_json_field(record: dict[str, Any], field: str) -> Any:
+    value = record.get(field)
+    if value is None:
+        return None
+    if isinstance(value, (dict, list)):
+        return value
+    if isinstance(value, (bytes, bytearray)):
+        value = value.decode("utf-8")
+    if isinstance(value, str):
+        try:
+            return json.loads(value)
+        except json.JSONDecodeError:
+            return None
+    return value
+
+
+def _is_heat_analysis_done(record: dict[str, Any] | None) -> bool:
+    if not record:
+        return False
+    return _record_phase(record) in _HEAT_DONE_PHASES
+
+
+def _is_analysis_skipped_record(record: dict[str, Any]) -> bool:
+    return _record_phase(record) == PHASE_ANALYSIS_SKIPPED
+
+
+def _is_senior_fit_needed(
+    *,
+    retain_reason: str | None,
+    is_internal_demand_matched: bool | None,
+) -> bool:
+    if not retain_reason:
+        return False
+    if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
+        return bool(is_internal_demand_matched)
+    return True
+
+
+def _is_finalized_record(record: dict[str, Any] | None) -> bool:
+    if not record:
+        return False
+    return _record_phase(record) == PHASE_FINALIZED
+
+
+def _latest_score_ymd(scores: list[dict[str, Any]]) -> str | None:
+    ymds = [_score_row_ymd(row) for row in scores if _score_row_ymd(row)]
+    return max(ymds) if ymds else None
+
+
+def _should_rerun_heat_analysis(
+    existing_record: dict[str, Any] | None,
+    *,
+    scores: list[dict[str, Any]],
+    fetch_start_ymd: str,
+    fetch_end_ymd: str,
+    min_days: int,
+    max_days: int,
+) -> bool:
+    if not _is_heat_analysis_done(existing_record):
+        return True
+    assert existing_record is not None
+    latest_ymd = _latest_score_ymd(scores)
+    record_end = str(existing_record.get("data_end_ymd") or "").strip()
+    if latest_ymd and record_end and latest_ymd > record_end:
+        return True
+    if _is_analysis_skipped_record(existing_record):
+        _, skip_reason = prepare_analysis_scores(
+            scores,
+            start_ymd=fetch_start_ymd,
+            end_ymd=fetch_end_ymd,
+            min_days=min_days,
+            max_days=max_days,
+        )
+        if skip_reason is None:
+            return True
+    return False
+
+
+def _is_senior_fit_attempt_done(
+    item: dict[str, Any],
+    existing_record: dict[str, Any] | None,
+) -> bool:
+    if existing_record:
+        phase = _record_phase(existing_record)
+        if phase in (PHASE_SENIOR_FIT_SCORED, PHASE_FINALIZED):
+            return True
+    return item.get("senior_fit_score") is not None
+
+
+def _senior_fit_passed_from_score(score: Any, *, senior_threshold: float) -> bool:
+    if score is None:
+        return False
+    try:
+        return float(score) / 10.0 > senior_threshold
+    except (TypeError, ValueError):
+        return False
+
+
+def _rehydrate_result_from_record(record: dict[str, Any]) -> dict[str, Any]:
+    detail = _parse_record_detail_json(record)
+    retain_reason_raw = record.get("retain_reason")
+    retain_reason = (
+        str(retain_reason_raw).strip() if retain_reason_raw is not None else None
+    ) or None
+
+    def _bool_field(key: str) -> bool | None:
+        value = record.get(key)
+        if value is None:
+            return None
+        return bool(value)
+
+    return {
+        "skipped": False,
+        "skip_reason": None,
+        "data_days": record.get("data_days"),
+        "data_start_ymd": record.get("data_start_ymd"),
+        "data_end_ymd": record.get("data_end_ymd"),
+        "fetch_start_ymd": record.get("fetch_start_ymd"),
+        "fetch_end_ymd": record.get("fetch_end_ymd"),
+        "min_score": record.get("min_score"),
+        "max_score": record.get("max_score"),
+        "avg_score": record.get("avg_score"),
+        "is_sustained_high": _bool_field("is_sustained_high"),
+        "is_rising": _bool_field("is_rising"),
+        "is_spike": _bool_field("is_spike"),
+        "retain_reason": retain_reason,
+        "patterns": list(detail.get("patterns") or []),
+    }
+
+
+def _rehydrate_pending_item_from_record(
+    item: dict[str, Any],
+    record: dict[str, Any],
+    *,
+    senior_threshold: float,
+) -> dict[str, Any]:
+    result = _rehydrate_result_from_record(record)
+    retain_reason = result.get("retain_reason")
+    is_internal_demand_matched = _nullable_bool_from_record(
+        record.get("is_internal_demand_matched")
+    )
+    matched_demand_raw = record.get("matched_demand")
+    matched_demand = (
+        str(matched_demand_raw).strip() if matched_demand_raw is not None else None
+    ) or None
+    senior_fit_score = record.get("senior_fit_score")
+    senior_fit_passed: bool | None = None
+    is_final_retained = _nullable_bool_from_record(record.get("is_final_retained"))
+    if senior_fit_score is not None:
+        senior_fit_passed = _senior_fit_passed_from_score(
+            senior_fit_score,
+            senior_threshold=senior_threshold,
+        )
+    elif not _is_senior_fit_needed(
+        retain_reason=retain_reason,
+        is_internal_demand_matched=is_internal_demand_matched,
+    ):
+        senior_fit_passed = False
+        if is_final_retained is None:
+            is_final_retained = False
+
+    return {
+        "meta": item["meta"],
+        "name": item["name"],
+        "fetch_start_ymd": item["fetch_start_ymd"],
+        "fetch_end_ymd": item["fetch_end_ymd"],
+        "result": result,
+        "retain_reason": retain_reason,
+        "record_id": int(record.get("id") or item.get("record_id") or 0),
+        "is_internal_demand_matched": is_internal_demand_matched,
+        "matched_demand": matched_demand,
+        "internal_demand_match_json": _parse_record_json_field(
+            record,
+            "internal_demand_match_json",
+        ),
+        "senior_fit_score": senior_fit_score,
+        "demand_senior_fit_json": _parse_record_json_field(
+            record,
+            "demand_senior_fit_json",
+        ),
+        "senior_fit_passed": senior_fit_passed,
+        "is_final_retained": is_final_retained,
+    }
+
+
+def _build_skipped_export_row_from_record(
+    *,
+    record: dict[str, Any],
+    meta: dict[str, Any],
+    name: str,
+    analyze_ymd: str,
+    fetch_start_ymd: str,
+    fetch_end_ymd: str,
+) -> dict[str, Any]:
+    detail = _parse_record_detail_json(record)
+    return {
+        "analyze_ymd": analyze_ymd,
+        "name": name,
+        "meta_id": meta.get("id"),
+        "fetch_start_ymd": fetch_start_ymd,
+        "fetch_end_ymd": fetch_end_ymd,
+        "analysis_skipped": True,
+        "skip_reason": detail.get("skip_reason") or "",
+        "data_days": record.get("data_days") or "",
+        "retain_reason": "",
+        "is_sustained_high": False,
+        "is_rising": False,
+        "is_spike": False,
+        "is_internal_demand_matched": "",
+        "matched_demand": "",
+        "senior_fit_score": "",
+        "is_final_retained": False,
+        "min_score": "",
+        "max_score": "",
+        "avg_score": "",
+    }
+
+
+def _refresh_wxindex_heat_job_summary(
+    summary: dict[str, Any],
+    *,
+    pending_items: list[dict[str, Any]],
+    export_rows: list[dict[str, Any]],
+) -> None:
+    summary["analyzed"] = len(pending_items)
+    summary["skipped"] = sum(1 for row in export_rows if row.get("analysis_skipped"))
+    summary["retained"] = sum(1 for item in pending_items if item.get("retain_reason"))
+    summary["sustained_high"] = sum(
+        1
+        for item in pending_items
+        if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
+    )
+    summary["rising"] = sum(
+        1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_RISING
+    )
+    summary["spike"] = sum(
+        1 for item in pending_items if item.get("retain_reason") == RETAIN_REASON_SPIKE
+    )
+    summary["internal_demand_matched"] = sum(
+        1 for item in pending_items if item.get("is_internal_demand_matched")
+    )
+    summary["senior_fit_candidates"] = sum(
+        1
+        for item in pending_items
+        if _is_senior_fit_needed(
+            retain_reason=item.get("retain_reason"),
+            is_internal_demand_matched=item.get("is_internal_demand_matched"),
+        )
+    )
+    summary["senior_scored"] = sum(
+        1 for item in pending_items if item.get("senior_fit_score") is not None
+    )
+    summary["senior_fit_passed"] = sum(
+        1 for item in pending_items if item.get("senior_fit_passed")
+    )
+    summary["final_retained"] = sum(
+        1 for item in pending_items if item.get("is_final_retained")
+    )
+
+
+def resolve_retain_reason(
+    *,
+    is_sustained_high: bool,
+    is_rising: bool,
+    is_spike: bool,
+) -> str | None:
+    """按 2->3->1 优先级确定保留原因(上涨 > 暴涨 > 持续高热度)。"""
+    if is_rising:
+        return RETAIN_REASON_RISING
+    if is_spike:
+        return RETAIN_REASON_SPIKE
+    if is_sustained_high:
+        return RETAIN_REASON_SUSTAINED_HIGH
+    return None
+
+
+def _chunk_words(words: list[str], batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE) -> list[list[str]]:
+    size = max(batch_size, 1)
+    return [words[index : index + size] for index in range(0, len(words), size)]
+
+
+def _empty_senior_fit_result() -> dict[str, Any]:
+    return {
+        "senior_fit_score": None,
+        "demand_senior_fit_json": {"source": "", "items": []},
+        "passed": False,
+    }
+
+
+def score_wxindex_words_senior_fit(
+    *,
+    words: list[str],
+    config: FlowConfig,
+    senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
+    batch_size: int = WXINDEX_WORD_LLM_BATCH_SIZE,
+) -> dict[str, dict[str, Any]]:
+    """批量对微信指数热词执行老年性 LLM 评分(每批最多 batch_size 个词)。"""
+    cleaned: list[str] = []
+    seen: set[str] = set()
+    for raw in words:
+        word = str(raw or "").strip()
+        if not word or word in seen:
+            continue
+        seen.add(word)
+        cleaned.append(word)
+
+    results: dict[str, dict[str, Any]] = {
+        word: _empty_senior_fit_result() for word in cleaned
+    }
+    for batch in _chunk_words(cleaned, batch_size=batch_size):
+        candidates = [{"demand_type": TYPE_PHRASE, "demand_text": word} for word in batch]
+        senior_fit_json = llm_score_senior_fit(
+            channel_content_id=f"wxindex_words:{','.join(batch[:3])}",
+            candidates=candidates,
+            model=config.demand_quality_llm_model,
+            max_attempts=config.demand_quality_llm_max_attempts,
+            retry_sleep_seconds=config.demand_quality_llm_retry_sleep_seconds,
+            max_tokens=config.demand_quality_llm_max_tokens,
+        )
+        senior_fit_json["threshold"] = senior_threshold * 10.0
+        for word in batch:
+            _, senior_score = lookup_quality_scores(
+                demand_type=TYPE_PHRASE,
+                demand_text=word,
+                event_sense_json=None,
+                senior_fit_json=senior_fit_json,
+            )
+            passed = (
+                senior_score is not None
+                and senior_score / 10.0 > senior_threshold
+            )
+            results[word] = {
+                "senior_fit_score": senior_score,
+                "demand_senior_fit_json": senior_fit_json,
+                "passed": passed,
+            }
+    return results
+
+
+def score_wxindex_word_senior_fit(
+    *,
+    word: str,
+    config: FlowConfig,
+    senior_threshold: float = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD,
+) -> dict[str, Any]:
+    """对单个微信指数热词执行老年性 LLM 评分。"""
+    target_word = str(word or "").strip()
+    if not target_word:
+        return _empty_senior_fit_result()
+
+    batch_results = score_wxindex_words_senior_fit(
+        words=[target_word],
+        config=config,
+        senior_threshold=senior_threshold,
+        batch_size=1,
+    )
+    return batch_results.get(target_word, _empty_senior_fit_result())
+
+
+def build_wxindex_word_hive_row(
+    *,
+    wxindex_word_record_id: int,
+    word: str,
+    strategy: str,
+    partition_dt: str,
+    max_score: float | None,
+) -> dict[str, Any]:
+    normalized_name = str(word or "").strip()
+    weight = 0.0
+    if max_score is not None:
+        weight = float(max_score) / WEIGHT_DIVISOR
+    return {
+        "record_id": wxindex_word_record_id,
+        "strategy": strategy,
+        "demand_id": build_demand_id(
+            strategy=strategy,
+            demand_name=normalized_name,
+            partition_dt=partition_dt,
+        ),
+        "demand_name": normalized_name,
+        "weight": weight,
+        "type": TYPE_PHRASE,
+        "video_count": None,
+        "video_list": [],
+        "extend": "{}",
+        "dt": partition_dt,
+    }
+
+
+def build_wxindex_word_odps_sync_row(
+    *,
+    wxindex_word_record_id: int,
+    word: str,
+    strategy: str,
+    partition_dt: str,
+    max_score: float | None,
+) -> dict[str, Any]:
+    normalized_name = str(word or "").strip()
+    weight = None
+    if max_score is not None:
+        weight = float(max_score) / WEIGHT_DIVISOR
+    return {
+        "partition_dt": partition_dt,
+        "strategy": strategy,
+        "demand_id": build_demand_id(
+            strategy=strategy,
+            demand_name=normalized_name,
+            partition_dt=partition_dt,
+        ),
+        "demand_name": normalized_name,
+        "demand_type": TYPE_PHRASE,
+        "record_id": wxindex_word_record_id,
+        "weight": weight,
+    }
+
+
+def prepare_analysis_scores(
+    scores: list[dict[str, Any]],
+    *,
+    start_ymd: str,
+    end_ymd: str,
+    min_days: int = WXINDEX_HEAT_MIN_DAYS,
+    max_days: int = WXINDEX_HEAT_MAX_DAYS,
+) -> tuple[list[dict[str, Any]], str | None]:
+    """截取目标区间内可用数据,不足 min_days 时返回 skip 原因。"""
+    window_scores = filter_scores_in_ymd_window(
+        scores,
+        start_ymd=start_ymd,
+        end_ymd=end_ymd,
+    )
+    if len(window_scores) > max_days:
+        window_scores = window_scores[-max_days:]
+    if len(window_scores) < min_days:
+        return window_scores, "insufficient_days"
+    return window_scores, None
+
+
+def _score_row_ymd(row: dict[str, Any]) -> str:
+    return str(row.get("ymd") or row.get("dt") or "").strip()
+
+
+def _window_ymd_bounds(
+    window_scores: list[dict[str, Any]],
+) -> tuple[str | None, str | None]:
+    if not window_scores:
+        return None, None
+    start_ymd = _score_row_ymd(window_scores[0]) or None
+    end_ymd = _score_row_ymd(window_scores[-1]) or None
+    return start_ymd, end_ymd
+
+
+def analyze_wxindex_heat_patterns(
+    scores: list[dict[str, Any]],
+    *,
+    start_ymd: str,
+    end_ymd: str,
+    sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
+    min_days: int = WXINDEX_HEAT_MIN_DAYS,
+    max_days: int = WXINDEX_HEAT_MAX_DAYS,
+    spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
+    spike_ratio: float = WXINDEX_SPIKE_RATIO,
+    spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
+    rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
+    rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
+    rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
+) -> dict[str, Any]:
+    """对目标区间数据判断三种热度模式。"""
+    window_scores, skip_reason = prepare_analysis_scores(
+        scores,
+        start_ymd=start_ymd,
+        end_ymd=end_ymd,
+        min_days=min_days,
+        max_days=max_days,
+    )
+    if skip_reason:
+        data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
+        return {
+            "skipped": True,
+            "skip_reason": skip_reason,
+            "data_days": len(window_scores) if window_scores else None,
+            "data_start_ymd": data_start_ymd,
+            "data_end_ymd": data_end_ymd,
+            "patterns": [],
+        }
+
+    numeric_scores = extract_sorted_scores(window_scores, max_points=max_days)
+    data_start_ymd, data_end_ymd = _window_ymd_bounds(window_scores)
+
+    is_sustained_high = all(score > sustained_threshold for score in numeric_scores)
+    is_rising = is_wxindex_heat_rising_scores(
+        numeric_scores,
+        min_points=min_days,
+        overall_change_rate=rising_overall_change_rate,
+        window_change_rate_threshold=rising_window_change_rate,
+        adjacent_up_ratio=rising_adjacent_up_ratio,
+    )
+    is_spike = is_wxindex_spike_scores(
+        numeric_scores,
+        spike_days=spike_days,
+        min_points=min_days,
+        spike_ratio=spike_ratio,
+        baseline_floor=spike_baseline_floor,
+    )
+
+    retain_reason = resolve_retain_reason(
+        is_sustained_high=is_sustained_high,
+        is_rising=is_rising,
+        is_spike=is_spike,
+    )
+
+    patterns: list[str] = []
+    if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
+        patterns.append(PATTERN_SUSTAINED_HIGH)
+    elif retain_reason == RETAIN_REASON_RISING:
+        patterns.append(PATTERN_RISING)
+    elif retain_reason == RETAIN_REASON_SPIKE:
+        patterns.append(PATTERN_SPIKE)
+
+    return {
+        "skipped": False,
+        "skip_reason": None,
+        "data_days": len(window_scores),
+        "data_start_ymd": data_start_ymd,
+        "data_end_ymd": data_end_ymd,
+        "fetch_start_ymd": start_ymd,
+        "fetch_end_ymd": end_ymd,
+        "min_score": min(numeric_scores),
+        "max_score": max(numeric_scores),
+        "avg_score": sum(numeric_scores) / len(numeric_scores),
+        "is_sustained_high": is_sustained_high,
+        "is_rising": is_rising,
+        "is_spike": is_spike,
+        "retain_reason": retain_reason,
+        "patterns": patterns,
+        "scores": window_scores,
+    }
+
+
+def build_wxindex_word_record_init_payload(
+    *,
+    meta: dict[str, Any],
+    name: str,
+    analyze_ymd: str,
+    fetch_start_ymd: str,
+    fetch_end_ymd: str,
+    demand_cache_run_id: int | None = None,
+) -> dict[str, Any]:
+    """分析前写入追溯记录:仅含 meta 与抓取窗口,分析字段待后续更新。"""
+    return {
+        "name": name,
+        "meta_id": meta.get("id"),
+        "analyze_ymd": analyze_ymd,
+        "fetch_start_ymd": fetch_start_ymd,
+        "fetch_end_ymd": fetch_end_ymd,
+        "demand_cache_run_id": demand_cache_run_id,
+        **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
+    }
+
+
+def build_wxindex_word_record_skipped_payload(
+    *,
+    meta: dict[str, Any],
+    name: str,
+    analyze_ymd: str,
+    fetch_start_ymd: str,
+    fetch_end_ymd: str,
+    result: dict[str, Any],
+    demand_cache_run_id: int | None = None,
+) -> dict[str, Any]:
+    """分析被跳过时更新追溯记录。"""
+    payload = {
+        "name": name,
+        "meta_id": meta.get("id"),
+        "analyze_ymd": analyze_ymd,
+        "fetch_start_ymd": fetch_start_ymd,
+        "fetch_end_ymd": fetch_end_ymd,
+        "demand_cache_run_id": demand_cache_run_id,
+        **_WXINDEX_RECORD_UNSET_ANALYSIS_FIELDS,
+    }
+    payload.update(
+        {
+            "data_start_ymd": result.get("data_start_ymd"),
+            "data_end_ymd": result.get("data_end_ymd"),
+            "data_days": result.get("data_days"),
+            "detail_json": {
+                "phase": "analysis_skipped",
+                "skip_reason": result.get("skip_reason"),
+                "data_days": result.get("data_days"),
+            },
+        }
+    )
+    return payload
+
+
+def build_wxindex_word_record_payload(
+    item: dict[str, Any],
+    *,
+    analyze_ymd: str,
+    demand_cache_run_id: int | None,
+    phase: str,
+    senior_threshold: float,
+    sustained_threshold: float,
+    spike_days: int,
+    spike_ratio: float,
+    spike_baseline_floor: float,
+    rising_overall_change_rate: float,
+    rising_window_change_rate: float,
+    rising_adjacent_up_ratio: float,
+) -> dict[str, Any]:
+    """根据 pending item 当前状态构建 records 表 payload。"""
+    meta = item["meta"]
+    name = item["name"]
+    fetch_start_ymd = item["fetch_start_ymd"]
+    fetch_end_ymd = item["fetch_end_ymd"]
+    result = item["result"]
+    retain_reason = item.get("retain_reason")
+
+    return {
+        "name": name,
+        "meta_id": meta.get("id"),
+        "analyze_ymd": analyze_ymd,
+        "fetch_start_ymd": fetch_start_ymd,
+        "fetch_end_ymd": fetch_end_ymd,
+        "data_start_ymd": result.get("data_start_ymd"),
+        "data_end_ymd": result.get("data_end_ymd"),
+        "data_days": result.get("data_days"),
+        "is_sustained_high": result.get("is_sustained_high"),
+        "is_rising": result.get("is_rising"),
+        "is_spike": result.get("is_spike"),
+        "retain_reason": retain_reason,
+        "is_internal_demand_matched": item.get("is_internal_demand_matched"),
+        "matched_demand": item.get("matched_demand"),
+        "demand_cache_run_id": demand_cache_run_id,
+        "internal_demand_match_json": item.get("internal_demand_match_json"),
+        "senior_fit_score": item.get("senior_fit_score"),
+        "demand_senior_fit_json": item.get("demand_senior_fit_json"),
+        "is_final_retained": item.get("is_final_retained"),
+        "min_score": result.get("min_score"),
+        "max_score": result.get("max_score"),
+        "avg_score": result.get("avg_score"),
+        "detail_json": {
+            "phase": phase,
+            "patterns": list(result.get("patterns") or []),
+            "retain_reason": retain_reason,
+            "retain_reason_priority": "2->3->1",
+            "senior_fit_threshold": senior_threshold,
+            "senior_fit_threshold_score": senior_threshold * 10.0,
+            "is_final_retained": item.get("is_final_retained"),
+            "sustained_threshold": sustained_threshold,
+            "spike_days": spike_days,
+            "spike_ratio": spike_ratio,
+            "spike_baseline_floor": spike_baseline_floor,
+            "rising_overall_change_rate": rising_overall_change_rate,
+            "rising_window_change_rate": rising_window_change_rate,
+            "rising_adjacent_up_ratio": rising_adjacent_up_ratio,
+        },
+    }
+
+
+def _persist_wxindex_word_record(
+    repository: HotContentRepository,
+    payload: dict[str, Any],
+    *,
+    dry_run: bool,
+    skip_db_save: bool,
+    verbose: bool,
+    action: str,
+) -> int:
+    name = str(payload.get("name") or "").strip()
+    if dry_run or skip_db_save:
+        if verbose:
+            label = "dry-run" if dry_run else "skip-db-save"
+            print(f"[{label}] would {action} wxindex word record word={name}")
+        return 0
+    return repository.save_wxindex_word_record(payload)
+
+
+def _persist_pending_item_record(
+    repository: HotContentRepository,
+    item: dict[str, Any],
+    *,
+    analyze_ymd: str,
+    demand_cache_run_id: int | None,
+    phase: str,
+    senior_threshold: float,
+    sustained_threshold: float,
+    spike_days: int,
+    spike_ratio: float,
+    spike_baseline_floor: float,
+    rising_overall_change_rate: float,
+    rising_window_change_rate: float,
+    rising_adjacent_up_ratio: float,
+    dry_run: bool,
+    skip_db_save: bool,
+    verbose: bool,
+) -> int:
+    payload = build_wxindex_word_record_payload(
+        item,
+        analyze_ymd=analyze_ymd,
+        demand_cache_run_id=demand_cache_run_id,
+        phase=phase,
+        senior_threshold=senior_threshold,
+        sustained_threshold=sustained_threshold,
+        spike_days=spike_days,
+        spike_ratio=spike_ratio,
+        spike_baseline_floor=spike_baseline_floor,
+        rising_overall_change_rate=rising_overall_change_rate,
+        rising_window_change_rate=rising_window_change_rate,
+        rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+    )
+    record_id = _persist_wxindex_word_record(
+        repository,
+        payload,
+        dry_run=dry_run,
+        skip_db_save=skip_db_save,
+        verbose=verbose,
+        action=f"{phase} word={item['name']}",
+    )
+    if record_id:
+        item["record_id"] = record_id
+    return int(item.get("record_id") or 0)
+
+
+def _init_candidate_wxindex_word_records(
+    repository: HotContentRepository,
+    candidate_items: list[dict[str, Any]],
+    *,
+    analyze_ymd: str,
+    demand_cache_run_id: int | None,
+    dry_run: bool,
+    skip_db_save: bool,
+    verbose: bool,
+) -> None:
+    if not candidate_items:
+        return
+    if dry_run or skip_db_save:
+        if verbose:
+            label = "dry-run" if dry_run else "skip-db-save"
+            print(
+                f"[{label}] would batch init wxindex word records "
+                f"count={len(candidate_items)} analyze_ymd={analyze_ymd}"
+            )
+        return
+
+    init_payloads = [
+        build_wxindex_word_record_init_payload(
+            meta=item["meta"],
+            name=item["name"],
+            analyze_ymd=analyze_ymd,
+            fetch_start_ymd=item["fetch_start_ymd"],
+            fetch_end_ymd=item["fetch_end_ymd"],
+            demand_cache_run_id=demand_cache_run_id,
+        )
+        for item in candidate_items
+    ]
+    record_id_map = repository.init_wxindex_word_records(init_payloads)
+    for item in candidate_items:
+        item["record_id"] = int(record_id_map.get(item["name"]) or 0)
+    if verbose:
+        print(
+            f"init wxindex word records count={len(candidate_items)} "
+            f"analyze_ymd={analyze_ymd}"
+        )
+
+
+def _apply_demand_match_to_item(
+    item: dict[str, Any],
+    match_result: dict[str, Any],
+) -> None:
+    item["internal_demand_match_json"] = match_result
+    item["is_internal_demand_matched"] = bool(match_result.get("matched"))
+    matched_demand = str(match_result.get("matched_demand") or "").strip()
+    item["matched_demand"] = matched_demand or None
+    if not item.get("is_internal_demand_matched"):
+        item["is_final_retained"] = False
+
+
+def _apply_senior_fit_to_item(
+    item: dict[str, Any],
+    senior_result: dict[str, Any],
+) -> None:
+    passed = bool(senior_result.get("passed"))
+    item["senior_fit_score"] = senior_result.get("senior_fit_score")
+    item["demand_senior_fit_json"] = senior_result.get("demand_senior_fit_json")
+    item["senior_fit_passed"] = passed
+    item["is_final_retained"] = passed
+
+
+def build_wxindex_word_stats_payload(
+    item: dict[str, Any],
+    *,
+    analyze_ymd: str,
+) -> dict[str, Any]:
+    """构建通过热度+老年性筛选的词统计 payload。"""
+    meta = item["meta"]
+    result = item["result"]
+    return {
+        "name": item["name"],
+        "meta_id": meta.get("id"),
+        "analyze_ymd": analyze_ymd,
+        "wxindex_word_record_id": item.get("record_id"),
+        "retain_reason": item.get("retain_reason"),
+        "senior_fit_score": item.get("senior_fit_score"),
+        "data_start_ymd": result.get("data_start_ymd"),
+        "data_end_ymd": result.get("data_end_ymd"),
+        "data_days": result.get("data_days"),
+        "min_score": result.get("min_score"),
+        "max_score": result.get("max_score"),
+        "avg_score": result.get("avg_score"),
+        "detail_json": {
+            "demand_senior_fit_json": item.get("demand_senior_fit_json"),
+            "is_sustained_high": result.get("is_sustained_high"),
+            "is_rising": result.get("is_rising"),
+            "is_spike": result.get("is_spike"),
+        },
+    }
+
+
+def _save_senior_fit_passed_stats(
+    repository: HotContentRepository,
+    items: list[dict[str, Any]],
+    *,
+    analyze_ymd: str,
+    existing_stats_names: set[str] | None = None,
+    dry_run: bool,
+    skip_db_save: bool,
+    verbose: bool,
+) -> tuple[int, int]:
+    existing = existing_stats_names or set()
+    all_payloads = [
+        build_wxindex_word_stats_payload(item, analyze_ymd=analyze_ymd)
+        for item in items
+        if item.get("senior_fit_passed")
+    ]
+    resumed = sum(1 for payload in all_payloads if payload["name"] in existing)
+    payloads = [payload for payload in all_payloads if payload["name"] not in existing]
+    if not payloads:
+        return 0, resumed
+    if dry_run or skip_db_save:
+        if verbose:
+            label = "dry-run" if dry_run else "skip-db-save"
+            print(f"[{label}] would save wxindex word stats count={len(payloads)}")
+        return len(payloads), resumed
+    saved = repository.save_wxindex_word_stats_batch(payloads)
+    if verbose:
+        print(f"saved wxindex word stats count={saved} analyze_ymd={analyze_ymd}")
+    return saved, resumed
+
+
+def _build_export_row_from_item(
+    item: dict[str, Any],
+    *,
+    analyze_ymd: str,
+    strategy: str,
+) -> dict[str, Any]:
+    meta = item["meta"]
+    name = item["name"]
+    result = item["result"]
+    retain_reason = item.get("retain_reason")
+    is_internal_demand_matched = item.get("is_internal_demand_matched")
+    matched_demand = str(item.get("matched_demand") or "")
+    senior_fit_score = item.get("senior_fit_score")
+    is_final_retained = bool(item.get("is_final_retained"))
+
+    return {
+        "analyze_ymd": analyze_ymd,
+        "name": name,
+        "meta_id": meta.get("id"),
+        "fetch_start_ymd": item["fetch_start_ymd"],
+        "fetch_end_ymd": item["fetch_end_ymd"],
+        "data_start_ymd": result.get("data_start_ymd"),
+        "data_end_ymd": result.get("data_end_ymd"),
+        "data_days": result.get("data_days"),
+        "analysis_skipped": False,
+        "skip_reason": "",
+        "is_sustained_high": bool(result.get("is_sustained_high")),
+        "is_rising": bool(result.get("is_rising")),
+        "is_spike": bool(result.get("is_spike")),
+        "retain_reason": retain_reason or "",
+        "is_internal_demand_matched": (
+            "" if is_internal_demand_matched is None else is_internal_demand_matched
+        ),
+        "matched_demand": matched_demand,
+        "senior_fit_score": senior_fit_score if senior_fit_score is not None else "",
+        "is_final_retained": is_final_retained,
+        "min_score": result.get("min_score"),
+        "max_score": result.get("max_score"),
+        "avg_score": result.get("avg_score"),
+        "demand_id": (
+            build_demand_id(
+                strategy=strategy,
+                demand_name=name,
+                partition_dt=analyze_ymd,
+            )
+            if is_final_retained and strategy
+            else ""
+        ),
+        "weight": (
+            float(result.get("max_score") or 0) / WEIGHT_DIVISOR
+            if is_final_retained and result.get("max_score") is not None
+            else ""
+        ),
+    }
+
+
+def run_wxindex_heat_pattern_daily_job(
+    repository: HotContentRepository,
+    *,
+    config: FlowConfig | None = None,
+    api_client: JsonApiClient | None = None,
+    today: date | None = None,
+    sustained_threshold: float = WXINDEX_SUSTAINED_HIGH_THRESHOLD,
+    min_days: int = WXINDEX_HEAT_MIN_DAYS,
+    max_days: int = WXINDEX_HEAT_MAX_DAYS,
+    spike_days: int = WXINDEX_SPIKE_LOOKBACK_DAYS,
+    spike_ratio: float = WXINDEX_SPIKE_RATIO,
+    spike_baseline_floor: float = WXINDEX_SPIKE_BASELINE_FLOOR,
+    rising_overall_change_rate: float = WXINDEX_RISING_OVERALL_CHANGE_RATE,
+    rising_window_change_rate: float = WXINDEX_RISING_WINDOW_CHANGE_RATE,
+    rising_adjacent_up_ratio: float = WXINDEX_RISING_ADJACENT_UP_RATIO,
+    dry_run: bool = False,
+    skip_odps: bool = False,
+    skip_db_save: bool = False,
+    verbose: bool = False,
+) -> dict[str, Any]:
+    """定时任务:分析仍在抓取窗口内的词,并写入热度模式结果表。"""
+    current = today or datetime.now(SHANGHAI_TZ).date()
+    analyze_ymd = current.strftime("%Y%m%d")
+    meta_rows = repository.list_active_wxindex_word_meta(today=current)
+
+    demand_name_set: list[str] = []
+    demand_cache_run_id: int | None = None
+    demand_cache_error: str | None = None
+    postprocess_service: ContributionPostprocessService | None = None
+    if config is not None:
+        try:
+            cache = DemandCacheService(config, repository).get_or_create_current_hour_cache()
+            demand_name_set = list(cache.get("demand_name_set") or [])
+            demand_cache_run_id = int(cache["id"])
+            if demand_name_set:
+                postprocess_service = ContributionPostprocessService(
+                    config,
+                    repository,
+                    api_client
+                    or JsonApiClient(
+                        timeout_seconds=config.request_timeout_seconds,
+                        verify_ssl=config.https_verify_ssl,
+                    ),
+                )
+        except HotContentFlowError as exc:
+            demand_cache_error = str(exc)
+            if verbose:
+                print(f"demand cache unavailable, skip demand match: {exc}")
+
+    summary: dict[str, Any] = {
+        "analyze_ymd": analyze_ymd,
+        "meta_count": len(meta_rows),
+        "analyzed": 0,
+        "skipped": 0,
+        "retained": 0,
+        "sustained_high": 0,
+        "rising": 0,
+        "spike": 0,
+        "internal_demand_matched": 0,
+        "senior_scored": 0,
+        "senior_fit_candidates": 0,
+        "senior_fit_passed": 0,
+        "stats_saved": 0,
+        "final_retained": 0,
+        "odps_synced": 0,
+        "odps_written": 0,
+        "demand_cache_run_id": demand_cache_run_id,
+        "demand_cache_error": demand_cache_error,
+        "demand_name_count": len(demand_name_set),
+        "llm_batch_size": WXINDEX_WORD_LLM_BATCH_SIZE,
+        "demand_match_batches": 0,
+        "senior_fit_batches": 0,
+        "records_initialized": 0,
+        "heat_resumed": 0,
+        "demand_match_resumed": 0,
+        "senior_fit_resumed": 0,
+        "stats_resumed": 0,
+        "finalized_resumed": 0,
+        "dry_run": dry_run,
+        "skip_odps": skip_odps,
+        "skip_db_save": skip_db_save,
+    }
+    retained_words: list[dict[str, Any]] = []
+    export_rows: list[dict[str, Any]] = []
+    final_hive_rows: list[dict[str, Any]] = []
+    senior_threshold = WXINDEX_WORD_SENIOR_FIT_NORMALIZED_THRESHOLD
+    strategy = str(config.hot_demand_pool_strategy or "").strip() if config else ""
+    pending_items: list[dict[str, Any]] = []
+    candidate_items: list[dict[str, Any]] = []
+
+    for meta in meta_rows:
+        name = str(meta.get("name") or "").strip()
+        fetch_start_ymd = str(meta.get("fetch_start_ymd") or "").strip()
+        fetch_end_ymd = str(meta.get("fetch_end_ymd") or "").strip()
+        if not name or not fetch_start_ymd or not fetch_end_ymd:
+            summary["skipped"] += 1
+            continue
+
+        candidate_items.append(
+            {
+                "meta": meta,
+                "name": name,
+                "fetch_start_ymd": fetch_start_ymd,
+                "fetch_end_ymd": fetch_end_ymd,
+            }
+        )
+
+    _init_candidate_wxindex_word_records(
+        repository,
+        candidate_items,
+        analyze_ymd=analyze_ymd,
+        demand_cache_run_id=demand_cache_run_id,
+        dry_run=dry_run,
+        skip_db_save=skip_db_save,
+        verbose=verbose,
+    )
+    summary["records_initialized"] = len(candidate_items)
+
+    existing_records: dict[str, dict[str, Any]] = {}
+    existing_stats_names: set[str] = set()
+    if not dry_run and not skip_db_save:
+        candidate_names = [item["name"] for item in candidate_items]
+        existing_records = repository.list_wxindex_word_records_by_analyze_ymd(
+            analyze_ymd=analyze_ymd,
+            names=candidate_names,
+        )
+        existing_stats_names = repository.list_wxindex_word_stats_names(
+            analyze_ymd=analyze_ymd,
+            names=candidate_names,
+        )
+    for item in candidate_items:
+        if not item.get("record_id"):
+            existing = existing_records.get(item["name"])
+            if existing:
+                item["record_id"] = int(existing.get("id") or 0)
+
+    for item in candidate_items:
+        meta = item["meta"]
+        name = item["name"]
+        fetch_start_ymd = item["fetch_start_ymd"]
+        fetch_end_ymd = item["fetch_end_ymd"]
+        existing_record = existing_records.get(name)
+        scores = repository.list_wxindex_word_scores_in_range(
+            name,
+            start_ymd=fetch_start_ymd,
+            end_ymd=fetch_end_ymd,
+        )
+
+        if (
+            _is_heat_analysis_done(existing_record)
+            and not _should_rerun_heat_analysis(
+                existing_record,
+                scores=scores,
+                fetch_start_ymd=fetch_start_ymd,
+                fetch_end_ymd=fetch_end_ymd,
+                min_days=min_days,
+                max_days=max_days,
+            )
+        ):
+            summary["heat_resumed"] += 1
+            if _is_analysis_skipped_record(existing_record):
+                if verbose:
+                    detail = _parse_record_detail_json(existing_record)
+                    print(
+                        f"resume skip word={name} reason={detail.get('skip_reason')} "
+                        f"days={existing_record.get('data_days')}"
+                    )
+                export_rows.append(
+                    _build_skipped_export_row_from_record(
+                        record=existing_record,
+                        meta=meta,
+                        name=name,
+                        analyze_ymd=analyze_ymd,
+                        fetch_start_ymd=fetch_start_ymd,
+                        fetch_end_ymd=fetch_end_ymd,
+                    )
+                )
+                continue
+
+            pending_item = _rehydrate_pending_item_from_record(
+                item,
+                existing_record,
+                senior_threshold=senior_threshold,
+            )
+            pending_items.append(pending_item)
+            if verbose:
+                print(
+                    f"resume heat analyzed word={name} "
+                    f"retain_reason={pending_item.get('retain_reason') or ''} "
+                    f"data_days={pending_item['result'].get('data_days')}"
+                )
+            continue
+
+        result = analyze_wxindex_heat_patterns(
+            scores,
+            start_ymd=fetch_start_ymd,
+            end_ymd=fetch_end_ymd,
+            sustained_threshold=sustained_threshold,
+            min_days=min_days,
+            max_days=max_days,
+            spike_days=spike_days,
+            spike_ratio=spike_ratio,
+            spike_baseline_floor=spike_baseline_floor,
+            rising_overall_change_rate=rising_overall_change_rate,
+            rising_window_change_rate=rising_window_change_rate,
+            rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+        )
+        if result.get("skipped"):
+            if verbose:
+                print(
+                    f"skip word={name} reason={result.get('skip_reason')} "
+                    f"days={result.get('data_days')}"
+                )
+            skipped_payload = build_wxindex_word_record_skipped_payload(
+                meta=meta,
+                name=name,
+                analyze_ymd=analyze_ymd,
+                fetch_start_ymd=fetch_start_ymd,
+                fetch_end_ymd=fetch_end_ymd,
+                result=result,
+                demand_cache_run_id=demand_cache_run_id,
+            )
+            _persist_wxindex_word_record(
+                repository,
+                skipped_payload,
+                dry_run=dry_run,
+                skip_db_save=skip_db_save,
+                verbose=verbose,
+                action="update skipped",
+            )
+            export_rows.append(
+                {
+                    "analyze_ymd": analyze_ymd,
+                    "name": name,
+                    "meta_id": meta.get("id"),
+                    "fetch_start_ymd": fetch_start_ymd,
+                    "fetch_end_ymd": fetch_end_ymd,
+                    "analysis_skipped": True,
+                    "skip_reason": result.get("skip_reason"),
+                    "data_days": result.get("data_days"),
+                    "retain_reason": "",
+                    "is_sustained_high": False,
+                    "is_rising": False,
+                    "is_spike": False,
+                    "is_internal_demand_matched": "",
+                    "matched_demand": "",
+                    "senior_fit_score": "",
+                    "is_final_retained": False,
+                    "min_score": "",
+                    "max_score": "",
+                    "avg_score": "",
+                }
+            )
+            continue
+
+        retain_reason = str(result.get("retain_reason") or "").strip() or None
+
+        pending_item = {
+            "meta": meta,
+            "name": name,
+            "fetch_start_ymd": fetch_start_ymd,
+            "fetch_end_ymd": fetch_end_ymd,
+            "result": result,
+            "retain_reason": retain_reason,
+            "record_id": item.get("record_id", 0),
+            "is_internal_demand_matched": None,
+            "matched_demand": None,
+            "internal_demand_match_json": None,
+            "senior_fit_score": None,
+            "demand_senior_fit_json": None,
+            "senior_fit_passed": None,
+            "is_final_retained": None,
+        }
+        pending_items.append(pending_item)
+        _persist_pending_item_record(
+            repository,
+            pending_item,
+            analyze_ymd=analyze_ymd,
+            demand_cache_run_id=demand_cache_run_id,
+            phase="heat_analyzed",
+            senior_threshold=senior_threshold,
+            sustained_threshold=sustained_threshold,
+            spike_days=spike_days,
+            spike_ratio=spike_ratio,
+            spike_baseline_floor=spike_baseline_floor,
+            rising_overall_change_rate=rising_overall_change_rate,
+            rising_window_change_rate=rising_window_change_rate,
+            rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+            dry_run=dry_run,
+            skip_db_save=skip_db_save,
+            verbose=verbose,
+        )
+        if verbose:
+            print(
+                f"heat analyzed word={name} retain_reason={retain_reason or ''} "
+                f"data_days={result.get('data_days')}"
+            )
+
+    demand_match_results: dict[str, dict[str, Any]] = {}
+    pending_by_name = {item["name"]: item for item in pending_items}
+    sustained_high_words = [
+        item["name"]
+        for item in pending_items
+        if item.get("retain_reason") == RETAIN_REASON_SUSTAINED_HIGH
+        and item.get("is_internal_demand_matched") is None
+    ]
+    for item in pending_items:
+        if item.get("retain_reason") != RETAIN_REASON_SUSTAINED_HIGH:
+            continue
+        if item.get("is_internal_demand_matched") is not None:
+            summary["demand_match_resumed"] += 1
+
+    def _persist_demand_match_updates(words: list[str]) -> None:
+        for word in words:
+            pending_item = pending_by_name.get(word)
+            if pending_item is None:
+                continue
+            match_result = demand_match_results.get(word) or {
+                "word": word,
+                "matched": False,
+                "matched_demand": "",
+                "match_list": [],
+            }
+            _apply_demand_match_to_item(pending_item, match_result)
+            _persist_pending_item_record(
+                repository,
+                pending_item,
+                analyze_ymd=analyze_ymd,
+                demand_cache_run_id=demand_cache_run_id,
+                phase="demand_matched",
+                senior_threshold=senior_threshold,
+                sustained_threshold=sustained_threshold,
+                spike_days=spike_days,
+                spike_ratio=spike_ratio,
+                spike_baseline_floor=spike_baseline_floor,
+                rising_overall_change_rate=rising_overall_change_rate,
+                rising_window_change_rate=rising_window_change_rate,
+                rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+                dry_run=dry_run,
+                skip_db_save=skip_db_save,
+                verbose=verbose,
+            )
+
+    if sustained_high_words:
+        if postprocess_service is not None and demand_name_set:
+            for batch in _chunk_words(sustained_high_words):
+                summary["demand_match_batches"] += 1
+                if verbose:
+                    print(f"demand match batch size={len(batch)} words={batch}")
+                demand_match_results.update(
+                    postprocess_service.match_words_to_demand_pool(
+                        words=batch,
+                        demand_name_set=demand_name_set,
+                    )
+                )
+                _persist_demand_match_updates(batch)
+        else:
+            for word in sustained_high_words:
+                demand_match_results[word] = {
+                    "word": word,
+                    "matched": False,
+                    "matched_demand": "",
+                    "match_list": [],
+                    "skip_reason": "empty_demand_cache",
+                }
+            _persist_demand_match_updates(sustained_high_words)
+
+    senior_fit_candidate_names: list[str] = []
+    for item in pending_items:
+        retain_reason = item.get("retain_reason")
+        name = str(item.get("name") or "").strip()
+        if not retain_reason or not name:
+            continue
+        if _is_senior_fit_attempt_done(item, existing_records.get(name)):
+            summary["senior_fit_resumed"] += 1
+            continue
+        if retain_reason == RETAIN_REASON_SUSTAINED_HIGH:
+            if not item.get("is_internal_demand_matched"):
+                continue
+        senior_fit_candidate_names.append(name)
+
+    senior_fit_results: dict[str, dict[str, Any]] = {}
+
+    def _persist_senior_fit_updates(words: list[str]) -> None:
+        for word in words:
+            pending_item = pending_by_name.get(word)
+            senior_result = senior_fit_results.get(word)
+            if pending_item is None or senior_result is None:
+                continue
+            _apply_senior_fit_to_item(pending_item, senior_result)
+            _persist_pending_item_record(
+                repository,
+                pending_item,
+                analyze_ymd=analyze_ymd,
+                demand_cache_run_id=demand_cache_run_id,
+                phase="senior_fit_scored",
+                senior_threshold=senior_threshold,
+                sustained_threshold=sustained_threshold,
+                spike_days=spike_days,
+                spike_ratio=spike_ratio,
+                spike_baseline_floor=spike_baseline_floor,
+                rising_overall_change_rate=rising_overall_change_rate,
+                rising_window_change_rate=rising_window_change_rate,
+                rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+                dry_run=dry_run,
+                skip_db_save=skip_db_save,
+                verbose=verbose,
+            )
+            if verbose and not pending_item.get("senior_fit_passed"):
+                print(
+                    f"senior fit rejected word={word} "
+                    f"score={pending_item.get('senior_fit_score')}"
+                )
+
+    if config is not None and senior_fit_candidate_names:
+        for batch in _chunk_words(senior_fit_candidate_names):
+            summary["senior_fit_batches"] += 1
+            if verbose:
+                print(f"senior fit batch size={len(batch)} words={batch}")
+            senior_fit_results.update(
+                score_wxindex_words_senior_fit(
+                    words=batch,
+                    config=config,
+                    senior_threshold=senior_threshold,
+                    batch_size=WXINDEX_WORD_LLM_BATCH_SIZE,
+                )
+            )
+            _persist_senior_fit_updates(batch)
+
+    stats_saved, stats_resumed = _save_senior_fit_passed_stats(
+        repository,
+        pending_items,
+        analyze_ymd=analyze_ymd,
+        existing_stats_names=existing_stats_names,
+        dry_run=dry_run,
+        skip_db_save=skip_db_save,
+        verbose=verbose,
+    )
+    summary["stats_saved"] = stats_saved
+    summary["stats_resumed"] = stats_resumed
+
+    for item in pending_items:
+        name = item["name"]
+        result = item["result"]
+        retain_reason = item.get("retain_reason")
+
+        if retain_reason:
+            retained_words.append(
+                {
+                    "name": name,
+                    "retain_reason": retain_reason,
+                    "data_days": result.get("data_days"),
+                    "data_start_ymd": result.get("data_start_ymd"),
+                    "data_end_ymd": result.get("data_end_ymd"),
+                    "is_internal_demand_matched": item.get("is_internal_demand_matched"),
+                    "matched_demand": str(item.get("matched_demand") or ""),
+                    "senior_fit_score": item.get("senior_fit_score"),
+                    "is_final_retained": bool(item.get("is_final_retained")),
+                }
+            )
+
+        existing_record = existing_records.get(name)
+        if _is_finalized_record(existing_record):
+            summary["finalized_resumed"] += 1
+            record_id = int(item.get("record_id") or existing_record.get("id") or 0)
+            if verbose:
+                print(f"resume finalized word={name}")
+        else:
+            record_id = _persist_pending_item_record(
+                repository,
+                item,
+                analyze_ymd=analyze_ymd,
+                demand_cache_run_id=demand_cache_run_id,
+                phase="finalized",
+                senior_threshold=senior_threshold,
+                sustained_threshold=sustained_threshold,
+                spike_days=spike_days,
+                spike_ratio=spike_ratio,
+                spike_baseline_floor=spike_baseline_floor,
+                rising_overall_change_rate=rising_overall_change_rate,
+                rising_window_change_rate=rising_window_change_rate,
+                rising_adjacent_up_ratio=rising_adjacent_up_ratio,
+                dry_run=dry_run,
+                skip_db_save=skip_db_save,
+                verbose=verbose,
+            )
+
+        export_rows.append(
+            _build_export_row_from_item(
+                item,
+                analyze_ymd=analyze_ymd,
+                strategy=strategy,
+            )
+        )
+
+        if item.get("is_final_retained") and strategy:
+            final_hive_rows.append(
+                build_wxindex_word_hive_row(
+                    wxindex_word_record_id=record_id,
+                    word=name,
+                    strategy=strategy,
+                    partition_dt=analyze_ymd,
+                    max_score=result.get("max_score"),
+                )
+            )
+
+    if (
+        final_hive_rows
+        and strategy
+        and config is not None
+        and not dry_run
+        and not skip_odps
+    ):
+        odps_summary = sync_wxindex_word_rows_to_odps(
+            config,
+            repository,
+            hive_rows=final_hive_rows,
+            partition_dt=analyze_ymd,
+            strategy=strategy,
+        )
+        summary["odps_written"] = odps_summary.get("written_count", 0)
+        summary["odps_synced"] = odps_summary.get("odps_synced", 0)
+        summary["odps_sync"] = odps_summary
+    elif dry_run or skip_odps:
+        summary["odps_written"] = len(final_hive_rows)
+        summary["odps_synced"] = len(final_hive_rows)
+
+    _refresh_wxindex_heat_job_summary(
+        summary,
+        pending_items=pending_items,
+        export_rows=export_rows,
+    )
+    summary["retained_words"] = retained_words
+    summary["export_rows"] = export_rows
+    return summary
+
+
+WXINDEX_HEAT_PATTERN_EXPORT_FIELDS = [
+    "analyze_ymd",
+    "name",
+    "meta_id",
+    "fetch_start_ymd",
+    "fetch_end_ymd",
+    "data_start_ymd",
+    "data_end_ymd",
+    "data_days",
+    "analysis_skipped",
+    "skip_reason",
+    "is_sustained_high",
+    "is_rising",
+    "is_spike",
+    "retain_reason",
+    "is_internal_demand_matched",
+    "matched_demand",
+    "senior_fit_score",
+    "is_final_retained",
+    "min_score",
+    "max_score",
+    "avg_score",
+    "demand_id",
+    "weight",
+]
+
+
+def write_wxindex_heat_pattern_csv(
+    rows: list[dict[str, Any]],
+    output_path: str | Path,
+    *,
+    fieldnames: list[str] | None = None,
+) -> Path:
+    """将热度分析明细写入本地 CSV。"""
+    path = Path(output_path).expanduser()
+    if not path.is_absolute():
+        path = Path.cwd() / path
+    path.parent.mkdir(parents=True, exist_ok=True)
+    columns = fieldnames or WXINDEX_HEAT_PATTERN_EXPORT_FIELDS
+    with path.open("w", encoding="utf-8-sig", newline="") as handle:
+        writer = csv.DictWriter(handle, fieldnames=columns, extrasaction="ignore")
+        writer.writeheader()
+        for row in rows:
+            writer.writerow({column: row.get(column, "") for column in columns})
+    return path

+ 110 - 4
app/hot_content/wxindex_trend.py

@@ -12,6 +12,11 @@ UP_FIT_CHANGE_RATE = 0.04
 UP_WINDOW_CHANGE_RATE = 0.02
 DOWN_FIT_CHANGE_RATE = -0.04
 DOWN_WINDOW_CHANGE_RATE = -0.02
+HEAT_RISING_OVERALL_CHANGE_RATE = 0.15
+HEAT_RISING_WINDOW_CHANGE_RATE = 0.12
+HEAT_RISING_ADJACENT_UP_RATIO = 0.65
+HEAT_SPIKE_BASELINE_FLOOR = 50_000.0
+HEAT_SPIKE_RATIO = 2.5
 
 
 def _median(values: list[float]) -> float:
@@ -24,7 +29,11 @@ def _median(values: list[float]) -> float:
     return (ordered[mid - 1] + ordered[mid]) / 2
 
 
-def _extract_recent_scores(series: list[dict[str, Any]]) -> list[float]:
+def _extract_recent_scores(
+    series: list[dict[str, Any]],
+    *,
+    max_points: int = MAX_TREND_POINTS,
+) -> list[float]:
     scored_rows: list[tuple[str, int, float]] = []
     for index, row in enumerate(series):
         if not isinstance(row, dict):
@@ -39,7 +48,8 @@ def _extract_recent_scores(series: list[dict[str, Any]]) -> list[float]:
         scored_rows.append((ymd, index, score))
 
     scored_rows.sort(key=lambda item: (item[0], item[1]))
-    return [score for _, _, score in scored_rows[-MAX_TREND_POINTS:]]
+    point_limit = max(max_points, 1)
+    return [score for _, _, score in scored_rows[-point_limit:]]
 
 
 def _theil_sen_slope(values: list[float]) -> float:
@@ -50,12 +60,110 @@ def _theil_sen_slope(values: list[float]) -> float:
     return _median(slopes)
 
 
+def extract_sorted_scores(
+    series: list[dict[str, Any]],
+    *,
+    max_points: int | None = None,
+) -> list[float]:
+    point_limit = (
+        max_points if max_points is not None and max_points > 0 else MAX_TREND_POINTS
+    )
+    return _extract_recent_scores(series, max_points=point_limit)
+
+
+def is_wxindex_rising_scores(
+    scores: list[float],
+    *,
+    min_points: int = MIN_TREND_POINTS,
+) -> bool:
+    """判断热度序列是否呈持续上涨趋势(原流程 Theil-Sen 规则)。"""
+    if len(scores) < min_points:
+        return False
+
+    log_scores = [math.log1p(score) for score in scores]
+    slope = _theil_sen_slope(log_scores)
+    fit_change_rate = math.expm1(slope * (len(log_scores) - 1))
+
+    early_count = min(3, len(scores))
+    late_count = min(3, len(scores))
+    early_avg = sum(scores[:early_count]) / early_count
+    late_avg = sum(scores[-late_count:]) / late_count
+    window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
+
+    return (
+        fit_change_rate >= UP_FIT_CHANGE_RATE
+        and window_change_rate >= UP_WINDOW_CHANGE_RATE
+    )
+
+
+def is_wxindex_heat_rising_scores(
+    scores: list[float],
+    *,
+    min_points: int = 7,
+    overall_change_rate: float = HEAT_RISING_OVERALL_CHANGE_RATE,
+    window_change_rate_threshold: float = HEAT_RISING_WINDOW_CHANGE_RATE,
+    adjacent_up_ratio: float = HEAT_RISING_ADJACENT_UP_RATIO,
+) -> bool:
+    """热度模式任务:判断区间内是否持续上涨。"""
+    if len(scores) < min_points:
+        return False
+
+    first_score = scores[0]
+    last_score = scores[-1]
+    overall_change_rate_value = (last_score - first_score) / max(first_score, 1.0)
+    if overall_change_rate_value < overall_change_rate:
+        return False
+
+    early_avg = sum(scores[:3]) / 3
+    late_avg = sum(scores[-3:]) / 3
+    window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
+    if window_change_rate < window_change_rate_threshold:
+        return False
+
+    if last_score <= first_score:
+        return False
+
+    adjacent_total = len(scores) - 1
+    if adjacent_total <= 0:
+        return False
+    adjacent_up_count = sum(
+        1 for index in range(adjacent_total) if scores[index + 1] > scores[index]
+    )
+    return adjacent_up_count / adjacent_total >= adjacent_up_ratio
+
+
+def is_wxindex_spike_scores(
+    scores: list[float],
+    *,
+    spike_days: int = 3,
+    min_points: int = 7,
+    spike_ratio: float = HEAT_SPIKE_RATIO,
+    baseline_floor: float = HEAT_SPIKE_BASELINE_FLOOR,
+) -> bool:
+    """判断最近 N 天热度是否相对前期突然暴涨。"""
+    if len(scores) < min_points or spike_days <= 0 or len(scores) <= spike_days:
+        return False
+
+    baseline = scores[:-spike_days]
+    recent = scores[-spike_days:]
+    baseline_avg = sum(baseline) / len(baseline)
+    recent_avg = sum(recent) / len(recent)
+    effective_baseline = max(baseline_avg, baseline_floor)
+    if recent_avg / effective_baseline < spike_ratio:
+        return False
+
+    baseline_max = max(baseline)
+    return recent[-1] > baseline_max and recent_avg > baseline_avg
+
+
 def calc_wxindex_trend(series: list[dict[str, Any]]) -> str:
     """按最近 7 天整体走势计算趋势,避免被最后一天波动误导。"""
     scores = _extract_recent_scores(series)
     if len(scores) < MIN_TREND_POINTS:
         return "未知"
 
+    if is_wxindex_rising_scores(scores, min_points=MIN_TREND_POINTS):
+        return "上升"
     log_scores = [math.log1p(score) for score in scores]
     slope = _theil_sen_slope(log_scores)
     fit_change_rate = math.expm1(slope * (len(log_scores) - 1))
@@ -64,8 +172,6 @@ def calc_wxindex_trend(series: list[dict[str, Any]]) -> str:
     late_avg = sum(scores[-3:]) / 3
     window_change_rate = (late_avg - early_avg) / max(early_avg, 1.0)
 
-    if fit_change_rate >= UP_FIT_CHANGE_RATE and window_change_rate >= UP_WINDOW_CHANGE_RATE:
-        return "上升"
     if (
         fit_change_rate <= DOWN_FIT_CHANGE_RATE
         and window_change_rate <= DOWN_WINDOW_CHANGE_RATE

+ 51 - 3
app/scheduler.py

@@ -22,6 +22,7 @@ from app.hot_content.service import run_once
 from app.hot_content.timezone import SHANGHAI_TZ
 from app.hot_content.types import FlowConfig
 from app.hot_content.wxindex_words import run_wxindex_words_daily_job
+from app.hot_content.wxindex_heat_pattern import run_wxindex_heat_pattern_daily_job
 
 
 def _import_blocking_scheduler() -> Any:
@@ -89,6 +90,31 @@ def run_wxindex_words_refresh_job(config: FlowConfig) -> None:
         repository.close()
 
 
+def run_wxindex_heat_pattern_job(config: FlowConfig) -> None:
+    repository = HotContentRepository(config.mysql)
+    api_client = JsonApiClient(
+        timeout_seconds=config.request_timeout_seconds,
+        verify_ssl=config.https_verify_ssl,
+    )
+    try:
+        summary = run_wxindex_heat_pattern_daily_job(
+            repository,
+            config=config,
+            api_client=api_client,
+        )
+        print(
+            json.dumps(
+                {"job": "wxindex_heat_pattern", "summary": summary},
+                ensure_ascii=False,
+                indent=2,
+            )
+        )
+    except Exception as exc:
+        print(f"wxindex heat pattern failed: {exc}", file=sys.stderr)
+    finally:
+        repository.close()
+
+
 def register_hot_content_job(scheduler: Any, config: FlowConfig) -> None:
     scheduler.add_job(
         run_hot_content_job,
@@ -137,6 +163,22 @@ def register_wxindex_words_refresh_job(scheduler: Any, config: FlowConfig) -> No
     )
 
 
+def register_wxindex_heat_pattern_job(scheduler: Any, config: FlowConfig) -> None:
+    scheduler.add_job(
+        run_wxindex_heat_pattern_job,
+        trigger="cron",
+        hour=config.wxindex_heat_pattern_cron_hour,
+        minute=config.wxindex_heat_pattern_cron_minute,
+        timezone=SHANGHAI_TZ,
+        args=[config],
+        id="wxindex_heat_pattern",
+        name="微信指数热度模式分析(持续高热/上涨/暴涨)",
+        replace_existing=True,
+        coalesce=True,
+        max_instances=1,
+    )
+
+
 def start_scheduler() -> None:
     BlockingScheduler = _import_blocking_scheduler()
     scheduler = BlockingScheduler(timezone=SHANGHAI_TZ)
@@ -144,12 +186,16 @@ def start_scheduler() -> None:
     register_hot_content_job(scheduler, config)
     register_decode_result_job(scheduler, config)
     register_wxindex_words_refresh_job(scheduler, config)
+    register_wxindex_heat_pattern_job(scheduler, config)
     print(
         "scheduler started, timezone=Asia/Shanghai, "
-        "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh'], "
+        "jobs=['hot_content_flow', 'decode_result_flow', 'wxindex_words_refresh', "
+        "'wxindex_heat_pattern'], "
         f"hot_cron={config.hot_flow_cron_hours}:{config.hot_flow_cron_minute:02d}, "
         f"decode_result_interval={config.decode_result_interval_seconds}s, "
-        f"wxindex_words_cron={config.wxindex_words_cron_hour}:{config.wxindex_words_cron_minute:02d}"
+        f"wxindex_words_cron={config.wxindex_words_cron_hour}:{config.wxindex_words_cron_minute:02d}, "
+        f"wxindex_heat_pattern_cron="
+        f"{config.wxindex_heat_pattern_cron_hour}:{config.wxindex_heat_pattern_cron_minute:02d}"
     )
     scheduler.start()
 
@@ -159,7 +205,7 @@ def parse_args() -> argparse.Namespace:
     parser.add_argument("--once", action="store_true", help="执行一次,不启动调度器")
     parser.add_argument(
         "--job",
-        choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh"),
+        choices=("all", "hot-content", "decode-result", "postprocess", "wxindex-refresh", "wxindex-heat-pattern"),
         default="all",
         help="--once 时选择执行哪个任务",
     )
@@ -207,6 +253,8 @@ def main() -> None:
             )
         if args.job in {"wxindex-refresh"}:
             run_wxindex_words_refresh_job(config)
+        if args.job in {"wxindex-heat-pattern"}:
+            run_wxindex_heat_pattern_job(config)
         return
     start_scheduler()
 

+ 2 - 0
docker-compose.yml

@@ -28,6 +28,8 @@ services:
       DECODE_RESULT_FLOW_INTERVAL_SECONDS: ${DECODE_RESULT_FLOW_INTERVAL_SECONDS:-1800}
       WXINDEX_WORDS_CRON_HOUR: ${WXINDEX_WORDS_CRON_HOUR:-10}
       WXINDEX_WORDS_CRON_MINUTE: ${WXINDEX_WORDS_CRON_MINUTE:-0}
+      WXINDEX_HEAT_PATTERN_CRON_HOUR: ${WXINDEX_HEAT_PATTERN_CRON_HOUR:-11}
+      WXINDEX_HEAT_PATTERN_CRON_MINUTE: ${WXINDEX_HEAT_PATTERN_CRON_MINUTE:-0}
       # 业务阈值
       WXINDEX_SCORE_THRESHOLD: ${WXINDEX_SCORE_THRESHOLD:-1000000}
       DEMAND_POOL_SOURCE_TABLE: ${DEMAND_POOL_SOURCE_TABLE:-dwd_multi_demand_pool_di}