| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import traceback
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from ._const import I2IRecommendDataSyncConst
- from ._mapper import I2IRecommendDataSyncMapper
- from ._utils import I2IRecommendDataSyncUtil
- class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.mapper = I2IRecommendDataSyncMapper(pool)
- self.tool = I2IRecommendDataSyncUtil()
- self.log_service = log_service
- async def _sync_single_account(self, gh_id: str, version: str):
- """单个账号的版本切换:先激活新版本,再关闭旧版本"""
- await self.mapper.activate_new_version(gh_id=gh_id, version=version)
- await self.mapper.deactivate_old_versions(gh_id=gh_id, latest_version=version)
- async def deal(self):
- """任务入口"""
- failed_accounts = []
- try:
- # 1. 查询有待激活数据的账号
- accounts = await self.mapper.fetch_all_accounts()
- if not accounts:
- # 上游数据未更新,无待激活账号
- await self.tool.bot(
- title="i2i推荐数据同步-上游数据未更新",
- detail={
- "message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"
- },
- mention=True,
- )
- return
- # 2. 逐账号判断并切换
- sync_count = 0
- for account in accounts:
- gh_id = account["gh_id"]
- try:
- version_info = await self.mapper.fetch_max_version_for_account(
- gh_id
- )
- if not version_info:
- continue
- # 最大version的status=0才需要切换
- if version_info["status"] != self.VersionStatus.INIT:
- continue
- sync_count += 1
- await self._sync_single_account(
- gh_id=gh_id, version=version_info["max_version"]
- )
- except Exception as e:
- failed_accounts.append(gh_id)
- await self.log_service.log(
- contents={
- "task": "i2i_recommend_data_sync",
- "status": "fail",
- "gh_id": gh_id,
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
- # 3. 汇总结果
- await self.log_service.log(
- contents={
- "task": "i2i_recommend_data_sync",
- "status": "success",
- "total": sync_count,
- "failed": len(failed_accounts),
- "failed_accounts": failed_accounts,
- }
- )
- if failed_accounts:
- await self.tool.bot(
- title="i2i推荐数据同步-部分账号失败",
- detail={
- "total": sync_count,
- "failed": len(failed_accounts),
- "failed_accounts": failed_accounts[:20],
- },
- mention=True,
- )
- else:
- await self.tool.bot(
- title="i2i推荐数据同步成功",
- detail={"total": sync_count},
- mention=False,
- )
- except Exception as e:
- await self.log_service.log(
- contents={
- "task": "i2i_recommend_data_sync",
- "status": "fail",
- "message": "deal 入口异常",
- "error": str(e),
- "traceback": traceback.format_exc(),
- }
- )
- await self.tool.bot(
- title="i2i推荐数据同步-入口异常",
- detail={
- "error": str(e),
- "traceback": traceback.format_exc(),
- },
- mention=True,
- )
- raise
- __all__ = ["I2IRecommendDataSyncTask"]
|