jihuaqiang 1 день назад
Родитель
Сommit
6cff4f71dd

+ 1 - 2
examples/content_finder/content_finder.md

@@ -49,7 +49,6 @@ $system$
 7. **入库阶段**:仅在 Schema 校验通过后,调用 `store_results_mysql(trace_id)` 存储到远程数据库
 8. **接入平台阶段**:按 `aigc_platform_plan` 生成 AIGC 爬取计划
 9. **过程摘要阶段(内容策略表格)**:在以上全部完成后,按 `exec_summary_rows` 的要求生成 `summary_json`,并调用 `exec_summary(trace_id, summary_json, log_path)` tool(`log_path` 传入 `{output_dir}/{trace_id}/log.txt`),将**每条入选的选择策略**整理成表格形式的 JSON,写入 `{output_dir}/{trace_id}/process_trace.json`。
-   
 
 ## 强制要求(违反即为错误)
 
@@ -135,7 +134,7 @@ $system$
 - **禁止**:写完库就认为任务结束、不创建爬取计划。若某条创建失败,须在回复中说明原因;仅当入选视频已创建或已说明失败原因时,方可视为本阶段完成。
 
 ### 6.过程摘要是否已写入
-- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`生成了每条视频的过程记录.
+- 是否在 **AIGC 计划阶段完成后** 调用了 `exec_summary`生成了每条视频的过程记录,尤其是case出发的策略,是否对每个内容关联了灵感点.
 
 
 $user$

+ 4 - 0
examples/content_finder/db/__init__.py

@@ -11,6 +11,8 @@ from .open_aigc_pattern_connection import get_open_aigc_pattern_connection
 from .schedule import (
     get_next_unprocessed_demand,
     get_daily_unprocessed_pool,
+    get_first_running_task,
+    get_one_today_unprocessed_demand,
     create_task_record,
     fetch_trace_ids_created_after,
     update_task_status,
@@ -29,6 +31,8 @@ __all__ = [
     "get_open_aigc_pattern_connection",
     "get_next_unprocessed_demand",
     "get_daily_unprocessed_pool",
+    "get_first_running_task",
+    "get_one_today_unprocessed_demand",
     "create_task_record",
     "fetch_trace_ids_created_after",
     "update_task_status",

+ 71 - 0
examples/content_finder/db/schedule.py

@@ -56,6 +56,77 @@ def get_next_unprocessed_demand() -> Optional[Dict[str, Any]]:
             conn.close()
 
 
+def get_first_running_task() -> Optional[Dict[str, Any]]:
+    """
+    查找 demand_find_task 中 status=STATUS_RUNNING(1) 的任务(理论上仅一条)。
+
+    用于服务重启后恢复执行中的任务:联表取出 query(demand_content.name)。
+
+    Returns:
+        {"demand_content_id": int, "query": str, "trace_id": str} 或 None
+    """
+    sql = """
+    SELECT t.demand_content_id,
+           t.trace_id,
+           dc.name AS query
+    FROM demand_find_task t
+    INNER JOIN demand_content dc ON dc.id = t.demand_content_id
+    WHERE t.status = %s
+    ORDER BY t.id ASC
+    LIMIT 1
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (STATUS_RUNNING,))
+            row = cur.fetchone()
+            return dict(row) if row else None
+    except Exception as e:
+        logger.error(f"get_first_running_task 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
+def get_one_today_unprocessed_demand(*, dt: int) -> Optional[Dict[str, Any]]:
+    """
+    从 demand_content 中取「当天 dt」且尚未在 demand_find_task 中出现过的 1 条需求。
+
+    - 不按品类分组(不再使用 merge_leve2)
+    - dt 与表字段一致:一般为 YYYYMMDD 整数(如 20260402)
+
+    Returns:
+        {"demand_content_id": int, "query": str} 或 None
+    """
+    sql = """
+    SELECT dc.id AS demand_content_id,
+           dc.name AS query
+    FROM demand_content dc
+    WHERE dc.dt = %s
+      AND NOT EXISTS (
+        SELECT 1 FROM demand_find_task t
+        WHERE t.demand_content_id = dc.id
+      )
+    ORDER BY dc.score DESC, dc.id DESC
+    LIMIT 1
+    """
+    conn = None
+    try:
+        conn = get_connection()
+        with conn.cursor() as cur:
+            cur.execute(sql, (int(dt),))
+            row = cur.fetchone()
+            return dict(row) if row else None
+    except Exception as e:
+        logger.error(f"get_one_today_unprocessed_demand 失败: {e}", exc_info=True)
+        raise
+    finally:
+        if conn:
+            conn.close()
+
+
 def get_daily_unprocessed_pool(
     *,
     total_limit: int = 20,

+ 84 - 41
examples/content_finder/server.py

@@ -3,8 +3,10 @@
 
 提供:
 1. API 接口:POST /api/tasks - 触发内容寻找任务
-2. 定时调度:每 10 分钟从数据库联表查询未处理需求并执行任务
-3. 并发控制:限制最大并发任务数
+2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后每 10 分钟从
+   demand_content 取当天(dt=YYYYMMDD)且未建任务记录的 1 条需求执行(不区分品类)
+3. 并发控制:限制最大并发任务数;定时侧若已有任务在执行则跳过本次轮询
+4. 单次寻找任务最长执行 15 分钟,超时记为失败并回写 demand_find_task
 """
 
 import asyncio
@@ -27,7 +29,13 @@ from dotenv import load_dotenv
 load_dotenv()
 
 import core
-from db import get_daily_unprocessed_pool, create_task_record, update_task_status, update_task_on_complete
+from db import (
+    create_task_record,
+    get_first_running_task,
+    get_one_today_unprocessed_demand,
+    update_task_status,
+    update_task_on_complete,
+)
 from db.schedule import STATUS_RUNNING, STATUS_SUCCESS, STATUS_FAILED
 
 # 配置日志
@@ -59,6 +67,10 @@ scheduler = AsyncIOScheduler(timezone=ZoneInfo(SCHEDULER_TIMEZONE))
 MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
 task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
 
+# 定时:轮询间隔(分钟)、单次任务超时(秒,默认 15 分钟)
+SCHEDULE_INTERVAL_MINUTES = int(os.getenv("SCHEDULE_INTERVAL_MINUTES", "10"))
+TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "900"))
+
 # 统计信息
 stats = {
     "total_tasks": 0,
@@ -121,8 +133,11 @@ async def execute_task(
                 logger.warning(f"更新任务状态为执行中失败: {e}")
 
         try:
-            result = await core.run_agent(
-                query, demand_id=demand_id, stream_output=False, log_assistant_text=True
+            result = await asyncio.wait_for(
+                core.run_agent(
+                    query, demand_id=demand_id, stream_output=False, log_assistant_text=True
+                ),
+                timeout=float(TASK_TIMEOUT_SECONDS),
             )
             duration = (datetime.now() - start_time).total_seconds()
 
@@ -137,6 +152,15 @@ async def execute_task(
                 if task_type == "scheduled" and demand_id is not None:
                     _update_scheduled_task_complete(demand_id, result.get("trace_id") or "", STATUS_FAILED)
 
+        except asyncio.TimeoutError:
+            stats["failed_tasks"] += 1
+            duration = (datetime.now() - start_time).total_seconds()
+            logger.error(
+                f"任务超时 [{task_type}]: 超过 {TASK_TIMEOUT_SECONDS}s,记为失败, 耗时={duration:.1f}s"
+            )
+            if task_type == "scheduled" and demand_id is not None:
+                _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
+
         except Exception as e:
             stats["failed_tasks"] += 1
             duration = (datetime.now() - start_time).total_seconds()
@@ -145,42 +169,59 @@ async def execute_task(
                 _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
 
 
-async def scheduled_task():
-    """
-    定时任务:每天上午 6 点执行一次(生成当日池子并跑完)
+def _today_dt_int() -> int:
+    """当天 demand_content.dt 约定为 YYYYMMDD 整数(如 20260402),与定时器时区一致。"""
+    return int(datetime.now(ZoneInfo(SCHEDULER_TIMEZONE)).strftime("%Y%m%d"))
 
-    流程:
-    1. 从 demand_content 中按 merge_leve2 品类去重分组,每个品类取 score 最高且未处理过的 5 条
-    2. 全局最多取 20 条,作为当天“池子”
-    3. 为池子中每条 demand_content 创建 demand_find_task 记录,并执行任务(并发受 MAX_CONCURRENT_TASKS 限制)
+
+async def scheduled_tick():
+    """
+    每 10 分钟执行一次:若当前无任务占用并发槽,则从 demand_content 取当天(dt=今日)
+    且尚未出现在 demand_find_task 中的 1 条需求并执行。
     """
-    logger.info("定时任务触发")
+    logger.info("定时任务触发(scheduled_tick)")
 
-    pool = get_daily_unprocessed_pool(total_limit=20, per_category_limit=5)
-    if not pool:
-        logger.info("定时任务跳过:无待处理需求(当日池子为空)")
+    if task_semaphore._value != MAX_CONCURRENT_TASKS:
+        logger.info("定时任务跳过:仍有任务在执行(并发槽已满)")
         return
 
-    logger.info(f"当日任务池生成:count={len(pool)}(每日上限 20,每品类上限 5)")
-
-    tasks: list[asyncio.Task] = []
-    for item in pool:
-        query = (item.get("query") or "").strip()
-        demand_content_id = item.get("demand_content_id")
-        if not query or demand_content_id is None:
-            continue
-        create_task_record(demand_content_id)  # trace_id 初始为空,完成后更新
-        tasks.append(
-            asyncio.create_task(
-                execute_task(query=query, demand_id=demand_content_id, task_type="scheduled")
-            )
-        )
+    dt = _today_dt_int()
+    item = get_one_today_unprocessed_demand(dt=dt)
+    if not item:
+        logger.info(f"定时任务跳过:无待处理需求(dt={dt} 或均已建任务)")
+        return
 
-    if not tasks:
-        logger.info("定时任务跳过:当日池子中无有效 query")
+    demand_content_id = item.get("demand_content_id")
+    query = (item.get("query") or "").strip()
+    if demand_content_id is None or not query:
+        logger.info("定时任务跳过:查询结果无效")
         return
 
-    await asyncio.gather(*tasks, return_exceptions=True)
+    logger.info(f"定时任务领取:demand_content_id={demand_content_id}, dt={dt}")
+    create_task_record(demand_content_id)
+    await execute_task(query=query, demand_id=demand_content_id, task_type="scheduled")
+
+
+async def run_startup_resume():
+    """
+    启动后先执行 demand_find_task 中 status=执行中(1) 的任务(理论上仅一条)。
+    """
+    try:
+        row = get_first_running_task()
+        if not row:
+            logger.info("启动恢复:无执行中(status=1)的 demand_find_task")
+            return
+
+        demand_content_id = row.get("demand_content_id")
+        query = (row.get("query") or "").strip()
+        if demand_content_id is None or not query:
+            logger.warning("启动恢复:执行中任务数据不完整,跳过")
+            return
+
+        logger.info(f"启动恢复:执行 demand_find_task status=1, demand_content_id={demand_content_id}")
+        await execute_task(query=query, demand_id=int(demand_content_id), task_type="scheduled")
+    except Exception as e:
+        logger.error(f"启动恢复失败: {e}", exc_info=True)
 
 
 # ============ API 接口 ============
@@ -352,21 +393,23 @@ async def startup():
     logger.info("内容寻找服务启动中...")
     logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
     logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
+    logger.info(
+        f"定时策略:每 {SCHEDULE_INTERVAL_MINUTES} 分钟轮询当天需求;"
+        f"单次任务超时 {TASK_TIMEOUT_SECONDS}s"
+    )
+
+    asyncio.create_task(run_startup_resume())
 
-    # 配置定时任务:每天上午 6 点触发一次
     job = scheduler.add_job(
-        scheduled_task,
-        "cron",
-        hour="6",
-        minute="0",
-        second="0",
+        scheduled_tick,
+        "interval",
+        minutes=SCHEDULE_INTERVAL_MINUTES,
         misfire_grace_time=300,
         coalesce=True,
+        max_instances=1,
     )
     scheduler.start()
     logger.info(f"定时任务已注册: id={job.id}, next_run_time={job.next_run_time}")
-    # asyncio.create_task(scheduled_task())
-    # logger.info("定时任务已启动:启动后立即执行一次,之后每 10 分钟执行(从数据库获取待处理需求)")
 
     logger.info("服务启动完成")
     logger.info("=" * 60)