activity_scheduler.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #!/usr/bin/env python3
  2. """
  3. 活跃度计算任务调度器
  4. 使用 APScheduler 实现定时任务调度
  5. """
  6. import asyncio
  7. import argparse
  8. import logging
  9. import sys
  10. from datetime import datetime
  11. from pathlib import Path
  12. import os
  13. # 添加项目根目录到路径
  14. sys.path.insert(0, str(Path(__file__).parent.parent))
  15. from apscheduler.schedulers.blocking import BlockingScheduler
  16. from apscheduler.triggers.date import DateTrigger
  17. from apscheduler.triggers.cron import CronTrigger
  18. from scripts.activity_calculator import ActivityCalculator
  19. async def run_full_update():
  20. """执行全量更新"""
  21. print(f"[{datetime.now()}] 开始执行全量更新...")
  22. calculator = ActivityCalculator(update_mode="full")
  23. try:
  24. await calculator.initialize()
  25. result = await calculator.calculate_and_update()
  26. print(f"[{datetime.now()}] 全量更新完成,处理了 {result} 个用户")
  27. return result
  28. except Exception as e:
  29. print(f"[{datetime.now()}] 全量更新失败: {e}")
  30. raise
  31. finally:
  32. await calculator.close()
  33. async def run_incremental_update():
  34. """执行增量更新"""
  35. print(f"[{datetime.now()}] 开始执行增量更新...")
  36. calculator = ActivityCalculator(update_mode="incremental")
  37. try:
  38. await calculator.initialize()
  39. result = await calculator.calculate_and_update()
  40. print(f"[{datetime.now()}] 增量更新完成,处理了 {result} 个用户")
  41. return result
  42. except Exception as e:
  43. print(f"[{datetime.now()}] 增量更新失败: {e}")
  44. raise
  45. finally:
  46. await calculator.close()
  47. def setup_logging():
  48. """设置日志"""
  49. logging.basicConfig(
  50. level=logging.INFO,
  51. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  52. )
  53. def main():
  54. parser = argparse.ArgumentParser(description='活跃度计算任务调度器')
  55. parser.add_argument('task', choices=['full', 'incremental', 'scheduler'],
  56. help='任务类型: full(全量更新), incremental(增量更新), scheduler(启动调度器)')
  57. parser.add_argument('--run-once', action='store_true',
  58. help='只执行一次(用于测试)')
  59. args = parser.parse_args()
  60. setup_logging()
  61. if args.task == 'full':
  62. print("=" * 50)
  63. print(f"执行全量更新任务 - {datetime.now()}")
  64. print("=" * 50)
  65. result = asyncio.run(run_full_update())
  66. print(f"全量更新结果: {result} 个用户被处理")
  67. elif args.task == 'incremental':
  68. print("=" * 50)
  69. print(f"执行增量更新任务 - {datetime.now()}")
  70. print("=" * 50)
  71. result = asyncio.run(run_incremental_update())
  72. print(f"增量更新结果: {result} 个用户被处理")
  73. elif args.task == 'scheduler':
  74. print("=" * 50)
  75. print(f"启动调度器 - {datetime.now()}")
  76. print("=" * 50)
  77. scheduler = BlockingScheduler()
  78. # 添加一次性全量更新任务(可选,根据需要启用)
  79. # scheduler.add_job(
  80. # func=lambda: asyncio.run(run_full_update()),
  81. # trigger=DateTrigger(run_date=datetime.now()), # 立即执行或指定时间
  82. # id='full_update_once',
  83. # name='一次性全量更新',
  84. # misfire_grace_time=3600 # 1小时宽限时间
  85. # )
  86. # 添加每日凌晨0点的增量更新任务
  87. scheduler.add_job(
  88. func=lambda: asyncio.run(run_incremental_update()),
  89. trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行
  90. id='incremental_update_daily',
  91. name='每日增量更新',
  92. misfire_grace_time=3600 # 1小时宽限时间
  93. )
  94. print("调度器已启动,按 Ctrl+C 停止...")
  95. try:
  96. scheduler.start()
  97. except KeyboardInterrupt:
  98. print("\n调度器已停止")
  99. scheduler.shutdown()
  100. if __name__ == '__main__':
  101. main()