jihuaqiang пре 23 часа
родитељ
комит
ab3a64a4f8
2 измењених фајлова са 26 додато и 11 уклоњено
  1. 4 2
      examples/content_finder/db/schedule.py
  2. 22 9
      examples/content_finder/server.py

+ 4 - 2
examples/content_finder/db/schedule.py

@@ -96,13 +96,15 @@ def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
 
     - 不按品类分组(不再使用 merge_leve2)
     - dt 与表字段一致:一般为 YYYYMMDD 整数(如 20260402)
+    - 同 dt 下按 score 降序取第一条(最高分优先)
 
     Returns:
-        {"demand_content_id": int, "query": str} 或 None
+        {"demand_content_id": int, "query": str, "score": Any} 或 None
     """
     sql = """
     SELECT dc.id AS demand_content_id,
-           dc.name AS query
+           dc.name AS query,
+           dc.score AS score
     FROM demand_content dc
     WHERE dc.dt = %s
       AND NOT EXISTS (

+ 22 - 9
examples/content_finder/server.py

@@ -3,8 +3,8 @@
 
 提供:
 1. API 接口:POST /api/tasks - 触发内容寻找任务
-2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后每 10 分钟从
-   demand_content 取当天(dt=YYYYMMDD)且未建任务记录的 1 条需求执行(不区分品类)
+2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后每 2 分钟轮询一次,
+   若当前无任务在执行,则从 demand_content 取当天(dt=YYYYMMDD)、未建任务记录且 score 最高的一条执行(不区分品类)
 3. 并发控制:限制最大并发任务数;定时侧若已有任务在执行则跳过本次轮询
 4. 单次寻找任务最长执行 15 分钟,超时记为失败并回写 demand_find_task
 """
@@ -68,7 +68,7 @@ MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
 task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
 
 # 定时:轮询间隔(分钟)、单次任务超时(秒,默认 15 分钟)
-SCHEDULE_INTERVAL_MINUTES = int(os.getenv("SCHEDULE_INTERVAL_MINUTES", "10"))
+SCHEDULE_INTERVAL_MINUTES = int(os.getenv("SCHEDULE_INTERVAL_MINUTES", "2"))
 TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "900"))
 
 # 统计信息
@@ -174,15 +174,24 @@ def _today_dt_int() -> int:
     return int(datetime.now(ZoneInfo(SCHEDULER_TIMEZONE)).strftime("%Y%m%d"))
 
 
+def _has_running_content_task() -> bool:
+    """
+    本进程内是否有内容寻找任务正在执行(占用并发槽)。
+
+    与 execute_task 共用 task_semaphore,含 API 触发与定时触发。
+    """
+    return task_semaphore._value != MAX_CONCURRENT_TASKS
+
+
 async def scheduled_tick():
     """
-    每 10 分钟执行一次:若当前无任务占用并发槽,则从 demand_content 取当天(dt=今日)
-    且尚未出现在 demand_find_task 中的 1 条需求并执行。
+    按 SCHEDULE_INTERVAL_MINUTES 轮询:若当前无任务在执行,则从 demand_content 取
+    当天(dt=今日)、尚未出现在 demand_find_task 中且 score 最高的一条需求并执行。
     """
     logger.info("定时任务触发(scheduled_tick)")
 
-    if task_semaphore._value != MAX_CONCURRENT_TASKS:
-        logger.info("定时任务跳过:仍有任务在执行(并发槽已满)")
+    if _has_running_content_task():
+        logger.info("定时任务跳过:仍有任务在执行(并发槽占用中)")
         return
 
     dt = _today_dt_int()
@@ -197,7 +206,11 @@ async def scheduled_tick():
         logger.info("定时任务跳过:查询结果无效")
         return
 
-    logger.info(f"定时任务领取:demand_content_id={demand_content_id}, dt={dt}")
+    score = item.get("score")
+    logger.info(
+        f"定时任务领取(当天 score 最高):demand_content_id={demand_content_id}, "
+        f"dt={dt}, score={score}"
+    )
     create_task_record(demand_content_id)
     await execute_task(query=query, demand_id=demand_content_id, task_type="scheduled")
 
@@ -394,7 +407,7 @@ async def startup():
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
     logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
     logger.info(
-        f"定时策略:每 {SCHEDULE_INTERVAL_MINUTES} 分钟轮询当天需求;"
+        f"定时策略:每 {SCHEDULE_INTERVAL_MINUTES} 分钟检查是否空闲,空闲则取当天 score 最高的一条;"
         f"单次任务超时 {TASK_TIMEOUT_SECONDS}s"
     )