jihuaqiang 15 часов назад
Родитель
Сommit
40593547a5
1 измененных файлов с 18 добавлено и 12 удалено
  1. 18 12
      examples/content_finder/server.py

+ 18 - 12
examples/content_finder/server.py

@@ -68,8 +68,10 @@ scheduler = AsyncIOScheduler(timezone=SCHEDULER_TZ)
 MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "2"))
 MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "2"))
 task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
 task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
 
 
-# 定时:轮询间隔(分钟)、单次任务超时(秒,默认 15 分钟)
-SCHEDULE_INTERVAL_MINUTES = int(os.getenv("SCHEDULE_INTERVAL_MINUTES", "2"))
+# 定时:派发间隔(秒)、单次任务超时(秒,默认 15 分钟)
+# - 为避免启动时同时派发多个任务导致潜在重复处理,默认每 30s 只派发 1 条;
+#   通过持续派发逐步填满并发槽,直到达到 MAX_CONCURRENT_TASKS。
+SCHEDULE_DISPATCH_INTERVAL_SECONDS = int(os.getenv("SCHEDULE_DISPATCH_INTERVAL_SECONDS", "30"))
 TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1500"))
 TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1500"))
 
 
 # 统计信息
 # 统计信息
@@ -193,13 +195,14 @@ def _has_running_content_task() -> bool:
 
 
 async def scheduled_tick():
 async def scheduled_tick():
     """
     """
-    按 SCHEDULE_INTERVAL_MINUTES 轮询:若当前无任务在执行,则从 demand_content 取
+    按 SCHEDULE_DISPATCH_INTERVAL_SECONDS 派发:若当前并发有空槽,则从 demand_content 取
     当天(dt=今日)、尚未出现在 demand_find_task 中且 score 最高的一条需求并执行。
     当天(dt=今日)、尚未出现在 demand_find_task 中且 score 最高的一条需求并执行。
     """
     """
     logger.info("定时任务触发(scheduled_tick)")
     logger.info("定时任务触发(scheduled_tick)")
 
 
-    if _has_running_content_task():
-        logger.info("定时任务跳过:仍有任务在执行(并发槽占用中)")
+    # 无空闲并发槽则不派发;保持 tick 很快返回,避免阻塞调度器。
+    if task_semaphore._value <= 0:
+        logger.info("定时任务跳过:无空闲并发槽")
         return
         return
 
 
     dt = _today_dt_int()
     dt = _today_dt_int()
@@ -221,11 +224,14 @@ async def scheduled_tick():
         f"dt={dt}, score={score}"
         f"dt={dt}, score={score}"
     )
     )
     create_task_record(demand_content_id)
     create_task_record(demand_content_id)
-    await execute_task(
-        query=query,
-        demand_id=demand_content_id,
-        suggestion=suggestion,
-        task_type="scheduled",
+    # 后台执行:由 execute_task 内部 semaphore 控制并发占用
+    asyncio.create_task(
+        execute_task(
+            query=query,
+            demand_id=demand_content_id,
+            suggestion=suggestion,
+            task_type="scheduled",
+        )
     )
     )
 
 
 
 
@@ -433,7 +439,7 @@ async def startup():
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
     logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
     logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
     logger.info(
     logger.info(
-        f"定时策略:每 {SCHEDULE_INTERVAL_MINUTES} 分钟检查是否空闲,空闲则取当天 score 最高的一条;"
+        f"定时策略:每 {SCHEDULE_DISPATCH_INTERVAL_SECONDS} 秒尝试派发 1 条(有并发空槽才派发);"
         f"单次任务超时 {TASK_TIMEOUT_SECONDS}s"
         f"单次任务超时 {TASK_TIMEOUT_SECONDS}s"
     )
     )
 
 
@@ -442,7 +448,7 @@ async def startup():
     job = scheduler.add_job(
     job = scheduler.add_job(
         scheduled_tick,
         scheduled_tick,
         "interval",
         "interval",
-        minutes=SCHEDULE_INTERVAL_MINUTES,
+        seconds=SCHEDULE_DISPATCH_INTERVAL_SECONDS,
         misfire_grace_time=300,
         misfire_grace_time=300,
         coalesce=True,
         coalesce=True,
         max_instances=1,
         max_instances=1,