jihuaqiang 6 дней назад
Родитель
Сommit
5440f5e8a8
3 измененных файлов с 56 добавлено и 7 удалено
  1. 42 3
      scheduler/decode_dispatch_job.py
  2. 12 2
      scheduler/decode_hourly_stats_job.py
  3. 2 2
      scheduler/odps_fetch.py

+ 42 - 3
scheduler/decode_dispatch_job.py

@@ -103,6 +103,27 @@ def _count_today_non_terminal(dt: str) -> int:
     return int((result or {}).get("total", 0))
 
 
+def _count_today_total(dt: str) -> int:
+    sql = """
+        SELECT COUNT(1) AS total
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s
+    """
+    result = mysql.fetchone(sql, (dt,))
+    return int((result or {}).get("total", 0))
+
+
+def _fetch_decode_daily_limit() -> int:
+    sql = """
+        SELECT `max` AS daily_limit
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    row = mysql.fetchone(sql)
+    return int((row or {}).get("daily_limit") or 0)
+
+
 def _submit_decode_result_chunk(
     channel_content_ids: List[str],
 ) -> Tuple[bool, str, Dict[str, Any]]:
@@ -428,8 +449,8 @@ def run_decode_dispatch_job() -> None:
             logger.info("解码调度任务结束(开关关闭:不发起新任务)")
             return
 
-        need = max(0, BATCH_SIZE - after_poll_non_terminal)
-        if need == 0:
+        window_need = max(0, BATCH_SIZE - after_poll_non_terminal)
+        if window_need == 0:
             logger.info(
                 "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
                 BATCH_SIZE,
@@ -439,11 +460,29 @@ def run_decode_dispatch_job() -> None:
             logger.info("解码调度任务结束(窗口已满)")
             return
 
+        daily_limit = _fetch_decode_daily_limit()
+        today_total = _count_today_total(dt)
+        daily_remaining = max(0, daily_limit - today_total)
+        if daily_remaining == 0:
+            logger.info(
+                "当日解构已达到上限,不再发起新任务 dt={} daily_limit={} today_total={}",
+                dt,
+                daily_limit,
+                today_total,
+            )
+            logger.info("解码调度任务结束(达到当日上限)")
+            return
+
+        need = min(window_need, daily_remaining)
         logger.info(
-            "动态窗口补充 dt={} 当前解构中={} 目标={} 本次补充={}",
+            "动态窗口补充 dt={} 当前解构中={} 目标={} 窗口缺口={} 日上限={} 当日已发起={} 当日剩余额度={} 本次补充={}",
             dt,
             after_poll_non_terminal,
             BATCH_SIZE,
+            window_need,
+            daily_limit,
+            today_total,
+            daily_remaining,
             need,
         )
         records = _pick_candidate_records(dt=dt, batch_size=need)

+ 12 - 2
scheduler/decode_hourly_stats_job.py

@@ -17,7 +17,6 @@ if str(_PROJECT_ROOT) not in sys.path:
 
 from utils.scheduler_logger import get_scheduler_logger
 from utils.sync_mysql_help import mysql
-from scheduler.odps_fetch import count_priority_posts
 
 
 logger = get_scheduler_logger()
@@ -78,7 +77,7 @@ def _fetch_today_stats(now: datetime) -> Dict[str, int]:
         ),
     )
     executed_total = int((row or {}).get("executed_count") or 0)
-    demand_total = int(count_priority_posts(now.strftime("%Y%m%d")) or 0)
+    demand_total = _fetch_decode_daily_limit()
     pending_total = max(demand_total - executed_total, 0)
     return {
         "executed_total": executed_total,
@@ -86,6 +85,17 @@ def _fetch_today_stats(now: datetime) -> Dict[str, int]:
     }
 
 
+def _fetch_decode_daily_limit() -> int:
+    sql = """
+        SELECT `max` AS daily_limit
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    row = mysql.fetchone(sql)
+    return int((row or {}).get("daily_limit") or 0)
+
+
 def _has_overdue_pending_task(now: datetime) -> bool:
     overdue_before = now - timedelta(minutes=30)
     sql = """

+ 2 - 2
scheduler/odps_fetch.py

@@ -46,7 +46,7 @@ def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = N
             level, reason, count, extend, dt
         FROM {ODPS_TABLE}
         WHERE dt = '{target_dt}'
-          AND level IN (0, 1, 2)
+          AND level IN (0, 1, 2, 3)
         ORDER BY count DESC, level ASC
         LIMIT {offset}, {limit}
     """
@@ -89,7 +89,7 @@ def count_priority_posts(dt: Optional[str] = None) -> int:
         SELECT COUNT(1) AS total_count
         FROM {ODPS_TABLE}
         WHERE dt = '{target_dt}'
-          AND level IN (0, 1, 2)
+          AND level IN (0, 1, 2, 3)
     """
     logger.info("开始执行ODPS计数 dt={}", target_dt)
     odps = _build_odps_client()