_utils.py 780 B

123456789101112131415161718192021222324252627
  1. from typing import Dict
  2. from app.infra.external import feishu_robot
  3. from ._const import I2IRecommendDataSyncConst
  4. class I2IRecommendDataSyncUtil(I2IRecommendDataSyncConst):
  5. @staticmethod
  6. def filter_accounts_to_sync(account_version_list: list[dict]) -> list[dict]:
  7. """
  8. 筛选需要切换的账号:最大version的status为OFFLINE(0)
  9. """
  10. return [
  11. row for row in account_version_list
  12. if row["status"] == I2IRecommendDataSyncConst.VersionStatus.INIT
  13. ]
  14. # 飞书通知
  15. async def bot(self, title: str, detail: Dict, mention: bool=False):
  16. return await feishu_robot.bot(
  17. title=title,
  18. detail=detail,
  19. env=self.RANK_BOT,
  20. mention=mention,
  21. )