#!/usr/bin/env python3 """ 活跃度计算任务调度器 使用 APScheduler 实现定时任务调度 """ import asyncio import argparse import logging import sys from datetime import datetime from pathlib import Path import os # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.cron import CronTrigger from scripts.activity_calculator import ActivityCalculator async def run_full_update(): """执行全量更新""" print(f"[{datetime.now()}] 开始执行全量更新...") calculator = ActivityCalculator(update_mode="full") try: await calculator.initialize() result = await calculator.calculate_and_update() print(f"[{datetime.now()}] 全量更新完成,处理了 {result} 个用户") return result except Exception as e: print(f"[{datetime.now()}] 全量更新失败: {e}") raise finally: await calculator.close() async def run_incremental_update(): """执行增量更新""" print(f"[{datetime.now()}] 开始执行增量更新...") calculator = ActivityCalculator(update_mode="incremental") try: await calculator.initialize() result = await calculator.calculate_and_update() print(f"[{datetime.now()}] 增量更新完成,处理了 {result} 个用户") return result except Exception as e: print(f"[{datetime.now()}] 增量更新失败: {e}") raise finally: await calculator.close() def setup_logging(): """设置日志""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def main(): parser = argparse.ArgumentParser(description='活跃度计算任务调度器') parser.add_argument('task', choices=['full', 'incremental', 'scheduler'], help='任务类型: full(全量更新), incremental(增量更新), scheduler(启动调度器)') parser.add_argument('--run-once', action='store_true', help='只执行一次(用于测试)') args = parser.parse_args() setup_logging() if args.task == 'full': print("=" * 50) print(f"执行全量更新任务 - {datetime.now()}") print("=" * 50) result = asyncio.run(run_full_update()) print(f"全量更新结果: {result} 个用户被处理") elif args.task == 'incremental': print("=" * 50) print(f"执行增量更新任务 - {datetime.now()}") print("=" * 50) result = asyncio.run(run_incremental_update()) print(f"增量更新结果: {result} 个用户被处理") elif args.task == 'scheduler': print("=" * 50) print(f"启动调度器 - {datetime.now()}") print("=" * 50) scheduler = BlockingScheduler() # 添加一次性全量更新任务(可选,根据需要启用) # scheduler.add_job( # func=lambda: asyncio.run(run_full_update()), # trigger=DateTrigger(run_date=datetime.now()), # 立即执行或指定时间 # id='full_update_once', # name='一次性全量更新', # misfire_grace_time=3600 # 1小时宽限时间 # ) # 添加每日凌晨0点的增量更新任务 scheduler.add_job( func=lambda: asyncio.run(run_incremental_update()), trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行 id='incremental_update_daily', name='每日增量更新', misfire_grace_time=3600 # 1小时宽限时间 ) print("调度器已启动,按 Ctrl+C 停止...") try: scheduler.start() except KeyboardInterrupt: print("\n调度器已停止") scheduler.shutdown() if __name__ == '__main__': main()