run_full_update.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. #!/usr/bin/env python3
  2. """
  3. 执行一次性全量更新的脚本
  4. 用于初始化数据或定期校准
  5. """
  6. import asyncio
  7. import sys
  8. from pathlib import Path
  9. # 添加项目根目录到路径
  10. sys.path.insert(0, str(Path(__file__).parent.parent))
  11. from scripts.activity_calculator import ActivityCalculator
  12. from core.utils.log.logger_manager import LoggerManager
  13. async def run_full_update():
  14. """执行全量更新"""
  15. logger = LoggerManager.get_logger()
  16. logger.info("开始执行全量更新...")
  17. calculator = ActivityCalculator(update_mode="full")
  18. try:
  19. await calculator.initialize()
  20. result = await calculator.calculate_and_update()
  21. logger.info(f"全量更新完成,处理了 {result} 个用户")
  22. return result
  23. except Exception as e:
  24. logger.error(f"全量更新失败: {e}")
  25. raise
  26. finally:
  27. await calculator.close()
  28. def main():
  29. import logging
  30. logging.basicConfig(level=logging.INFO)
  31. print("=" * 50)
  32. print("执行一次性全量更新")
  33. print("=" * 50)
  34. result = asyncio.run(run_full_update())
  35. print(f"全量更新结果: {result} 个用户被处理")
  36. print("全量更新任务完成!")
  37. if __name__ == '__main__':
  38. main()