kevin.yang 4 dagar sedan
förälder
incheckning
d10358d769

+ 3 - 1
docker-compose.yml

@@ -12,7 +12,7 @@ services:
       - .env
     environment:
       - FEISHU_HTTP_PORT=4380
-      - GATEWAY_FEISHU_WEBHOOK_URL=http://localhost:8000/api/channels/feishu/openclaw/webhook
+      - GATEWAY_FEISHU_WEBHOOK_URL=http://gateway:8000/api/channels/feishu/inbound/webhook
     ports:
       - "4380:4380"
     networks:
@@ -33,6 +33,8 @@ services:
       - workspace_shared:/root/.gateway/shared
       # 将宿主机的 docker.sock 暴露给 gateway,用于创建 workspace 容器
       - /var/run/docker.sock:/var/run/docker.sock:ro
+    ports:
+      - "8000:8000"
     networks:
       - agent
 

+ 28 - 10
gateway/core/channels/__init__.py

@@ -1,21 +1,39 @@
 """
-Gateway Channels:外部渠道接入(个人助理型飞书等)
+Gateway Channels:外部渠道接入。
 
-与 openclaw-lark-patch HTTP 服务配合:`GATEWAY_FEISHU_WEBHOOK_URL` 指向
-`POST /api/channels/feishu/openclaw/webhook`。
+HTTP 路由由 ``gateway.core.channels.feishu.manager.build_channels_api_router`` 定义;
+应用入口 ``gateway_server`` 对其 ``include_router``(如
+``/api/channels/feishu/inbound/webhook``,``GATEWAY_FEISHU_WEBHOOK_URL``)。
 """
 
-# from gateway.core.channels.channel_manager import ChannelManager, FeishuChannelConfig
 from gateway.core.channels.feishu.connector import FeishuConnector
-# from gateway.core.channels.router import MessageRouter, RouteResult
-from gateway.core.channels.types import FeishuReplyContext, IncomingFeishuEvent
+from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
+from gateway.core.channels.feishu.manager import (
+    FeishuChannelConfig,
+    FeishuChannelManager,
+    channel_manager_from_env,
+)
+from gateway.core.channels.manager import ChannelRegistry
+from gateway.core.channels.router import ChannelTraceRouter
+from gateway.core.channels.feishu.router import FeishuMessageRouter
+from gateway.core.channels.types import CHANNEL_FEISHU, CHANNEL_WECHAT, RouteResult
+
+ChannelManager = FeishuChannelManager
+MessageRouter = FeishuMessageRouter
 
 __all__ = [
-    # "ChannelManager",
-    # "FeishuChannelConfig",
+    "CHANNEL_FEISHU",
+    "CHANNEL_WECHAT",
+    "ChannelManager",
+    "ChannelRegistry",
+    "ChannelTraceRouter",
+    "FeishuChannelConfig",
+    "FeishuChannelManager",
     "FeishuConnector",
+    "FeishuMessageRouter",
     "FeishuReplyContext",
     "IncomingFeishuEvent",
-    # "MessageRouter",
-    # "RouteResult",
+    "MessageRouter",
+    "RouteResult",
+    "channel_manager_from_env",
 ]

+ 1 - 1
gateway/core/channels/backends/echo_executor.py

@@ -4,7 +4,7 @@ import logging
 import uuid
 from typing import TYPE_CHECKING, Any
 
-from gateway.core.channels.types import FeishuReplyContext, IncomingFeishuEvent
+from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
 
 if TYPE_CHECKING:
     from gateway.core.channels.feishu.connector import FeishuConnector

+ 1 - 1
gateway/core/channels/backends/user_id.py

@@ -1,6 +1,6 @@
 from __future__ import annotations
 
-from gateway.core.channels.types import IncomingFeishuEvent
+from gateway.core.channels.feishu.types import IncomingFeishuEvent
 
 
 class DefaultUserIdentityResolver:

+ 31 - 1
gateway/core/channels/feishu/__init__.py

@@ -1,3 +1,33 @@
 from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.manager import (
+    FeishuChannelConfig,
+    FeishuChannelManager,
+    build_channels_api_router,
+    channel_manager_from_env,
+)
+from gateway.core.channels.feishu.router import (
+    FeishuExecutorBackend,
+    FeishuMessageRouter,
+    FeishuUserIdentityResolver,
+)
+from gateway.core.channels.feishu.types import (
+    FeishuReplyContext,
+    IncomingFeishuEvent,
+    feishu_event_to_mapping,
+    mapping_to_feishu_event,
+)
 
-__all__ = ["FeishuConnector"]
+__all__ = [
+    "FeishuChannelConfig",
+    "FeishuChannelManager",
+    "FeishuConnector",
+    "FeishuExecutorBackend",
+    "FeishuMessageRouter",
+    "build_channels_api_router",
+    "FeishuReplyContext",
+    "FeishuUserIdentityResolver",
+    "IncomingFeishuEvent",
+    "channel_manager_from_env",
+    "feishu_event_to_mapping",
+    "mapping_to_feishu_event",
+]

+ 0 - 41
gateway/core/channels/feishu/api.py

@@ -1,41 +0,0 @@
-from __future__ import annotations
-
-import logging
-from collections.abc import Mapping
-from typing import Any
-
-import httpx
-
-logger = logging.getLogger(__name__)
-
-
-class FeishuHttpAdapterClient:
-    """调用 openclaw-lark-patch HTTP 服务(默认端口 4380)。"""
-
-    def __init__(self, base_url: str, *, timeout: float = 120.0) -> None:
-        self._base = base_url.rstrip("/")
-        self._timeout = timeout
-
-    async def post_json(self, path: str, body: Mapping[str, Any]) -> dict[str, Any]:
-        url = f"{self._base}{path if path.startswith('/') else '/' + path}"
-        async with httpx.AsyncClient(timeout=self._timeout) as client:
-            resp = await client.post(url, json=dict(body))
-            try:
-                data = resp.json()
-            except Exception:
-                data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
-            if not isinstance(data, dict):
-                return {"ok": False, "error": "unexpected_response_shape"}
-            if resp.status_code >= 400 and "ok" not in data:
-                data = {**data, "ok": False, "status_code": resp.status_code}
-            return data
-
-    async def get_json(self, path: str) -> dict[str, Any]:
-        url = f"{self._base}{path if path.startswith('/') else '/' + path}"
-        async with httpx.AsyncClient(timeout=self._timeout) as client:
-            resp = await client.get(url)
-            try:
-                data = resp.json()
-            except Exception:
-                return {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
-            return data if isinstance(data, dict) else {"ok": False, "error": "unexpected_response_shape"}

+ 86 - 8
gateway/core/channels/feishu/connector.py

@@ -4,26 +4,104 @@ import logging
 from collections.abc import Mapping, Sequence
 from typing import Any, Literal, assert_never
 
-from gateway.core.channels.feishu.api import FeishuHttpAdapterClient
-from gateway.core.channels.feishu.webhook import WebhookParseError, parse_openclaw_normalized
-from gateway.core.channels.types import FeishuReplyContext, IncomingFeishuEvent
+import httpx
+
+from gateway.core.channels.feishu.types import (
+    FeishuReplyContext,
+    IncomingFeishuEvent,
+    mapping_to_feishu_event,
+)
 
 logger = logging.getLogger(__name__)
 
+
+class WebhookParseError(ValueError):
+    pass
+
+
+class FeishuHttpAdapterClient:
+    """调用本仓库 Feishu 独立 HTTP 适配服务(默认端口 4380)。"""
+
+    def __init__(self, base_url: str, *, timeout: float = 120.0) -> None:
+        self._base = base_url.rstrip("/")
+        self._timeout = timeout
+
+    async def post_json(self, path: str, body: Mapping[str, Any]) -> dict[str, Any]:
+        url = f"{self._base}{path if path.startswith('/') else '/' + path}"
+        async with httpx.AsyncClient(timeout=self._timeout) as client:
+            resp = await client.post(url, json=dict(body))
+            try:
+                data = resp.json()
+            except Exception:
+                data = {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
+            if not isinstance(data, dict):
+                return {"ok": False, "error": "unexpected_response_shape"}
+            if resp.status_code >= 400 and "ok" not in data:
+                data = {**data, "ok": False, "status_code": resp.status_code}
+            return data
+
+    async def get_json(self, path: str) -> dict[str, Any]:
+        url = f"{self._base}{path if path.startswith('/') else '/' + path}"
+        async with httpx.AsyncClient(timeout=self._timeout) as client:
+            resp = await client.get(url)
+            try:
+                data = resp.json()
+            except Exception:
+                return {"ok": False, "error": "invalid_json", "status_code": resp.status_code}
+            return data if isinstance(data, dict) else {"ok": False, "error": "unexpected_response_shape"}
+
+
 FeishuHttpMessageKind = Literal["text", "card", "media", "raw"]
 
 
 class FeishuConnector:
     """
-    飞书连接器:解析 openclaw 规范化事件,经 Feishu HTTP 适配器发送消息。
+    飞书连接器:解析 Feishu HTTP 适配层规范化入站事件,经适配服务发送消息。
 
-    不持有 app_secret;鉴权与 token 均在 Node 侧完成。
+    入站 JSON 校验/解析见 `parse_feishu_inbound_event`(不依赖 HTTP 客户端,可按类调用)。
+
+    内部使用 ``FeishuHttpAdapterClient`` 访问 Node 适配器;不持有 app_secret,鉴权与 token 均在 Node 侧完成。
 
     发送能力与适配器 POST ``/feishu/send-message`` 一致:
     ``text``(富文本 post)、``card``(交互卡片)、``media``(URL 上传后按类型发送)、
     ``raw``(显式 ``msg_type`` + JSON ``content``,覆盖 text/post/image/分享名片/贴纸等开放平台类型)。
     """
 
+    @staticmethod
+    def parse_feishu_inbound_event(body: Mapping[str, Any]) -> IncomingFeishuEvent:
+        """
+        校验并解析 Feishu HTTP 适配层 ``forwardEventToGateway`` 写入的 JSON。
+
+        支持 event_type: message | reaction | card_action(与适配服务 server.ts 一致)。
+        """
+        if not body:
+            raise WebhookParseError("empty body")
+        event_type = body.get("event_type")
+        if not isinstance(event_type, str) or not event_type:
+            raise WebhookParseError("missing or invalid event_type")
+        app_id = body.get("app_id")
+        if not isinstance(app_id, str) or not app_id:
+            raise WebhookParseError("missing or invalid app_id")
+
+        event = mapping_to_feishu_event(body)
+
+        if event_type == "message":
+            if not event.chat_id:
+                raise WebhookParseError("message event requires chat_id")
+            if event.message_id is None or event.message_id == "":
+                raise WebhookParseError("message event requires message_id")
+        elif event_type == "reaction":
+            if not event.chat_id:
+                raise WebhookParseError("reaction event requires chat_id")
+            if not event.message_id:
+                raise WebhookParseError("reaction event requires message_id")
+        elif event_type == "card_action":
+            pass
+        else:
+            pass
+
+        return event
+
     def __init__(
         self,
         *,
@@ -35,13 +113,13 @@ class FeishuConnector:
     def handle_webhook(self, event: Mapping[str, Any]) -> dict[str, Any]:
         """校验 payload(对应 channels.md 的 handle_webhook)。"""
         try:
-            ev: IncomingFeishuEvent = parse_openclaw_normalized(event)
+            ev: IncomingFeishuEvent = self.parse_feishu_inbound_event(event)
             return {"ok": True, "parsed": True, "event_type": ev.event_type}
         except WebhookParseError as e:
             return {"ok": False, "error": str(e)}
 
     def parse_normalized_event(self, payload: Mapping[str, Any]) -> IncomingFeishuEvent:
-        return parse_openclaw_normalized(payload)
+        return self.parse_feishu_inbound_event(payload)
 
     async def send_message(
         self,
@@ -56,7 +134,7 @@ class FeishuConnector:
         实际发送依赖 chat_id(与 server.ts /feishu/send-message 一致)。
         """
         if reply_context is None:
-            return {"ok": False, "error": "reply_context_required_for_openclaw_bridge"}
+            return {"ok": False, "error": "reply_context_required_for_feishu_adapter"}
         return await self.send_text(reply_context, text)
 
     def _send_message_base(

+ 160 - 0
gateway/core/channels/feishu/manager.py

@@ -0,0 +1,160 @@
+from __future__ import annotations
+
+import logging
+import os
+from collections.abc import Mapping
+from dataclasses import dataclass
+from typing import Any
+
+from fastapi import APIRouter, HTTPException, Request
+
+from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
+from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
+from gateway.core.channels.backends.user_id import DefaultUserIdentityResolver
+from gateway.core.channels.feishu.connector import FeishuConnector, WebhookParseError
+from gateway.core.channels.feishu.router import FeishuMessageRouter
+from gateway.core.channels.manager import ChannelRegistry
+from gateway.core.channels.types import RouteResult
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class FeishuChannelConfig:
+    channel_id: str = "feishu"
+    feishu_http_base_url: str = "http://127.0.0.1:4380"
+    http_timeout: float = 120.0
+    enabled: bool = True
+    auto_create_trace: bool = True
+    workspace_prefix: str = "feishu"
+    default_agent_type: str = "personal_assistant"
+    echo_replies: bool = True
+    echo_prefix: str = "[Gateway] "
+    dispatch_reactions: bool = False
+    dispatch_card_actions: bool = False
+
+
+class FeishuChannelManager(ChannelRegistry):
+    """
+    飞书渠道:配置、连接器、路由与入站 webhook;继承 ``ChannelRegistry`` 的注册/启停能力。
+    """
+
+    def __init__(self, config: FeishuChannelConfig | None = None) -> None:
+        super().__init__()
+        self._config = config or FeishuChannelConfig()
+        self.register_channel(self._config.channel_id, self._config)
+
+        self._connector = FeishuConnector(
+            feishu_http_base_url=self._config.feishu_http_base_url,
+            timeout=self._config.http_timeout,
+        )
+        self._trace_backend = MemoryTraceBackend()
+        self._executor = EchoExecutorBackend(
+            prefix=self._config.echo_prefix,
+            enabled=self._config.echo_replies,
+        )
+        self._identity = DefaultUserIdentityResolver()
+        self._router = FeishuMessageRouter(
+            connector=self._connector,
+            trace_backend=self._trace_backend,
+            executor_backend=self._executor,
+            identity_resolver=self._identity,
+            workspace_prefix=self._config.workspace_prefix,
+            default_agent_type=self._config.default_agent_type,
+            auto_create_trace=self._config.auto_create_trace,
+            dispatch_reactions=self._config.dispatch_reactions,
+            dispatch_card_actions=self._config.dispatch_card_actions,
+        )
+
+    @property
+    def config(self) -> FeishuChannelConfig:
+        return self._config
+
+    @property
+    def feishu_connector(self) -> FeishuConnector:
+        return self._connector
+
+    @property
+    def message_router(self) -> FeishuMessageRouter:
+        return self._router
+
+    async def handle_feishu_inbound_webhook(self, body: Mapping[str, Any]) -> RouteResult:
+        """POST /api/channels/feishu/inbound/webhook"""
+        cid = self._config.channel_id
+        if not self._running.get(cid, False):
+            return RouteResult(ok=False, error="channel_stopped")
+
+        try:
+            event = FeishuConnector.parse_feishu_inbound_event(body)
+        except WebhookParseError as e:
+            return RouteResult(ok=False, error=str(e))
+
+        return await self._router.route_feishu_inbound_event(event)
+
+
+def build_channels_api_router(channel_manager: FeishuChannelManager) -> APIRouter:
+    """
+    构建 ``/api/channels`` 下的 FastAPI 路由(飞书 webhook、``GET .../status``)。
+
+    放在本模块而非 ``router.py``,避免 ``manager`` ↔ ``FeishuMessageRouter`` 循环导入。
+    """
+    channels = APIRouter(prefix="/api/channels", tags=["channels"])
+    feishu = APIRouter(prefix="/feishu", tags=["feishu"])
+
+    @feishu.post("/inbound/webhook")
+    async def feishu_inbound_webhook(request: Request) -> dict[str, Any]:
+        """
+        Feishu HTTP 适配服务经 ``GATEWAY_FEISHU_WEBHOOK_URL`` POST 规范化 JSON 到此端点。
+        """
+        try:
+            body = await request.json()
+        except Exception:
+            raise HTTPException(status_code=400, detail="invalid_json")
+
+        if not isinstance(body, dict):
+            raise HTTPException(status_code=400, detail="body_must_be_object")
+
+        result = await channel_manager.handle_feishu_inbound_webhook(body)
+        if not result.ok and result.error:
+            logger.warning("feishu inbound webhook error: %s", result.error)
+
+        out: dict[str, Any] = {
+            "ok": result.ok,
+            "skipped": result.skipped,
+            "reason": result.reason,
+            "trace_id": result.trace_id,
+            "task_id": result.task_id,
+            "user_id": result.user_id,
+            "workspace_id": result.workspace_id,
+        }
+        if result.error:
+            out["error"] = result.error
+        return out
+
+    @channels.get("/{channel_id}/status")
+    async def channel_status(channel_id: str) -> dict[str, Any]:
+        return channel_manager.get_channel_status(channel_id)
+
+    channels.include_router(feishu)
+    return channels
+
+
+def channel_manager_from_env() -> FeishuChannelManager:
+    """从环境变量构造(与 docker-compose / .env 配合)。"""
+    base = os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip()
+    timeout = float(os.getenv("FEISHU_HTTP_TIMEOUT", "120"))
+    echo = os.getenv("CHANNELS_ECHO_REPLIES", "true").lower() in ("1", "true", "yes")
+    reactions = os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes")
+    cards = os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "false").lower() in ("1", "true", "yes")
+    prefix = os.getenv("CHANNELS_ECHO_PREFIX", "[Gateway] ")
+
+    return FeishuChannelManager(
+        FeishuChannelConfig(
+            feishu_http_base_url=base,
+            http_timeout=timeout,
+            echo_replies=echo,
+            echo_prefix=prefix,
+            dispatch_reactions=reactions,
+            dispatch_card_actions=cards,
+        )
+    )

+ 1 - 1
gateway/core/channels/feishu/openclaw-lark-patch/src/http/server.ts

@@ -106,7 +106,7 @@ const logger: Logger = {
 
 const GATEWAY_FEISHU_WEBHOOK_URL =
   process.env.GATEWAY_FEISHU_WEBHOOK_URL ??
-  'http://localhost:8000/api/channels/feishu/openclaw/webhook'
+  'http://localhost:8000/api/channels/feishu/inbound/webhook'
 
 // 飞书官方(开放平台):IM 上传图片单文件 ≤10MB;云空间/文件「直接上传」单文件 ≤20MB,更大需分片上传。
 // 此处为自建服务接收 JSON 的上限,默认 20MB 与直传文件上限对齐;超大媒体应使用 media_url / 飞书分片 API,而非放大 JSON。

+ 220 - 0
gateway/core/channels/feishu/router.py

@@ -0,0 +1,220 @@
+from __future__ import annotations
+
+import logging
+from collections.abc import Mapping
+from typing import Any, Protocol, runtime_checkable
+
+from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.types import (
+    FeishuReplyContext,
+    IncomingFeishuEvent,
+    feishu_event_to_mapping,
+)
+from gateway.core.channels.manager import TraceBackend
+from gateway.core.channels.router import ChannelTraceRouter
+from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult
+
+logger = logging.getLogger(__name__)
+
+
+@runtime_checkable
+class FeishuExecutorBackend(Protocol):
+    """与 Executor 对接前的抽象:处理入站用户文本并负责回呼飞书。"""
+
+    async def handle_inbound_message(
+        self,
+        trace_id: str,
+        text: str,
+        reply_context: FeishuReplyContext,
+        connector: Any,
+        *,
+        event: IncomingFeishuEvent,
+    ) -> str:
+        """返回 task_id 或占位 id。"""
+        ...
+
+
+@runtime_checkable
+class FeishuUserIdentityResolver(Protocol):
+    """将飞书事件映射为网关内统一 user_id(后续可换 DB 映射表)。"""
+
+    def resolve_user_id(self, event: IncomingFeishuEvent) -> str:
+        ...
+
+
+class FeishuMessageRouter(ChannelTraceRouter):
+    """
+    飞书消息路由:用户 → trace_id → Executor;与 channels.md 中 MessageRouter 一致。
+
+    非 message 事件(reaction / card_action)默认跳过执行器,仅返回 200。
+    """
+
+    def __init__(
+        self,
+        *,
+        connector: FeishuConnector,
+        trace_backend: TraceBackend,
+        executor_backend: FeishuExecutorBackend,
+        identity_resolver: FeishuUserIdentityResolver,
+        workspace_prefix: str = CHANNEL_FEISHU,
+        default_agent_type: str = "personal_assistant",
+        auto_create_trace: bool = True,
+        dispatch_reactions: bool = False,
+        dispatch_card_actions: bool = False,
+    ) -> None:
+        super().__init__(
+            trace_backend=trace_backend,
+            workspace_prefix=workspace_prefix,
+            default_agent_type=default_agent_type,
+        )
+        self._connector = connector
+        self._executor = executor_backend
+        self._identity = identity_resolver
+        self._auto_create = auto_create_trace
+        self._dispatch_reactions = dispatch_reactions
+        self._dispatch_card_actions = dispatch_card_actions
+
+    def _reply_context_from_event(self, event: IncomingFeishuEvent) -> FeishuReplyContext | None:
+        if not event.chat_id:
+            logger.warning("missing chat_id, cannot reply: %s", feishu_event_to_mapping(event))
+            return None
+        return FeishuReplyContext(
+            account_id=event.account_id,
+            app_id=event.app_id,
+            chat_id=event.chat_id,
+            message_id=event.message_id,
+            open_id=event.open_id,
+        )
+
+    def _synthetic_text_for_event(self, event: IncomingFeishuEvent) -> str | None:
+        if event.event_type == "reaction" and event.emoji:
+            return f"[系统-表情] {event.emoji} message_id={event.message_id or ''}"
+        if event.event_type == "card_action":
+            return (
+                f"[系统-卡片] action={event.action or ''} "
+                f"operation_id={event.operation_id or ''}"
+            )
+        return None
+
+    async def route_feishu_inbound_event(self, event: IncomingFeishuEvent) -> RouteResult:
+        """处理 Feishu HTTP 适配服务转发的规范化入站事件。"""
+        user_id = self._identity.resolve_user_id(event)
+        workspace_id = self._workspace_id_for_user(user_id)
+
+        dispatch = False
+        text: str | None = None
+        if event.event_type == "message":
+            dispatch = True
+            text = event.content or ""
+        elif event.event_type == "reaction" and self._dispatch_reactions:
+            dispatch = True
+            text = self._synthetic_text_for_event(event) or ""
+        elif event.event_type == "card_action" and self._dispatch_card_actions:
+            dispatch = True
+            text = self._synthetic_text_for_event(event) or ""
+
+        if not dispatch:
+            return RouteResult(
+                ok=True,
+                skipped=True,
+                reason=f"event_type_not_dispatched:{event.event_type}",
+                user_id=user_id,
+            )
+
+        if not self._auto_create:
+            return RouteResult(ok=False, error="auto_create_trace_disabled", user_id=user_id)
+
+        trace_id = await self._trace.get_or_create_trace(
+            channel=CHANNEL_FEISHU,
+            user_id=user_id,
+            workspace_id=workspace_id,
+            agent_type=self._agent_type,
+            metadata=feishu_event_to_mapping(event),
+        )
+
+        ctx = self._reply_context_from_event(event)
+        if ctx is None:
+            return RouteResult(
+                ok=False,
+                error="missing_chat_id_for_reply",
+                trace_id=trace_id,
+                user_id=user_id,
+                workspace_id=workspace_id,
+            )
+
+        task_id = await self._executor.handle_inbound_message(
+            trace_id,
+            text or "",
+            ctx,
+            self._connector,
+            event=event,
+        )
+        return RouteResult(
+            ok=True,
+            trace_id=trace_id,
+            task_id=task_id,
+            user_id=user_id,
+            workspace_id=workspace_id,
+        )
+
+    async def route_message(self, channel: str, user_id: str, message: Mapping[str, Any]) -> str:
+        """
+        通用入口:message 建议包含 text、可选飞书上下文字段
+        (account_id, app_id, chat_id, message_id, open_id)。
+        """
+        text = str(message.get("text") or message.get("content") or "")
+        trace_id = await self.get_trace_id(channel, user_id)
+        ctx = FeishuReplyContext(
+            account_id=_as_opt_str(message.get("account_id")),
+            app_id=str(message.get("app_id") or ""),
+            chat_id=str(message.get("chat_id") or ""),
+            message_id=_as_opt_str(message.get("message_id")),
+            open_id=_as_opt_str(message.get("open_id")),
+        )
+        if not ctx.app_id or not ctx.chat_id:
+            raise ValueError("route_message requires app_id and chat_id in message for Feishu reply")
+        synthetic = IncomingFeishuEvent(
+            event_type="message",
+            app_id=ctx.app_id,
+            account_id=ctx.account_id,
+            open_id=ctx.open_id,
+            chat_type=_as_opt_str(message.get("chat_type")),
+            chat_id=ctx.chat_id,
+            message_id=ctx.message_id,
+            content=text,
+            raw=dict(message) if isinstance(message, dict) else {},
+        )
+        return await self._executor.handle_inbound_message(
+            trace_id, text, ctx, self._connector, event=synthetic
+        )
+
+    async def send_agent_reply(
+        self,
+        trace_id: str,
+        content: str,
+        metadata: Mapping[str, Any] | None = None,
+    ) -> dict[str, Any]:
+        """
+        Executor 完成后由业务调用:根据 metadata 中的飞书上下文发消息。
+
+        metadata 键:account_id?, app_id, chat_id, message_id?, open_id?
+        """
+        meta = dict(metadata or {})
+        ctx = FeishuReplyContext(
+            account_id=_as_opt_str(meta.get("account_id")),
+            app_id=str(meta.get("app_id") or ""),
+            chat_id=str(meta.get("chat_id") or ""),
+            message_id=_as_opt_str(meta.get("message_id")),
+            open_id=_as_opt_str(meta.get("open_id")),
+        )
+        if not ctx.chat_id:
+            return {"ok": False, "error": "metadata missing chat_id", "trace_id": trace_id}
+        _ = trace_id
+        return await self._connector.send_text(ctx, content)
+
+
+def _as_opt_str(v: Any) -> str | None:
+    if v is None:
+        return None
+    s = str(v)
+    return s if s else None

+ 88 - 0
gateway/core/channels/feishu/types.py

@@ -0,0 +1,88 @@
+"""
+飞书渠道专用数据类型(规范化入站事件、回包上下文等)。
+"""
+
+from __future__ import annotations
+
+from collections.abc import Mapping
+from dataclasses import dataclass, field
+from typing import Any
+
+
+@dataclass(slots=True)
+class IncomingFeishuEvent:
+    """Feishu HTTP 适配服务 ``forwardEventToGateway`` 写入的规范化结构(见适配服务 server.ts)。"""
+
+    event_type: str
+    app_id: str
+    account_id: str | None
+    open_id: str | None
+    tenant_id: str | None = None
+    chat_type: str | None = None
+    chat_id: str | None = None
+    message_id: str | None = None
+    content: str | None = None
+    emoji: str | None = None
+    action_time: str | None = None
+    action: str | None = None
+    operation_id: str | None = None
+    raw: dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass(slots=True)
+class FeishuReplyContext:
+    """调用 Feishu HTTP 适配服务 ``/feishu/send-message`` 时需要的上下文。"""
+
+    account_id: str | None
+    app_id: str
+    chat_id: str
+    message_id: str | None = None
+    open_id: str | None = None
+
+
+def feishu_event_to_mapping(event: IncomingFeishuEvent) -> dict[str, Any]:
+    """供日志与 Trace 元数据序列化。"""
+    return {
+        "event_type": event.event_type,
+        "app_id": event.app_id,
+        "account_id": event.account_id,
+        "open_id": event.open_id,
+        "tenant_id": event.tenant_id,
+        "chat_type": event.chat_type,
+        "chat_id": event.chat_id,
+        "message_id": event.message_id,
+        "content": event.content,
+        "emoji": event.emoji,
+        "action": event.action,
+        "operation_id": event.operation_id,
+    }
+
+
+def mapping_to_feishu_event(body: Mapping[str, Any]) -> IncomingFeishuEvent:
+    """从 JSON 反序列化(宽松,缺省字段为 None)。"""
+    raw = body.get("raw")
+    raw_dict: dict[str, Any] = raw if isinstance(raw, dict) else {}
+    return IncomingFeishuEvent(
+        event_type=str(body.get("event_type") or ""),
+        app_id=str(body.get("app_id") or ""),
+        account_id=_opt_str(body.get("account_id")),
+        open_id=_opt_str(body.get("open_id")),
+        tenant_id=_opt_str(body.get("tenant_id")),
+        chat_type=_opt_str(body.get("chat_type")),
+        chat_id=_opt_str(body.get("chat_id")),
+        message_id=_opt_str(body.get("message_id")),
+        content=_opt_str(body.get("content")),
+        emoji=_opt_str(body.get("emoji")),
+        action_time=_opt_str(body.get("action_time")),
+        action=_opt_str(body.get("action")),
+        operation_id=_opt_str(body.get("operation_id")),
+        raw=raw_dict,
+    )
+
+
+def _opt_str(v: Any) -> str | None:
+    if v is None:
+        return None
+    if isinstance(v, str):
+        return v if v else None
+    return str(v)

+ 0 - 55
gateway/core/channels/feishu/webhook.py

@@ -1,55 +0,0 @@
-from __future__ import annotations
-
-from collections.abc import Mapping
-from typing import Any
-
-from gateway.core.channels.types import IncomingFeishuEvent, mapping_to_event
-
-
-class WebhookParseError(ValueError):
-    pass
-
-
-def parse_openclaw_normalized(body: Mapping[str, Any]) -> IncomingFeishuEvent:
-    """
-    校验并解析 Node 服务 `forwardEventToGateway` 的 JSON。
-
-    支持 event_type: message | reaction | card_action(与 server.ts 一致)。
-    """
-    if not body:
-        raise WebhookParseError("empty body")
-    event_type = body.get("event_type")
-    if not isinstance(event_type, str) or not event_type:
-        raise WebhookParseError("missing or invalid event_type")
-    app_id = body.get("app_id")
-    if not isinstance(app_id, str) or not app_id:
-        raise WebhookParseError("missing or invalid app_id")
-
-    event = mapping_to_event(body)
-
-    if event_type == "message":
-        if not event.chat_id:
-            raise WebhookParseError("message event requires chat_id")
-        if event.message_id is None or event.message_id == "":
-            raise WebhookParseError("message event requires message_id")
-    elif event_type == "reaction":
-        if not event.chat_id:
-            raise WebhookParseError("reaction event requires chat_id")
-        if not event.message_id:
-            raise WebhookParseError("reaction event requires message_id")
-    elif event_type == "card_action":
-        pass
-    else:
-        # 仍返回结构化事件,由上层决定是否忽略
-        pass
-
-    return event
-
-
-def handle_webhook_dict(body: Mapping[str, Any]) -> dict[str, Any]:
-    """FeishuConnector.handle_webhook 风格的同步封装(供非 async 调用方)。"""
-    try:
-        ev = parse_openclaw_normalized(body)
-        return {"ok": True, "event_type": ev.event_type, "app_id": ev.app_id}
-    except WebhookParseError as e:
-        return {"ok": False, "error": str(e)}

+ 56 - 0
gateway/core/channels/manager.py

@@ -0,0 +1,56 @@
+from __future__ import annotations
+
+from typing import Protocol, runtime_checkable
+
+
+@runtime_checkable
+class TraceBackend(Protocol):
+    """与 Lifecycle.TraceManager 对接前的抽象:按渠道用户解析 trace_id。"""
+
+    async def get_or_create_trace(
+        self,
+        *,
+        channel: str,
+        user_id: str,
+        workspace_id: str,
+        agent_type: str,
+        metadata: dict[str, object],
+    ) -> str:
+        ...
+
+
+@runtime_checkable
+class ChannelRegistration(Protocol):
+    """注册到 ``ChannelRegistry`` 的渠道配置需至少提供 ``enabled``。"""
+
+    enabled: bool
+
+
+class ChannelRegistry:
+    """
+    与具体 IM 无关:渠道注册、启停、状态查询。
+    """
+
+    def __init__(self) -> None:
+        self._registry: dict[str, ChannelRegistration] = {}
+        self._running: dict[str, bool] = {}
+
+    def register_channel(self, channel_id: str, cfg: ChannelRegistration) -> None:
+        self._registry[channel_id] = cfg
+        if channel_id not in self._running:
+            self._running[channel_id] = cfg.enabled
+
+    def start_channel(self, channel_id: str) -> None:
+        self._running[channel_id] = True
+
+    def stop_channel(self, channel_id: str) -> None:
+        self._running[channel_id] = False
+
+    def get_channel_status(self, channel_id: str) -> dict[str, str | bool]:
+        cfg = self._registry.get(channel_id)
+        return {
+            "channel_id": channel_id,
+            "registered": cfg is not None,
+            "running": bool(self._running.get(channel_id)),
+            "enabled": bool(cfg and cfg.enabled),
+        }

+ 0 - 46
gateway/core/channels/protocols.py

@@ -1,46 +0,0 @@
-from __future__ import annotations
-
-from typing import Any, Protocol, runtime_checkable
-
-from gateway.core.channels.types import FeishuReplyContext, IncomingFeishuEvent
-
-
-@runtime_checkable
-class TraceBackend(Protocol):
-    """与 Lifecycle.TraceManager 对接前的抽象:按渠道用户解析 trace_id。"""
-
-    async def get_or_create_trace(
-        self,
-        *,
-        channel: str,
-        user_id: str,
-        workspace_id: str,
-        agent_type: str,
-        metadata: dict[str, Any],
-    ) -> str:
-        ...
-
-
-@runtime_checkable
-class ExecutorBackend(Protocol):
-    """与 Executor 对接前的抽象:处理入站用户文本并负责回呼飞书。"""
-
-    async def handle_inbound_message(
-        self,
-        trace_id: str,
-        text: str,
-        reply_context: FeishuReplyContext,
-        connector: Any,
-        *,
-        event: IncomingFeishuEvent,
-    ) -> str:
-        """返回 task_id 或占位 id。"""
-        ...
-
-
-@runtime_checkable
-class UserIdentityResolver(Protocol):
-    """将飞书事件映射为网关内统一 user_id(后续可换 DB 映射表)。"""
-
-    def resolve_user_id(self, event: IncomingFeishuEvent) -> str:
-        ...

+ 40 - 0
gateway/core/channels/router.py

@@ -0,0 +1,40 @@
+from __future__ import annotations
+
+from gateway.core.channels.manager import TraceBackend
+
+
+class ChannelTraceRouter:
+    """
+    与具体 IM 无关:按渠道 user_id 解析 workspace_id,并委托 TraceBackend 获取/创建 trace。
+
+    飞书等渠道的入站消息路由见 ``gateway.core.channels.feishu.router.FeishuMessageRouter``。
+    """
+
+    def __init__(
+        self,
+        *,
+        trace_backend: TraceBackend,
+        workspace_prefix: str,
+        default_agent_type: str = "personal_assistant",
+    ) -> None:
+        self._trace = trace_backend
+        self._workspace_prefix = workspace_prefix
+        self._agent_type = default_agent_type
+
+    def _workspace_id_for_user(self, user_id: str) -> str:
+        return f"{self._workspace_prefix}:{user_id}"
+
+    async def get_trace_id(self, channel: str, user_id: str, *, create_if_missing: bool = True) -> str:
+        """获取或创建 Trace ID(对应 channels.md get_trace_id / create_trace_for_user 语义)。"""
+        if not create_if_missing:
+            raise NotImplementedError("仅内存后端支持 create_if_missing=False 时需扩展 TraceBackend")
+        return await self._trace.get_or_create_trace(
+            channel=channel,
+            user_id=user_id,
+            workspace_id=self._workspace_id_for_user(user_id),
+            agent_type=self._agent_type,
+            metadata={"source": "channels.channel_trace_router"},
+        )
+
+    async def create_trace_for_user(self, channel: str, user_id: str) -> str:
+        return await self.get_trace_id(channel, user_id, create_if_missing=True)

+ 24 - 76
gateway/core/channels/types.py

@@ -1,84 +1,32 @@
+"""
+Channels 公共类型:与具体 IM 厂商无关的标识、路由结果等。
+
+各渠道专有结构放在 ``gateway.core.channels.<channel>.types``(如 ``feishu.types``)。
+"""
+
 from __future__ import annotations
 
 from collections.abc import Mapping
-from dataclasses import dataclass, field
-from typing import Any
-
+from dataclasses import dataclass
+from typing import Any, Final, TypeAlias
 
-@dataclass(slots=True)
-class IncomingFeishuEvent:
-    """openclaw-lark-patch `forwardEventToGateway` 写入的规范化结构(见 server.ts)。"""
+# —— 渠道标识(配置、Trace 分片、URL 路径等与之一致)——
+CHANNEL_FEISHU: Final[str] = "feishu"
+CHANNEL_WECHAT: Final[str] = "wechat"
 
-    event_type: str
-    app_id: str
-    account_id: str | None
-    open_id: str | None
-    tenant_id: str | None = None
-    chat_type: str | None = None
-    chat_id: str | None = None
-    message_id: str | None = None
-    content: str | None = None
-    emoji: str | None = None
-    action_time: str | None = None
-    action: str | None = None
-    operation_id: str | None = None
-    raw: dict[str, Any] = field(default_factory=dict)
+# HTTP / Webhook 解析前的通用载荷形态(多为 JSON 对象)
+ChannelInboundPayload: TypeAlias = Mapping[str, Any]
 
 
 @dataclass(slots=True)
-class FeishuReplyContext:
-    """调用 Feishu HTTP 适配器 `/feishu/send-message` 时需要的上下文。"""
-
-    account_id: str | None
-    app_id: str
-    chat_id: str
-    message_id: str | None = None
-    open_id: str | None = None
-
-
-def event_to_mapping(event: IncomingFeishuEvent) -> dict[str, Any]:
-    """供日志与扩展序列化使用。"""
-    return {
-        "event_type": event.event_type,
-        "app_id": event.app_id,
-        "account_id": event.account_id,
-        "open_id": event.open_id,
-        "tenant_id": event.tenant_id,
-        "chat_type": event.chat_type,
-        "chat_id": event.chat_id,
-        "message_id": event.message_id,
-        "content": event.content,
-        "emoji": event.emoji,
-        "action": event.action,
-        "operation_id": event.operation_id,
-    }
-
-
-def mapping_to_event(body: Mapping[str, Any]) -> IncomingFeishuEvent:
-    """从 JSON 反序列化(宽松,缺省字段为 None)。"""
-    raw = body.get("raw")
-    raw_dict: dict[str, Any] = raw if isinstance(raw, dict) else {}
-    return IncomingFeishuEvent(
-        event_type=str(body.get("event_type") or ""),
-        app_id=str(body.get("app_id") or ""),
-        account_id=_opt_str(body.get("account_id")),
-        open_id=_opt_str(body.get("open_id")),
-        tenant_id=_opt_str(body.get("tenant_id")),
-        chat_type=_opt_str(body.get("chat_type")),
-        chat_id=_opt_str(body.get("chat_id")),
-        message_id=_opt_str(body.get("message_id")),
-        content=_opt_str(body.get("content")),
-        emoji=_opt_str(body.get("emoji")),
-        action_time=_opt_str(body.get("action_time")),
-        action=_opt_str(body.get("action")),
-        operation_id=_opt_str(body.get("operation_id")),
-        raw=raw_dict,
-    )
-
-
-def _opt_str(v: Any) -> str | None:
-    if v is None:
-        return None
-    if isinstance(v, str):
-        return v if v else None
-    return str(v)
+class RouteResult:
+    """入站消息经 ChannelManager / MessageRouter 处理后的统一结果(与厂商无关)。"""
+
+    ok: bool
+    skipped: bool = False
+    reason: str | None = None
+    trace_id: str | None = None
+    task_id: str | None = None
+    user_id: str | None = None
+    workspace_id: str | None = None
+    error: str | None = None

+ 5 - 2
gateway/core/router.py

@@ -1,12 +1,15 @@
 """
 Gateway Router
 
-消息路由、在线状态查询
+Agent WebSocket 注册、消息发送、在线状态查询。
 """
 
+from __future__ import annotations
+
 import json
 import logging
-from typing import Dict, Any, Optional
+from typing import Any, Dict, Optional
+
 from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException
 from pydantic import BaseModel
 

+ 16 - 9
gateway_server.py

@@ -4,19 +4,23 @@ A2A IM Gateway Server
 启动 Gateway 服务器,提供 Agent 注册和消息路由
 """
 
-import asyncio
 import logging
+
+import uvicorn
 from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
-import uvicorn
 
+from gateway.core.channels.feishu.manager import (
+    build_channels_api_router,
+    channel_manager_from_env,
+)
 from gateway.core.registry import AgentRegistry
 from gateway.core.router import GatewayRouter
 
 # 配置日志
 logging.basicConfig(
     level=logging.INFO,
-    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 )
 
 logger = logging.getLogger(__name__)
@@ -28,7 +32,7 @@ def create_gateway_app() -> FastAPI:
     app = FastAPI(
         title="A2A IM Gateway",
         description="Agent 即时通讯网关",
-        version="1.0.0"
+        version="1.0.0",
     )
 
     # 添加 CORS 中间件
@@ -43,11 +47,14 @@ def create_gateway_app() -> FastAPI:
     # 创建 Registry
     registry = AgentRegistry(heartbeat_timeout=60)
 
-    # 创建 Router
-    router = GatewayRouter(registry)
+    # 创建 Gateway Router
+    gateway_router = GatewayRouter(registry)
+
+    # 注册 Gateway 路由
+    app.include_router(gateway_router.router)
 
-    # 注册路由
-    app.include_router(router.router)
+    channel_manager = channel_manager_from_env()
+    app.include_router(build_channels_api_router(channel_manager))
 
     # 启动和关闭事件
     @app.on_event("startup")
@@ -78,7 +85,7 @@ def main():
         app,
         host="0.0.0.0",
         port=8000,
-        log_level="info"
+        log_level="info",
     )