import traceback from app.core.database import DatabaseManager from app.core.observability import LogService from ._const import I2IRecommendDataSyncConst from ._mapper import I2IRecommendDataSyncMapper from ._utils import I2IRecommendDataSyncUtil class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.mapper = I2IRecommendDataSyncMapper(pool) self.tool = I2IRecommendDataSyncUtil() self.log_service = log_service async def _sync_single_account(self, gh_id: str, version: str): """单个账号的版本切换:先激活新版本,再关闭旧版本""" await self.mapper.activate_new_version(gh_id=gh_id, version=version) await self.mapper.deactivate_old_versions(gh_id=gh_id, latest_version=version) async def deal(self): """任务入口""" failed_accounts = [] try: # 1. 查询有待激活数据的账号 accounts = await self.mapper.fetch_all_accounts() if not accounts: # 上游数据未更新,无待激活账号 await self.tool.bot( title="i2i推荐数据同步-上游数据未更新", detail={ "message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常" }, mention=True, ) return # 2. 逐账号判断并切换 sync_count = 0 for account in accounts: gh_id = account["gh_id"] try: version_info = await self.mapper.fetch_max_version_for_account( gh_id ) if not version_info: continue # 最大version的status=0才需要切换 if version_info["status"] != self.VersionStatus.INIT: continue sync_count += 1 await self._sync_single_account( gh_id=gh_id, version=version_info["max_version"] ) except Exception as e: failed_accounts.append(gh_id) await self.log_service.log( contents={ "task": "i2i_recommend_data_sync", "status": "fail", "gh_id": gh_id, "error": str(e), "traceback": traceback.format_exc(), } ) # 3. 汇总结果 await self.log_service.log( contents={ "task": "i2i_recommend_data_sync", "status": "success", "total": sync_count, "failed": len(failed_accounts), "failed_accounts": failed_accounts, } ) if failed_accounts: await self.tool.bot( title="i2i推荐数据同步-部分账号失败", detail={ "total": sync_count, "failed": len(failed_accounts), "failed_accounts": failed_accounts[:20], }, mention=True, ) else: await self.tool.bot( title="i2i推荐数据同步成功", detail={"total": sync_count}, mention=False, ) except Exception as e: await self.log_service.log( contents={ "task": "i2i_recommend_data_sync", "status": "fail", "message": "deal 入口异常", "error": str(e), "traceback": traceback.format_exc(), } ) await self.tool.bot( title="i2i推荐数据同步-入口异常", detail={ "error": str(e), "traceback": traceback.format_exc(), }, mention=True, ) raise __all__ = ["I2IRecommendDataSyncTask"]