chat_test.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. # 将项目根目录添加到 python 路径,确保可以正确导入 agent 包
  6. PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
  7. if PROJECT_ROOT not in sys.path:
  8. sys.path.append(PROJECT_ROOT)
  9. from agent.tools.builtin.feishu.chat import (
  10. feishu_get_contact_list,
  11. feishu_send_message_to_contact,
  12. feishu_get_contact_replies,
  13. feishu_get_chat_history
  14. )
  15. async def feishu_tools():
  16. print("开始测试飞书工具...\n")
  17. # # 1. 测试获取联系人列表
  18. # print("--- 测试: feishu_get_contact_list ---")
  19. # result_list = await feishu_get_contact_list()
  20. # print(f"标题: {result_list.title}")
  21. # print(f"输出: {result_list.output}")
  22. # print("-" * 30 + "\n")
  23. #
  24. # # 2. 测试发送消息 (以 '谭景玉' 为例,请确保 contacts.json 中有此人且信息正确)
  25. contact_name = "谭景玉"
  26. # print(f"--- 测试: feishu_send_message_to_contact (对象: {contact_name}) ---")
  27. #
  28. # 测试发送纯文本
  29. text_content = "干活"
  30. print(f"正在发送文本: {text_content}")
  31. result_send_text = await feishu_send_message_to_contact(contact_name, text_content)
  32. print(f"标题: {result_send_text.title}")
  33. print(f"输出: {result_send_text.output}")
  34. if result_send_text.error:
  35. print(f"错误: {result_send_text.error}")
  36. # 测试发送多模态消息 (文本 + 图片)
  37. # 注意:这里的图片 URL 需要是一个可访问的地址,或者你可以使用 base64 格式
  38. # multimodal_content = [
  39. # {"type": "text", "text": "这是一条多模态测试消息:"},
  40. # {"type": "image_url", "image_url": {"url": "https://www.baidu.com/img/flexible/logo/pc/result.png"}}
  41. # ]
  42. # print(f"\n正在发送多模态消息...")
  43. # result_send_multi = await feishu_send_message_to_contact(contact_name, multimodal_content)
  44. # print(f"标题: {result_send_multi.title}")
  45. # # print(f"输出: {result_send_multi.output}")
  46. # if result_send_multi.error:
  47. # print(f"错误: {result_send_multi.error}")
  48. # print("-" * 30 + "\n")
  49. # # 3. 测试获取回复
  50. # print(f"--- 测试: feishu_get_contact_replies (对象: {contact_name}) ---")
  51. # result_replies = await feishu_get_contact_replies(contact_name)
  52. # print(f"标题: {result_replies.title}")
  53. # print(f"消息详情: {result_replies.output}")
  54. # print("-" * 30 + "\n")
  55. # # 4. 测试获取历史记录
  56. # print(f"--- 测试: feishu_get_chat_history (对象: {contact_name}) ---")
  57. # result_history = await feishu_get_chat_history(contact_name, page_size=5, page_token="4cXSlmN7uFAnWWU5yfIGMNvUNrBPLlXZREzLcnvUtOcmK2QFKfwEqfbui_UDsR-y8ne0BkzXABiYTAQASh-n7my_3zQp6o3ERRz0bZ4LB5zMvahf8x7OQoso1rjrMaKM")
  58. # print(f"标题: {result_history.title}")
  59. # print(f"历史记录输出: {result_history.output}")
  60. # print("-" * 30 + "\n")
  61. if __name__ == "__main__":
  62. # 模拟环境变量 (如果在系统环境变量中已设置,此处可省略)
  63. os.environ["FEISHU_APP_ID"] = "cli_a90fe317987a9cc9"
  64. os.environ["FEISHU_APP_SECRET"] = "nn2dWuXTiRA2N6xodbm4g0qz1AfM2ayi"
  65. try:
  66. asyncio.run(feishu_tools())
  67. except KeyboardInterrupt:
  68. pass
  69. except Exception as e:
  70. print(f"测试过程中出现异常: {e}")