import json import os import base64 import httpx import asyncio from typing import Optional, List, Dict, Any, Union from .feishu_client import FeishuClient, FeishuDomain from agent.tools import tool, ToolResult, ToolContext from agent.trace.models import MessageContent # 从环境变量获取飞书配置 # 也可以在此设置硬编码的默认值,但推荐使用环境变量 FEISHU_APP_ID = os.getenv("FEISHU_APP_ID", "cli_a90fe317987a9cc9") FEISHU_APP_SECRET = os.getenv("FEISHU_APP_SECRET", "nn2dWuXTiRA2N6xodbm4g0qz1AfM2ayi") CONTACTS_FILE = os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..")), "config", "feishu_contacts.json") CHAT_HISTORY_DIR = os.path.join(os.path.dirname(__file__), "chat_history") UNREAD_SUMMARY_FILE = os.path.join(CHAT_HISTORY_DIR, "chat_summary.json") # ==================== 一、文件内使用的功能函数 ==================== def load_contacts() -> List[Dict[str, Any]]: """读取 contacts.json 中的所有联系人""" if not os.path.exists(CONTACTS_FILE): return [] try: with open(CONTACTS_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return [] def save_contacts(contacts: List[Dict[str, Any]]): """保存联系人信息到 contacts.json""" try: with open(CONTACTS_FILE, 'w', encoding='utf-8') as f: json.dump(contacts, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存联系人失败: {e}") def list_contacts_info() -> List[Dict[str, str]]: """ 1. 列出所有联系人信息 读取 contacts.json 中的每一个联系人的 name、description,以字典列表返回 """ contacts = load_contacts() return [{"name": c.get("name", ""), "description": c.get("description", "")} for c in contacts] def get_contact_full_info(name: str) -> Optional[Dict[str, Any]]: """ 2. 根据联系人名称获取联系人完整字典信息 从 contacts.json 中读取每一个联系人做名称匹配,返回数据中的所有字段为一个字典对象 """ contacts = load_contacts() for c in contacts: if c.get("name") == name: return c return None def get_contact_by_id(id_value: str) -> Optional[Dict[str, Any]]: """根据 chat_id 或 open_id 获取联系人信息""" contacts = load_contacts() for c in contacts: if c.get("chat_id") == id_value or c.get("open_id") == id_value: return c return None def update_contact_chat_id(name: str, chat_id: str): """ 3. 更新某一个联系人的 chat_id """ contacts = load_contacts() updated = False for c in contacts: if c.get("name") == name: if not c.get("chat_id"): c["chat_id"] = chat_id updated = True break if updated: save_contacts(contacts) # ==================== 二、聊天记录文件管理 ==================== def _ensure_chat_history_dir(): if not os.path.exists(CHAT_HISTORY_DIR): os.makedirs(CHAT_HISTORY_DIR) def get_chat_file_path(contact_name: str) -> str: _ensure_chat_history_dir() return os.path.join(CHAT_HISTORY_DIR, f"chat_{contact_name}.json") def load_chat_history(contact_name: str) -> List[Dict[str, Any]]: path = get_chat_file_path(contact_name) if os.path.exists(path): try: with open(path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return [] return [] def save_chat_history(contact_name: str, history: List[Dict[str, Any]]): path = get_chat_file_path(contact_name) try: with open(path, 'w', encoding='utf-8') as f: json.dump(history, f, ensure_ascii=False, indent=2) except Exception as e: print(f"保存聊天记录失败: {e}") def update_unread_count(contact_name: str, increment: int = 1, reset: bool = False): """更新未读消息摘要""" _ensure_chat_history_dir() summary = {} if os.path.exists(UNREAD_SUMMARY_FILE): try: with open(UNREAD_SUMMARY_FILE, 'r', encoding='utf-8') as f: summary = json.load(f) except Exception: summary = {} if reset: summary[contact_name] = 0 else: summary[contact_name] = summary.get(contact_name, 0) + increment try: with open(UNREAD_SUMMARY_FILE, 'w', encoding='utf-8') as f: json.dump(summary, f, ensure_ascii=False, indent=2) except Exception as e: print(f"更新未读摘要失败: {e}") # ==================== 三、@tool 工具 ==================== @tool( hidden_params=["context"], display={ "zh": { "name": "获取飞书联系人列表", "params": {} }, "en": { "name": "Get Feishu Contact List", "params": {} } } ) async def feishu_get_contact_list(context: Optional[ToolContext] = None) -> ToolResult: """ 获取所有联系人的名称和描述。 Args: context: 工具执行上下文(可选) """ contacts = list_contacts_info() return ToolResult( title="获取联系人列表成功", output=json.dumps(contacts, ensure_ascii=False, indent=2), metadata={"contacts": contacts} ) @tool( hidden_params=["context"], display={ "zh": { "name": "给飞书联系人发送消息", "params": { "contact_name": "联系人名称", "content": "消息内容。OpenAI 多模态格式列表 (例如: [{'type': 'text', 'text': '你好'}, {'type': 'image_url', 'image_url': {'url': '...'}}])" } }, "en": { "name": "Send Message to Feishu Contact", "params": { "contact_name": "Contact Name", "content": "Message content. OpenAI multimodal list format." } } } ) async def feishu_send_message_to_contact( contact_name: str, content: MessageContent, context: Optional[ToolContext] = None ) -> ToolResult: """ 给指定的联系人发送消息。支持发送文本和图片,OpenAI 多模态格式,会自动转换为飞书相应的格式并发起多次发送。 Args: contact_name: 飞书联系人的名称 content: 消息内容。OpenAI 多模态列表格式。 """ contact = get_contact_full_info(contact_name) if not contact: return ToolResult(title="发送失败", output=f"未找到联系人: {contact_name}", error="Contact not found") client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET) # 确定接收者 ID (优先使用 chat_id,否则使用 open_id) receive_id = contact.get("chat_id") or contact.get("open_id") or contact.get("user_id") if not receive_id: return ToolResult(title="发送失败", output="联系人 ID 信息缺失", error="Receiver ID not found in contacts.json") # 如果 content 是字符串,尝试解析为 JSON if isinstance(content, str): try: parsed = json.loads(content) if isinstance(parsed, (list, dict)): content = parsed except (json.JSONDecodeError, TypeError): pass try: last_res = None if isinstance(content, str): last_res = client.send_message(to=receive_id, text=content) elif isinstance(content, list): for item in content: item_type = item.get("type") if item_type == "text": last_res = client.send_message(to=receive_id, text=item.get("text", "")) elif item_type == "image_url": img_info = item.get("image_url", {}) url = img_info.get("url") if url.startswith("data:image"): # 处理 base64 图片 try: if "," in url: _, encoded = url.split(",", 1) else: encoded = url image_bytes = base64.b64decode(encoded) last_res = client.send_image(to=receive_id, image=image_bytes) except Exception as e: print(f"解析 base64 图片失败: {e}") else: # 处理网络 URL try: async with httpx.AsyncClient() as httpx_client: img_resp = await httpx_client.get(url, timeout=15.0) img_resp.raise_for_status() last_res = client.send_image(to=receive_id, image=img_resp.content) except Exception as e: print(f"下载图片失败: {e}") elif isinstance(content, dict): # 如果是单块格式也支持一下 item_type = content.get("type") if item_type == "text": last_res = client.send_message(to=receive_id, text=content.get("text", "")) elif item_type == "image_url": # ... 逻辑与上面类似,为了简洁这里也可以统一转成 list 处理 content = [content] # 此处递归或重写逻辑,这里选择简单地重新判断 return await feishu_send_message_to_contact(contact_name, content, context) else: return ToolResult(title="发送失败", output="不支持的内容格式", error="Invalid content format") if last_res: # 更新 chat_id update_contact_chat_id(contact_name, last_res.chat_id) # [待开启] 发送即记录:为了维护完整的聊天记录,将机器人发出的消息也保存到本地文件 try: history = load_chat_history(contact_name) history.append({ "role": "assistant", "message_id": last_res.message_id, "content": content if isinstance(content, list) else [{"type": "text", "text": content}] }) save_chat_history(contact_name, history) # 机器人回复了,将该联系人的未读计数重置为 0 update_unread_count(contact_name, reset=True) except Exception as e: print(f"记录发送的消息失败: {e}") return ToolResult( title=f"消息已成功发送至 {contact_name}", output=f"发送成功。消息 ID: {last_res.message_id}", metadata={"message_id": last_res.message_id, "chat_id": last_res.chat_id} ) return ToolResult(title="发送失败", output="没有执行成功的发送操作") except Exception as e: return ToolResult(title="发送异常", output=str(e), error=str(e)) @tool( hidden_params=["context"], display={ "zh": { "name": "获取飞书联系人回复", "params": { "contact_name": "联系人名称", "wait_time_seconds": "可选,如果当前没有新回复,则最多等待指定的秒数。在等待期间会每秒检查一次,一旦有新回复则立即返回。超过时长仍无回复则返回空。" } }, "en": { "name": "Get Feishu Contact Replies", "params": { "contact_name": "Contact Name", "wait_time_seconds": "Optional. If there are no new replies, wait up to the specified number of seconds. It will check every second and return immediately if a new reply is detected. If no reply is received after the duration, it returns empty." } } } ) async def feishu_get_contact_replies( contact_name: str, wait_time_seconds: Optional[int] = None, context: Optional[ToolContext] = None ) -> ToolResult: """ 获取指定联系人的最新回复消息。 返回的数据格式为 OpenAI 多模态消息内容列表。 只抓取自上一个机器人消息之后的用户回复。 Args: contact_name: 飞书联系人的名称 wait_time_seconds: 可选的最大轮询等待时间。如果暂时没有新回复,将每秒检查一次直到有回复或超时。 context: 工具执行上下文(可选) """ contact = get_contact_full_info(contact_name) if not contact: return ToolResult(title="获取失败", output=f"未找到联系人: {contact_name}", error="Contact not found") chat_id = contact.get("chat_id") if not chat_id: return ToolResult(title="获取失败", output=f"联系人 {contact_name} 尚未建立会话 (无 chat_id)", error="No chat_id") client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET) try: def get_replies(): msg_list_res = client.get_message_list(chat_id=chat_id) if not msg_list_res or "items" not in msg_list_res: return [] openai_blocks = [] # 遍历消息列表 (最新的在前) for msg in msg_list_res["items"]: if msg.get("sender_type") == "app": # 碰到机器人的消息即停止 break content_blocks = _convert_feishu_msg_to_openai_content(client, msg) openai_blocks.extend(content_blocks) # 反转列表以保持时间正序 (旧 -> 新) openai_blocks.reverse() return openai_blocks openai_blocks = get_replies() # 如果初始没有获取到回复,且设置了等待时间,则开始轮询 if not openai_blocks and wait_time_seconds and wait_time_seconds > 0: for _ in range(int(wait_time_seconds)): await asyncio.sleep(1) openai_blocks = get_replies() if openai_blocks: break return ToolResult( title=f"获取 {contact_name} 回复成功", output=json.dumps(openai_blocks, ensure_ascii=False, indent=2) if openai_blocks else "目前没有新的用户回复", metadata={"replies": openai_blocks} ) except Exception as e: return ToolResult(title="获取回复异常", output=str(e), error=str(e)) def _convert_feishu_msg_to_openai_content(client: FeishuClient, msg: Dict[str, Any]) -> List[Dict[str, Any]]: """将单条飞书消息内容转换为 OpenAI 多模态格式块列表""" blocks = [] msg_type = msg.get("content_type") raw_content = msg.get("content", "") message_id = msg.get("message_id") if msg_type == "text": blocks.append({"type": "text", "text": raw_content}) elif msg_type == "image": try: content_dict = json.loads(raw_content) image_key = content_dict.get("image_key") if image_key and message_id: img_bytes = client.download_message_resource( message_id=message_id, file_key=image_key, resource_type="image" ) b64_str = base64.b64encode(img_bytes).decode('utf-8') blocks.append({ "type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64_str}"} }) except Exception as e: print(f"转换图片消息失败: {e}") blocks.append({"type": "text", "text": "[图片内容获取失败]"}) elif msg_type == "post": blocks.append({"type": "text", "text": raw_content}) else: blocks.append({"type": "text", "text": f"[{msg_type} 消息]: {raw_content}"}) return blocks @tool( hidden_params=["context"], display={ "zh": { "name": "获取飞书聊天历史记录", "params": { "contact_name": "联系人名称", "start_time": "起始时间戳 (秒),可选", "end_time": "结束时间戳 (秒),可选", "page_size": "分页大小,默认 20", "page_token": "分页令牌,用于加载下一页,可选" } }, "en": { "name": "Get Feishu Chat History", "params": { "contact_name": "Contact Name", "start_time": "Start timestamp (seconds), optional", "end_time": "End timestamp (seconds), optional", "page_size": "Page size, default 20", "page_token": "Page token for next page, optional" } } } ) async def feishu_get_chat_history( contact_name: str, start_time: Optional[int] = None, end_time: Optional[int] = None, page_size: int = 20, page_token: Optional[str] = None, context: Optional[ToolContext] = None ) -> ToolResult: """ 根据联系人名称获取完整的历史聊天记录。 支持通过时间戳进行范围筛选,并支持分页获取。 返回的消息按时间倒序排列(最新的在前面)。 Args: contact_name: 飞书联系人的名称 start_time: 筛选起始时间的时间戳(秒),可选 end_time: 筛选结束时间的时间戳(秒),可选 page_size: 每页消息数量,默认为 20 page_token: 分页令牌,用于加载上一页/下一页,可选 context: 工具执行上下文(可选) """ contact = get_contact_full_info(contact_name) if not contact: return ToolResult(title="获取历史失败", output=f"未找到联系人: {contact_name}", error="Contact not found") chat_id = contact.get("chat_id") if not chat_id: return ToolResult(title="获取历史失败", output=f"联系人 {contact_name} 尚未建立会话 (无 chat_id)", error="No chat_id") client = FeishuClient(app_id=FEISHU_APP_ID, app_secret=FEISHU_APP_SECRET) try: res = client.get_message_list( chat_id=chat_id, start_time=start_time, end_time=end_time, page_size=page_size, page_token=page_token ) if not res or "items" not in res: return ToolResult(title="获取历史失败", output="请求接口失败或返回为空") # 将所有消息转换为 OpenAI 多模态格式 formatted_messages = [] for msg in res["items"]: formatted_messages.append({ "message_id": msg.get("message_id"), "sender_id": msg.get("sender_id"), "sender_type": "assistant" if msg.get("sender_type") == "app" else "user", "create_time": msg.get("create_time"), "content": _convert_feishu_msg_to_openai_content(client, msg) }) result_data = { "messages": formatted_messages, "page_token": res.get("page_token"), "has_more": res.get("has_more") } return ToolResult( title=f"获取 {contact_name} 历史记录成功", output=json.dumps(result_data, ensure_ascii=False, indent=2), metadata=result_data ) except Exception as e: return ToolResult(title="获取历史异常", output=str(e), error=str(e))