| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- #!/usr/bin/env python3
- """
- 执行一次性全量更新的脚本
- 用于初始化数据或定期校准
- """
- import asyncio
- import sys
- from pathlib import Path
- # 添加项目根目录到路径
- sys.path.insert(0, str(Path(__file__).parent.parent))
- from scripts.activity_calculator import ActivityCalculator
- from core.utils.log.logger_manager import LoggerManager
- async def run_full_update():
- """执行全量更新"""
- logger = LoggerManager.get_logger()
- logger.info("开始执行全量更新...")
-
- calculator = ActivityCalculator(update_mode="full")
- try:
- await calculator.initialize()
- result = await calculator.calculate_and_update()
- logger.info(f"全量更新完成,处理了 {result} 个用户")
- return result
- except Exception as e:
- logger.error(f"全量更新失败: {e}")
- raise
- finally:
- await calculator.close()
- def main():
- import logging
- logging.basicConfig(level=logging.INFO)
-
- print("=" * 50)
- print("执行一次性全量更新")
- print("=" * 50)
-
- result = asyncio.run(run_full_update())
- print(f"全量更新结果: {result} 个用户被处理")
- print("全量更新任务完成!")
- if __name__ == '__main__':
- main()
|