Kaynağa Gözat

始终保持20个解构并发

jihuaqiang 1 hafta önce
ebeveyn
işleme
2a2e0336cb
1 değiştirilmiş dosya ile 43 ekleme ve 33 silme
  1. 43 33
      scheduler/decode_dispatch_job.py

+ 43 - 33
scheduler/decode_dispatch_job.py

@@ -15,6 +15,7 @@ logger = get_scheduler_logger()
 CONFIG_ID = "57"
 DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
 DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
+# 动态窗口:尽量保持当天 status IN (0,1) 的条数为此值(补充时按缺口取数)。
 BATCH_SIZE = 20
 ODPS_PAGE_SIZE = 200
 RESULT_POLL_CHUNK = 50
@@ -395,50 +396,59 @@ def run_decode_dispatch_job() -> None:
     try:
         dt = _today_dt()
 
-        # Startup guard: if there are in-flight tasks today, poll only in this run.
-        # New batch submit will wait for next scheduler cycle after all are terminal.
-        initial_non_terminal = _count_today_non_terminal(dt)
-        if initial_non_terminal > 0:
+        before_poll_non_terminal = _count_today_non_terminal(dt)
+        if before_poll_non_terminal > 0:
             logger.info(
-                "启动时发现当天存在进行中任务,本轮仅查询不发起新批次 dt={} count={}",
+                "当天存在待执行/执行中任务,先拉取解码结果 dt={} count={}",
                 dt,
-                initial_non_terminal,
+                before_poll_non_terminal,
+            )
+        pending_vids = _fetch_today_pending_vids(dt)
+        if pending_vids:
+            logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
+            _poll_decode_results_for_today(dt, pending_vids)
+        elif before_poll_non_terminal > 0:
+            logger.warning(
+                "存在非终态记录但未获取到可查询vid dt={} count={}",
+                dt,
+                before_poll_non_terminal,
             )
-            pending_vids = _fetch_today_pending_vids(dt)
-            if pending_vids:
-                logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
-                _poll_decode_results_for_today(dt, pending_vids)
-            else:
-                logger.warning(
-                    "存在非终态记录但未获取到可查询vid dt={} count={}",
-                    dt,
-                    initial_non_terminal,
-                )
 
-            remaining_non_terminal = _count_today_non_terminal(dt)
-            if remaining_non_terminal > 0:
-                logger.info(
-                    "查询后仍有待执行/执行中任务,跳过新批次发起 dt={} count={}",
-                    dt,
-                    remaining_non_terminal,
-                )
-                logger.info("解码调度任务结束(启动保护:仅查询)")
-                return
-            else:
-                logger.info(
-                    "查询后当天进行中任务已清空,立即发起新批次 dt={}",
-                    dt,
-                )
-                # fallthrough: submit new batch in the same run
+        after_poll_non_terminal = _count_today_non_terminal(dt)
+        if pending_vids or before_poll_non_terminal > 0:
+            logger.info(
+                "解码结果查询阶段结束 dt={} 查询前非终态={} 查询后非终态={}",
+                dt,
+                before_poll_non_terminal,
+                after_poll_non_terminal,
+            )
 
         if not _is_decode_submit_open():
             logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt)
             logger.info("解码调度任务结束(开关关闭:不发起新任务)")
             return
 
-        records = _pick_candidate_records(dt=dt, batch_size=BATCH_SIZE)
+        need = max(0, BATCH_SIZE - after_poll_non_terminal)
+        if need == 0:
+            logger.info(
+                "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
+                BATCH_SIZE,
+                dt,
+                after_poll_non_terminal,
+            )
+            logger.info("解码调度任务结束(窗口已满)")
+            return
+
+        logger.info(
+            "动态窗口补充 dt={} 当前解构中={} 目标={} 本次补充={}",
+            dt,
+            after_poll_non_terminal,
+            BATCH_SIZE,
+            need,
+        )
+        records = _pick_candidate_records(dt=dt, batch_size=need)
         if not records:
-            logger.info("无可发起的新批次候选数据 dt={}", dt)
+            logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need)
             logger.info("解码调度任务结束(无新增任务)")
             return
         logger.info("解码提交接口执行开始 records={}", records)