entrance.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import traceback
  2. from app.core.database import DatabaseManager
  3. from app.core.observability import LogService
  4. from ._const import I2IRecommendDataSyncConst
  5. from ._mapper import I2IRecommendDataSyncMapper
  6. from ._utils import I2IRecommendDataSyncUtil
  7. class I2IRecommendDataSyncTask(I2IRecommendDataSyncConst):
  8. def __init__(self, pool: DatabaseManager, log_service: LogService):
  9. self.mapper = I2IRecommendDataSyncMapper(pool)
  10. self.tool = I2IRecommendDataSyncUtil()
  11. self.log_service = log_service
  12. async def _sync_single_account(self, gh_id: str, version: str):
  13. """单个账号的版本切换:先激活新版本,再关闭旧版本"""
  14. await self.mapper.activate_new_version(gh_id=gh_id, version=version)
  15. await self.mapper.deactivate_old_versions(gh_id=gh_id, latest_version=version)
  16. async def deal(self):
  17. """任务入口"""
  18. failed_accounts = []
  19. try:
  20. # 1. 查询有待激活数据的账号
  21. accounts = await self.mapper.fetch_all_accounts()
  22. if not accounts:
  23. # 上游数据未更新,无待激活账号
  24. await self.tool.bot(
  25. title="i2i推荐数据同步-上游数据未更新",
  26. detail={
  27. "message": "未查询到status=0的待激活数据,请检查上游大数据任务是否正常"
  28. },
  29. mention=True,
  30. )
  31. return
  32. # 2. 逐账号判断并切换
  33. sync_count = 0
  34. for account in accounts:
  35. gh_id = account["gh_id"]
  36. try:
  37. version_info = await self.mapper.fetch_max_version_for_account(
  38. gh_id
  39. )
  40. if not version_info:
  41. continue
  42. # 最大version的status=0才需要切换
  43. if version_info["status"] != self.VersionStatus.INIT:
  44. continue
  45. sync_count += 1
  46. await self._sync_single_account(
  47. gh_id=gh_id, version=version_info["max_version"]
  48. )
  49. except Exception as e:
  50. failed_accounts.append(gh_id)
  51. await self.log_service.log(
  52. contents={
  53. "task": "i2i_recommend_data_sync",
  54. "status": "fail",
  55. "gh_id": gh_id,
  56. "error": str(e),
  57. "traceback": traceback.format_exc(),
  58. }
  59. )
  60. # 3. 汇总结果
  61. await self.log_service.log(
  62. contents={
  63. "task": "i2i_recommend_data_sync",
  64. "status": "success",
  65. "total": sync_count,
  66. "failed": len(failed_accounts),
  67. "failed_accounts": failed_accounts,
  68. }
  69. )
  70. if failed_accounts:
  71. await self.tool.bot(
  72. title="i2i推荐数据同步-部分账号失败",
  73. detail={
  74. "total": sync_count,
  75. "failed": len(failed_accounts),
  76. "failed_accounts": failed_accounts[:20],
  77. },
  78. mention=True,
  79. )
  80. else:
  81. await self.tool.bot(
  82. title="i2i推荐数据同步成功",
  83. detail={"total": sync_count},
  84. mention=False,
  85. )
  86. except Exception as e:
  87. await self.log_service.log(
  88. contents={
  89. "task": "i2i_recommend_data_sync",
  90. "status": "fail",
  91. "message": "deal 入口异常",
  92. "error": str(e),
  93. "traceback": traceback.format_exc(),
  94. }
  95. )
  96. await self.tool.bot(
  97. title="i2i推荐数据同步-入口异常",
  98. detail={
  99. "error": str(e),
  100. "traceback": traceback.format_exc(),
  101. },
  102. mention=True,
  103. )
  104. raise
  105. __all__ = ["I2IRecommendDataSyncTask"]