server.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. """
  2. 生产服务器 - FastAPI + APScheduler
  3. 参考 content_finder 的生产部署模式
  4. """
  5. import asyncio
  6. import logging
  7. import os
  8. import sys
  9. from datetime import datetime, timezone
  10. from pathlib import Path
  11. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  12. from apscheduler.triggers.cron import CronTrigger
  13. from fastapi import FastAPI, HTTPException
  14. from fastapi.responses import JSONResponse
  15. # 添加项目根目录到 Python 路径
  16. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  17. # 导入执行引擎
  18. from execute_once import main as execute_decision_pipeline
  19. logger = logging.getLogger(__name__)
  20. # ═══════════════════════════════════════════
  21. # 全局调度器
  22. # ═══════════════════════════════════════════
  23. scheduler: AsyncIOScheduler = None
  24. # ═══════════════════════════════════════════
  25. # 定时任务
  26. # ═══════════════════════════════════════════
  27. async def scheduled_decision_job():
  28. """定时执行决策流程"""
  29. logger.info(f"[定时任务] 开始执行决策流程 - {datetime.now(timezone.utc)}")
  30. try:
  31. await execute_decision_pipeline()
  32. logger.info(f"[定时任务] 决策流程执行完成")
  33. except Exception as e:
  34. logger.error(f"[定时任务] 执行失败: {e}", exc_info=True)
  35. # ═══════════════════════════════════════════
  36. # FastAPI 应用
  37. # ═══════════════════════════════════════════
  38. app = FastAPI(
  39. title="auto_put_ad_mini",
  40. description="广告智能调控服务",
  41. version="1.0.0",
  42. )
  43. @app.on_event("startup")
  44. async def startup():
  45. """服务启动时初始化"""
  46. global scheduler
  47. logger.info("=" * 60)
  48. logger.info("广告智能调控服务启动中...")
  49. logger.info("启动 APScheduler")
  50. scheduler = AsyncIOScheduler(timezone="UTC")
  51. # 从数据库读取定时调度配置(优先级:数据库 > 环境变量)
  52. cron_expression = "0 2 * * *" # 默认值
  53. try:
  54. from db import get_system_config
  55. cron_expression = get_system_config("cron_schedule", default="0 2 * * *")
  56. logger.info(f"✅ 从数据库读取定时调度:{cron_expression}")
  57. except Exception as e:
  58. logger.warning(f"⚠️ 数据库读取失败,使用环境变量或默认值: {e}")
  59. cron_expression = os.getenv("CRON_SCHEDULE", "0 2 * * *")
  60. scheduler.add_job(
  61. scheduled_decision_job,
  62. trigger=CronTrigger.from_crontab(cron_expression, timezone="UTC"),
  63. id="decision_pipeline",
  64. name="广告决策流程",
  65. replace_existing=True,
  66. max_instances=1, # 不允许并发运行
  67. )
  68. # 可选:启动时立即执行一次(从数据库读取配置)
  69. run_on_startup = False
  70. try:
  71. from db import get_system_config
  72. run_on_startup = get_system_config("run_on_startup", default=False)
  73. except:
  74. run_on_startup = os.getenv("RUN_ON_STARTUP", "false").lower() == "true"
  75. if run_on_startup:
  76. logger.info("启动时立即执行一次决策流程")
  77. scheduler.add_job(
  78. scheduled_decision_job,
  79. id="startup_run",
  80. name="启动时执行",
  81. )
  82. scheduler.start()
  83. logger.info(f"✅ 定时任务已配置:{cron_expression}")
  84. logger.info("服务启动完成")
  85. logger.info("=" * 60)
  86. @app.on_event("shutdown")
  87. async def shutdown():
  88. """服务关闭时清理"""
  89. logger.info("服务关闭中...")
  90. if scheduler and scheduler.running:
  91. scheduler.shutdown()
  92. logger.info("服务已关闭")
  93. # ═══════════════════════════════════════════
  94. # 请求日志中间件
  95. # ═══════════════════════════════════════════
  96. import time
  97. from fastapi import Request
  98. @app.middleware("http")
  99. async def log_requests(request: Request, call_next):
  100. """记录所有HTTP请求"""
  101. start_time = time.time()
  102. response = await call_next(request)
  103. duration = time.time() - start_time
  104. logger.info(
  105. f"{request.method} {request.url.path} "
  106. f"status={response.status_code} duration={duration:.3f}s"
  107. )
  108. return response
  109. # ═══════════════════════════════════════════
  110. # 健康检查端点
  111. # ═══════════════════════════════════════════
  112. @app.get("/health")
  113. async def health_check():
  114. """健康检查(Kubernetes liveness/readiness probe)"""
  115. try:
  116. # 检查输出目录
  117. outputs_dir = Path("/app/outputs")
  118. if not outputs_dir.exists():
  119. raise HTTPException(status_code=500, detail="输出目录不存在")
  120. # 检查调度器状态
  121. if scheduler is None or not scheduler.running:
  122. raise HTTPException(status_code=500, detail="调度器未运行")
  123. # 检查最近执行时间
  124. reports_dir = outputs_dir / "reports"
  125. latest_report = None
  126. if reports_dir.exists():
  127. recent_files = sorted(reports_dir.glob("llm_decisions_*.csv"), reverse=True)
  128. if recent_files:
  129. latest_report = recent_files[0].name
  130. return JSONResponse({
  131. "status": "healthy",
  132. "timestamp": datetime.now(timezone.utc).isoformat(),
  133. "scheduler_running": scheduler.running if scheduler else False,
  134. "latest_report": latest_report,
  135. "jobs": [
  136. {
  137. "id": job.id,
  138. "name": job.name,
  139. "next_run": job.next_run_time.isoformat() if job.next_run_time else None,
  140. }
  141. for job in scheduler.get_jobs()
  142. ] if scheduler else [],
  143. })
  144. except HTTPException:
  145. raise
  146. except Exception as e:
  147. logger.error(f"健康检查失败: {e}", exc_info=True)
  148. raise HTTPException(status_code=500, detail=str(e))
  149. # ═══════════════════════════════════════════
  150. # 手动触发端点(可选)
  151. # ═══════════════════════════════════════════
  152. @app.post("/trigger")
  153. async def manual_trigger():
  154. """手动触发决策流程"""
  155. logger.info("收到手动触发请求")
  156. # 检查是否有任务正在运行
  157. running_jobs = [job for job in scheduler.get_jobs() if job.id == "decision_pipeline"]
  158. if running_jobs and running_jobs[0].next_run_time:
  159. return JSONResponse({
  160. "status": "scheduled",
  161. "message": "任务已在队列中",
  162. "next_run": running_jobs[0].next_run_time.isoformat(),
  163. })
  164. # 添加一次性任务
  165. scheduler.add_job(
  166. scheduled_decision_job,
  167. id=f"manual_trigger_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
  168. name="手动触发",
  169. )
  170. return JSONResponse({
  171. "status": "triggered",
  172. "message": "任务已添加到队列",
  173. })
  174. # ═══════════════════════════════════════════
  175. # 启动服务
  176. # ═══════════════════════════════════════════
  177. if __name__ == "__main__":
  178. import uvicorn
  179. # 配置日志输出到 stdout(Kubernetes 自动收集)
  180. logging.basicConfig(
  181. level=logging.INFO,
  182. format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  183. handlers=[logging.StreamHandler(sys.stdout)]
  184. )
  185. port = int(os.getenv("PORT", 8080))
  186. uvicorn.run(
  187. "server:app",
  188. host="0.0.0.0",
  189. port=port,
  190. log_level="info",
  191. )