Bladeren bron

增加微信数据补充

xueyiming 1 week geleden
bovenliggende
commit
354ea86977
2 gewijzigde bestanden met toevoegingen van 132 en 32 verwijderingen
  1. 25 10
      app/hot_content/repository.py
  2. 107 22
      app/hot_content/wxindex_words.py

+ 25 - 10
app/hot_content/repository.py

@@ -1108,28 +1108,41 @@ class HotContentRepository:
             scores.append({"ymd": dt, "total_score": total_score})
         return scores
 
-    def list_stale_wxindex_words(self, *, end_ymd: str) -> list[dict[str, Any]]:
-        """返回已存在但最新日期早于 end_ymd 的词。"""
+    def list_stale_wxindex_words(
+        self,
+        *,
+        end_ymd: str,
+        start_ymd: str = "20260601",
+    ) -> list[dict[str, Any]]:
+        """返回已存在但缺最新日期,或未从 start_ymd 补齐的词。"""
         target_end = str(end_ymd or "").strip()
-        if not target_end:
+        target_start = str(start_ymd or "").strip()
+        if not target_end or not target_start:
             return []
         self._ensure_wxindex_words_table()
         sql = """
-            SELECT name, MAX(dt) AS latest_dt
+            SELECT name, MIN(dt) AS earliest_dt, MAX(dt) AS latest_dt
             FROM hot_content_wxindex_words
             GROUP BY name
-            HAVING MAX(dt) < %s
+            HAVING MAX(dt) < %s OR MIN(dt) > %s
             ORDER BY name ASC
         """
         with self.conn.cursor() as cursor:
-            cursor.execute(sql, (target_end,))
+            cursor.execute(sql, (target_end, target_start))
             rows = cursor.fetchall()
         stale_words: list[dict[str, Any]] = []
         for row in rows:
             name = str(row.get("name") or "").strip()
+            earliest_dt = str(row.get("earliest_dt") or "").strip()
             latest_dt = str(row.get("latest_dt") or "").strip()
-            if name and latest_dt:
-                stale_words.append({"name": name, "latest_dt": latest_dt})
+            if name and earliest_dt and latest_dt:
+                stale_words.append(
+                    {
+                        "name": name,
+                        "earliest_dt": earliest_dt,
+                        "latest_dt": latest_dt,
+                    }
+                )
         return stale_words
 
     def list_low_avg_wxindex_words(
@@ -1241,9 +1254,11 @@ class HotContentRepository:
         if not rows:
             return 0, 0
 
+        inserted = 0
         with self.conn.cursor() as cursor:
-            cursor.executemany(sql, rows)
-            inserted = int(cursor.rowcount or 0)
+            for row in rows:
+                cursor.execute(sql, row)
+                inserted += int(cursor.rowcount or 0)
         skipped = len(rows) - inserted
         return inserted, skipped
 

+ 107 - 22
app/hot_content/wxindex_words.py

@@ -86,20 +86,64 @@ def fetch_wxindex_scores(
     return parse_wxindex_total_scores(wx_resp)
 
 
-def scores_need_refresh(
+def get_word_score_bounds(
+    scores: list[dict[str, Any]],
+) -> tuple[str | None, str | None]:
+    ymds = [
+        str(item.get("ymd") or item.get("dt") or "").strip()
+        for item in scores
+        if isinstance(item, dict) and str(item.get("ymd") or item.get("dt") or "").strip()
+    ]
+    if not ymds:
+        return None, None
+    return min(ymds), max(ymds)
+
+
+def word_scores_need_supplement(
     scores: list[dict[str, Any]],
     *,
     end_ymd: str | None = None,
-) -> bool:
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
+) -> tuple[bool, str]:
+    """判断词是否需要补数:缺起始段、缺最新日期,或完全无数据。"""
     if not scores:
-        return True
+        return True, "empty"
     target_end = end_ymd or get_wxindex_end_ymd()
-    latest_ymd = max(
-        str(item.get("ymd") or "")
-        for item in scores
-        if isinstance(item, dict) and str(item.get("ymd") or "").strip()
+    earliest_ymd, latest_ymd = get_word_score_bounds(scores)
+    if earliest_ymd is None or latest_ymd is None:
+        return True, "empty"
+    if earliest_ymd > start_ymd:
+        return True, "missing_start"
+    if latest_ymd < target_end:
+        return True, "missing_end"
+    return False, "complete"
+
+
+def get_supplement_fetch_range(
+    scores: list[dict[str, Any]],
+    *,
+    end_ymd: str | None = None,
+    start_ymd: str = WXINDEX_WORDS_START_YMD,
+) -> tuple[str, str] | None:
+    """计算补数 API 查询区间;无需补数时返回 None。"""
+    need_supplement, reason = word_scores_need_supplement(
+        scores,
+        end_ymd=end_ymd,
+        start_ymd=start_ymd,
     )
-    return latest_ymd < target_end
+    if not need_supplement:
+        return None
+
+    target_end = end_ymd or get_wxindex_end_ymd()
+    if reason == "empty":
+        return start_ymd, target_end
+
+    earliest_ymd, latest_ymd = get_word_score_bounds(scores)
+    if earliest_ymd is None or latest_ymd is None:
+        return start_ymd, target_end
+    if earliest_ymd > start_ymd:
+        return start_ymd, target_end
+    return next_ymd(latest_ymd), target_end
 
 
 def slice_scores_lookback(
@@ -145,31 +189,45 @@ def refresh_stale_wxindex_words(
         "inserted_rows": 0,
         "skipped_rows": 0,
         "fetch_failed": 0,
+        "api_empty": 0,
         "no_new_range": 0,
     }
 
-    stale_words = repository.list_stale_wxindex_words(end_ymd=target_end)
+    stale_words = repository.list_stale_wxindex_words(
+        end_ymd=target_end,
+        start_ymd=WXINDEX_WORDS_START_YMD,
+    )
     summary["stale_words"] = len(stale_words)
     if not stale_words:
         return summary
 
     for item in stale_words:
         name = str(item.get("name") or "").strip()
-        latest_dt = str(item.get("latest_dt") or "").strip()
-        if not name or not latest_dt:
+        if not name:
             continue
 
-        start_ymd = next_ymd(latest_dt)
-        if start_ymd > target_end:
+        stored_scores = repository.list_wxindex_word_scores(name)
+        fetch_range = get_supplement_fetch_range(
+            stored_scores,
+            end_ymd=target_end,
+        )
+        if fetch_range is None:
             summary["no_new_range"] += 1
             if verbose:
-                print(f"skip up-to-date word={name} latest_dt={latest_dt}")
+                print(f"skip complete word={name}")
+            continue
+
+        start_ymd, end_ymd = fetch_range
+        if start_ymd > end_ymd:
+            summary["no_new_range"] += 1
+            if verbose:
+                print(f"skip up-to-date word={name}")
             continue
 
         if dry_run:
             summary["refreshed"] += 1
             if verbose:
-                print(f"[dry-run] would refresh word={name} {start_ymd}->{target_end}")
+                print(f"[dry-run] would refresh word={name} {start_ymd}->{end_ymd}")
             continue
 
         try:
@@ -178,8 +236,14 @@ def refresh_stale_wxindex_words(
                 api_url,
                 keyword=name,
                 start_ymd=start_ymd,
-                end_ymd=target_end,
+                end_ymd=end_ymd,
             )
+            if not api_scores:
+                summary["api_empty"] += 1
+                if verbose:
+                    print(f"api empty word={name} range={start_ymd}->{end_ymd}")
+                continue
+
             inserted, skipped = repository.save_wxindex_daily_scores(
                 name=name,
                 scores=api_scores,
@@ -190,12 +254,21 @@ def refresh_stale_wxindex_words(
                 print(f"refresh failed word={name}: {exc}")
             continue
 
+        if inserted <= 0:
+            summary["api_empty"] += 1
+            if verbose:
+                print(
+                    f"no new rows word={name} range={start_ymd}->{end_ymd} "
+                    f"api_rows={len(api_scores)} skipped={skipped}"
+                )
+            continue
+
         summary["refreshed"] += 1
         summary["inserted_rows"] += inserted
         summary["skipped_rows"] += skipped
         if verbose:
             print(
-                f"refreshed word={name} range={start_ymd}->{target_end} "
+                f"refreshed word={name} range={start_ymd}->{end_ymd} "
                 f"inserted={inserted} skipped={skipped}"
             )
 
@@ -294,22 +367,28 @@ def ensure_word_full_scores(
 
     target_end = end_ymd or get_wxindex_end_ymd()
     stored_scores = repository.list_wxindex_word_scores(word)
-    if stored_scores and not force_refresh and not scores_need_refresh(
+    fetch_range = None if force_refresh else get_supplement_fetch_range(
         stored_scores,
         end_ymd=target_end,
-    ):
+    )
+    if fetch_range is None and stored_scores:
         return stored_scores, "cached"
 
     if dry_run:
         return [], "dry_run"
 
     had_data = bool(stored_scores)
+    start_ymd, fetch_end_ymd = fetch_range or (WXINDEX_WORDS_START_YMD, target_end)
     api_scores = fetch_wxindex_scores(
         api_client,
         api_url,
         keyword=word,
-        end_ymd=target_end,
+        start_ymd=start_ymd,
+        end_ymd=fetch_end_ymd,
     )
+    if not api_scores:
+        return stored_scores, "api_empty"
+
     inserted, _skipped = repository.save_wxindex_daily_scores(
         name=word,
         scores=api_scores,
@@ -317,8 +396,10 @@ def ensure_word_full_scores(
     final_scores = repository.list_wxindex_word_scores(word)
     if inserted > 0:
         action = "updated" if had_data else "inserted"
-    else:
+    elif final_scores:
         action = "cached"
+    else:
+        action = "api_empty"
     return final_scores or api_scores, action
 
 
@@ -338,6 +419,7 @@ def sync_words_from_trend_json(
         "inserted": 0,
         "updated": 0,
         "cached": 0,
+        "api_empty": 0,
         "fetch_failed": 0,
     }
     words = extract_searched_words(trend_json)
@@ -366,6 +448,8 @@ def sync_words_from_trend_json(
             summary["updated"] += 1
         elif action == "cached":
             summary["cached"] += 1
+        elif action == "api_empty":
+            summary["api_empty"] += 1
         elif action == "dry_run":
             summary["inserted"] += 1
 
@@ -392,6 +476,7 @@ def backfill_wxindex_words(
         "inserted": 0,
         "updated": 0,
         "cached": 0,
+        "api_empty": 0,
         "fetch_failed": 0,
         "invalid_json": 0,
     }
@@ -428,7 +513,7 @@ def backfill_wxindex_words(
             dry_run=dry_run,
             verbose=verbose,
         )
-        for key in ("words_found", "inserted", "updated", "cached", "fetch_failed"):
+        for key in ("words_found", "inserted", "updated", "cached", "api_empty", "fetch_failed"):
             summary[key] += result[key]
 
     return summary