| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945 |
- """
- 飞书消息处理客户端
- 基于 OpenClaw 项目的飞书集成代码整理
- 依赖安装:
- pip install lark-oapi websocket-client requests
- 使用示例:
- client = FeishuClient(app_id="cli_xxx", app_secret="xxx")
- # 发送消息
- client.send_message(to="ou_xxx", text="Hello!")
- # 监听消息
- client.start_websocket(on_message=my_handler)
- """
- import json
- import io
- import logging
- import os
- import tempfile
- import threading
- from dataclasses import dataclass, field
- from enum import Enum
- from typing import Any, Callable, Dict, List, Optional, Union
- import lark_oapi as lark
- from lark_oapi.api.contact.v3 import GetUserRequest, GetUserResponse
- from lark_oapi.api.im.v1 import (
- CreateMessageRequest, CreateMessageRequestBody,
- ReplyMessageRequest, ReplyMessageRequestBody,
- GetMessageRequest, GetMessageResponse,
- PatchMessageRequest, PatchMessageRequestBody,
- CreateImageRequest, CreateImageRequestBody,
- GetImageRequest,
- CreateFileRequest, CreateFileRequestBody,
- GetMessageResourceRequest, GetMessageResourceResponse,
- ListMessageRequest, ListMessageResponse
- )
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- class FeishuDomain(Enum):
- """飞书域名"""
- FEISHU = "https://open.feishu.cn" # 中国版
- LARK = "https://open.larksuite.com" # 国际版
- class ChatType(Enum):
- """聊天类型"""
- P2P = "p2p" # 私聊
- GROUP = "group" # 群聊
- class ReceiveIdType(Enum):
- """接收者ID类型"""
- OPEN_ID = "open_id"
- USER_ID = "user_id"
- UNION_ID = "union_id"
- EMAIL = "email"
- CHAT_ID = "chat_id"
- @dataclass
- class FeishuMessageEvent:
- """飞书消息事件"""
- message_id: str
- chat_id: str
- chat_type: ChatType
- content: str
- content_type: str # text, image, file, post, etc.
- sender_open_id: str
- sender_user_id: Optional[str] = None
- sender_name: Optional[str] = None
- root_id: Optional[str] = None # 根消息ID(话题)
- parent_id: Optional[str] = None # 父消息ID(回复)
- mentions: List[Dict] = field(default_factory=list)
- mentioned_bot: bool = False
- @dataclass
- class SendResult:
- """发送结果"""
- message_id: str
- chat_id: str
- class FeishuClient:
- """
- 飞书客户端
- 功能:
- - 发送/接收消息
- - 上传/下载媒体文件
- - WebSocket 实时监听
- """
- def __init__(
- self,
- app_id: str,
- app_secret: str,
- domain: FeishuDomain = FeishuDomain.FEISHU,
- encrypt_key: Optional[str] = None,
- verification_token: Optional[str] = None,
- ):
- """
- 初始化飞书客户端
- Args:
- app_id: 飞书应用 App ID
- app_secret: 飞书应用 App Secret
- domain: 飞书域名 (FEISHU 或 LARK)
- encrypt_key: 事件加密密钥 (可选)
- verification_token: 事件验证令牌 (可选)
- """
- self.app_id = app_id
- self.app_secret = app_secret
- self.domain = domain
- self.encrypt_key = encrypt_key
- self.verification_token = verification_token
- # 创建 Lark 客户端
- self.client = lark.Client.builder() \
- .app_id(app_id) \
- .app_secret(app_secret) \
- .domain(domain.value) \
- .build()
- # 缓存
- self._bot_open_id: Optional[str] = None
- self._sender_name_cache: Dict[str, str] = {}
- # ==================== 消息发送 ====================
- def send_message(
- self,
- to: str,
- text: str,
- reply_to_message_id: Optional[str] = None,
- receive_id_type: Optional[ReceiveIdType] = None,
- ) -> SendResult:
- """
- 发送文本消息
- Args:
- to: 接收者ID (open_id, user_id, chat_id 等)
- text: 消息文本
- reply_to_message_id: 回复的消息ID (可选)
- receive_id_type: 接收者ID类型 (可选,自动推断)
- Returns:
- SendResult: 发送结果
- """
- if receive_id_type is None:
- receive_id_type = self._resolve_receive_id_type(to)
- # 构建富文本消息 (支持 Markdown)
- content = json.dumps({
- "zh_cn": {
- "content": [[{"tag": "md", "text": text}]]
- }
- })
- if reply_to_message_id:
- # 回复消息
- request = ReplyMessageRequest.builder() \
- .message_id(reply_to_message_id) \
- .request_body(ReplyMessageRequestBody.builder()
- .content(content)
- .msg_type("post")
- .build()) \
- .build()
- response = self.client.im.v1.message.reply(request)
- else:
- # 新消息
- request = CreateMessageRequest.builder() \
- .receive_id_type(receive_id_type.value) \
- .request_body(CreateMessageRequestBody.builder()
- .receive_id(to)
- .content(content)
- .msg_type("post")
- .build()) \
- .build()
- response = self.client.im.v1.message.create(request)
- if not response.success():
- raise Exception(f"发送消息失败: {response.msg} (code: {response.code})")
- return SendResult(
- message_id=response.data.message_id,
- chat_id=response.data.chat_id
- )
- def send_card(
- self,
- to: str,
- card: Dict[str, Any],
- reply_to_message_id: Optional[str] = None,
- receive_id_type: Optional[ReceiveIdType] = None,
- ) -> SendResult:
- """
- 发送卡片消息 (交互式消息)
- Args:
- to: 接收者ID
- card: 卡片内容 (JSON 结构)
- reply_to_message_id: 回复的消息ID (可选)
- receive_id_type: 接收者ID类型 (可选)
- Returns:
- SendResult: 发送结果
- """
- if receive_id_type is None:
- receive_id_type = self._resolve_receive_id_type(to)
- content = json.dumps(card)
- if reply_to_message_id:
- request = ReplyMessageRequest.builder() \
- .message_id(reply_to_message_id) \
- .request_body(ReplyMessageRequestBody.builder()
- .content(content)
- .msg_type("interactive")
- .build()) \
- .build()
- response = self.client.im.v1.message.reply(request)
- else:
- request = CreateMessageRequest.builder() \
- .receive_id_type(receive_id_type.value) \
- .request_body(CreateMessageRequestBody.builder()
- .receive_id(to)
- .content(content)
- .msg_type("interactive")
- .build()) \
- .build()
- response = self.client.im.v1.message.create(request)
- if not response.success():
- raise Exception(f"发送卡片失败: {response.msg}")
- return SendResult(
- message_id=response.data.message_id,
- chat_id=response.data.chat_id
- )
- def send_markdown_card(
- self,
- to: str,
- text: str,
- reply_to_message_id: Optional[str] = None,
- ) -> SendResult:
- """
- 发送 Markdown 卡片 (更好的格式渲染)
- Args:
- to: 接收者ID
- text: Markdown 文本
- reply_to_message_id: 回复的消息ID (可选)
- Returns:
- SendResult: 发送结果
- """
- card = {
- "config": {"wide_screen_mode": True},
- "elements": [{"tag": "markdown", "content": text}]
- }
- return self.send_card(to, card, reply_to_message_id)
- # ==================== 媒体处理 ====================
- def upload_image(
- self,
- image: Union[bytes, str],
- image_type: str = "message"
- ) -> str:
- """
- 上传图片
- """
- file_obj = None
- try:
- # 1. 准备文件对象
- if isinstance(image, str):
- # 如果是路径,直接打开
- file_obj = open(image, "rb")
- else:
- # 如果是二进制数据,使用内存文件 (避免写磁盘)
- file_obj = io.BytesIO(image)
- # 某些 SDK/API 依赖文件名来判断 Content-Type,我们手动给一个名字
- # 如果知道真实格式更好,不知道则默认 .png 或 .bin
- file_obj.name = "upload.png"
- # 2. 构建请求
- # 注意:这里直接传入 file_obj
- request = CreateImageRequest.builder() \
- .request_body(CreateImageRequestBody.builder()
- .image_type(image_type)
- .image(file_obj)
- .build()) \
- .build()
- # 3. 发起请求
- response = self.client.im.v1.image.create(request)
- if not response.success():
- raise Exception(f"上传图片失败: {response.msg}")
- return response.data.image_key
- finally:
- # 4. 显式关闭文件句柄
- if file_obj and not isinstance(file_obj, io.BytesIO):
- file_obj.close()
- def download_image(self, image_key: str) -> bytes:
- """
- 下载图片
- Args:
- image_key: 图片 key
- Returns:
- bytes: 图片数据
- """
- request = GetImageRequest.builder() \
- .image_key(image_key) \
- .build()
- response = self.client.im.v1.image.get(request)
- if not response.success():
- raise Exception(f"下载图片失败: {response.msg}")
- return response.file.read()
- def send_image(
- self,
- to: str,
- image: Union[bytes, str],
- reply_to_message_id: Optional[str] = None,
- ) -> SendResult:
- """
- 发送图片消息
- Args:
- to: 接收者ID
- image: 图片数据或文件路径
- reply_to_message_id: 回复的消息ID (可选)
- Returns:
- SendResult: 发送结果
- """
- image_key = self.upload_image(image)
- content = json.dumps({"image_key": image_key})
- receive_id_type = self._resolve_receive_id_type(to)
- if reply_to_message_id:
- request = ReplyMessageRequest.builder() \
- .message_id(reply_to_message_id) \
- .request_body(ReplyMessageRequestBody.builder()
- .content(content)
- .msg_type("image")
- .build()) \
- .build()
- response = self.client.im.v1.message.reply(request)
- else:
- request = CreateMessageRequest.builder() \
- .receive_id_type(receive_id_type.value) \
- .request_body(CreateMessageRequestBody.builder()
- .receive_id(to)
- .content(content)
- .msg_type("image")
- .build()) \
- .build()
- response = self.client.im.v1.message.create(request)
- if not response.success():
- raise Exception(f"发送图片失败: {response.msg}")
- return SendResult(
- message_id=response.data.message_id,
- chat_id=response.data.chat_id
- )
- def upload_file(
- self,
- file: Union[bytes, str],
- file_name: str,
- file_type: str = "stream",
- ) -> str:
- """
- 上传文件
- Args:
- file: 文件数据或路径
- file_name: 文件名
- file_type: 文件类型 (opus/mp4/pdf/doc/xls/ppt/stream)
- Returns:
- str: file_key
- """
- if isinstance(file, str):
- with open(file, "rb") as f:
- file_data = f.read()
- if not file_name:
- file_name = os.path.basename(file)
- else:
- file_data = file
- with tempfile.NamedTemporaryFile(delete=False) as tmp:
- tmp.write(file_data)
- tmp_path = tmp.name
- try:
- request = CreateFileRequest.builder() \
- .request_body(CreateFileRequestBody.builder()
- .file_type(file_type)
- .file_name(file_name)
- .file(open(tmp_path, "rb"))
- .build()) \
- .build()
- response = self.client.im.v1.file.create(request)
- if not response.success():
- raise Exception(f"上传文件失败: {response.msg}")
- return response.data.file_key
- finally:
- os.unlink(tmp_path)
- def send_file(
- self,
- to: str,
- file: Union[bytes, str],
- file_name: str,
- reply_to_message_id: Optional[str] = None,
- ) -> SendResult:
- """
- 发送文件消息
- Args:
- to: 接收者ID
- file: 文件数据或路径
- file_name: 文件名
- reply_to_message_id: 回复的消息ID (可选)
- Returns:
- SendResult: 发送结果
- """
- file_type = self._detect_file_type(file_name)
- file_key = self.upload_file(file, file_name, file_type)
- content = json.dumps({"file_key": file_key})
- receive_id_type = self._resolve_receive_id_type(to)
- if reply_to_message_id:
- request = ReplyMessageRequest.builder() \
- .message_id(reply_to_message_id) \
- .request_body(ReplyMessageRequestBody.builder()
- .content(content)
- .msg_type("file")
- .build()) \
- .build()
- response = self.client.im.v1.message.reply(request)
- else:
- request = CreateMessageRequest.builder() \
- .receive_id_type(receive_id_type.value) \
- .request_body(CreateMessageRequestBody.builder()
- .receive_id(to)
- .content(content)
- .msg_type("file")
- .build()) \
- .build()
- response = self.client.im.v1.message.create(request)
- if not response.success():
- raise Exception(f"发送文件失败: {response.msg}")
- return SendResult(
- message_id=response.data.message_id,
- chat_id=response.data.chat_id
- )
- def download_message_resource(
- self,
- message_id: str,
- file_key: str,
- resource_type: str = "file"
- ) -> bytes:
- """
- 下载消息中的资源文件
- Args:
- message_id: 消息ID
- file_key: 文件 key
- resource_type: 资源类型 ("image" 或 "file")
- Returns:
- bytes: 文件数据
- """
- request = GetMessageResourceRequest.builder() \
- .message_id(message_id) \
- .file_key(file_key) \
- .type(resource_type) \
- .build()
- response = self.client.im.v1.message_resource.get(request)
- if not response.success():
- raise Exception(f"下载资源失败: {response.msg}")
- return response.file.read()
- # ==================== 消息获取 ====================
- def get_message(self, message_id: str) -> Optional[Dict]:
- """
- 获取消息详情
- Args:
- message_id: 消息ID
- Returns:
- Dict: 消息详情,失败返回 None
- """
- request = GetMessageRequest.builder() \
- .message_id(message_id) \
- .build()
- response = self.client.im.v1.message.get(request)
- if not response.success():
- return None
- items = response.data.items
- if not items:
- return None
- item = items[0]
- content = item.body.content if item.body else ""
- # 解析文本内容
- try:
- parsed = json.loads(content)
- if item.msg_type == "text" and "text" in parsed:
- content = parsed["text"]
- except:
- pass
- return {
- "message_id": item.message_id,
- "chat_id": item.chat_id,
- "sender_id": item.sender.id if item.sender else None,
- "sender_type": item.sender.sender_type if item.sender else None,
- "content": content,
- "content_type": item.msg_type,
- "create_time": item.create_time,
- }
- def get_message_list(self, chat_id: str, start_time: Optional[Union[str, int]] = None, end_time: Optional[Union[str, int]] = None, page_size: int = 20, page_token: Optional[str] = None) -> Optional[Dict]:
- """
- 获取消息列表
- Args:
- chat_id: 会话 ID
- start_time: 起始时间 (可选)
- end_time: 结束时间 (可选)
- page_size: 分页大小 (默认 20)
- page_token: 分页令牌 (可选)
- Returns:
- Dict: 包含消息列表和分页信息,失败返回 None
- """
- builder = ListMessageRequest.builder() \
- .container_id_type("chat") \
- .container_id(chat_id) \
- .sort_type("ByCreateTimeDesc") \
- .page_size(page_size)
- if start_time is not None:
- builder.start_time(str(start_time))
- if end_time is not None:
- builder.end_time(str(end_time))
- if page_token:
- builder.page_token(page_token)
- request = builder.build()
- # 发起请求
- response: ListMessageResponse = self.client.im.v1.message.list(request)
- # 处理失败返回
- if not response.success():
- logger.error(
- f"client.im.v1.message.list failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}")
- return None
- # 构建返回结果
- messages = []
- if response.data.items:
- for item in response.data.items:
- content = item.body.content if item.body else ""
- # 解析文本内容
- try:
- parsed = json.loads(content)
- if item.msg_type == "text" and "text" in parsed:
- content = parsed["text"]
- except:
- pass
- messages.append({
- "message_id": item.message_id,
- "chat_id": item.chat_id,
- "sender_id": item.sender.id if item.sender else None,
- "sender_type": item.sender.sender_type if item.sender else None,
- "content": content,
- "content_type": item.msg_type,
- "create_time": item.create_time,
- })
- return {
- "items": messages,
- "page_token": response.data.page_token,
- "has_more": response.data.has_more
- }
- # ==================== 用户信息 ====================
- def get_user_info(self, open_id: str) -> Optional[Dict]:
- """
- 获取用户信息
- Args:
- open_id: 用户 open_id
- Returns:
- Dict: 用户信息,失败返回 None
- """
- # 检查缓存
- if open_id in self._sender_name_cache:
- return {"name": self._sender_name_cache[open_id]}
- request = GetUserRequest.builder() \
- .user_id(open_id) \
- .user_id_type("open_id") \
- .build()
- response = self.client.contact.v3.user.get(request)
- if not response.success():
- return None
- user = response.data.user
- name = user.name or user.en_name or user.nickname
- if name:
- self._sender_name_cache[open_id] = name
- return {
- "open_id": user.open_id,
- "user_id": user.user_id,
- "name": name,
- "en_name": user.en_name,
- "nickname": user.nickname,
- "email": user.email,
- "mobile": user.mobile,
- "avatar": user.avatar.avatar_origin if user.avatar else None,
- }
- # ==================== WebSocket 监听 ====================
- def start_websocket(
- self,
- on_message: Callable[[FeishuMessageEvent], None],
- on_bot_added: Optional[Callable[[str], None]] = None,
- on_bot_removed: Optional[Callable[[str], None]] = None,
- blocking: bool = True,
- ):
- """
- 启动 WebSocket 监听消息
- Args:
- on_message: 消息回调函数
- on_bot_added: 机器人被添加到群的回调 (可选)
- on_bot_removed: 机器人被移出群的回调 (可选)
- blocking: 是否阻塞当前线程
- """
- # 创建事件处理器
- # 注意: lark-oapi SDK 的回调函数只接受一个参数 (data)
- event_handler = lark.EventDispatcherHandler.builder(
- self.encrypt_key or "",
- self.verification_token or ""
- ).register_p2_im_message_receive_v1(
- lambda data: self._handle_message_event(data, on_message)
- )
- if on_bot_added:
- event_handler = event_handler.register_p2_im_chat_member_bot_added_v1(
- lambda data: on_bot_added(data.event.chat_id)
- )
- if on_bot_removed:
- event_handler = event_handler.register_p2_im_chat_member_bot_deleted_v1(
- lambda data: on_bot_removed(data.event.chat_id)
- )
- handler = event_handler.build()
- # 创建 WebSocket 客户端
- ws_client = lark.ws.Client(
- self.app_id,
- self.app_secret,
- event_handler=handler,
- domain=lark.FEISHU_DOMAIN if self.domain == FeishuDomain.FEISHU else lark.LARK_DOMAIN,
- log_level=lark.LogLevel.INFO,
- )
- logger.info("启动飞书 WebSocket 监听...")
- if blocking:
- ws_client.start()
- else:
- thread = threading.Thread(target=ws_client.start, daemon=True)
- thread.start()
- return thread
- def _handle_message_event(
- self,
- data,
- callback: Callable[[FeishuMessageEvent], None]
- ):
- """处理消息事件"""
- try:
- # data 结构: P2ImMessageReceiveV1 对象
- # data.event 包含实际的事件数据
- event = data.event
- msg = event.message
- sender = event.sender
- # 解析消息内容
- content = self._parse_message_content(msg.content, msg.message_type)
- # 检查是否 @了机器人
- mentioned_bot = self._check_bot_mentioned(msg.mentions)
- # 去除 @机器人 的文本
- if msg.mentions:
- content = self._strip_bot_mention(content, msg.mentions)
- # 构建事件对象
- message_event = FeishuMessageEvent(
- message_id=msg.message_id,
- chat_id=msg.chat_id,
- chat_type=ChatType(msg.chat_type),
- content=content,
- content_type=msg.message_type,
- sender_open_id=sender.sender_id.open_id if sender.sender_id else "",
- sender_user_id=sender.sender_id.user_id if sender.sender_id else None,
- root_id=msg.root_id,
- parent_id=msg.parent_id,
- mentions=[], # 简化处理
- mentioned_bot=mentioned_bot,
- )
- # 尝试获取发送者名称 (可能会失败,不影响主流程)
- try:
- if message_event.sender_open_id:
- user_info = self.get_user_info(message_event.sender_open_id)
- if user_info:
- message_event.sender_name = user_info.get("name")
- except Exception as e:
- logger.debug(f"获取用户信息失败: {e}")
- callback(message_event)
- except Exception as e:
- logger.error(f"处理消息事件失败: {e}", exc_info=True)
- # ==================== 辅助方法 ====================
- def _resolve_receive_id_type(self, receive_id: str) -> ReceiveIdType:
- """推断接收者ID类型"""
- if receive_id.startswith("ou_"):
- return ReceiveIdType.OPEN_ID
- elif receive_id.startswith("on_"):
- return ReceiveIdType.UNION_ID
- elif receive_id.startswith("oc_"):
- return ReceiveIdType.CHAT_ID
- elif "@" in receive_id:
- return ReceiveIdType.EMAIL
- else:
- return ReceiveIdType.USER_ID
- def _parse_message_content(self, content: str, message_type: str) -> str:
- """解析消息内容"""
- try:
- parsed = json.loads(content)
- if message_type == "text":
- return parsed.get("text", "")
- elif message_type == "post":
- return self._parse_post_content(parsed)
- return content
- except:
- return content
- def _parse_post_content(self, parsed: Dict) -> str:
- """解析富文本消息"""
- title = parsed.get("title", "")
- content_blocks = parsed.get("content", [])
- text_parts = [title] if title else []
- for paragraph in content_blocks:
- if isinstance(paragraph, list):
- for element in paragraph:
- if element.get("tag") == "text":
- text_parts.append(element.get("text", ""))
- elif element.get("tag") == "a":
- text_parts.append(element.get("text", element.get("href", "")))
- elif element.get("tag") == "at":
- text_parts.append(f"@{element.get('user_name', '')}")
- return "\n".join(text_parts).strip() or "[富文本消息]"
- def _check_bot_mentioned(self, mentions: Optional[List]) -> bool:
- """检查是否 @了机器人"""
- if not mentions:
- return False
- if not self._bot_open_id:
- # 如果没有缓存机器人 open_id,假设有 mention 就是 @了机器人
- return len(mentions) > 0
- return any(m.id.open_id == self._bot_open_id for m in mentions)
- def _strip_bot_mention(self, text: str, mentions: List) -> str:
- """去除 @机器人 的文本"""
- result = text
- for mention in mentions:
- name = mention.name if hasattr(mention, 'name') else ""
- key = mention.key if hasattr(mention, 'key') else ""
- if name:
- result = result.replace(f"@{name}", "").strip()
- if key:
- result = result.replace(key, "").strip()
- return result
- def _detect_file_type(self, file_name: str) -> str:
- """检测文件类型"""
- ext = os.path.splitext(file_name)[1].lower()
- type_map = {
- ".opus": "opus", ".ogg": "opus",
- ".mp4": "mp4", ".mov": "mp4", ".avi": "mp4",
- ".pdf": "pdf",
- ".doc": "doc", ".docx": "doc",
- ".xls": "xls", ".xlsx": "xls",
- ".ppt": "ppt", ".pptx": "ppt",
- }
- return type_map.get(ext, "stream")
- # ==================== 使用示例 ====================
- if __name__ == "__main__":
- # 从环境变量获取配置
- APP_ID = os.getenv("FEISHU_APP_ID", "cli_a90fe317987a9cc9")
- APP_SECRET = os.getenv("FEISHU_APP_SECRET", "nn2dWuXTiRA2N6xodbm4g0qz1AfM2ayi")
- if not APP_ID or not APP_SECRET:
- print("请设置环境变量 FEISHU_APP_ID 和 FEISHU_APP_SECRET")
- exit(1)
- # 创建客户端
- client = FeishuClient(
- app_id=APP_ID,
- app_secret=APP_SECRET,
- domain=FeishuDomain.FEISHU,
- )
- # 消息处理回调
- def handle_message(event: FeishuMessageEvent):
- print(f"\n收到消息:")
- print(f" 发送者: {event.sender_name or event.sender_open_id}")
- print(f" 类型: {event.chat_type.value}")
- print(f" 内容: {event.content}")
- print(f" @机器人: {event.mentioned_bot}")
- # 自动回复示例
- if event.chat_type == ChatType.P2P or event.mentioned_bot:
- # 先回复文字
- reply_text = f"收到你的消息: {event.content}"
- chat_id = event.chat_id
- content = event.content
- content_type = event.content_type # image、text等
- open_id = event.sender_open_id
- client.send_message(
- to=event.chat_id,
- text=reply_text,
- reply_to_message_id=event.message_id
- )
- print(f" 已回复文字: {reply_text}")
- # 再回复一张图片 (读取当前目录下的 hanli.png)
- try:
- image_path = os.path.join(os.path.dirname(__file__) or ".", "hanli.png")
- if os.path.exists(image_path):
- client.send_image(
- to=event.chat_id,
- image=image_path,
- )
- print(f" 已回复图片: {image_path}")
- else:
- print(f" 图片不存在: {image_path}")
- except Exception as e:
- print(f" 回复图片失败: {e}")
- # 启动 WebSocket 监听
- print("启动飞书消息监听...")
- print("按 Ctrl+C 退出")
- try:
- client.start_websocket(
- on_message=handle_message,
- on_bot_added=lambda chat_id: print(f"机器人被添加到群: {chat_id}"),
- on_bot_removed=lambda chat_id: print(f"机器人被移出群: {chat_id}"),
- blocking=True
- )
- # res = client.get_message_list(chat_id='oc_56e85f0e2c97405d176729b62d8f56e5', start_time=0, end_time=1770623620)
- # print(f"获取消息列表结果: {json.dumps(res, indent=4, ensure_ascii=False)}")
- except KeyboardInterrupt:
- print("\n退出")
|