ソースを参照

增加定时任务自动执行

xueyiming 5 日 前
コミット
d9fd8ebe5e
4 ファイル変更176 行追加3 行削除
  1. 1 1
      examples/demand/demand.md
  2. 7 1
      examples/demand/run.py
  3. 167 1
      examples/demand/web_api.py
  4. 1 0
      requirements.txt

+ 1 - 1
examples/demand/demand.md

@@ -107,5 +107,5 @@ $user$
 1. 共现查询的地点必须来自于高权重分类,不能直接从树上寻找分类
 2. 分类的共现组合,必须来自于`get_weight_score_topn`查询到的分类作为起点
 3. 最终结果的保留,必须要有权重分或者支持度进行支持
-4. 尽可能多的产生需求,尽量保证最终产生的需求数量不少于50个
+4. 尽可能多的产生需求,尽量保证最终产生的需求数量在50个左右
 

+ 7 - 1
examples/demand/run.py

@@ -259,6 +259,7 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
         log(f"[mysql] 需求 JSON 非数组,跳过写入:type={type(items)}")
         return 0
 
+    dt_value = datetime.now().strftime("%Y%m%d")
     score_map = _load_name_score_map(execution_id)
     rows: list[dict] = []
     for di in items:
@@ -274,14 +275,19 @@ def write_demand_items_to_mysql(execution_id: int, merge_level2: str) -> int:
             score = _avg_score_for_joined_name(name, score_map)
         reason = di.get("reason")
         desc_value = di.get("desc")
+        suggestion = desc_value
+        # 兼容旧字段:同时保留 ext_data(reason/desc)JSON,便于旧版消费逻辑迁移期继续使用。
         ext_data = {"reason": reason, "desc": desc_value}
 
         rows.append(
             {
                 "merge_leve2": _safe_truncate(merge_level2, 32),
                 "name": _safe_truncate(name, 64),
+                "reason": reason,
+                "suggestion": suggestion,
                 "score": float(score),
                 "ext_data": json.dumps(ext_data, ensure_ascii=False),
+                "dt": dt_value,
             }
         )
 
@@ -367,7 +373,7 @@ async def run_once(execution_id, merge_level2, task_id: Optional[int] = None) ->
             log(f"[cost] total_tokens={total_tokens}, total_cost=${total_cost:.6f}")
 
             # agent 执行完成后:把本地 result JSON 写入 MySQL 表 demand_content
-            # element_names -> name(逗号分隔);reason/desc -> ext_data JSON;merge_leve2 -> demand_content.merge_leve2
+            # element_names -> name(逗号分隔);reason -> demand_content.reason;desc -> demand_content.suggestion;dt -> demand_content.dt
             try:
                 write_demand_items_to_mysql(execution_id=execution_id, merge_level2=merge_level2)
             except Exception as e:

+ 167 - 1
examples/demand/web_api.py

@@ -3,9 +3,12 @@ demand Web API(异步任务:发起 -> 立即返回 task_id -> 另一个接
 """
 
 import asyncio
+import os
+import importlib
 import sys
+from datetime import datetime, timedelta
 from pathlib import Path
-from typing import Literal, Optional
+from typing import Any, Literal, Optional
 
 from fastapi import FastAPI, HTTPException
 from pydantic import BaseModel
@@ -20,12 +23,137 @@ from examples.demand.run import _create_demand_task, main as run_demand
 
 app = FastAPI(title="demand web api")
 
+# APScheduler:使用动态导入避免环境未安装时直接导入失败
+try:
+    _aps_asyncio_mod = importlib.import_module("apscheduler.schedulers.asyncio")
+    _aps_cron_mod = importlib.import_module("apscheduler.triggers.cron")
+    AsyncIOScheduler = getattr(_aps_asyncio_mod, "AsyncIOScheduler")
+    CronTrigger = getattr(_aps_cron_mod, "CronTrigger")
+except Exception:  # pragma: no cover
+    AsyncIOScheduler = None  # type: ignore[assignment]
+    CronTrigger = None  # type: ignore[assignment]
+
 
 class DemandStartRequest(BaseModel):
     cluster_name: str
     platform_type: Literal["piaoquan", "changwen"]
 
 
+# 定时任务配置:请按需修改/补齐
+# 说明:平台映射关系由 platform_type 决定;cluster_name 将用于匹配 demand_task.name
+DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST: list[dict] = []
+
+# 是否开启定时任务(可选,通过环境变量覆盖)
+DEMAND_SCHEDULER_ENABLED: bool = os.getenv("DEMAND_SCHEDULER_ENABLED", "1").strip() == "1"
+
+DEMAND_SCHEDULER_START_HOUR: int = 2
+
+
+def _get_today_time_window(now: datetime) -> tuple[datetime, datetime]:
+    """返回今天的 [start, end) 时间窗口(本地时区)。"""
+    start_of_today = datetime(year=now.year, month=now.month, day=now.day)
+    end_of_today = start_of_today + timedelta(days=1)
+    return start_of_today, end_of_today
+
+
+async def demand_start_sync(cluster_name: str, platform_type: Literal["piaoquan", "changwen"]) -> dict:
+    """
+    与 /demand/start 同一执行链路,但不创建后台任务:prepare -> create demand_task -> 串行 await run_demand。
+    """
+    # prepare 阶段是同步的(当前示例代码为 sync),这里保持同步串行语义
+    if platform_type == "piaoquan":
+        execution_id = piaoquan_prepare(cluster_name)
+    else:
+        execution_id = changwen_prepare(cluster_name)
+
+    if not execution_id:
+        raise HTTPException(status_code=400, detail="获取 execution_id 失败")
+
+    task_name = cluster_name[:32] if cluster_name else None
+    task_id = _create_demand_task(
+        execution_id=execution_id,
+        name=task_name,
+        platform=platform_type,
+    )
+    if not task_id:
+        raise HTTPException(status_code=500, detail="创建 demand_task 失败")
+
+    # run_once 内部 finally 会把 task 状态写回 MySQL
+    result = await run_demand(
+        cluster_name,
+        platform_type,
+        execution_id=execution_id,
+        task_id=task_id,
+    )
+    return {"ok": True, "message": "调用成功", "task_id": task_id, "execution_id": execution_id, "result": result}
+
+
+def _today_has_status_0_or_1(cluster_name: str, platform_type: str, now: datetime) -> bool:
+    """
+    查找 demand_task:
+    - 限制为今天(create_time)
+    - name 与 platform 精确匹配
+    - 若存在 status 为 0 或 1 的记录,则跳过
+    """
+    start_of_today, end_of_today = _get_today_time_window(now)
+    return mysql_db.exists(
+        "demand_task",
+        where=(
+            "name = %s "
+            "AND platform = %s "
+            "AND status IN (0, 1) "
+            "AND create_time >= %s "
+            "AND create_time < %s"
+        ),
+        where_params=(
+            str(cluster_name)[:32],
+            str(platform_type)[:32],
+            start_of_today,
+            end_of_today,
+        ),
+    )
+
+
+async def demand_scheduled_run_once() -> None:
+    """
+    任务批处理(串行):
+    遍历配置列表 -> 查当天 demand_task -> 匹配 cluster_name/name & platform_type/platform
+    若存在 status=0 或 1 的记录则跳过;否则执行一次 demand_start_sync。
+    """
+    if not DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST:
+        return
+
+    now = datetime.now()
+    for item in DEMAND_SCHEDULE_CLUSTER_PLATFORM_LIST:
+        cluster_name = item.get("cluster_name")
+        platform_type = item.get("platform_type")
+        if not cluster_name or platform_type not in ("piaoquan", "changwen"):
+            continue
+
+        if _today_has_status_0_or_1(cluster_name, platform_type, now=now):
+            print(f"[scheduler] skip: cluster={cluster_name}, platform={platform_type} (today has status 0/1)")
+            continue
+
+        print(f"[scheduler] run: cluster={cluster_name}, platform={platform_type}")
+        await demand_start_sync(cluster_name=cluster_name, platform_type=platform_type)  # 串行执行
+
+
+_demand_scheduler: Optional[Any] = None
+_demand_scheduler_lock = asyncio.Lock()
+
+
+async def _demand_scheduler_job() -> None:
+    """
+    定时任务 job:
+    - 串行执行(防止并发)
+    - 遍历配置 -> 今日过滤 demand_task -> 跳过/执行
+    """
+    if _demand_scheduler_lock.locked():
+        return
+    async with _demand_scheduler_lock:
+        await demand_scheduled_run_once()
+
+
 @app.post("/demand/start")
 async def demand_start(req: DemandStartRequest):
     # 注意:这里会同步计算 execution_id(prepare 阶段),随后 run_once 放到后台异步执行。
@@ -59,6 +187,44 @@ async def demand_start(req: DemandStartRequest):
     return {"ok": True, "message": "调用成功", "task_id": task_id, "execution_id": execution_id}
 
 
+@app.on_event("startup")
+async def _start_demand_scheduler() -> None:
+    """启动定时任务(cron 触发,2:00-24:00 每 30 分钟)。"""
+    global _demand_scheduler
+    if not DEMAND_SCHEDULER_ENABLED:
+        return
+    if _demand_scheduler is not None:
+        return
+
+    if AsyncIOScheduler is None or CronTrigger is None:
+        # 依赖未安装则跳过定时任务
+        print("[scheduler] apscheduler 未安装,跳过定时任务启动")
+        return
+
+    scheduler = AsyncIOScheduler()
+    # 02:00 - 23:30:每 30 分钟一次
+    scheduler.add_job(
+        func=_demand_scheduler_job,
+        trigger=CronTrigger(hour=f"{DEMAND_SCHEDULER_START_HOUR}-23", minute="0,30"),
+        id="demand_scheduler_job_main",
+        replace_existing=True,
+        max_instances=1,
+        coalesce=True,
+    )
+    # 24:00(即下一天 00:00):每天一次
+    scheduler.add_job(
+        func=_demand_scheduler_job,
+        trigger=CronTrigger(hour="0", minute="0"),
+        id="demand_scheduler_job_midnight",
+        replace_existing=True,
+        max_instances=1,
+        coalesce=True,
+    )
+
+    scheduler.start()
+    _demand_scheduler = scheduler
+
+
 @app.get("/demand/task/{task_id}/status")
 def demand_task_status(task_id: int, max_log_chars: int = 2000):
     row = mysql_db.select_one(

+ 1 - 0
requirements.txt

@@ -13,6 +13,7 @@ fastapi>=0.115.0
 uvicorn[standard]>=0.32.0
 websockets>=13.0
 pydantic
+apscheduler>=3.10.4
 
 # 飞书
 lark-oapi==1.5.3