lark_alert_for_human_intervention.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from typing import List
  2. from toolkit.base import BaseToolkit
  3. from toolkit.function_tool import FunctionTool
  4. import requests
  5. class LarkAlertForHumanIntervention(BaseToolkit):
  6. r"""A toolkit for Lark alert for human intervention."""
  7. def __init__(self):
  8. self.webhook_url = 'https://open.feishu.cn/open-apis/bot/v2/hook/c316b559-1c6a-4c4e-97c9-50b44e4c2a9d'
  9. super().__init__()
  10. def send_lark_alert_for_human_intervention(
  11. self, message: str
  12. ) -> str:
  13. r"""Sends a Lark alert for human intervention.
  14. Args:
  15. message (str): The message to send.
  16. Returns:
  17. str: A confirmation message.
  18. """
  19. req_body = {
  20. "msg_type": "text",
  21. "content": {
  22. "text": f'[Agent告警]{message}'
  23. }
  24. }
  25. try:
  26. response = requests.post(
  27. url=self.webhook_url,
  28. json=req_body
  29. )
  30. return f"Alert sent successfully: {response.status_code}"
  31. except Exception as e:
  32. return f"Failed to send alert: {e}"
  33. def get_tools(self) -> List[FunctionTool]:
  34. return [FunctionTool(self.send_lark_alert_for_human_intervention)]
  35. if __name__ == '__main__':
  36. toolkit = LarkAlertForHumanIntervention()
  37. tools = toolkit.get_tools()
  38. for tool in tools:
  39. print(f"Tool schema: {tool.get_openai_tool_schema()}")
  40. resp = toolkit.send_lark_alert_for_human_intervention('测试')
  41. print(resp)