kimi_balance.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334
  1. import traceback
  2. from typing import Dict
  3. from applications.api import feishu_robot
  4. from applications.utils import AsyncHttPClient
  5. # const
  6. BALANCE_LIMIT_THRESHOLD = 100.0
  7. async def check_kimi_balance() -> Dict:
  8. url = "https://api.moonshot.cn/v1/users/me/balance"
  9. headers = {
  10. "Authorization": "Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q",
  11. "Content-Type": "application/json; charset=utf-8",
  12. }
  13. try:
  14. async with AsyncHttPClient() as client:
  15. response = await client.get(url, headers=headers)
  16. balance = response["data"]["available_balance"]
  17. if balance < BALANCE_LIMIT_THRESHOLD:
  18. await feishu_robot.bot(
  19. title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
  20. detail={"balance": balance},
  21. )
  22. return {"code": 2, "data": response}
  23. except Exception as e:
  24. error_stack = traceback.format_exc()
  25. await feishu_robot.bot(
  26. title="kimi余额接口处理失败,数据结构异常",
  27. detail={"error": str(e), "error_msg": error_stack},
  28. )
  29. return {"code": 99, "data": error_stack}