""" 飞书消息处理客户端 基于 OpenClaw 项目的飞书集成代码整理 依赖安装: pip install lark-oapi websocket-client requests 使用示例: client = FeishuClient(app_id="cli_xxx", app_secret="xxx") # 发送消息 client.send_message(to="ou_xxx", text="Hello!") # 监听消息 client.start_websocket(on_message=my_handler) """ import json import io import logging import os import tempfile import threading from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Dict, List, Optional, Union import lark_oapi as lark from lark_oapi.api.contact.v3 import GetUserRequest, GetUserResponse from lark_oapi.api.im.v1 import ( CreateMessageRequest, CreateMessageRequestBody, ReplyMessageRequest, ReplyMessageRequestBody, GetMessageRequest, GetMessageResponse, PatchMessageRequest, PatchMessageRequestBody, CreateImageRequest, CreateImageRequestBody, GetImageRequest, CreateFileRequest, CreateFileRequestBody, GetMessageResourceRequest, GetMessageResourceResponse, ListMessageRequest, ListMessageResponse ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FeishuDomain(Enum): """飞书域名""" FEISHU = "https://open.feishu.cn" # 中国版 LARK = "https://open.larksuite.com" # 国际版 class ChatType(Enum): """聊天类型""" P2P = "p2p" # 私聊 GROUP = "group" # 群聊 class ReceiveIdType(Enum): """接收者ID类型""" OPEN_ID = "open_id" USER_ID = "user_id" UNION_ID = "union_id" EMAIL = "email" CHAT_ID = "chat_id" @dataclass class FeishuMessageEvent: """飞书消息事件""" message_id: str chat_id: str chat_type: ChatType content: str content_type: str # text, image, file, post, etc. sender_open_id: str sender_user_id: Optional[str] = None sender_name: Optional[str] = None root_id: Optional[str] = None # 根消息ID(话题) parent_id: Optional[str] = None # 父消息ID(回复) mentions: List[Dict] = field(default_factory=list) mentioned_bot: bool = False @dataclass class SendResult: """发送结果""" message_id: str chat_id: str class FeishuClient: """ 飞书客户端 功能: - 发送/接收消息 - 上传/下载媒体文件 - WebSocket 实时监听 """ def __init__( self, app_id: str, app_secret: str, domain: FeishuDomain = FeishuDomain.FEISHU, encrypt_key: Optional[str] = None, verification_token: Optional[str] = None, ): """ 初始化飞书客户端 Args: app_id: 飞书应用 App ID app_secret: 飞书应用 App Secret domain: 飞书域名 (FEISHU 或 LARK) encrypt_key: 事件加密密钥 (可选) verification_token: 事件验证令牌 (可选) """ self.app_id = app_id self.app_secret = app_secret self.domain = domain self.encrypt_key = encrypt_key self.verification_token = verification_token # 创建 Lark 客户端 self.client = lark.Client.builder() \ .app_id(app_id) \ .app_secret(app_secret) \ .domain(domain.value) \ .build() # 缓存 self._bot_open_id: Optional[str] = None self._sender_name_cache: Dict[str, str] = {} # ==================== 消息发送 ==================== def send_message( self, to: str, text: str, reply_to_message_id: Optional[str] = None, receive_id_type: Optional[ReceiveIdType] = None, ) -> SendResult: """ 发送文本消息 Args: to: 接收者ID (open_id, user_id, chat_id 等) text: 消息文本 reply_to_message_id: 回复的消息ID (可选) receive_id_type: 接收者ID类型 (可选,自动推断) Returns: SendResult: 发送结果 """ if receive_id_type is None: receive_id_type = self._resolve_receive_id_type(to) # 构建富文本消息 (支持 Markdown) content = json.dumps({ "zh_cn": { "content": [[{"tag": "md", "text": text}]] } }) if reply_to_message_id: # 回复消息 request = ReplyMessageRequest.builder() \ .message_id(reply_to_message_id) \ .request_body(ReplyMessageRequestBody.builder() .content(content) .msg_type("post") .build()) \ .build() response = self.client.im.v1.message.reply(request) else: # 新消息 request = CreateMessageRequest.builder() \ .receive_id_type(receive_id_type.value) \ .request_body(CreateMessageRequestBody.builder() .receive_id(to) .content(content) .msg_type("post") .build()) \ .build() response = self.client.im.v1.message.create(request) if not response.success(): raise Exception(f"发送消息失败: {response.msg} (code: {response.code})") return SendResult( message_id=response.data.message_id, chat_id=response.data.chat_id ) def send_card( self, to: str, card: Dict[str, Any], reply_to_message_id: Optional[str] = None, receive_id_type: Optional[ReceiveIdType] = None, ) -> SendResult: """ 发送卡片消息 (交互式消息) Args: to: 接收者ID card: 卡片内容 (JSON 结构) reply_to_message_id: 回复的消息ID (可选) receive_id_type: 接收者ID类型 (可选) Returns: SendResult: 发送结果 """ if receive_id_type is None: receive_id_type = self._resolve_receive_id_type(to) content = json.dumps(card) if reply_to_message_id: request = ReplyMessageRequest.builder() \ .message_id(reply_to_message_id) \ .request_body(ReplyMessageRequestBody.builder() .content(content) .msg_type("interactive") .build()) \ .build() response = self.client.im.v1.message.reply(request) else: request = CreateMessageRequest.builder() \ .receive_id_type(receive_id_type.value) \ .request_body(CreateMessageRequestBody.builder() .receive_id(to) .content(content) .msg_type("interactive") .build()) \ .build() response = self.client.im.v1.message.create(request) if not response.success(): raise Exception(f"发送卡片失败: {response.msg}") return SendResult( message_id=response.data.message_id, chat_id=response.data.chat_id ) def send_markdown_card( self, to: str, text: str, reply_to_message_id: Optional[str] = None, ) -> SendResult: """ 发送 Markdown 卡片 (更好的格式渲染) Args: to: 接收者ID text: Markdown 文本 reply_to_message_id: 回复的消息ID (可选) Returns: SendResult: 发送结果 """ card = { "config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": text}] } return self.send_card(to, card, reply_to_message_id) # ==================== 媒体处理 ==================== def upload_image( self, image: Union[bytes, str], image_type: str = "message" ) -> str: """ 上传图片 """ file_obj = None try: # 1. 准备文件对象 if isinstance(image, str): # 如果是路径,直接打开 file_obj = open(image, "rb") else: # 如果是二进制数据,使用内存文件 (避免写磁盘) file_obj = io.BytesIO(image) # 某些 SDK/API 依赖文件名来判断 Content-Type,我们手动给一个名字 # 如果知道真实格式更好,不知道则默认 .png 或 .bin file_obj.name = "upload.png" # 2. 构建请求 # 注意:这里直接传入 file_obj request = CreateImageRequest.builder() \ .request_body(CreateImageRequestBody.builder() .image_type(image_type) .image(file_obj) .build()) \ .build() # 3. 发起请求 response = self.client.im.v1.image.create(request) if not response.success(): raise Exception(f"上传图片失败: {response.msg}") return response.data.image_key finally: # 4. 显式关闭文件句柄 if file_obj and not isinstance(file_obj, io.BytesIO): file_obj.close() def download_image(self, image_key: str) -> bytes: """ 下载图片 Args: image_key: 图片 key Returns: bytes: 图片数据 """ request = GetImageRequest.builder() \ .image_key(image_key) \ .build() response = self.client.im.v1.image.get(request) if not response.success(): raise Exception(f"下载图片失败: {response.msg}") return response.file.read() def send_image( self, to: str, image: Union[bytes, str], reply_to_message_id: Optional[str] = None, ) -> SendResult: """ 发送图片消息 Args: to: 接收者ID image: 图片数据或文件路径 reply_to_message_id: 回复的消息ID (可选) Returns: SendResult: 发送结果 """ image_key = self.upload_image(image) content = json.dumps({"image_key": image_key}) receive_id_type = self._resolve_receive_id_type(to) if reply_to_message_id: request = ReplyMessageRequest.builder() \ .message_id(reply_to_message_id) \ .request_body(ReplyMessageRequestBody.builder() .content(content) .msg_type("image") .build()) \ .build() response = self.client.im.v1.message.reply(request) else: request = CreateMessageRequest.builder() \ .receive_id_type(receive_id_type.value) \ .request_body(CreateMessageRequestBody.builder() .receive_id(to) .content(content) .msg_type("image") .build()) \ .build() response = self.client.im.v1.message.create(request) if not response.success(): raise Exception(f"发送图片失败: {response.msg}") return SendResult( message_id=response.data.message_id, chat_id=response.data.chat_id ) def upload_file( self, file: Union[bytes, str], file_name: str, file_type: str = "stream", ) -> str: """ 上传文件 Args: file: 文件数据或路径 file_name: 文件名 file_type: 文件类型 (opus/mp4/pdf/doc/xls/ppt/stream) Returns: str: file_key """ if isinstance(file, str): with open(file, "rb") as f: file_data = f.read() if not file_name: file_name = os.path.basename(file) else: file_data = file with tempfile.NamedTemporaryFile(delete=False) as tmp: tmp.write(file_data) tmp_path = tmp.name try: request = CreateFileRequest.builder() \ .request_body(CreateFileRequestBody.builder() .file_type(file_type) .file_name(file_name) .file(open(tmp_path, "rb")) .build()) \ .build() response = self.client.im.v1.file.create(request) if not response.success(): raise Exception(f"上传文件失败: {response.msg}") return response.data.file_key finally: os.unlink(tmp_path) def send_file( self, to: str, file: Union[bytes, str], file_name: str, reply_to_message_id: Optional[str] = None, ) -> SendResult: """ 发送文件消息 Args: to: 接收者ID file: 文件数据或路径 file_name: 文件名 reply_to_message_id: 回复的消息ID (可选) Returns: SendResult: 发送结果 """ file_type = self._detect_file_type(file_name) file_key = self.upload_file(file, file_name, file_type) content = json.dumps({"file_key": file_key}) receive_id_type = self._resolve_receive_id_type(to) if reply_to_message_id: request = ReplyMessageRequest.builder() \ .message_id(reply_to_message_id) \ .request_body(ReplyMessageRequestBody.builder() .content(content) .msg_type("file") .build()) \ .build() response = self.client.im.v1.message.reply(request) else: request = CreateMessageRequest.builder() \ .receive_id_type(receive_id_type.value) \ .request_body(CreateMessageRequestBody.builder() .receive_id(to) .content(content) .msg_type("file") .build()) \ .build() response = self.client.im.v1.message.create(request) if not response.success(): raise Exception(f"发送文件失败: {response.msg}") return SendResult( message_id=response.data.message_id, chat_id=response.data.chat_id ) def download_message_resource( self, message_id: str, file_key: str, resource_type: str = "file" ) -> bytes: """ 下载消息中的资源文件 Args: message_id: 消息ID file_key: 文件 key resource_type: 资源类型 ("image" 或 "file") Returns: bytes: 文件数据 """ request = GetMessageResourceRequest.builder() \ .message_id(message_id) \ .file_key(file_key) \ .type(resource_type) \ .build() response = self.client.im.v1.message_resource.get(request) if not response.success(): raise Exception(f"下载资源失败: {response.msg}") return response.file.read() # ==================== 消息获取 ==================== def get_message(self, message_id: str) -> Optional[Dict]: """ 获取消息详情 Args: message_id: 消息ID Returns: Dict: 消息详情,失败返回 None """ request = GetMessageRequest.builder() \ .message_id(message_id) \ .build() response = self.client.im.v1.message.get(request) if not response.success(): return None items = response.data.items if not items: return None item = items[0] content = item.body.content if item.body else "" # 解析文本内容 try: parsed = json.loads(content) if item.msg_type == "text" and "text" in parsed: content = parsed["text"] except: pass return { "message_id": item.message_id, "chat_id": item.chat_id, "sender_id": item.sender.id if item.sender else None, "sender_type": item.sender.sender_type if item.sender else None, "content": content, "content_type": item.msg_type, "create_time": item.create_time, } def get_message_list(self, chat_id: str, start_time: Optional[Union[str, int]] = None, end_time: Optional[Union[str, int]] = None, page_size: int = 20, page_token: Optional[str] = None) -> Optional[Dict]: """ 获取消息列表 Args: chat_id: 会话 ID start_time: 起始时间 (可选) end_time: 结束时间 (可选) page_size: 分页大小 (默认 20) page_token: 分页令牌 (可选) Returns: Dict: 包含消息列表和分页信息,失败返回 None """ builder = ListMessageRequest.builder() \ .container_id_type("chat") \ .container_id(chat_id) \ .sort_type("ByCreateTimeDesc") \ .page_size(page_size) if start_time is not None: builder.start_time(str(start_time)) if end_time is not None: builder.end_time(str(end_time)) if page_token: builder.page_token(page_token) request = builder.build() # 发起请求 response: ListMessageResponse = self.client.im.v1.message.list(request) # 处理失败返回 if not response.success(): logger.error( f"client.im.v1.message.list failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}") return None # 构建返回结果 messages = [] if response.data.items: for item in response.data.items: content = item.body.content if item.body else "" # 解析文本内容 try: parsed = json.loads(content) if item.msg_type == "text" and "text" in parsed: content = parsed["text"] except: pass messages.append({ "message_id": item.message_id, "chat_id": item.chat_id, "sender_id": item.sender.id if item.sender else None, "sender_type": item.sender.sender_type if item.sender else None, "content": content, "content_type": item.msg_type, "create_time": item.create_time, }) return { "items": messages, "page_token": response.data.page_token, "has_more": response.data.has_more } # ==================== 用户信息 ==================== def get_user_info(self, open_id: str) -> Optional[Dict]: """ 获取用户信息 Args: open_id: 用户 open_id Returns: Dict: 用户信息,失败返回 None """ # 检查缓存 if open_id in self._sender_name_cache: return {"name": self._sender_name_cache[open_id]} request = GetUserRequest.builder() \ .user_id(open_id) \ .user_id_type("open_id") \ .build() response = self.client.contact.v3.user.get(request) if not response.success(): return None user = response.data.user name = user.name or user.en_name or user.nickname if name: self._sender_name_cache[open_id] = name return { "open_id": user.open_id, "user_id": user.user_id, "name": name, "en_name": user.en_name, "nickname": user.nickname, "email": user.email, "mobile": user.mobile, "avatar": user.avatar.avatar_origin if user.avatar else None, } # ==================== WebSocket 监听 ==================== def start_websocket( self, on_message: Callable[[FeishuMessageEvent], None], on_bot_added: Optional[Callable[[str], None]] = None, on_bot_removed: Optional[Callable[[str], None]] = None, blocking: bool = True, ): """ 启动 WebSocket 监听消息 Args: on_message: 消息回调函数 on_bot_added: 机器人被添加到群的回调 (可选) on_bot_removed: 机器人被移出群的回调 (可选) blocking: 是否阻塞当前线程 """ # 创建事件处理器 # 注意: lark-oapi SDK 的回调函数只接受一个参数 (data) event_handler = lark.EventDispatcherHandler.builder( self.encrypt_key or "", self.verification_token or "" ).register_p2_im_message_receive_v1( lambda data: self._handle_message_event(data, on_message) ) if on_bot_added: event_handler = event_handler.register_p2_im_chat_member_bot_added_v1( lambda data: on_bot_added(data.event.chat_id) ) if on_bot_removed: event_handler = event_handler.register_p2_im_chat_member_bot_deleted_v1( lambda data: on_bot_removed(data.event.chat_id) ) handler = event_handler.build() # 创建 WebSocket 客户端 ws_client = lark.ws.Client( self.app_id, self.app_secret, event_handler=handler, domain=lark.FEISHU_DOMAIN if self.domain == FeishuDomain.FEISHU else lark.LARK_DOMAIN, log_level=lark.LogLevel.INFO, ) logger.info("启动飞书 WebSocket 监听...") if blocking: ws_client.start() else: thread = threading.Thread(target=ws_client.start, daemon=True) thread.start() return thread def _handle_message_event( self, data, callback: Callable[[FeishuMessageEvent], None] ): """处理消息事件""" try: # data 结构: P2ImMessageReceiveV1 对象 # data.event 包含实际的事件数据 event = data.event msg = event.message sender = event.sender # 解析消息内容 content = self._parse_message_content(msg.content, msg.message_type) # 检查是否 @了机器人 mentioned_bot = self._check_bot_mentioned(msg.mentions) # 去除 @机器人 的文本 if msg.mentions: content = self._strip_bot_mention(content, msg.mentions) # 构建事件对象 message_event = FeishuMessageEvent( message_id=msg.message_id, chat_id=msg.chat_id, chat_type=ChatType(msg.chat_type), content=content, content_type=msg.message_type, sender_open_id=sender.sender_id.open_id if sender.sender_id else "", sender_user_id=sender.sender_id.user_id if sender.sender_id else None, root_id=msg.root_id, parent_id=msg.parent_id, mentions=[], # 简化处理 mentioned_bot=mentioned_bot, ) # 尝试获取发送者名称 (可能会失败,不影响主流程) try: if message_event.sender_open_id: user_info = self.get_user_info(message_event.sender_open_id) if user_info: message_event.sender_name = user_info.get("name") except Exception as e: logger.debug(f"获取用户信息失败: {e}") callback(message_event) except Exception as e: logger.error(f"处理消息事件失败: {e}", exc_info=True) # ==================== 辅助方法 ==================== def _resolve_receive_id_type(self, receive_id: str) -> ReceiveIdType: """推断接收者ID类型""" if receive_id.startswith("ou_"): return ReceiveIdType.OPEN_ID elif receive_id.startswith("on_"): return ReceiveIdType.UNION_ID elif receive_id.startswith("oc_"): return ReceiveIdType.CHAT_ID elif "@" in receive_id: return ReceiveIdType.EMAIL else: return ReceiveIdType.USER_ID def _parse_message_content(self, content: str, message_type: str) -> str: """解析消息内容""" try: parsed = json.loads(content) if message_type == "text": return parsed.get("text", "") elif message_type == "post": return self._parse_post_content(parsed) return content except: return content def _parse_post_content(self, parsed: Dict) -> str: """解析富文本消息""" title = parsed.get("title", "") content_blocks = parsed.get("content", []) text_parts = [title] if title else [] for paragraph in content_blocks: if isinstance(paragraph, list): for element in paragraph: if element.get("tag") == "text": text_parts.append(element.get("text", "")) elif element.get("tag") == "a": text_parts.append(element.get("text", element.get("href", ""))) elif element.get("tag") == "at": text_parts.append(f"@{element.get('user_name', '')}") return "\n".join(text_parts).strip() or "[富文本消息]" def _check_bot_mentioned(self, mentions: Optional[List]) -> bool: """检查是否 @了机器人""" if not mentions: return False if not self._bot_open_id: # 如果没有缓存机器人 open_id,假设有 mention 就是 @了机器人 return len(mentions) > 0 return any(m.id.open_id == self._bot_open_id for m in mentions) def _strip_bot_mention(self, text: str, mentions: List) -> str: """去除 @机器人 的文本""" result = text for mention in mentions: name = mention.name if hasattr(mention, 'name') else "" key = mention.key if hasattr(mention, 'key') else "" if name: result = result.replace(f"@{name}", "").strip() if key: result = result.replace(key, "").strip() return result def _detect_file_type(self, file_name: str) -> str: """检测文件类型""" ext = os.path.splitext(file_name)[1].lower() type_map = { ".opus": "opus", ".ogg": "opus", ".mp4": "mp4", ".mov": "mp4", ".avi": "mp4", ".pdf": "pdf", ".doc": "doc", ".docx": "doc", ".xls": "xls", ".xlsx": "xls", ".ppt": "ppt", ".pptx": "ppt", } return type_map.get(ext, "stream") # ==================== 使用示例 ==================== if __name__ == "__main__": # 从环境变量获取配置 APP_ID = os.getenv("FEISHU_APP_ID", "cli_a90fe317987a9cc9") APP_SECRET = os.getenv("FEISHU_APP_SECRET", "nn2dWuXTiRA2N6xodbm4g0qz1AfM2ayi") if not APP_ID or not APP_SECRET: print("请设置环境变量 FEISHU_APP_ID 和 FEISHU_APP_SECRET") exit(1) # 创建客户端 client = FeishuClient( app_id=APP_ID, app_secret=APP_SECRET, domain=FeishuDomain.FEISHU, ) # 消息处理回调 def handle_message(event: FeishuMessageEvent): print(f"\n收到消息:") print(f" 发送者: {event.sender_name or event.sender_open_id}") print(f" 类型: {event.chat_type.value}") print(f" 内容: {event.content}") print(f" @机器人: {event.mentioned_bot}") # 自动回复示例 if event.chat_type == ChatType.P2P or event.mentioned_bot: # 先回复文字 reply_text = f"收到你的消息: {event.content}" chat_id = event.chat_id content = event.content content_type = event.content_type # image、text等 open_id = event.sender_open_id client.send_message( to=event.chat_id, text=reply_text, reply_to_message_id=event.message_id ) print(f" 已回复文字: {reply_text}") # 再回复一张图片 (读取当前目录下的 hanli.png) try: image_path = os.path.join(os.path.dirname(__file__) or ".", "hanli.png") if os.path.exists(image_path): client.send_image( to=event.chat_id, image=image_path, ) print(f" 已回复图片: {image_path}") else: print(f" 图片不存在: {image_path}") except Exception as e: print(f" 回复图片失败: {e}") # 启动 WebSocket 监听 print("启动飞书消息监听...") print("按 Ctrl+C 退出") try: client.start_websocket( on_message=handle_message, on_bot_added=lambda chat_id: print(f"机器人被添加到群: {chat_id}"), on_bot_removed=lambda chat_id: print(f"机器人被移出群: {chat_id}"), blocking=True ) # res = client.get_message_list(chat_id='oc_56e85f0e2c97405d176729b62d8f56e5', start_time=0, end_time=1770623620) # print(f"获取消息列表结果: {json.dumps(res, indent=4, ensure_ascii=False)}") except KeyboardInterrupt: print("\n退出")