| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- """
- 生产服务器 - FastAPI + APScheduler
- 参考 content_finder 的生产部署模式
- """
- import asyncio
- import logging
- import os
- import sys
- from datetime import datetime, timezone
- from pathlib import Path
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- from fastapi import FastAPI, HTTPException
- from fastapi.responses import JSONResponse
- # 添加项目根目录到 Python 路径
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- # 导入执行引擎
- from execute_once import main as execute_decision_pipeline
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # 全局调度器
- # ═══════════════════════════════════════════
- scheduler: AsyncIOScheduler = None
- # ═══════════════════════════════════════════
- # 定时任务
- # ═══════════════════════════════════════════
- async def scheduled_decision_job():
- """定时执行决策流程"""
- logger.info(f"[定时任务] 开始执行决策流程 - {datetime.now(timezone.utc)}")
- try:
- await execute_decision_pipeline()
- logger.info(f"[定时任务] 决策流程执行完成")
- except Exception as e:
- logger.error(f"[定时任务] 执行失败: {e}", exc_info=True)
- # ═══════════════════════════════════════════
- # FastAPI 应用
- # ═══════════════════════════════════════════
- app = FastAPI(
- title="auto_put_ad_mini",
- description="广告智能调控服务",
- version="1.0.0",
- )
- @app.on_event("startup")
- async def startup():
- """服务启动时初始化"""
- global scheduler
- logger.info("=" * 60)
- logger.info("广告智能调控服务启动中...")
- logger.info("启动 APScheduler")
- scheduler = AsyncIOScheduler(timezone="UTC")
- # 从数据库读取定时调度配置(优先级:数据库 > 环境变量)
- cron_expression = "0 2 * * *" # 默认值
- try:
- from db import get_system_config
- cron_expression = get_system_config("cron_schedule", default="0 2 * * *")
- logger.info(f"✅ 从数据库读取定时调度:{cron_expression}")
- except Exception as e:
- logger.warning(f"⚠️ 数据库读取失败,使用环境变量或默认值: {e}")
- cron_expression = os.getenv("CRON_SCHEDULE", "0 2 * * *")
- scheduler.add_job(
- scheduled_decision_job,
- trigger=CronTrigger.from_crontab(cron_expression, timezone="UTC"),
- id="decision_pipeline",
- name="广告决策流程",
- replace_existing=True,
- max_instances=1, # 不允许并发运行
- )
- # 可选:启动时立即执行一次(从数据库读取配置)
- run_on_startup = False
- try:
- from db import get_system_config
- run_on_startup = get_system_config("run_on_startup", default=False)
- except:
- run_on_startup = os.getenv("RUN_ON_STARTUP", "false").lower() == "true"
- if run_on_startup:
- logger.info("启动时立即执行一次决策流程")
- scheduler.add_job(
- scheduled_decision_job,
- id="startup_run",
- name="启动时执行",
- )
- scheduler.start()
- logger.info(f"✅ 定时任务已配置:{cron_expression}")
- logger.info("服务启动完成")
- logger.info("=" * 60)
- @app.on_event("shutdown")
- async def shutdown():
- """服务关闭时清理"""
- logger.info("服务关闭中...")
- if scheduler and scheduler.running:
- scheduler.shutdown()
- logger.info("服务已关闭")
- # ═══════════════════════════════════════════
- # 请求日志中间件
- # ═══════════════════════════════════════════
- import time
- from fastapi import Request
- @app.middleware("http")
- async def log_requests(request: Request, call_next):
- """记录所有HTTP请求"""
- start_time = time.time()
- response = await call_next(request)
- duration = time.time() - start_time
- logger.info(
- f"{request.method} {request.url.path} "
- f"status={response.status_code} duration={duration:.3f}s"
- )
- return response
- # ═══════════════════════════════════════════
- # 健康检查端点
- # ═══════════════════════════════════════════
- @app.get("/health")
- async def health_check():
- """健康检查(Kubernetes liveness/readiness probe)"""
- try:
- # 检查输出目录
- outputs_dir = Path("/app/outputs")
- if not outputs_dir.exists():
- raise HTTPException(status_code=500, detail="输出目录不存在")
- # 检查调度器状态
- if scheduler is None or not scheduler.running:
- raise HTTPException(status_code=500, detail="调度器未运行")
- # 检查最近执行时间
- reports_dir = outputs_dir / "reports"
- latest_report = None
- if reports_dir.exists():
- recent_files = sorted(reports_dir.glob("llm_decisions_*.csv"), reverse=True)
- if recent_files:
- latest_report = recent_files[0].name
- return JSONResponse({
- "status": "healthy",
- "timestamp": datetime.now(timezone.utc).isoformat(),
- "scheduler_running": scheduler.running if scheduler else False,
- "latest_report": latest_report,
- "jobs": [
- {
- "id": job.id,
- "name": job.name,
- "next_run": job.next_run_time.isoformat() if job.next_run_time else None,
- }
- for job in scheduler.get_jobs()
- ] if scheduler else [],
- })
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"健康检查失败: {e}", exc_info=True)
- raise HTTPException(status_code=500, detail=str(e))
- # ═══════════════════════════════════════════
- # 手动触发端点(可选)
- # ═══════════════════════════════════════════
- @app.post("/trigger")
- async def manual_trigger():
- """手动触发决策流程"""
- logger.info("收到手动触发请求")
- # 检查是否有任务正在运行
- running_jobs = [job for job in scheduler.get_jobs() if job.id == "decision_pipeline"]
- if running_jobs and running_jobs[0].next_run_time:
- return JSONResponse({
- "status": "scheduled",
- "message": "任务已在队列中",
- "next_run": running_jobs[0].next_run_time.isoformat(),
- })
- # 添加一次性任务
- scheduler.add_job(
- scheduled_decision_job,
- id=f"manual_trigger_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
- name="手动触发",
- )
- return JSONResponse({
- "status": "triggered",
- "message": "任务已添加到队列",
- })
- # ═══════════════════════════════════════════
- # 启动服务
- # ═══════════════════════════════════════════
- if __name__ == "__main__":
- import uvicorn
- # 配置日志输出到 stdout(Kubernetes 自动收集)
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
- handlers=[logging.StreamHandler(sys.stdout)]
- )
- port = int(os.getenv("PORT", 8080))
- uvicorn.run(
- "server:app",
- host="0.0.0.0",
- port=port,
- log_level="info",
- )
|