kevin.yang hai 4 días
pai
achega
52ea70caab

+ 12 - 8
gateway/core/channels/__init__.py

@@ -1,21 +1,21 @@
 """
 Gateway Channels:外部渠道接入。
 
-HTTP 路由由 ``gateway.core.channels.feishu.manager.build_channels_api_router`` 定义
+HTTP 路由由 ``gateway.core.channels.feishu.api.build_channels_api_router`` 构建
 应用入口 ``gateway_server`` 对其 ``include_router``(如
 ``/api/channels/feishu/inbound/webhook``,``GATEWAY_FEISHU_WEBHOOK_URL``)。
 """
 
+from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
+from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
+from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
+from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
-from gateway.core.channels.feishu.manager import (
-    FeishuChannelConfig,
-    FeishuChannelManager,
-    channel_manager_from_env,
-)
 from gateway.core.channels.manager import ChannelRegistry
+from gateway.core.channels.protocols import ExecutorBackend, UserIdentityResolver
 from gateway.core.channels.router import ChannelTraceRouter
-from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.types import CHANNEL_FEISHU, CHANNEL_WECHAT, RouteResult
 
 ChannelManager = FeishuChannelManager
@@ -27,13 +27,17 @@ __all__ = [
     "ChannelManager",
     "ChannelRegistry",
     "ChannelTraceRouter",
+    "EchoExecutorBackend",
+    "ExecutorBackend",
     "FeishuChannelConfig",
     "FeishuChannelManager",
+    "FeishuChannelApi",
     "FeishuConnector",
     "FeishuMessageRouter",
     "FeishuReplyContext",
     "IncomingFeishuEvent",
+    "MemoryTraceBackend",
     "MessageRouter",
     "RouteResult",
-    "channel_manager_from_env",
+    "UserIdentityResolver",
 ]

+ 0 - 2
gateway/core/channels/backends/__init__.py

@@ -1,9 +1,7 @@
 from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
-from gateway.core.channels.backends.user_id import DefaultUserIdentityResolver
 
 __all__ = [
-    "DefaultUserIdentityResolver",
     "EchoExecutorBackend",
     "MemoryTraceBackend",
 ]

+ 11 - 9
gateway/core/channels/backends/echo_executor.py

@@ -1,19 +1,21 @@
+"""
+通用回显执行器。
+
+适用于任何实现了 ``send_text(reply_context, text) -> dict`` 的渠道连接器,
+无需感知具体渠道类型(飞书、微信等)。
+"""
+
 from __future__ import annotations
 
 import logging
 import uuid
-from typing import TYPE_CHECKING, Any
-
-from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
-
-if TYPE_CHECKING:
-    from gateway.core.channels.feishu.connector import FeishuConnector
+from typing import Any
 
 logger = logging.getLogger(__name__)
 
 
 class EchoExecutorBackend:
-    """默认执行器:回显或固定话术,验证「Gateway → Node → 飞书」链路。"""
+    """默认执行器:将用户消息原文回显,用于验证 Gateway → 渠道适配层 → IM 全链路。"""
 
     def __init__(self, *, prefix: str = "[Gateway] ", enabled: bool = True) -> None:
         self._prefix = prefix
@@ -23,10 +25,10 @@ class EchoExecutorBackend:
         self,
         trace_id: str,
         text: str,
-        reply_context: FeishuReplyContext,
+        reply_context: Any,
         connector: Any,
         *,
-        event: IncomingFeishuEvent,
+        event: Any,
     ) -> str:
         task_id = f"task-{uuid.uuid4()}"
         if not self._enabled:

+ 5 - 8
gateway/core/channels/feishu/__init__.py

@@ -1,10 +1,7 @@
+from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.connector import FeishuConnector
-from gateway.core.channels.feishu.manager import (
-    FeishuChannelConfig,
-    FeishuChannelManager,
-    build_channels_api_router,
-    channel_manager_from_env,
-)
+from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
+from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
 from gateway.core.channels.feishu.router import (
     FeishuExecutorBackend,
     FeishuMessageRouter,
@@ -18,16 +15,16 @@ from gateway.core.channels.feishu.types import (
 )
 
 __all__ = [
+    "FeishuChannelApi",
+    "DefaultUserIdentityResolver",
     "FeishuChannelConfig",
     "FeishuChannelManager",
     "FeishuConnector",
     "FeishuExecutorBackend",
     "FeishuMessageRouter",
-    "build_channels_api_router",
     "FeishuReplyContext",
     "FeishuUserIdentityResolver",
     "IncomingFeishuEvent",
-    "channel_manager_from_env",
     "feishu_event_to_mapping",
     "mapping_to_feishu_event",
 ]

+ 84 - 0
gateway/core/channels/feishu/api.py

@@ -0,0 +1,84 @@
+"""
+飞书渠道 FastAPI 路由。
+
+``FeishuChannelApi`` 以类方法作为路由处理器,依赖通过构造函数注入,
+避免闭包捕获 channel_manager 的写法,也使各处理方法可独立测试。
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Any
+
+from fastapi import APIRouter, HTTPException, Request
+
+from gateway.core.channels.feishu.manager import FeishuChannelManager
+
+logger = logging.getLogger(__name__)
+
+
+class FeishuChannelApi:
+    """飞书渠道 HTTP 路由:持有 manager 引用,以方法作为路由处理器。
+
+    实现 ``ChannelPlugin`` Protocol,可通过 ``ChannelLoader`` 自动注册。
+    """
+
+    def __init__(self, channel_manager: FeishuChannelManager) -> None:
+        self._manager = channel_manager
+
+    @classmethod
+    def from_env(cls) -> FeishuChannelApi:
+        """从环境变量构造实例,供 ``ChannelLoader`` 自动调用。"""
+        return cls(FeishuChannelManager.from_env())
+
+    async def inbound_webhook(self, request: Request) -> dict[str, Any]:
+        """POST /api/channels/feishu/inbound/webhook
+
+        Feishu HTTP 适配服务经 ``GATEWAY_FEISHU_WEBHOOK_URL`` 转发规范化 JSON 到此端点。
+        """
+        try:
+            body = await request.json()
+        except Exception:
+            raise HTTPException(status_code=400, detail="invalid_json")
+
+        if not isinstance(body, dict):
+            raise HTTPException(status_code=400, detail="body_must_be_object")
+
+        result = await self._manager.handle_feishu_inbound_webhook(body)
+        if not result.ok and result.error:
+            logger.warning("feishu inbound webhook error: %s", result.error)
+
+        out: dict[str, Any] = {
+            "ok": result.ok,
+            "skipped": result.skipped,
+            "reason": result.reason,
+            "trace_id": result.trace_id,
+            "task_id": result.task_id,
+            "user_id": result.user_id,
+            "workspace_id": result.workspace_id,
+        }
+        if result.error:
+            out["error"] = result.error
+        return out
+
+    async def channel_status(self, channel_id: str) -> dict[str, Any]:
+        """GET /api/channels/{channel_id}/status"""
+        return self._manager.get_channel_status(channel_id)
+
+    def build_router(self) -> APIRouter:
+        """返回挂载好路由的 ``/api/channels`` APIRouter。"""
+        channels = APIRouter(prefix="/api/channels", tags=["channels"])
+        feishu = APIRouter(prefix="/feishu", tags=["feishu"])
+
+        feishu.add_api_route(
+            "/inbound/webhook",
+            self.inbound_webhook,
+            methods=["POST"],
+        )
+        channels.add_api_route(
+            "/{channel_id}/status",
+            self.channel_status,
+            methods=["GET"],
+        )
+        channels.include_router(feishu)
+        return channels

+ 1 - 1
gateway/core/channels/backends/user_id.py → gateway/core/channels/feishu/identity.py

@@ -5,7 +5,7 @@ from gateway.core.channels.feishu.types import IncomingFeishuEvent
 
 class DefaultUserIdentityResolver:
     """
-    默认 user_id:feishu:{tenant_key}:{app_id}:{open_id}。
+    飞书用户身份解析:feishu:{tenant_key}:{app_id}:{open_id}。
 
     无 open_id 时退化为 chat_id,便于群场景或异常事件仍可按会话隔离。
     """

+ 17 - 75
gateway/core/channels/feishu/manager.py

@@ -6,12 +6,10 @@ from collections.abc import Mapping
 from dataclasses import dataclass
 from typing import Any
 
-from fastapi import APIRouter, HTTPException, Request
-
 from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
-from gateway.core.channels.backends.user_id import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.connector import FeishuConnector, WebhookParseError
+from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.manager import ChannelRegistry
 from gateway.core.channels.types import RouteResult
@@ -35,9 +33,7 @@ class FeishuChannelConfig:
 
 
 class FeishuChannelManager(ChannelRegistry):
-    """
-    飞书渠道:配置、连接器、路由与入站 webhook;继承 ``ChannelRegistry`` 的注册/启停能力。
-    """
+    """飞书渠道:组装连接器、Trace 后端、执行器与消息路由;继承 ``ChannelRegistry`` 的注册/启停能力。"""
 
     def __init__(self, config: FeishuChannelConfig | None = None) -> None:
         super().__init__()
@@ -78,8 +74,22 @@ class FeishuChannelManager(ChannelRegistry):
     def message_router(self) -> FeishuMessageRouter:
         return self._router
 
+    @classmethod
+    def from_env(cls) -> FeishuChannelManager:
+        """从环境变量构造实例(与 docker-compose / .env 配合)。"""
+        return cls(
+            FeishuChannelConfig(
+                feishu_http_base_url=os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip(),
+                http_timeout=float(os.getenv("FEISHU_HTTP_TIMEOUT", "120")),
+                echo_replies=os.getenv("CHANNELS_ECHO_REPLIES", "true").lower() in ("1", "true", "yes"),
+                echo_prefix=os.getenv("CHANNELS_ECHO_PREFIX", "[Gateway] "),
+                dispatch_reactions=os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes"),
+                dispatch_card_actions=os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "false").lower() in ("1", "true", "yes"),
+            )
+        )
+
     async def handle_feishu_inbound_webhook(self, body: Mapping[str, Any]) -> RouteResult:
-        """POST /api/channels/feishu/inbound/webhook"""
+        """处理飞书适配层 POST 到 ``/api/channels/feishu/inbound/webhook`` 的规范化事件。"""
         cid = self._config.channel_id
         if not self._running.get(cid, False):
             return RouteResult(ok=False, error="channel_stopped")
@@ -90,71 +100,3 @@ class FeishuChannelManager(ChannelRegistry):
             return RouteResult(ok=False, error=str(e))
 
         return await self._router.route_feishu_inbound_event(event)
-
-
-def build_channels_api_router(channel_manager: FeishuChannelManager) -> APIRouter:
-    """
-    构建 ``/api/channels`` 下的 FastAPI 路由(飞书 webhook、``GET .../status``)。
-
-    放在本模块而非 ``router.py``,避免 ``manager`` ↔ ``FeishuMessageRouter`` 循环导入。
-    """
-    channels = APIRouter(prefix="/api/channels", tags=["channels"])
-    feishu = APIRouter(prefix="/feishu", tags=["feishu"])
-
-    @feishu.post("/inbound/webhook")
-    async def feishu_inbound_webhook(request: Request) -> dict[str, Any]:
-        """
-        Feishu HTTP 适配服务经 ``GATEWAY_FEISHU_WEBHOOK_URL`` POST 规范化 JSON 到此端点。
-        """
-        try:
-            body = await request.json()
-        except Exception:
-            raise HTTPException(status_code=400, detail="invalid_json")
-
-        if not isinstance(body, dict):
-            raise HTTPException(status_code=400, detail="body_must_be_object")
-
-        result = await channel_manager.handle_feishu_inbound_webhook(body)
-        if not result.ok and result.error:
-            logger.warning("feishu inbound webhook error: %s", result.error)
-
-        out: dict[str, Any] = {
-            "ok": result.ok,
-            "skipped": result.skipped,
-            "reason": result.reason,
-            "trace_id": result.trace_id,
-            "task_id": result.task_id,
-            "user_id": result.user_id,
-            "workspace_id": result.workspace_id,
-        }
-        if result.error:
-            out["error"] = result.error
-        return out
-
-    @channels.get("/{channel_id}/status")
-    async def channel_status(channel_id: str) -> dict[str, Any]:
-        return channel_manager.get_channel_status(channel_id)
-
-    channels.include_router(feishu)
-    return channels
-
-
-def channel_manager_from_env() -> FeishuChannelManager:
-    """从环境变量构造(与 docker-compose / .env 配合)。"""
-    base = os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip()
-    timeout = float(os.getenv("FEISHU_HTTP_TIMEOUT", "120"))
-    echo = os.getenv("CHANNELS_ECHO_REPLIES", "true").lower() in ("1", "true", "yes")
-    reactions = os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes")
-    cards = os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "false").lower() in ("1", "true", "yes")
-    prefix = os.getenv("CHANNELS_ECHO_PREFIX", "[Gateway] ")
-
-    return FeishuChannelManager(
-        FeishuChannelConfig(
-            feishu_http_base_url=base,
-            http_timeout=timeout,
-            echo_replies=echo,
-            echo_prefix=prefix,
-            dispatch_reactions=reactions,
-            dispatch_card_actions=cards,
-        )
-    )

+ 5 - 4
gateway/core/channels/feishu/router.py

@@ -11,6 +11,7 @@ from gateway.core.channels.feishu.types import (
     feishu_event_to_mapping,
 )
 from gateway.core.channels.manager import TraceBackend
+from gateway.core.channels.protocols import ExecutorBackend, UserIdentityResolver
 from gateway.core.channels.router import ChannelTraceRouter
 from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult
 
@@ -18,8 +19,8 @@ logger = logging.getLogger(__name__)
 
 
 @runtime_checkable
-class FeishuExecutorBackend(Protocol):
-    """与 Executor 对接前的抽象:处理入站用户文本并负责回呼飞书。"""
+class FeishuExecutorBackend(ExecutorBackend, Protocol):
+    """飞书执行器——窄化 ``ExecutorBackend`` 的参数类型为飞书专属结构。"""
 
     async def handle_inbound_message(
         self,
@@ -35,8 +36,8 @@ class FeishuExecutorBackend(Protocol):
 
 
 @runtime_checkable
-class FeishuUserIdentityResolver(Protocol):
-    """将飞书事件映射为网关内统一 user_id(后续可换 DB 映射表)。"""
+class FeishuUserIdentityResolver(UserIdentityResolver, Protocol):
+    """飞书用户身份解析器——窄化 ``UserIdentityResolver`` 的事件类型为 ``IncomingFeishuEvent``。"""
 
     def resolve_user_id(self, event: IncomingFeishuEvent) -> str:
         ...

+ 54 - 0
gateway/core/channels/loader.py

@@ -0,0 +1,54 @@
+"""
+渠道自动加载器。
+
+``_CHANNEL_REGISTRY`` 登记所有可用渠道(channel_id → 插件类路径);
+``CHANNELS_ENABLED`` 环境变量(逗号分隔)控制运行时实际启动哪些渠道。
+
+新增渠道只需两步:
+  1. 在对应渠道模块的 Api 类上实现 ``ChannelPlugin`` Protocol
+     (``from_env()`` classmethod + ``build_router()`` 方法)
+  2. 在 ``_CHANNEL_REGISTRY`` 中追加一行
+"""
+
+from __future__ import annotations
+
+import importlib
+import logging
+import os
+
+from fastapi import APIRouter
+
+logger = logging.getLogger(__name__)
+
+# channel_id → 插件类的完整模块路径
+_CHANNEL_REGISTRY: dict[str, str] = {
+    "feishu": "gateway.core.channels.feishu.api.FeishuChannelApi",
+    # "wechat": "gateway.core.channels.wechat.api.WeChatChannelApi",
+}
+
+
+def _import_plugin(dotted_path: str) -> type:
+    module_path, cls_name = dotted_path.rsplit(".", 1)
+    module = importlib.import_module(module_path)
+    return getattr(module, cls_name)
+
+
+def load_enabled_channels() -> list[APIRouter]:
+    """读取 ``CHANNELS_ENABLED`` 并返回已启用渠道的路由列表。
+
+    未配置时默认启用 ``feishu``。未知渠道 ID 记录警告后跳过,不中断启动。
+    """
+    enabled_raw = os.getenv("CHANNELS_ENABLED", "feishu")
+    enabled = [c.strip() for c in enabled_raw.split(",") if c.strip()]
+
+    routers: list[APIRouter] = []
+    for channel_id in enabled:
+        if channel_id not in _CHANNEL_REGISTRY:
+            logger.warning("Channel '%s' is not in the registry, skipping", channel_id)
+            continue
+        plugin_cls = _import_plugin(_CHANNEL_REGISTRY[channel_id])
+        channel = plugin_cls.from_env()
+        routers.append(channel.build_router())
+        logger.info("Channel '%s' loaded", channel_id)
+
+    return routers

+ 61 - 0
gateway/core/channels/protocols.py

@@ -0,0 +1,61 @@
+"""
+渠道层通用 Protocol——所有 IM 渠道(飞书、微信等)共用的最小接口约定。
+
+各渠道可在自己的模块中声明更严格的子 Protocol(窄化参数类型),
+但 backends/ 下的通用实现只需满足此处的宽松签名即可跨渠道复用。
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
+
+if TYPE_CHECKING:
+    from fastapi import APIRouter
+
+
+@runtime_checkable
+class UserIdentityResolver(Protocol):
+    """将渠道入站事件映射为网关内统一 user_id。
+
+    各渠道实现可声明更严格的事件类型(如 ``IncomingFeishuEvent``),
+    runtime_checkable 仅校验方法存在性,不校验参数类型。
+    """
+
+    def resolve_user_id(self, event: Any) -> str:
+        ...
+
+
+@runtime_checkable
+class ExecutorBackend(Protocol):
+    """接收解析后的入站消息,触发业务处理,返回 task_id。
+
+    ``reply_context`` 与 ``event`` 的具体类型由各渠道自行约束;
+    ``connector`` 须实现 ``send_text(reply_context, text) -> dict`` 以支持回显。
+    """
+
+    async def handle_inbound_message(
+        self,
+        trace_id: str,
+        text: str,
+        reply_context: Any,
+        connector: Any,
+        *,
+        event: Any,
+    ) -> str:
+        ...
+
+
+@runtime_checkable
+class ChannelPlugin(Protocol):
+    """渠道插件接口——每个渠道的 Api 类须实现此接口以支持自动注册。
+
+    ``from_env()`` 负责从环境变量读取渠道配置并构造实例;
+    ``build_router()`` 返回该渠道挂载到 FastAPI 的 ``APIRouter``。
+    """
+
+    @classmethod
+    def from_env(cls) -> ChannelPlugin:
+        ...
+
+    def build_router(self) -> APIRouter:
+        ...

+ 3 - 6
gateway_server.py

@@ -10,10 +10,7 @@ import uvicorn
 from fastapi import FastAPI
 from fastapi.middleware.cors import CORSMiddleware
 
-from gateway.core.channels.feishu.manager import (
-    build_channels_api_router,
-    channel_manager_from_env,
-)
+from gateway.core.channels.loader import load_enabled_channels
 from gateway.core.registry import AgentRegistry
 from gateway.core.router import GatewayRouter
 
@@ -53,8 +50,8 @@ def create_gateway_app() -> FastAPI:
     # 注册 Gateway 路由
     app.include_router(gateway_router.router)
 
-    channel_manager = channel_manager_from_env()
-    app.include_router(build_channels_api_router(channel_manager))
+    for router in load_enabled_channels():
+        app.include_router(router)
 
     # 启动和关闭事件
     @app.on_event("startup")