| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- from __future__ import annotations
- import logging
- from collections.abc import Mapping, Sequence
- from typing import Any, Literal, assert_never
- import httpx
- from gateway.core.channels.feishu.types import (
- FeishuReplyContext,
- IncomingFeishuEvent,
- mapping_to_feishu_event,
- )
- logger = logging.getLogger(__name__)
- # 飞书开放平台 reaction emoji_type,与 openclaw-lark FeishuEmoji.TYPING 一致
- TYPING_REACTION_EMOJI = "Typing"
- class WebhookParseError(ValueError):
- pass
- class FeishuHttpAdapterClient:
- """调用本仓库 Feishu 独立 HTTP 适配服务(默认端口 4380)。"""
- def __init__(self, base_url: str, *, timeout: float = 120.0) -> None:
- self._base = base_url.rstrip("/")
- self._timeout = timeout
- async def post_json(self, path: str, body: Mapping[str, Any]) -> dict[str, Any]:
- url = f"{self._base}{path if path.startswith('/') else '/' + path}"
- async with httpx.AsyncClient(timeout=self._timeout) as client:
- resp = await client.post(url, json=dict(body))
- try:
- data = resp.json()
- except Exception:
- data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
- if not isinstance(data, dict):
- return {"ok": False, "error": "unexpected_response_shape"}
- if resp.status_code >= 400 and "ok" not in data:
- data = {**data, "ok": False, "status_code": resp.status_code}
- return data
- async def get_json(self, path: str) -> dict[str, Any]:
- url = f"{self._base}{path if path.startswith('/') else '/' + path}"
- async with httpx.AsyncClient(timeout=self._timeout) as client:
- resp = await client.get(url)
- try:
- data = resp.json()
- except Exception:
- return {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
- return data if isinstance(data, dict) else {"ok": False, "error": "unexpected_response_shape"}
- FeishuHttpMessageKind = Literal["text", "card", "media", "raw"]
- class FeishuConnector:
- """
- 飞书连接器:解析 Feishu HTTP 适配层规范化入站事件,经适配服务发送消息。
- 入站 JSON 校验/解析见 `parse_feishu_inbound_event`(不依赖 HTTP 客户端,可按类调用)。
- 内部使用 ``FeishuHttpAdapterClient`` 访问 Node 适配器;不持有 app_secret,鉴权与 token 均在 Node 侧完成。
- 发送能力与适配器 POST ``/feishu/send-message`` 一致:
- ``text``(富文本 post)、``card``(交互卡片)、``media``(URL 上传后按类型发送)、
- ``raw``(显式 ``msg_type`` + JSON ``content``,覆盖 text/post/image/分享名片/贴纸等开放平台类型)。
- """
- @staticmethod
- def parse_feishu_inbound_event(body: Mapping[str, Any]) -> IncomingFeishuEvent:
- """
- 校验并解析 Feishu HTTP 适配层 ``forwardEventToGateway`` 写入的 JSON。
- 支持 event_type: message | reaction | card_action(与适配服务 server.ts 一致)。
- """
- if not body:
- raise WebhookParseError("empty body")
- event_type = body.get("event_type")
- if not isinstance(event_type, str) or not event_type:
- raise WebhookParseError("missing or invalid event_type")
- app_id = body.get("app_id")
- if not isinstance(app_id, str) or not app_id:
- raise WebhookParseError("missing or invalid app_id")
- event = mapping_to_feishu_event(body)
- if event_type == "message":
- if not event.chat_id:
- raise WebhookParseError("message event requires chat_id")
- if event.message_id is None or event.message_id == "":
- raise WebhookParseError("message event requires message_id")
- elif event_type == "reaction":
- if not event.chat_id:
- raise WebhookParseError("reaction event requires chat_id")
- if not event.message_id:
- raise WebhookParseError("reaction event requires message_id")
- elif event_type == "card_action":
- pass
- else:
- pass
- return event
- def __init__(
- self,
- *,
- feishu_http_base_url: str = "http://127.0.0.1:4380",
- timeout: float = 120.0,
- ) -> None:
- self._http = FeishuHttpAdapterClient(feishu_http_base_url, timeout=timeout)
- def handle_webhook(self, event: Mapping[str, Any]) -> dict[str, Any]:
- """校验 payload(对应 channels.md 的 handle_webhook)。"""
- try:
- ev: IncomingFeishuEvent = self.parse_feishu_inbound_event(event)
- return {"ok": True, "parsed": True, "event_type": ev.event_type}
- except WebhookParseError as e:
- return {"ok": False, "error": str(e)}
- def parse_normalized_event(self, payload: Mapping[str, Any]) -> IncomingFeishuEvent:
- return self.parse_feishu_inbound_event(payload)
- async def send_message(
- self,
- user_id: str,
- text: str,
- *,
- reply_context: FeishuReplyContext | None = None,
- ) -> dict[str, Any]:
- """
- channels.md 签名兼容:user_id 未使用时需传入 reply_context。
- 实际发送依赖 chat_id(与 server.ts /feishu/send-message 一致)。
- """
- if reply_context is None:
- return {"ok": False, "error": "reply_context_required_for_feishu_adapter"}
- return await self.send_text(reply_context, text)
- def _send_message_base(
- self,
- ctx: FeishuReplyContext,
- *,
- reply_in_thread: bool | None = None,
- ) -> dict[str, Any]:
- body: dict[str, Any] = {"chat_id": ctx.chat_id}
- if ctx.account_id:
- body["account_id"] = ctx.account_id
- if ctx.message_id:
- body["reply_to_message_id"] = ctx.message_id
- if reply_in_thread is not None:
- body["reply_in_thread"] = reply_in_thread
- return body
- async def post_send_message(self, body: Mapping[str, Any]) -> dict[str, Any]:
- """POST ``/feishu/send-message``;``body`` 需含 ``kind`` 及对应字段(与适配器一致)。"""
- payload = dict(body)
- result = await self._http.post_json("/feishu/send-message", payload)
- if not result.get("ok"):
- logger.warning("feishu send-message failed: %s", result)
- return result
- async def send_text(
- self,
- ctx: FeishuReplyContext,
- text: str,
- *,
- reply_in_thread: bool | None = None,
- ) -> dict[str, Any]:
- body = self._send_message_base(ctx, reply_in_thread=reply_in_thread)
- body["kind"] = "text"
- body["text"] = text
- return await self.post_send_message(body)
- async def send_card(
- self,
- ctx: FeishuReplyContext,
- card: Mapping[str, Any],
- *,
- reply_in_thread: bool | None = None,
- ) -> dict[str, Any]:
- """发送交互卡片(``msg_type: interactive``)。"""
- body = self._send_message_base(ctx, reply_in_thread=reply_in_thread)
- body["kind"] = "card"
- body["card"] = dict(card)
- return await self.post_send_message(body)
- async def send_media(
- self,
- ctx: FeishuReplyContext,
- media_url: str,
- *,
- reply_in_thread: bool | None = None,
- media_local_roots: Sequence[str] | None = None,
- file_name: str | None = None,
- ) -> dict[str, Any]:
- """
- 发送图片 / 视频 / 普通文件 / 音频(opus/mp4 等由适配器根据扩展名与内容判定)。
- ``media_url`` 可为 http(s) 或适配器允许的本地路径;本地路径需配合服务端
- ``media_local_roots`` 白名单。
- """
- body = self._send_message_base(ctx, reply_in_thread=reply_in_thread)
- body["kind"] = "media"
- body["media_url"] = media_url
- if media_local_roots is not None:
- body["media_local_roots"] = list(media_local_roots)
- if file_name:
- body["file_name"] = file_name
- return await self.post_send_message(body)
- async def send_raw_im(
- self,
- ctx: FeishuReplyContext,
- msg_type: str,
- content: str | Mapping[str, Any],
- *,
- reply_in_thread: bool | None = None,
- uuid: str | None = None,
- ) -> dict[str, Any]:
- """
- 使用机器人身份发送任意已支持的 ``msg_type``,``content`` 为合法 JSON 字符串或对象。
- 常用示例:``text`` → ``{"text": "你好"}``;``share_chat`` → ``{"chat_id": "oc_xxx"}``;
- ``image`` → ``{"image_key": "img_xxx"}``(需先上传)。具体格式以飞书 IM 开放文档为准。
- """
- body = self._send_message_base(ctx, reply_in_thread=reply_in_thread)
- body["kind"] = "raw"
- body["msg_type"] = msg_type
- body["content"] = content if isinstance(content, str) else dict(content)
- if uuid:
- body["uuid"] = uuid
- return await self.post_send_message(body)
- async def send(
- self,
- ctx: FeishuReplyContext,
- *,
- kind: FeishuHttpMessageKind = "text",
- reply_in_thread: bool | None = None,
- text: str | None = None,
- card: Mapping[str, Any] | None = None,
- media_url: str | None = None,
- media_local_roots: Sequence[str] | None = None,
- file_name: str | None = None,
- msg_type: str | None = None,
- content: str | Mapping[str, Any] | None = None,
- uuid: str | None = None,
- ) -> dict[str, Any]:
- """与适配器 ``kind`` 对齐的统一发送入口。"""
- if kind == "text":
- if not text:
- return {"ok": False, "error": "missing_text"}
- return await self.send_text(ctx, text, reply_in_thread=reply_in_thread)
- if kind == "card":
- if card is None:
- return {"ok": False, "error": "missing_card"}
- return await self.send_card(ctx, card, reply_in_thread=reply_in_thread)
- if kind == "media":
- if not media_url:
- return {"ok": False, "error": "missing_media_url"}
- return await self.send_media(
- ctx,
- media_url,
- reply_in_thread=reply_in_thread,
- media_local_roots=media_local_roots,
- file_name=file_name,
- )
- if kind == "raw":
- if not msg_type or content is None:
- return {"ok": False, "error": "raw_requires_msg_type_and_content"}
- return await self.send_raw_im(
- ctx,
- msg_type,
- content,
- reply_in_thread=reply_in_thread,
- uuid=uuid,
- )
- assert_never(kind)
- async def list_feishu_app_accounts(self) -> dict[str, Any]:
- """
- 拉取适配器 GET `/accounts`:已启用、已配置的飞书应用(机器人)列表。
- 与终端用户 open_id 无关;若要用户姓名/头像等需走开放平台 contact 等接口。
- """
- try:
- data = await self._http.get_json("/accounts")
- if data.get("ok") and "accounts" in data:
- return {"ok": True, "accounts": data.get("accounts")}
- except Exception as e:
- logger.debug("list_feishu_app_accounts: %s", e)
- return {"ok": True, "accounts": []}
- async def get_user_info(self, user_id: str) -> dict[str, Any]:
- """
- 飞书「用户」个人资料:当前 HTTP 适配器未暴露联系人查询,仅回传占位。
- 已配置应用列表请用 `list_feishu_app_accounts()`(勿与 `/accounts` 语义混淆)。
- """
- return {"ok": True, "user_id": user_id, "profile": None}
- async def add_message_reaction(
- self,
- message_id: str,
- emoji: str,
- *,
- account_id: str | None = None,
- ) -> dict[str, Any]:
- """POST ``/feishu/react``,为消息添加机器人表情(``emoji`` 为开放平台 emoji_type,如 ``Typing``)。"""
- body: dict[str, Any] = {
- "action": "add",
- "message_id": message_id,
- "emoji": emoji,
- }
- if account_id:
- body["account_id"] = account_id
- return await self._http.post_json("/feishu/react", body)
- async def remove_bot_reactions_for_emoji(
- self,
- message_id: str,
- emoji: str,
- *,
- account_id: str | None = None,
- ) -> dict[str, Any]:
- """移除该消息上本机器人添加的指定 ``emoji`` 表情(适配器 ``action: remove``)。"""
- body: dict[str, Any] = {
- "action": "remove",
- "message_id": message_id,
- "emoji": emoji,
- }
- if account_id:
- body["account_id"] = account_id
- return await self._http.post_json("/feishu/react", body)
|