kimi_balance.py 1.1 KB

123456789101112131415161718192021222324252627282930313233
  1. import traceback
  2. from typing import Dict
  3. from applications.api import feishu_robot
  4. from applications.utils import AsyncHttPClient
  5. # const
  6. BALANCE_LIMIT_THRESHOLD = 100.0
  7. async def check_kimi_balance() -> Dict:
  8. url = "https://api.moonshot.cn/v1/users/me/balance"
  9. headers = {
  10. "Authorization": "Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q",
  11. "Content-Type": "application/json; charset=utf-8",
  12. }
  13. async with AsyncHttPClient() as client:
  14. response = await client.get(url, headers=headers)
  15. try:
  16. balance = response["data"]["available_balance"]
  17. if balance < BALANCE_LIMIT_THRESHOLD:
  18. await feishu_robot.bot(
  19. title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
  20. detail={"balance": balance},
  21. )
  22. except Exception as e:
  23. error_stack = traceback.format_exc()
  24. await feishu_robot.bot(
  25. title="kimi余额接口处理失败,数据结构异常",
  26. detail={"error": str(e), "error_msg": error_stack},
  27. )
  28. return response