from __future__ import annotations import logging from collections.abc import Mapping, Sequence from typing import Any, Literal, assert_never import httpx from gateway.core.channels.feishu.types import ( FeishuReplyContext, IncomingFeishuEvent, mapping_to_feishu_event, ) logger = logging.getLogger(__name__) # 飞书开放平台 reaction emoji_type,与 openclaw-lark FeishuEmoji.TYPING 一致 TYPING_REACTION_EMOJI = "Typing" class WebhookParseError(ValueError): pass class FeishuHttpAdapterClient: """调用本仓库 Feishu 独立 HTTP 适配服务(默认端口 4380)。""" def __init__(self, base_url: str, *, timeout: float = 120.0) -> None: self._base = base_url.rstrip("/") self._timeout = timeout async def post_json(self, path: str, body: Mapping[str, Any]) -> dict[str, Any]: url = f"{self._base}{path if path.startswith('/') else '/' + path}" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.post(url, json=dict(body)) try: data = resp.json() except Exception: data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code} if not isinstance(data, dict): return {"ok": False, "error": "unexpected_response_shape"} if resp.status_code >= 400 and "ok" not in data: data = {**data, "ok": False, "status_code": resp.status_code} return data async def get_json(self, path: str) -> dict[str, Any]: url = f"{self._base}{path if path.startswith('/') else '/' + path}" async with httpx.AsyncClient(timeout=self._timeout) as client: resp = await client.get(url) try: data = resp.json() except Exception: return {"ok": False, "error": "invalid_json", "status_code": resp.status_code} return data if isinstance(data, dict) else {"ok": False, "error": "unexpected_response_shape"} FeishuHttpMessageKind = Literal["text", "card", "media", "raw"] class FeishuConnector: """ 飞书连接器:解析 Feishu HTTP 适配层规范化入站事件,经适配服务发送消息。 入站 JSON 校验/解析见 `parse_feishu_inbound_event`(不依赖 HTTP 客户端,可按类调用)。 内部使用 ``FeishuHttpAdapterClient`` 访问 Node 适配器;不持有 app_secret,鉴权与 token 均在 Node 侧完成。 发送能力与适配器 POST ``/feishu/send-message`` 一致: ``text``(富文本 post)、``card``(交互卡片)、``media``(URL 上传后按类型发送)、 ``raw``(显式 ``msg_type`` + JSON ``content``,覆盖 text/post/image/分享名片/贴纸等开放平台类型)。 """ @staticmethod def parse_feishu_inbound_event(body: Mapping[str, Any]) -> IncomingFeishuEvent: """ 校验并解析 Feishu HTTP 适配层 ``forwardEventToGateway`` 写入的 JSON。 支持 event_type: message | reaction | card_action(与适配服务 server.ts 一致)。 """ if not body: raise WebhookParseError("empty body") event_type = body.get("event_type") if not isinstance(event_type, str) or not event_type: raise WebhookParseError("missing or invalid event_type") app_id = body.get("app_id") if not isinstance(app_id, str) or not app_id: raise WebhookParseError("missing or invalid app_id") event = mapping_to_feishu_event(body) if event_type == "message": if not event.chat_id: raise WebhookParseError("message event requires chat_id") if event.message_id is None or event.message_id == "": raise WebhookParseError("message event requires message_id") elif event_type == "reaction": if not event.chat_id: raise WebhookParseError("reaction event requires chat_id") if not event.message_id: raise WebhookParseError("reaction event requires message_id") elif event_type == "card_action": pass else: pass return event def __init__( self, *, feishu_http_base_url: str = "http://127.0.0.1:4380", timeout: float = 120.0, ) -> None: self._http = FeishuHttpAdapterClient(feishu_http_base_url, timeout=timeout) def handle_webhook(self, event: Mapping[str, Any]) -> dict[str, Any]: """校验 payload(对应 channels.md 的 handle_webhook)。""" try: ev: IncomingFeishuEvent = self.parse_feishu_inbound_event(event) return {"ok": True, "parsed": True, "event_type": ev.event_type} except WebhookParseError as e: return {"ok": False, "error": str(e)} def parse_normalized_event(self, payload: Mapping[str, Any]) -> IncomingFeishuEvent: return self.parse_feishu_inbound_event(payload) async def send_message( self, user_id: str, text: str, *, reply_context: FeishuReplyContext | None = None, ) -> dict[str, Any]: """ channels.md 签名兼容:user_id 未使用时需传入 reply_context。 实际发送依赖 chat_id(与 server.ts /feishu/send-message 一致)。 """ if reply_context is None: return {"ok": False, "error": "reply_context_required_for_feishu_adapter"} return await self.send_text(reply_context, text) def _send_message_base( self, ctx: FeishuReplyContext, *, reply_in_thread: bool | None = None, ) -> dict[str, Any]: body: dict[str, Any] = {"chat_id": ctx.chat_id} if ctx.account_id: body["account_id"] = ctx.account_id if ctx.message_id: body["reply_to_message_id"] = ctx.message_id if reply_in_thread is not None: body["reply_in_thread"] = reply_in_thread return body async def post_send_message(self, body: Mapping[str, Any]) -> dict[str, Any]: """POST ``/feishu/send-message``;``body`` 需含 ``kind`` 及对应字段(与适配器一致)。""" payload = dict(body) result = await self._http.post_json("/feishu/send-message", payload) if not result.get("ok"): logger.warning("feishu send-message failed: %s", result) return result async def send_text( self, ctx: FeishuReplyContext, text: str, *, reply_in_thread: bool | None = None, ) -> dict[str, Any]: body = self._send_message_base(ctx, reply_in_thread=reply_in_thread) body["kind"] = "text" body["text"] = text return await self.post_send_message(body) async def send_card( self, ctx: FeishuReplyContext, card: Mapping[str, Any], *, reply_in_thread: bool | None = None, ) -> dict[str, Any]: """发送交互卡片(``msg_type: interactive``)。""" body = self._send_message_base(ctx, reply_in_thread=reply_in_thread) body["kind"] = "card" body["card"] = dict(card) return await self.post_send_message(body) async def send_media( self, ctx: FeishuReplyContext, media_url: str, *, reply_in_thread: bool | None = None, media_local_roots: Sequence[str] | None = None, file_name: str | None = None, ) -> dict[str, Any]: """ 发送图片 / 视频 / 普通文件 / 音频(opus/mp4 等由适配器根据扩展名与内容判定)。 ``media_url`` 可为 http(s) 或适配器允许的本地路径;本地路径需配合服务端 ``media_local_roots`` 白名单。 """ body = self._send_message_base(ctx, reply_in_thread=reply_in_thread) body["kind"] = "media" body["media_url"] = media_url if media_local_roots is not None: body["media_local_roots"] = list(media_local_roots) if file_name: body["file_name"] = file_name return await self.post_send_message(body) async def send_raw_im( self, ctx: FeishuReplyContext, msg_type: str, content: str | Mapping[str, Any], *, reply_in_thread: bool | None = None, uuid: str | None = None, ) -> dict[str, Any]: """ 使用机器人身份发送任意已支持的 ``msg_type``,``content`` 为合法 JSON 字符串或对象。 常用示例:``text`` → ``{"text": "你好"}``;``share_chat`` → ``{"chat_id": "oc_xxx"}``; ``image`` → ``{"image_key": "img_xxx"}``(需先上传)。具体格式以飞书 IM 开放文档为准。 """ body = self._send_message_base(ctx, reply_in_thread=reply_in_thread) body["kind"] = "raw" body["msg_type"] = msg_type body["content"] = content if isinstance(content, str) else dict(content) if uuid: body["uuid"] = uuid return await self.post_send_message(body) async def send( self, ctx: FeishuReplyContext, *, kind: FeishuHttpMessageKind = "text", reply_in_thread: bool | None = None, text: str | None = None, card: Mapping[str, Any] | None = None, media_url: str | None = None, media_local_roots: Sequence[str] | None = None, file_name: str | None = None, msg_type: str | None = None, content: str | Mapping[str, Any] | None = None, uuid: str | None = None, ) -> dict[str, Any]: """与适配器 ``kind`` 对齐的统一发送入口。""" if kind == "text": if not text: return {"ok": False, "error": "missing_text"} return await self.send_text(ctx, text, reply_in_thread=reply_in_thread) if kind == "card": if card is None: return {"ok": False, "error": "missing_card"} return await self.send_card(ctx, card, reply_in_thread=reply_in_thread) if kind == "media": if not media_url: return {"ok": False, "error": "missing_media_url"} return await self.send_media( ctx, media_url, reply_in_thread=reply_in_thread, media_local_roots=media_local_roots, file_name=file_name, ) if kind == "raw": if not msg_type or content is None: return {"ok": False, "error": "raw_requires_msg_type_and_content"} return await self.send_raw_im( ctx, msg_type, content, reply_in_thread=reply_in_thread, uuid=uuid, ) assert_never(kind) async def list_feishu_app_accounts(self) -> dict[str, Any]: """ 拉取适配器 GET `/accounts`:已启用、已配置的飞书应用(机器人)列表。 与终端用户 open_id 无关;若要用户姓名/头像等需走开放平台 contact 等接口。 """ try: data = await self._http.get_json("/accounts") if data.get("ok") and "accounts" in data: return {"ok": True, "accounts": data.get("accounts")} except Exception as e: logger.debug("list_feishu_app_accounts: %s", e) return {"ok": True, "accounts": []} async def get_user_info(self, user_id: str) -> dict[str, Any]: """ 飞书「用户」个人资料:当前 HTTP 适配器未暴露联系人查询,仅回传占位。 已配置应用列表请用 `list_feishu_app_accounts()`(勿与 `/accounts` 语义混淆)。 """ return {"ok": True, "user_id": user_id, "profile": None} async def add_message_reaction( self, message_id: str, emoji: str, *, account_id: str | None = None, ) -> dict[str, Any]: """POST ``/feishu/react``,为消息添加机器人表情(``emoji`` 为开放平台 emoji_type,如 ``Typing``)。""" body: dict[str, Any] = { "action": "add", "message_id": message_id, "emoji": emoji, } if account_id: body["account_id"] = account_id return await self._http.post_json("/feishu/react", body) async def remove_bot_reactions_for_emoji( self, message_id: str, emoji: str, *, account_id: str | None = None, ) -> dict[str, Any]: """移除该消息上本机器人添加的指定 ``emoji`` 表情(适配器 ``action: remove``)。""" body: dict[str, Any] = { "action": "remove", "message_id": message_id, "emoji": emoji, } if account_id: body["account_id"] = account_id return await self._http.post_json("/feishu/react", body)