chat.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. import json
  2. import os
  3. import base64
  4. import httpx
  5. import asyncio
  6. from typing import Optional, List, Dict, Any, Union
  7. from .feishu_client import FeishuClient, FeishuDomain
  8. from agent.tools import tool, ToolResult, ToolContext
  9. from agent.trace.models import MessageContent
  10. # 从环境变量获取飞书配置
  11. # 也可以在此设置硬编码的默认值,但推荐使用环境变量
  12. FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "cli_a90fe317987a9cc9")
  13. FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "nn2dWuXTiRA2N6xodbm4g0qz1AfM2ayi")
  14. CONTACTS_FILE = os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..")), "config", "feishu_contacts.json")
  15. CHAT_HISTORY_DIR = os.path.join(os.path.dirname(__file__), "chat_history")
  16. UNREAD_SUMMARY_FILE = os.path.join(CHAT_HISTORY_DIR, "chat_summary.json")
  17. # ==================== 一、文件内使用的功能函数 ====================
  18. def load_contacts() -> List[Dict[str, Any]]:
  19. """读取 contacts.json 中的所有联系人"""
  20. if not os.path.exists(CONTACTS_FILE):
  21. return []
  22. try:
  23. with open(CONTACTS_FILE, 'r', encoding='utf-8') as f:
  24. return json.load(f)
  25. except Exception:
  26. return []
  27. def save_contacts(contacts: List[Dict[str, Any]]):
  28. """保存联系人信息到 contacts.json"""
  29. try:
  30. with open(CONTACTS_FILE, 'w', encoding='utf-8') as f:
  31. json.dump(contacts, f, ensure_ascii=False, indent=2)
  32. except Exception as e:
  33. print(f"保存联系人失败: {e}")
  34. def list_contacts_info() -> List[Dict[str, str]]:
  35. """
  36. 1. 列出所有联系人信息
  37. 读取 contacts.json 中的每一个联系人的 name、description,以字典列表返回
  38. """
  39. contacts = load_contacts()
  40. return [{"name": c.get("name", ""), "description": c.get("description", "")} for c in contacts]
  41. def get_contact_full_info(name: str) -> Optional[Dict[str, Any]]:
  42. """
  43. 2. 根据联系人名称获取联系人完整字典信息
  44. 从 contacts.json 中读取每一个联系人做名称匹配,返回数据中的所有字段为一个字典对象
  45. """
  46. contacts = load_contacts()
  47. for c in contacts:
  48. if c.get("name") == name:
  49. return c
  50. return None
  51. def get_contact_by_id(id_value: str) -> Optional[Dict[str, Any]]:
  52. """根据 chat_id 或 open_id 获取联系人信息"""
  53. contacts = load_contacts()
  54. for c in contacts:
  55. if c.get("chat_id") == id_value or c.get("open_id") == id_value:
  56. return c
  57. return None
  58. def update_contact_chat_id(name: str, chat_id: str):
  59. """
  60. 3. 更新某一个联系人的 chat_id
  61. """
  62. contacts = load_contacts()
  63. updated = False
  64. for c in contacts:
  65. if c.get("name") == name:
  66. if not c.get("chat_id"):
  67. c["chat_id"] = chat_id
  68. updated = True
  69. break
  70. if updated:
  71. save_contacts(contacts)
  72. # ==================== 二、聊天记录文件管理 ====================
  73. def _ensure_chat_history_dir():
  74. if not os.path.exists(CHAT_HISTORY_DIR):
  75. os.makedirs(CHAT_HISTORY_DIR)
  76. def get_chat_file_path(contact_name: str) -> str:
  77. _ensure_chat_history_dir()
  78. return os.path.join(CHAT_HISTORY_DIR, f"chat_{contact_name}.json")
  79. def load_chat_history(contact_name: str) -> List[Dict[str, Any]]:
  80. path = get_chat_file_path(contact_name)
  81. if os.path.exists(path):
  82. try:
  83. with open(path, 'r', encoding='utf-8') as f:
  84. return json.load(f)
  85. except Exception:
  86. return []
  87. return []
  88. def save_chat_history(contact_name: str, history: List[Dict[str, Any]]):
  89. path = get_chat_file_path(contact_name)
  90. try:
  91. with open(path, 'w', encoding='utf-8') as f:
  92. json.dump(history, f, ensure_ascii=False, indent=2)
  93. except Exception as e:
  94. print(f"保存聊天记录失败: {e}")
  95. def update_unread_count(contact_name: str, increment: int = 1, reset: bool = False):
  96. """更新未读消息摘要"""
  97. _ensure_chat_history_dir()
  98. summary = {}
  99. if os.path.exists(UNREAD_SUMMARY_FILE):
  100. try:
  101. with open(UNREAD_SUMMARY_FILE, 'r', encoding='utf-8') as f:
  102. summary = json.load(f)
  103. except Exception:
  104. summary = {}
  105. if reset:
  106. summary[contact_name] = 0
  107. else:
  108. summary[contact_name] = summary.get(contact_name, 0) + increment
  109. try:
  110. with open(UNREAD_SUMMARY_FILE, 'w', encoding='utf-8') as f:
  111. json.dump(summary, f, ensure_ascii=False, indent=2)
  112. except Exception as e:
  113. print(f"更新未读摘要失败: {e}")
  114. # ==================== 三、@tool 工具 ====================
  115. @tool(
  116. hidden_params=["context"],
  117. groups=["feishu"],
  118. display={
  119. "zh": {
  120. "name": "获取飞书联系人列表",
  121. "params": {}
  122. },
  123. "en": {
  124. "name": "Get Feishu Contact List",
  125. "params": {}
  126. }
  127. }
  128. )
  129. async def feishu_get_contact_list(context: Optional[ToolContext] = None) -> ToolResult:
  130. """
  131. 获取所有联系人的名称和描述。
  132. Args:
  133. context: 工具执行上下文(可选)
  134. """
  135. contacts = list_contacts_info()
  136. return ToolResult(
  137. title="获取联系人列表成功",
  138. output=json.dumps(contacts, ensure_ascii=False, indent=2),
  139. metadata={"contacts": contacts}
  140. )
  141. @tool(
  142. hidden_params=["context"],
  143. groups=["feishu"],
  144. display={
  145. "zh": {
  146. "name": "给飞书联系人发送消息",
  147. "params": {
  148. "contact_name": "联系人名称",
  149. "content": "消息内容。OpenAI 多模态格式列表 (例如: [{'type': 'text', 'text': '你好'}, {'type': 'image_url', 'image_url': {'url': '...'}}])"
  150. }
  151. },
  152. "en": {
  153. "name": "Send Message to Feishu Contact",
  154. "params": {
  155. "contact_name": "Contact Name",
  156. "content": "Message content. OpenAI multimodal list format."
  157. }
  158. }
  159. }
  160. )
  161. async def feishu_send_message_to_contact(
  162. contact_name: str,
  163. content: MessageContent,
  164. context: Optional[ToolContext] = None
  165. ) -> ToolResult:
  166. """
  167. 给指定的联系人发送消息。支持发送文本和图片,OpenAI 多模态格式,会自动转换为飞书相应的格式并发起多次发送。
  168. Args:
  169. contact_name: 飞书联系人的名称
  170. content: 消息内容。OpenAI 多模态列表格式。
  171. """
  172. contact = get_contact_full_info(contact_name)
  173. if not contact:
  174. return ToolResult(title="发送失败", output=f"未找到联系人: {contact_name}", error="Contact not found")
  175. client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
  176. # 确定接收者 ID (优先使用 chat_id,否则使用 open_id)
  177. receive_id = contact.get("chat_id") or contact.get("open_id") or contact.get("user_id")
  178. if not receive_id:
  179. return ToolResult(title="发送失败", output="联系人 ID 信息缺失", error="Receiver ID not found in contacts.json")
  180. # 如果 content 是字符串,尝试解析为 JSON
  181. if isinstance(content, str):
  182. try:
  183. parsed = json.loads(content)
  184. if isinstance(parsed, (list, dict)):
  185. content = parsed
  186. except (json.JSONDecodeError, TypeError):
  187. pass
  188. try:
  189. last_res = None
  190. if isinstance(content, str):
  191. last_res = client.send_message(to=receive_id, text=content)
  192. elif isinstance(content, list):
  193. for item in content:
  194. item_type = item.get("type")
  195. if item_type == "text":
  196. last_res = client.send_message(to=receive_id, text=item.get("text", ""))
  197. elif item_type == "image_url":
  198. img_info = item.get("image_url", {})
  199. url = img_info.get("url")
  200. if url.startswith("data:image"):
  201. # 处理 base64 图片
  202. try:
  203. if "," in url:
  204. _, encoded = url.split(",", 1)
  205. else:
  206. encoded = url
  207. image_bytes = base64.b64decode(encoded)
  208. last_res = client.send_image(to=receive_id, image=image_bytes)
  209. except Exception as e:
  210. print(f"解析 base64 图片失败: {e}")
  211. elif url.startswith("http://") or url.startswith("https://"):
  212. # 处理网络 URL
  213. try:
  214. async with httpx.AsyncClient() as httpx_client:
  215. img_resp = await httpx_client.get(url, timeout=15.0)
  216. img_resp.raise_for_status()
  217. last_res = client.send_image(to=receive_id, image=img_resp.content)
  218. except Exception as e:
  219. print(f"下载图片失败: {e}")
  220. else:
  221. # 处理本地文件路径
  222. try:
  223. local_path = os.path.abspath(url)
  224. if os.path.isfile(local_path):
  225. last_res = client.send_image(to=receive_id, image=local_path)
  226. else:
  227. print(f"本地图片文件不存在: {local_path}")
  228. except Exception as e:
  229. print(f"读取本地图片失败: {e}")
  230. elif isinstance(content, dict):
  231. # 如果是单块格式也支持一下
  232. item_type = content.get("type")
  233. if item_type == "text":
  234. last_res = client.send_message(to=receive_id, text=content.get("text", ""))
  235. elif item_type == "image_url":
  236. # ... 逻辑与上面类似,为了简洁这里也可以统一转成 list 处理
  237. content = [content]
  238. # 此处递归或重写逻辑,这里选择简单地重新判断
  239. return await feishu_send_message_to_contact(contact_name, content, context)
  240. else:
  241. return ToolResult(title="发送失败", output="不支持的内容格式", error="Invalid content format")
  242. if last_res:
  243. # 更新 chat_id
  244. update_contact_chat_id(contact_name, last_res.chat_id)
  245. # [待开启] 发送即记录:为了维护完整的聊天记录,将机器人发出的消息也保存到本地文件
  246. try:
  247. history = load_chat_history(contact_name)
  248. history.append({
  249. "role": "assistant",
  250. "message_id": last_res.message_id,
  251. "content": content if isinstance(content, list) else [{"type": "text", "text": content}]
  252. })
  253. save_chat_history(contact_name, history)
  254. # 机器人回复了,将该联系人的未读计数重置为 0
  255. update_unread_count(contact_name, reset=True)
  256. except Exception as e:
  257. print(f"记录发送的消息失败: {e}")
  258. return ToolResult(
  259. title=f"消息已成功发送至 {contact_name}",
  260. output=f"发送成功。消息 ID: {last_res.message_id}",
  261. metadata={"message_id": last_res.message_id, "chat_id": last_res.chat_id}
  262. )
  263. return ToolResult(title="发送失败", output="没有执行成功的发送操作")
  264. except Exception as e:
  265. return ToolResult(title="发送异常", output=str(e), error=str(e))
  266. @tool(
  267. hidden_params=["context"],
  268. groups=["feishu"],
  269. display={
  270. "zh": {
  271. "name": "获取飞书联系人回复",
  272. "params": {
  273. "contact_name": "联系人名称",
  274. "wait_time_seconds": "可选,如果当前没有新回复,则最多等待指定的秒数。在等待期间会每秒检查一次,一旦有新回复则立即返回。超过时长仍无回复则返回空。"
  275. }
  276. },
  277. "en": {
  278. "name": "Get Feishu Contact Replies",
  279. "params": {
  280. "contact_name": "Contact Name",
  281. "wait_time_seconds": "Optional. If there are no new replies, wait up to the specified number of seconds. It will check every second and return immediately if a new reply is detected. If no reply is received after the duration, it returns empty."
  282. }
  283. }
  284. }
  285. )
  286. async def feishu_get_contact_replies(
  287. contact_name: str,
  288. wait_time_seconds: Optional[int] = None,
  289. context: Optional[ToolContext] = None
  290. ) -> ToolResult:
  291. """
  292. 获取指定联系人的最新回复消息。
  293. 返回的数据格式为 OpenAI 多模态消息内容列表。
  294. 只抓取自上一个机器人消息之后的用户回复。
  295. Args:
  296. contact_name: 飞书联系人的名称
  297. wait_time_seconds: 可选的最大轮询等待时间。如果暂时没有新回复,将每秒检查一次直到有回复或超时。
  298. context: 工具执行上下文(可选)
  299. """
  300. contact = get_contact_full_info(contact_name)
  301. if not contact:
  302. return ToolResult(title="获取失败", output=f"未找到联系人: {contact_name}", error="Contact not found")
  303. chat_id = contact.get("chat_id")
  304. if not chat_id:
  305. return ToolResult(title="获取失败", output=f"联系人 {contact_name} 尚未建立会话 (无 chat_id)", error="No chat_id")
  306. client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
  307. try:
  308. def get_replies():
  309. msg_list_res = client.get_message_list(chat_id=chat_id)
  310. if not msg_list_res or "items" not in msg_list_res:
  311. return []
  312. openai_blocks = []
  313. # 遍历消息列表 (最新的在前)
  314. for msg in msg_list_res["items"]:
  315. if msg.get("sender_type") == "app":
  316. # 碰到机器人的消息即停止
  317. break
  318. content_blocks = _convert_feishu_msg_to_openai_content(client, msg)
  319. openai_blocks.extend(content_blocks)
  320. # 反转列表以保持时间正序 (旧 -> 新)
  321. openai_blocks.reverse()
  322. return openai_blocks
  323. openai_blocks = get_replies()
  324. # 如果初始没有获取到回复,且设置了等待时间,则开始轮询
  325. if not openai_blocks and wait_time_seconds and wait_time_seconds > 0:
  326. for _ in range(int(wait_time_seconds)):
  327. await asyncio.sleep(1)
  328. openai_blocks = get_replies()
  329. if openai_blocks:
  330. break
  331. return ToolResult(
  332. title=f"获取 {contact_name} 回复成功",
  333. output=json.dumps(openai_blocks, ensure_ascii=False, indent=2) if openai_blocks else "目前没有新的用户回复",
  334. metadata={"replies": openai_blocks}
  335. )
  336. except Exception as e:
  337. return ToolResult(title="获取回复异常", output=str(e), error=str(e))
  338. def _convert_feishu_msg_to_openai_content(client: FeishuClient, msg: Dict[str, Any]) -> List[Dict[str, Any]]:
  339. """将单条飞书消息内容转换为 OpenAI 多模态格式块列表"""
  340. blocks = []
  341. msg_type = msg.get("content_type")
  342. raw_content = msg.get("content", "")
  343. message_id = msg.get("message_id")
  344. if msg_type == "text":
  345. blocks.append({"type": "text", "text": raw_content})
  346. elif msg_type == "image":
  347. try:
  348. content_dict = json.loads(raw_content)
  349. image_key = content_dict.get("image_key")
  350. if image_key and message_id:
  351. img_bytes = client.download_message_resource(
  352. message_id=message_id,
  353. file_key=image_key,
  354. resource_type="image"
  355. )
  356. b64_str = base64.b64encode(img_bytes).decode('utf-8')
  357. blocks.append({
  358. "type": "image_url",
  359. "image_url": {"url": f"data:image/png;base64,{b64_str}"}
  360. })
  361. except Exception as e:
  362. print(f"转换图片消息失败: {e}")
  363. blocks.append({"type": "text", "text": "[图片内容获取失败]"})
  364. elif msg_type == "post":
  365. blocks.append({"type": "text", "text": raw_content})
  366. else:
  367. blocks.append({"type": "text", "text": f"[{msg_type} 消息]: {raw_content}"})
  368. return blocks
  369. @tool(
  370. hidden_params=["context"],
  371. groups=["feishu"],
  372. display={
  373. "zh": {
  374. "name": "获取飞书聊天历史记录",
  375. "params": {
  376. "contact_name": "联系人名称",
  377. "start_time": "起始时间戳 (秒),可选",
  378. "end_time": "结束时间戳 (秒),可选",
  379. "page_size": "分页大小,默认 20",
  380. "page_token": "分页令牌,用于加载下一页,可选"
  381. }
  382. },
  383. "en": {
  384. "name": "Get Feishu Chat History",
  385. "params": {
  386. "contact_name": "Contact Name",
  387. "start_time": "Start timestamp (seconds), optional",
  388. "end_time": "End timestamp (seconds), optional",
  389. "page_size": "Page size, default 20",
  390. "page_token": "Page token for next page, optional"
  391. }
  392. }
  393. }
  394. )
  395. async def feishu_get_chat_history(
  396. contact_name: str,
  397. start_time: Optional[int] = None,
  398. end_time: Optional[int] = None,
  399. page_size: int = 20,
  400. page_token: Optional[str] = None,
  401. context: Optional[ToolContext] = None
  402. ) -> ToolResult:
  403. """
  404. 根据联系人名称获取完整的历史聊天记录。
  405. 支持通过时间戳进行范围筛选,并支持分页获取。
  406. 返回的消息按时间倒序排列(最新的在前面)。
  407. Args:
  408. contact_name: 飞书联系人的名称
  409. start_time: 筛选起始时间的时间戳(秒),可选
  410. end_time: 筛选结束时间的时间戳(秒),可选
  411. page_size: 每页消息数量,默认为 20
  412. page_token: 分页令牌,用于加载上一页/下一页,可选
  413. context: 工具执行上下文(可选)
  414. """
  415. contact = get_contact_full_info(contact_name)
  416. if not contact:
  417. return ToolResult(title="获取历史失败", output=f"未找到联系人: {contact_name}", error="Contact not found")
  418. chat_id = contact.get("chat_id")
  419. if not chat_id:
  420. return ToolResult(title="获取历史失败", output=f"联系人 {contact_name} 尚未建立会话 (无 chat_id)", error="No chat_id")
  421. client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET)
  422. try:
  423. res = client.get_message_list(
  424. chat_id=chat_id,
  425. start_time=start_time,
  426. end_time=end_time,
  427. page_size=page_size,
  428. page_token=page_token
  429. )
  430. if not res or "items" not in res:
  431. return ToolResult(title="获取历史失败", output="请求接口失败或返回为空")
  432. # 将所有消息转换为 OpenAI 多模态格式
  433. formatted_messages = []
  434. for msg in res["items"]:
  435. formatted_messages.append({
  436. "message_id": msg.get("message_id"),
  437. "sender_id": msg.get("sender_id"),
  438. "sender_type": "assistant" if msg.get("sender_type") == "app" else "user",
  439. "create_time": msg.get("create_time"),
  440. "content": _convert_feishu_msg_to_openai_content(client, msg)
  441. })
  442. result_data = {
  443. "messages": formatted_messages,
  444. "page_token": res.get("page_token"),
  445. "has_more": res.get("has_more")
  446. }
  447. return ToolResult(
  448. title=f"获取 {contact_name} 历史记录成功",
  449. output=json.dumps(result_data, ensure_ascii=False, indent=2),
  450. metadata=result_data
  451. )
  452. except Exception as e:
  453. return ToolResult(title="获取历史异常", output=str(e), error=str(e))