""" 生产服务器 - FastAPI + APScheduler 参考 content_finder 的生产部署模式 """ import asyncio import logging import os import sys from datetime import datetime, timezone from pathlib import Path from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse # 添加项目根目录到 Python 路径 sys.path.insert(0, str(Path(__file__).parent.parent.parent)) # 导入执行引擎 from execute_once import main as execute_decision_pipeline logger = logging.getLogger(__name__) # ═══════════════════════════════════════════ # 全局调度器 # ═══════════════════════════════════════════ scheduler: AsyncIOScheduler = None # ═══════════════════════════════════════════ # 定时任务 # ═══════════════════════════════════════════ async def scheduled_decision_job(): """定时执行决策流程""" logger.info(f"[定时任务] 开始执行决策流程 - {datetime.now(timezone.utc)}") try: await execute_decision_pipeline() logger.info(f"[定时任务] 决策流程执行完成") except Exception as e: logger.error(f"[定时任务] 执行失败: {e}", exc_info=True) # ═══════════════════════════════════════════ # FastAPI 应用 # ═══════════════════════════════════════════ app = FastAPI( title="auto_put_ad_mini", description="广告智能调控服务", version="1.0.0", ) @app.on_event("startup") async def startup(): """服务启动时初始化""" global scheduler logger.info("=" * 60) logger.info("广告智能调控服务启动中...") logger.info("启动 APScheduler") scheduler = AsyncIOScheduler(timezone="UTC") # 从数据库读取定时调度配置(优先级:数据库 > 环境变量) cron_expression = "0 2 * * *" # 默认值 try: from db import get_system_config cron_expression = get_system_config("cron_schedule", default="0 2 * * *") logger.info(f"✅ 从数据库读取定时调度:{cron_expression}") except Exception as e: logger.warning(f"⚠️ 数据库读取失败,使用环境变量或默认值: {e}") cron_expression = os.getenv("CRON_SCHEDULE", "0 2 * * *") scheduler.add_job( scheduled_decision_job, trigger=CronTrigger.from_crontab(cron_expression, timezone="UTC"), id="decision_pipeline", name="广告决策流程", replace_existing=True, max_instances=1, # 不允许并发运行 ) # 可选:启动时立即执行一次(从数据库读取配置) run_on_startup = False try: from db import get_system_config run_on_startup = get_system_config("run_on_startup", default=False) except: run_on_startup = os.getenv("RUN_ON_STARTUP", "false").lower() == "true" if run_on_startup: logger.info("启动时立即执行一次决策流程") scheduler.add_job( scheduled_decision_job, id="startup_run", name="启动时执行", ) scheduler.start() logger.info(f"✅ 定时任务已配置:{cron_expression}") logger.info("服务启动完成") logger.info("=" * 60) @app.on_event("shutdown") async def shutdown(): """服务关闭时清理""" logger.info("服务关闭中...") if scheduler and scheduler.running: scheduler.shutdown() logger.info("服务已关闭") # ═══════════════════════════════════════════ # 请求日志中间件 # ═══════════════════════════════════════════ import time from fastapi import Request @app.middleware("http") async def log_requests(request: Request, call_next): """记录所有HTTP请求""" start_time = time.time() response = await call_next(request) duration = time.time() - start_time logger.info( f"{request.method} {request.url.path} " f"status={response.status_code} duration={duration:.3f}s" ) return response # ═══════════════════════════════════════════ # 健康检查端点 # ═══════════════════════════════════════════ @app.get("/health") async def health_check(): """健康检查(Kubernetes liveness/readiness probe)""" try: # 检查输出目录 outputs_dir = Path("/app/outputs") if not outputs_dir.exists(): raise HTTPException(status_code=500, detail="输出目录不存在") # 检查调度器状态 if scheduler is None or not scheduler.running: raise HTTPException(status_code=500, detail="调度器未运行") # 检查最近执行时间 reports_dir = outputs_dir / "reports" latest_report = None if reports_dir.exists(): recent_files = sorted(reports_dir.glob("llm_decisions_*.csv"), reverse=True) if recent_files: latest_report = recent_files[0].name return JSONResponse({ "status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat(), "scheduler_running": scheduler.running if scheduler else False, "latest_report": latest_report, "jobs": [ { "id": job.id, "name": job.name, "next_run": job.next_run_time.isoformat() if job.next_run_time else None, } for job in scheduler.get_jobs() ] if scheduler else [], }) except HTTPException: raise except Exception as e: logger.error(f"健康检查失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) # ═══════════════════════════════════════════ # 手动触发端点(可选) # ═══════════════════════════════════════════ @app.post("/trigger") async def manual_trigger(): """手动触发决策流程""" logger.info("收到手动触发请求") # 检查是否有任务正在运行 running_jobs = [job for job in scheduler.get_jobs() if job.id == "decision_pipeline"] if running_jobs and running_jobs[0].next_run_time: return JSONResponse({ "status": "scheduled", "message": "任务已在队列中", "next_run": running_jobs[0].next_run_time.isoformat(), }) # 添加一次性任务 scheduler.add_job( scheduled_decision_job, id=f"manual_trigger_{datetime.now().strftime('%Y%m%d_%H%M%S')}", name="手动触发", ) return JSONResponse({ "status": "triggered", "message": "任务已添加到队列", }) # ═══════════════════════════════════════════ # 启动服务 # ═══════════════════════════════════════════ if __name__ == "__main__": import uvicorn # 配置日志输出到 stdout(Kubernetes 自动收集) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)] ) port = int(os.getenv("PORT", 8080)) uvicorn.run( "server:app", host="0.0.0.0", port=port, log_level="info", )