Przeglądaj źródła

bugfix: 跨天级任务状态更新

jihuaqiang 1 dzień temu
rodzic
commit
c7c0b1d8d1

+ 4 - 1
scheduler/change_history_status.py

@@ -23,7 +23,10 @@ def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]:
     sql = """
         SELECT DISTINCT dt, vid
         FROM aigc_topic_decode_task_result
-        WHERE status IN (0, 1) AND vid IS NOT NULL AND vid != ''
+        WHERE status IN (0, 1)
+          AND dt < DATE_FORMAT(CURDATE(), '%Y%m%d')
+          AND vid IS NOT NULL
+          AND vid != ''
         ORDER BY dt, vid
     """
     rows = mysql.fetchall(sql)

+ 43 - 0
scheduler/decode_dispatch_job.py

@@ -93,6 +93,27 @@ def _fetch_today_pending_vids(dt: str) -> List[str]:
     return [str(row["vid"]) for row in rows if row.get("vid")]
 
 
+def _fetch_history_pending_vids_by_dt(today_dt: str) -> Dict[str, List[str]]:
+    sql = """
+        SELECT DISTINCT dt, vid
+        FROM aigc_topic_decode_task_result
+        WHERE status IN (0, 1)
+          AND dt < %s
+          AND vid IS NOT NULL
+          AND vid != ''
+        ORDER BY dt, vid
+    """
+    rows = mysql.fetchall(sql, (today_dt,))
+    grouped: Dict[str, List[str]] = {}
+    for row in rows:
+        dt = str(row.get("dt") or "").strip()
+        vid = str(row.get("vid") or "").strip()
+        if not dt or not vid:
+            continue
+        grouped.setdefault(dt, []).append(vid)
+    return grouped
+
+
 def _count_today_non_terminal(dt: str) -> int:
     sql = """
         SELECT COUNT(1) AS total
@@ -247,6 +268,27 @@ def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
     )
 
 
+def _poll_decode_results_for_history(today_dt: str) -> None:
+    vids_by_dt = _fetch_history_pending_vids_by_dt(today_dt)
+    if not vids_by_dt:
+        return
+    total = sum(len(v) for v in vids_by_dt.values())
+    logger.info(
+        "开始查询历史未完成解码结果 today_dt={} 涉及dt数={} 总vid数={}",
+        today_dt,
+        len(vids_by_dt),
+        total,
+    )
+    for dt, vids in vids_by_dt.items():
+        _poll_decode_results_for_today(dt, vids)
+    logger.info(
+        "历史未完成解码结果查询结束 today_dt={} 涉及dt数={} 总vid数={}",
+        today_dt,
+        len(vids_by_dt),
+        total,
+    )
+
+
 def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
     posts: List[Dict[str, Any]] = []
     for item in records:
@@ -416,6 +458,7 @@ def run_decode_dispatch_job() -> None:
     logger.info("解码调度任务开始执行")
     try:
         dt = _today_dt()
+        _poll_decode_results_for_history(dt)
 
         before_poll_non_terminal = _count_today_non_terminal(dt)
         if before_poll_non_terminal > 0: