server.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509
  1. """
  2. 内容寻找服务
  3. 提供:
  4. 1. API 接口:POST /api/tasks - 触发内容寻找任务
  5. 2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后按间隔轮询;
  6. 若当前无任务在执行,则从 demand_content 取当天(dt=YYYYMMDD)、未建任务记录且 score 最高的一条执行(不区分品类)。
  7. 本文件常量 SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:仅在该本地时刻(SCHEDULER_TIMEZONE)及之后才派发;与按 UTC 计日的上游日限额对齐时常用 8(北京 08:00 = UTC 换日)。
  8. 3. 并发控制:限制最大并发任务数;定时侧若已有任务在执行则跳过本次轮询
  9. 4. 单次寻找任务最长执行 25 分钟,超时记为失败并回写 demand_find_task
  10. """
  11. import asyncio
  12. import logging
  13. import os
  14. import uuid
  15. from datetime import datetime
  16. from pathlib import Path
  17. from typing import Optional
  18. import sys
  19. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  20. from fastapi import FastAPI, HTTPException
  21. from pydantic import BaseModel
  22. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  23. from zoneinfo import ZoneInfo
  24. from dotenv import load_dotenv
  25. load_dotenv()
  26. import core
  27. from db import (
  28. create_task_record,
  29. get_first_running_task,
  30. get_latest_demand_task_oprate_is_open,
  31. get_one_today_unprocessed_demand,
  32. update_task_status,
  33. update_task_on_complete,
  34. )
  35. from db.schedule import STATUS_RUNNING, STATUS_SUCCESS, STATUS_FAILED
  36. # 配置日志
  37. log_dir = Path(__file__).parent / '.cache'
  38. log_dir.mkdir(exist_ok=True)
  39. logging.basicConfig(
  40. level=logging.INFO,
  41. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  42. handlers=[
  43. logging.FileHandler(log_dir / 'server.log'),
  44. logging.StreamHandler()
  45. ]
  46. )
  47. logger = logging.getLogger(__name__)
  48. # FastAPI 应用
  49. app = FastAPI(
  50. title="内容寻找服务",
  51. version="1.0.0",
  52. description="抖音内容寻找 Agent 服务"
  53. )
  54. # 定时调度器(默认用中国时区,避免容器 UTC 导致错过预期时间点)
  55. SCHEDULER_TIMEZONE = os.getenv("SCHEDULER_TIMEZONE", os.getenv("TZ", "Asia/Shanghai"))
  56. SCHEDULER_TZ = ZoneInfo(SCHEDULER_TIMEZONE)
  57. scheduler = AsyncIOScheduler(timezone=SCHEDULER_TZ)
  58. # 并发控制
  59. MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
  60. task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
  61. # 定时:派发间隔(秒)、单次任务超时(秒,默认 15 分钟)
  62. # - 为避免启动时同时派发多个任务导致潜在重复处理,默认每 30s 只派发 1 条;
  63. # 通过持续派发逐步填满并发槽,直到达到 MAX_CONCURRENT_TASKS。
  64. SCHEDULE_DISPATCH_INTERVAL_SECONDS = int(os.getenv("SCHEDULE_DISPATCH_INTERVAL_SECONDS", "30"))
  65. TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1500"))
  66. # 定时派发最早本地整点(0-23,含该小时起至当日结束;SCHEDULER_TIMEZONE)。None=不限制。
  67. # 上游按 UTC 计日(如 OpenRouter 日限额)时与 UTC 换日对齐可改为 8。
  68. SCHEDULE_DISPATCH_NOT_BEFORE_HOUR: Optional[int] = 8
  69. # 统计信息
  70. stats = {
  71. "total_tasks": 0,
  72. "completed_tasks": 0,
  73. "failed_tasks": 0,
  74. "scheduled_tasks": 0
  75. }
  76. # ============ 数据模型 ============
  77. class TaskRequest(BaseModel):
  78. query: Optional[str] = None
  79. demand_id: Optional[int] = None
  80. suggestion: Optional[str] = None
  81. class TaskResponse(BaseModel):
  82. trace_id: str
  83. status: str
  84. query: str
  85. message: str
  86. # ============ 核心函数 ============
  87. def _update_scheduled_task_complete(demand_id: int, trace_id: str, status: int) -> None:
  88. """定时任务完成时更新 trace_id 和 status,静默处理异常"""
  89. try:
  90. update_task_on_complete(demand_id, trace_id, status)
  91. except Exception as e:
  92. logger.warning(f"更新任务状态失败: {e}")
  93. async def execute_task(
  94. query: str,
  95. demand_id: Optional[int] = None,
  96. suggestion: str = "",
  97. task_type: str = "api",
  98. ):
  99. """
  100. 执行任务(带并发控制)
  101. Args:
  102. query: 查询内容
  103. demand_id: 需求 id(demand_content.id,关联 demand_content 表)
  104. suggestion: 补充信息(定时任务与 demand_content.suggestion 一致)
  105. task_type: 任务类型("api" 或 "scheduled")
  106. """
  107. async with task_semaphore:
  108. current_concurrent = MAX_CONCURRENT_TASKS - task_semaphore._value + 1
  109. logger.info(f"任务开始 [{task_type}]: query={query[:50]}..., 当前并发={current_concurrent}/{MAX_CONCURRENT_TASKS}")
  110. start_time = datetime.now(SCHEDULER_TZ)
  111. stats["total_tasks"] += 1
  112. if task_type == "scheduled":
  113. stats["scheduled_tasks"] += 1
  114. if task_type == "scheduled" and demand_id is not None:
  115. try:
  116. update_task_status("", demand_id, STATUS_RUNNING)
  117. except Exception as e:
  118. logger.warning(f"更新任务状态为执行中失败: {e}")
  119. try:
  120. result = await asyncio.wait_for(
  121. core.run_agent(
  122. query,
  123. demand_id=demand_id,
  124. suggestion=suggestion or None,
  125. stream_output=False,
  126. log_assistant_text=True,
  127. ),
  128. timeout=float(TASK_TIMEOUT_SECONDS),
  129. )
  130. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  131. if result["status"] == "completed":
  132. stats["completed_tasks"] += 1
  133. logger.info(f"任务完成 [{task_type}]: trace_id={result['trace_id']}, 耗时={duration:.1f}s")
  134. if task_type == "scheduled" and demand_id is not None:
  135. _update_scheduled_task_complete(demand_id, result["trace_id"], STATUS_SUCCESS)
  136. else:
  137. stats["failed_tasks"] += 1
  138. logger.error(f"任务失败 [{task_type}]: trace_id={result.get('trace_id')}, 错误={result.get('error')}, 耗时={duration:.1f}s")
  139. if task_type == "scheduled" and demand_id is not None:
  140. _update_scheduled_task_complete(demand_id, result.get("trace_id") or "", STATUS_FAILED)
  141. except asyncio.TimeoutError:
  142. stats["failed_tasks"] += 1
  143. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  144. logger.error(
  145. f"任务超时 [{task_type}]: 超过 {TASK_TIMEOUT_SECONDS}s,记为失败, 耗时={duration:.1f}s"
  146. )
  147. if task_type == "scheduled" and demand_id is not None:
  148. _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
  149. except Exception as e:
  150. stats["failed_tasks"] += 1
  151. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  152. logger.error(f"任务异常 [{task_type}]: {e}, 耗时={duration:.1f}s", exc_info=True)
  153. if task_type == "scheduled" and demand_id is not None:
  154. _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
  155. def _today_dt_int() -> int:
  156. """当天 demand_content.dt 约定为 YYYYMMDD 整数(如 20260402),与定时器时区一致。"""
  157. return int(datetime.now(SCHEDULER_TZ).strftime("%Y%m%d"))
  158. def _has_running_content_task() -> bool:
  159. """
  160. 本进程内是否有内容寻找任务正在执行(占用并发槽)。
  161. 与 execute_task 共用 task_semaphore,含 API 触发与定时触发。
  162. """
  163. return task_semaphore._value != MAX_CONCURRENT_TASKS
  164. async def scheduled_tick():
  165. """
  166. 按 SCHEDULE_DISPATCH_INTERVAL_SECONDS 派发:若当前并发有空槽,则从 demand_content 取
  167. 当天(dt=今日)、尚未出现在 demand_find_task 中且 score 最高的一条需求并执行。
  168. """
  169. if SCHEDULE_DISPATCH_NOT_BEFORE_HOUR is not None:
  170. now = datetime.now(SCHEDULER_TZ)
  171. if now.hour < SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:
  172. logger.info(
  173. "定时任务跳过:未到本地派发窗口(需 %02d:00 %s 及之后,当前 %s)",
  174. SCHEDULE_DISPATCH_NOT_BEFORE_HOUR,
  175. SCHEDULER_TIMEZONE,
  176. now.strftime("%H:%M"),
  177. )
  178. return
  179. logger.info("定时任务触发(scheduled_tick)")
  180. # demand_task_oprate:最新一条 is_open=0 时关闭定时派发;无记录时默认继续(兼容未配置)
  181. is_open = get_latest_demand_task_oprate_is_open()
  182. if is_open == 0:
  183. logger.info("定时任务跳过:demand_task_oprate 最新记录 is_open=0")
  184. return
  185. # 无空闲并发槽则不派发;保持 tick 很快返回,避免阻塞调度器。
  186. if task_semaphore._value <= 0:
  187. logger.info("定时任务跳过:无空闲并发槽")
  188. return
  189. dt = _today_dt_int()
  190. item = get_one_today_unprocessed_demand(dt=dt)
  191. if not item:
  192. logger.info(f"定时任务跳过:无待处理需求(dt={dt} 或均已建任务)")
  193. return
  194. demand_content_id = item.get("demand_content_id")
  195. query = (item.get("query") or "").strip()
  196. suggestion = (item.get("suggestion") or "").strip()
  197. if demand_content_id is None or not query:
  198. logger.info("定时任务跳过:查询结果无效")
  199. return
  200. score = item.get("score")
  201. logger.info(
  202. f"定时任务领取(当天 score 最高):demand_content_id={demand_content_id}, "
  203. f"dt={dt}, score={score}"
  204. )
  205. create_task_record(demand_content_id)
  206. # 后台执行:由 execute_task 内部 semaphore 控制并发占用
  207. asyncio.create_task(
  208. execute_task(
  209. query=query,
  210. demand_id=demand_content_id,
  211. suggestion=suggestion,
  212. task_type="scheduled",
  213. )
  214. )
  215. async def run_startup_resume():
  216. """
  217. 启动后先执行 demand_find_task 中 status=执行中(1) 的任务(理论上仅一条)。
  218. """
  219. try:
  220. row = get_first_running_task()
  221. if not row:
  222. logger.info("启动恢复:无执行中(status=1)的 demand_find_task")
  223. return
  224. demand_content_id = row.get("demand_content_id")
  225. query = (row.get("query") or "").strip()
  226. suggestion = (row.get("suggestion") or "").strip()
  227. if demand_content_id is None or not query:
  228. logger.warning("启动恢复:执行中任务数据不完整,跳过")
  229. return
  230. logger.info(f"启动恢复:执行 demand_find_task status=1, demand_content_id={demand_content_id}")
  231. await execute_task(
  232. query=query,
  233. demand_id=int(demand_content_id),
  234. suggestion=suggestion,
  235. task_type="scheduled",
  236. )
  237. except Exception as e:
  238. logger.error(f"启动恢复失败: {e}", exc_info=True)
  239. # ============ API 接口 ============
  240. @app.post("/api/tasks", response_model=TaskResponse)
  241. async def create_task(request: TaskRequest):
  242. """
  243. 创建内容寻找任务
  244. Args:
  245. request.query: 查询内容(可选,不传则使用默认值)
  246. Returns:
  247. {
  248. "trace_id": "20260317_103046_xyz789",
  249. "status": "started",
  250. "query": "...",
  251. "message": "任务已启动,结果将保存到 .cache/traces/xxx/"
  252. }
  253. """
  254. # 获取 query、demand_id、suggestion(API 显式传入;与库表字段同名便于对齐)
  255. query = request.query or core.DEFAULT_QUERY
  256. demand_id = request.demand_id
  257. suggestion_str = (request.suggestion or "").strip()
  258. # 用 Event 等待 trace_id
  259. trace_id_ready = asyncio.Event()
  260. trace_id_holder = {"id": None}
  261. async def run_and_capture():
  262. try:
  263. # 获取第一个 Trace 对象来获取 trace_id
  264. from agent import Trace
  265. async with task_semaphore:
  266. # 重新构建 runner 来获取 trace_id
  267. from agent import AgentRunner, RunConfig, FileSystemTraceStore
  268. from agent.llm import create_openrouter_llm_call
  269. from agent.llm.prompts import SimplePrompt
  270. from agent.tools.builtin.knowledge import KnowledgeConfig
  271. prompt_path = Path(__file__).parent / "content_finder.md"
  272. prompt = SimplePrompt(prompt_path)
  273. trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
  274. demand_id_str = str(demand_id) if demand_id is not None else ""
  275. messages = prompt.build_messages(
  276. query=query,
  277. suggestion=suggestion_str,
  278. trace_dir=trace_dir,
  279. demand_id=demand_id_str,
  280. )
  281. api_key = os.getenv("OPEN_ROUTER_API_KEY")
  282. model_name = prompt.config.get("model", "sonnet-4.6")
  283. model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
  284. temperature = float(prompt.config.get("temperature", 0.3))
  285. max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
  286. trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
  287. skills_dir = str(Path(__file__).parent / "skills")
  288. Path(trace_dir).mkdir(parents=True, exist_ok=True)
  289. store = FileSystemTraceStore(base_path=trace_dir)
  290. allowed_tools = [
  291. "douyin_search",
  292. "douyin_search_tikhub",
  293. "douyin_user_videos",
  294. "get_content_fans_portrait",
  295. "get_account_fans_portrait",
  296. "batch_fetch_portraits",
  297. "store_results_mysql",
  298. "exec_summary",
  299. ]
  300. runner = AgentRunner(
  301. llm_call=create_openrouter_llm_call(model=model),
  302. trace_store=store,
  303. skills_dir=skills_dir,
  304. )
  305. config = RunConfig(
  306. name="内容寻找",
  307. model=model,
  308. temperature=temperature,
  309. max_iterations=max_iterations,
  310. tools=allowed_tools,
  311. extra_llm_params={"max_tokens": 8192},
  312. knowledge=KnowledgeConfig(
  313. enable_extraction=True,
  314. enable_completion_extraction=True,
  315. enable_injection=True,
  316. owner="content_finder_agent",
  317. default_tags={"project": "content_finder"},
  318. default_scopes=["com.piaoquantv.supply"],
  319. default_search_types=["tool", "usecase", "definition"],
  320. default_search_owner="content_finder_agent"
  321. )
  322. )
  323. async for item in runner.run(messages=messages, config=config):
  324. if isinstance(item, Trace):
  325. if not trace_id_holder["id"]:
  326. trace_id_holder["id"] = item.trace_id
  327. trace_id_ready.set()
  328. logger.info(f"任务启动 [api]: trace_id={item.trace_id}")
  329. if item.status == "completed":
  330. stats["completed_tasks"] += 1
  331. logger.info(f"任务完成 [api]: trace_id={item.trace_id}")
  332. break
  333. elif item.status == "failed":
  334. stats["failed_tasks"] += 1
  335. logger.error(f"任务失败 [api]: trace_id={item.trace_id}, 错误={item.error_message}")
  336. break
  337. except Exception as e:
  338. stats["failed_tasks"] += 1
  339. logger.error(f"任务异常 [api]: {e}", exc_info=True)
  340. if not trace_id_holder["id"]:
  341. trace_id_holder["id"] = f"error_{datetime.now(SCHEDULER_TZ).strftime('%Y%m%d_%H%M%S')}"
  342. trace_id_ready.set()
  343. # 启动后台任务
  344. stats["total_tasks"] += 1
  345. asyncio.create_task(run_and_capture())
  346. # 等待 trace_id(最多 5 秒)
  347. try:
  348. await asyncio.wait_for(trace_id_ready.wait(), timeout=5.0)
  349. except asyncio.TimeoutError:
  350. logger.error("获取 trace_id 超时")
  351. raise HTTPException(status_code=500, detail="任务启动超时")
  352. trace_id = trace_id_holder["id"]
  353. return TaskResponse(
  354. trace_id=trace_id,
  355. status="started",
  356. query=query,
  357. message=f"任务已启动,结果将保存到 .cache/traces/{trace_id}/"
  358. )
  359. @app.get("/health")
  360. async def health_check():
  361. """健康检查"""
  362. return {
  363. "status": "ok",
  364. "max_concurrent_tasks": MAX_CONCURRENT_TASKS,
  365. "current_tasks": MAX_CONCURRENT_TASKS - task_semaphore._value,
  366. "scheduler_running": scheduler.running,
  367. "stats": stats
  368. }
  369. @app.get("/")
  370. async def root():
  371. """根路径"""
  372. return {
  373. "service": "内容寻找服务",
  374. "version": "1.0.0",
  375. "endpoints": {
  376. "create_task": "POST /api/tasks",
  377. "health": "GET /health"
  378. }
  379. }
  380. # ============ 启动事件 ============
  381. @app.on_event("startup")
  382. async def startup():
  383. """服务启动时初始化"""
  384. logger.info("=" * 60)
  385. logger.info("内容寻找服务启动中...")
  386. logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
  387. logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
  388. window_desc = (
  389. f";本地派发不早于 {SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:02d}:00({SCHEDULER_TIMEZONE})"
  390. if SCHEDULE_DISPATCH_NOT_BEFORE_HOUR is not None
  391. else ""
  392. )
  393. logger.info(
  394. f"定时策略:每 {SCHEDULE_DISPATCH_INTERVAL_SECONDS} 秒尝试派发 1 条(有并发空槽才派发)"
  395. f"{window_desc};单次任务超时 {TASK_TIMEOUT_SECONDS}s"
  396. )
  397. asyncio.create_task(run_startup_resume())
  398. job = scheduler.add_job(
  399. scheduled_tick,
  400. "interval",
  401. seconds=SCHEDULE_DISPATCH_INTERVAL_SECONDS,
  402. misfire_grace_time=300,
  403. coalesce=True,
  404. max_instances=1,
  405. )
  406. scheduler.start()
  407. logger.info(f"定时任务已注册: id={job.id}, next_run_time={job.next_run_time}")
  408. logger.info("服务启动完成")
  409. logger.info("=" * 60)
  410. @app.on_event("shutdown")
  411. async def shutdown():
  412. """服务关闭时清理"""
  413. logger.info("服务关闭中...")
  414. if scheduler.running:
  415. scheduler.shutdown()
  416. logger.info("服务已关闭")
  417. # ============ 主函数 ============
  418. if __name__ == "__main__":
  419. import uvicorn
  420. port = int(os.getenv("PORT", "8080"))
  421. host = os.getenv("HOST", "0.0.0.0")
  422. logger.info(f"启动服务: http://{host}:{port}")
  423. uvicorn.run(app, host=host, port=port)