Explorar o código

少日期需要跳过流程

xueyiming hai 6 días
pai
achega
537e17b449
Modificáronse 2 ficheiros con 82 adicións e 1 borrados
  1. 30 0
      app/hot_content/repository.py
  2. 52 1
      app/hot_content/wxindex_heat_pattern.py

+ 30 - 0
app/hot_content/repository.py

@@ -2253,6 +2253,36 @@ class HotContentRepository:
             cursor.execute(sql, tuple(cleaned))
             return int(cursor.rowcount or 0)
 
+    def list_wxindex_word_names_with_dt(
+        self,
+        names: list[str],
+        *,
+        dt: str,
+    ) -> set[str]:
+        """返回在 hot_content_wxindex_words 中存在指定日期数据的词名集合。"""
+        target_dt = str(dt or "").strip()
+        normalized_names = [
+            str(name or "").strip() for name in names if str(name or "").strip()
+        ]
+        if not target_dt or not normalized_names:
+            return set()
+        self._ensure_wxindex_words_table()
+        placeholders = ", ".join(["%s"] * len(normalized_names))
+        sql = f"""
+            SELECT DISTINCT name
+            FROM hot_content_wxindex_words
+            WHERE dt = %s
+              AND name IN ({placeholders})
+        """
+        with self.conn.cursor() as cursor:
+            cursor.execute(sql, [target_dt, *normalized_names])
+            rows = cursor.fetchall()
+        return {
+            str(row.get("name") or "").strip()
+            for row in rows or []
+            if str(row.get("name") or "").strip()
+        }
+
     def has_wxindex_word(self, name: str) -> bool:
         return self.get_wxindex_word_latest_dt(name) is not None
 

+ 52 - 1
app/hot_content/wxindex_heat_pattern.py

@@ -4,7 +4,7 @@ from __future__ import annotations
 
 import csv
 import json
-from datetime import date, datetime
+from datetime import date, datetime, timedelta
 from pathlib import Path
 from typing import Any
 
@@ -837,6 +837,45 @@ def _persist_pending_item_record(
     return int(item.get("record_id") or 0)
 
 
+def _filter_candidates_awaiting_yesterday_score(
+    repository: HotContentRepository,
+    candidate_items: list[dict[str, Any]],
+    *,
+    yesterday_ymd: str,
+    existing_records: dict[str, dict[str, Any]],
+    verbose: bool,
+) -> tuple[list[dict[str, Any]], int]:
+    """初始化完成后:未完成热度分析且缺少昨日指数数据的词留待下次执行。"""
+    names_to_check = [
+        item["name"]
+        for item in candidate_items
+        if not _is_heat_analysis_done(existing_records.get(item["name"]))
+    ]
+    if not names_to_check:
+        return candidate_items, 0
+
+    names_with_yesterday = repository.list_wxindex_word_names_with_dt(
+        names_to_check,
+        dt=yesterday_ymd,
+    )
+    ready_items: list[dict[str, Any]] = []
+    awaiting_count = 0
+    for item in candidate_items:
+        name = item["name"]
+        if _is_heat_analysis_done(existing_records.get(name)):
+            ready_items.append(item)
+            continue
+        if name in names_with_yesterday:
+            ready_items.append(item)
+            continue
+        awaiting_count += 1
+        if verbose:
+            print(
+                f"await yesterday score word={name} dt={yesterday_ymd}, skip this run"
+            )
+    return ready_items, awaiting_count
+
+
 def _init_candidate_wxindex_word_records(
     repository: HotContentRepository,
     candidate_items: list[dict[str, Any]],
@@ -1092,6 +1131,7 @@ def run_wxindex_heat_pattern_daily_job(
         "demand_match_batches": 0,
         "senior_fit_batches": 0,
         "records_initialized": 0,
+        "awaiting_yesterday_score": 0,
         "heat_resumed": 0,
         "demand_match_resumed": 0,
         "senior_fit_resumed": 0,
@@ -1149,6 +1189,17 @@ def run_wxindex_heat_pattern_daily_job(
             analyze_ymd=analyze_ymd,
             names=candidate_names,
         )
+
+    yesterday_ymd = (current - timedelta(days=1)).strftime("%Y%m%d")
+    candidate_items, awaiting_yesterday = _filter_candidates_awaiting_yesterday_score(
+        repository,
+        candidate_items,
+        yesterday_ymd=yesterday_ymd,
+        existing_records=existing_records,
+        verbose=verbose,
+    )
+    summary["awaiting_yesterday_score"] = awaiting_yesterday
+
     for item in candidate_items:
         if not item.get("record_id"):
             existing = existing_records.get(item["name"])