#!/usr/bin/env python3 """ 执行一次性全量更新的脚本 用于初始化数据或定期校准 """ import asyncio import sys from pathlib import Path # 添加项目根目录到路径 sys.path.insert(0, str(Path(__file__).parent.parent)) from scripts.activity_calculator import ActivityCalculator from core.utils.log.logger_manager import LoggerManager async def run_full_update(): """执行全量更新""" logger = LoggerManager.get_logger() logger.info("开始执行全量更新...") calculator = ActivityCalculator(update_mode="full") try: await calculator.initialize() result = await calculator.calculate_and_update() logger.info(f"全量更新完成,处理了 {result} 个用户") return result except Exception as e: logger.error(f"全量更新失败: {e}") raise finally: await calculator.close() def main(): import logging logging.basicConfig(level=logging.INFO) print("=" * 50) print("执行一次性全量更新") print("=" * 50) result = asyncio.run(run_full_update()) print(f"全量更新结果: {result} 个用户被处理") print("全量更新任务完成!") if __name__ == '__main__': main()