| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- #!/usr/bin/env python3
- """
- 活跃度计算任务调度器
- 使用 APScheduler 实现定时任务调度
- """
- import asyncio
- import argparse
- import logging
- import sys
- from datetime import datetime
- from pathlib import Path
- import os
- # 添加项目根目录到路径
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.date import DateTrigger
- from apscheduler.triggers.cron import CronTrigger
- from scripts.activity_calculator import ActivityCalculator
- async def run_full_update():
- """执行全量更新"""
- print(f"[{datetime.now()}] 开始执行全量更新...")
- calculator = ActivityCalculator(update_mode="full")
- try:
- await calculator.initialize()
- result = await calculator.calculate_and_update()
- print(f"[{datetime.now()}] 全量更新完成,处理了 {result} 个用户")
- return result
- except Exception as e:
- print(f"[{datetime.now()}] 全量更新失败: {e}")
- raise
- finally:
- await calculator.close()
- async def run_incremental_update():
- """执行增量更新"""
- print(f"[{datetime.now()}] 开始执行增量更新...")
- calculator = ActivityCalculator(update_mode="incremental")
- try:
- await calculator.initialize()
- result = await calculator.calculate_and_update()
- print(f"[{datetime.now()}] 增量更新完成,处理了 {result} 个用户")
- return result
- except Exception as e:
- print(f"[{datetime.now()}] 增量更新失败: {e}")
- raise
- finally:
- await calculator.close()
- def setup_logging():
- """设置日志"""
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- def main():
- parser = argparse.ArgumentParser(description='活跃度计算任务调度器')
- parser.add_argument('task', choices=['full', 'incremental', 'scheduler'],
- help='任务类型: full(全量更新), incremental(增量更新), scheduler(启动调度器)')
- parser.add_argument('--run-once', action='store_true',
- help='只执行一次(用于测试)')
-
- args = parser.parse_args()
- setup_logging()
-
- if args.task == 'full':
- print("=" * 50)
- print(f"执行全量更新任务 - {datetime.now()}")
- print("=" * 50)
- result = asyncio.run(run_full_update())
- print(f"全量更新结果: {result} 个用户被处理")
-
- elif args.task == 'incremental':
- print("=" * 50)
- print(f"执行增量更新任务 - {datetime.now()}")
- print("=" * 50)
- result = asyncio.run(run_incremental_update())
- print(f"增量更新结果: {result} 个用户被处理")
-
- elif args.task == 'scheduler':
- print("=" * 50)
- print(f"启动调度器 - {datetime.now()}")
- print("=" * 50)
-
- scheduler = BlockingScheduler()
-
- # 添加一次性全量更新任务(可选,根据需要启用)
- # scheduler.add_job(
- # func=lambda: asyncio.run(run_full_update()),
- # trigger=DateTrigger(run_date=datetime.now()), # 立即执行或指定时间
- # id='full_update_once',
- # name='一次性全量更新',
- # misfire_grace_time=3600 # 1小时宽限时间
- # )
-
- # 添加每日凌晨0点的增量更新任务
- scheduler.add_job(
- func=lambda: asyncio.run(run_incremental_update()),
- trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行
- id='incremental_update_daily',
- name='每日增量更新',
- misfire_grace_time=3600 # 1小时宽限时间
- )
-
- print("调度器已启动,按 Ctrl+C 停止...")
- try:
- scheduler.start()
- except KeyboardInterrupt:
- print("\n调度器已停止")
- scheduler.shutdown()
- if __name__ == '__main__':
- main()
|