_utils.py 793 B

123456789101112131415161718192021222324252627
  1. from typing import Dict
  2. from app.infra.external import feishu_robot
  3. from ._const import I2IRecommendDataSyncConst
  4. class I2IRecommendDataSyncUtil(I2IRecommendDataSyncConst):
  5. @staticmethod
  6. def filter_accounts_to_sync(account_version_list: list[dict]) -> list[dict]:
  7. """
  8. 筛选需要切换的账号:最大version的status为OFFLINE(0)
  9. """
  10. return [
  11. row
  12. for row in account_version_list
  13. if row["status"] == I2IRecommendDataSyncConst.VersionStatus.INIT
  14. ]
  15. # 飞书通知
  16. async def bot(self, title: str, detail: Dict, mention: bool = False):
  17. return await feishu_robot.bot(
  18. title=title,
  19. detail=detail,
  20. env=self.RANK_BOT,
  21. mention=mention,
  22. )