kevin.yang 3 giorni fa
parent
commit
1f63318b3d

+ 2 - 2
gateway/core/channels/__init__.py

@@ -6,10 +6,10 @@ HTTP 路由由各渠道 Api 类(如 ``FeishuChannelApi.build_router``)构建
 ``/api/channels/feishu/inbound/webhook``)。
 ``/api/channels/feishu/inbound/webhook``)。
 """
 """
 
 
-from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.api import FeishuChannelApi
 from gateway.core.channels.feishu.connector import FeishuConnector
 from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.http_run_executor import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
 from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
 from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
@@ -27,8 +27,8 @@ __all__ = [
     "ChannelManager",
     "ChannelManager",
     "ChannelRegistry",
     "ChannelRegistry",
     "ChannelTraceRouter",
     "ChannelTraceRouter",
-    "EchoExecutorBackend",
     "ExecutorBackend",
     "ExecutorBackend",
+    "FeishuHttpRunApiExecutor",
     "FeishuChannelConfig",
     "FeishuChannelConfig",
     "FeishuChannelManager",
     "FeishuChannelManager",
     "FeishuChannelApi",
     "FeishuChannelApi",

+ 0 - 2
gateway/core/channels/backends/__init__.py

@@ -1,7 +1,5 @@
-from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 
 
 __all__ = [
 __all__ = [
-    "EchoExecutorBackend",
     "MemoryTraceBackend",
     "MemoryTraceBackend",
 ]
 ]

+ 0 - 41
gateway/core/channels/backends/echo_executor.py

@@ -1,41 +0,0 @@
-"""
-通用回显执行器。
-
-适用于任何实现了 ``send_text(reply_context, text) -> dict`` 的渠道连接器,
-无需感知具体渠道类型(飞书、微信等)。
-"""
-
-from __future__ import annotations
-
-import logging
-import uuid
-from typing import Any
-
-logger = logging.getLogger(__name__)
-
-
-class EchoExecutorBackend:
-    """默认执行器:将用户消息原文回显,用于验证 Gateway → 渠道适配层 → IM 全链路。"""
-
-    def __init__(self, *, prefix: str = "[Gateway] ", enabled: bool = True) -> None:
-        self._prefix = prefix
-        self._enabled = enabled
-
-    async def handle_inbound_message(
-        self,
-        trace_id: str,
-        text: str,
-        reply_context: Any,
-        connector: Any,
-        *,
-        event: Any,
-    ) -> str:
-        task_id = f"task-{uuid.uuid4()}"
-        if not self._enabled:
-            logger.info("EchoExecutor disabled, skip reply trace_id=%s", trace_id)
-            return task_id
-        reply_body = f"{self._prefix}{text}" if text else f"{self._prefix}(空消息)"
-        result = await connector.send_text(reply_context, reply_body)
-        if not result.get("ok"):
-            logger.error("send_text failed trace_id=%s result=%s", trace_id, result)
-        return task_id

+ 178 - 0
gateway/core/channels/feishu/http_run_executor.py

@@ -0,0 +1,178 @@
+"""
+飞书执行器:通过 HTTP 调用 Agent 的 ``run_api``(``POST /api/traces`` / ``POST /api/traces/{id}/run``)。
+
+按飞书 ``user_id``(与 ``DefaultUserIdentityResolver`` 一致)维护 API ``trace_id``,与首次创建 / 续跑语义对齐。
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import uuid
+from typing import Any
+
+import httpx
+
+from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
+
+logger = logging.getLogger(__name__)
+
+
+def _format_api_error(status_code: int, body_text: str) -> str:
+    try:
+        import json
+
+        data = json.loads(body_text)
+        detail = data.get("detail")
+        if isinstance(detail, str):
+            return detail
+        if isinstance(detail, list):
+            return json.dumps(detail, ensure_ascii=False)
+        if detail is not None:
+            return str(detail)
+    except Exception:
+        pass
+    return (body_text or "")[:800] or f"HTTP {status_code}"
+
+
+def _append_feishu_context_block(
+    text: str,
+    event: IncomingFeishuEvent,
+    reply_context: FeishuReplyContext,
+) -> str:
+    """在用户文本后附加结构化上下文,便于后续工具(Feishu HTTP)读取。"""
+    core = text.strip() if text else ""
+    if not core:
+        core = "(空消息)"
+    lines = [
+        core,
+        "",
+        "[飞书上下文 · 工具调用时请使用下列字段,勿向用户复述本段]",
+        f"account_id={reply_context.account_id or ''}",
+        f"app_id={reply_context.app_id}",
+        f"chat_id={reply_context.chat_id}",
+        f"message_id={reply_context.message_id or ''}",
+        f"open_id={reply_context.open_id or ''}",
+        f"chat_type={event.chat_type or ''}",
+    ]
+    return "\n".join(lines)
+
+
+class FeishuHttpRunApiExecutor:
+    """调用 ``agent/trace/run_api`` 暴露的 Trace HTTP API,触发后台 Agent 运行。"""
+
+    def __init__(
+        self,
+        *,
+        base_url: str,
+        timeout: float,
+        identity_resolver: Any,
+        model: str = "gpt-4o",
+        max_iterations: int = 200,
+        temperature: float = 0.3,
+        notify_on_submit: bool = True,
+    ) -> None:
+        self._base = base_url.rstrip("/")
+        self._timeout = timeout
+        self._identity = identity_resolver
+        self._model = model
+        self._max_iterations = max_iterations
+        self._temperature = temperature
+        self._notify = notify_on_submit
+        self._map_lock = asyncio.Lock()
+        self._api_trace_by_user: dict[str, str] = {}
+
+    async def handle_inbound_message(
+        self,
+        trace_id: str,
+        text: str,
+        reply_context: FeishuReplyContext,
+        connector: Any,
+        *,
+        event: IncomingFeishuEvent,
+    ) -> str:
+        _ = trace_id
+        user_id = self._identity.resolve_user_id(event)
+        content = _append_feishu_context_block(text, event, reply_context)
+        task_id = f"task-{uuid.uuid4()}"
+
+        async with self._map_lock:
+            api_trace_id = self._api_trace_by_user.get(user_id)
+
+        try:
+            async with httpx.AsyncClient(timeout=self._timeout) as client:
+                if api_trace_id is None:
+                    resp = await client.post(
+                        f"{self._base}/api/traces",
+                        json={
+                            "messages": [{"role": "user", "content": content}],
+                            "model": self._model,
+                            "temperature": self._temperature,
+                            "max_iterations": self._max_iterations,
+                            "uid": user_id,
+                            "name": f"feishu-{user_id}",
+                        },
+                    )
+                else:
+                    resp = await client.post(
+                        f"{self._base}/api/traces/{api_trace_id}/run",
+                        json={"messages": [{"role": "user", "content": content}]},
+                    )
+        except httpx.RequestError as exc:
+            logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id)
+            await connector.send_text(
+                reply_context,
+                f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
+            )
+            return task_id
+
+        body_text = resp.text
+        if resp.status_code == 409:
+            await connector.send_text(
+                reply_context,
+                "[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
+            )
+            return task_id
+
+        if resp.status_code >= 400:
+            err = _format_api_error(resp.status_code, body_text)
+            logger.warning(
+                "FeishuHttpRunApiExecutor: API 错误 status=%s user_id=%s detail=%s",
+                resp.status_code,
+                user_id,
+                err,
+            )
+            await connector.send_text(
+                reply_context,
+                f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
+            )
+            return task_id
+
+        try:
+            data = resp.json()
+        except Exception:
+            await connector.send_text(
+                reply_context,
+                "[Gateway] Agent API 返回非 JSON,已放弃解析。",
+            )
+            return task_id
+
+        resolved_id = data.get("trace_id")
+        if not isinstance(resolved_id, str) or not resolved_id:
+            await connector.send_text(
+                reply_context,
+                "[Gateway] Agent API 响应缺少 trace_id。",
+            )
+            return task_id
+
+        async with self._map_lock:
+            if user_id not in self._api_trace_by_user:
+                self._api_trace_by_user[user_id] = resolved_id
+
+        if self._notify:
+            await connector.send_text(
+                reply_context,
+                f"[Gateway] 已提交 Agent(API trace_id={resolved_id}),后台执行中。",
+            )
+
+        return task_id

+ 24 - 14
gateway/core/channels/feishu/manager.py

@@ -1,21 +1,18 @@
 from __future__ import annotations
 from __future__ import annotations
 
 
-import logging
 import os
 import os
 from collections.abc import Mapping
 from collections.abc import Mapping
 from dataclasses import dataclass
 from dataclasses import dataclass
 from typing import Any
 from typing import Any
 
 
-from gateway.core.channels.backends.echo_executor import EchoExecutorBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.backends.memory_trace import MemoryTraceBackend
 from gateway.core.channels.feishu.connector import FeishuConnector, WebhookParseError
 from gateway.core.channels.feishu.connector import FeishuConnector, WebhookParseError
+from gateway.core.channels.feishu.http_run_executor import FeishuHttpRunApiExecutor
 from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.feishu.router import FeishuMessageRouter
 from gateway.core.channels.manager import ChannelRegistry
 from gateway.core.channels.manager import ChannelRegistry
 from gateway.core.channels.types import RouteResult
 from gateway.core.channels.types import RouteResult
 
 
-logger = logging.getLogger(__name__)
-
 
 
 @dataclass
 @dataclass
 class FeishuChannelConfig:
 class FeishuChannelConfig:
@@ -26,14 +23,17 @@ class FeishuChannelConfig:
     auto_create_trace: bool = True
     auto_create_trace: bool = True
     workspace_prefix: str = "feishu"
     workspace_prefix: str = "feishu"
     default_agent_type: str = "personal_assistant"
     default_agent_type: str = "personal_assistant"
-    echo_replies: bool = True
-    echo_prefix: str = "[Gateway] "
     dispatch_reactions: bool = False
     dispatch_reactions: bool = False
     dispatch_card_actions: bool = False
     dispatch_card_actions: bool = False
+    agent_api_base_url: str = "http://127.0.0.1:8000"
+    agent_run_model: str = "gpt-4o"
+    agent_run_max_iterations: int = 200
+    agent_run_temperature: float = 0.3
+    feishu_run_notify_on_submit: bool = True
 
 
 
 
 class FeishuChannelManager(ChannelRegistry):
 class FeishuChannelManager(ChannelRegistry):
-    """飞书渠道:组装连接器、Trace 后端、执行器与消息路由;继承 ``ChannelRegistry`` 的注册/启停能力。"""
+    """飞书渠道:组装连接器、Trace 后端、HTTP Run API 执行器与消息路由。"""
 
 
     def __init__(self, config: FeishuChannelConfig | None = None) -> None:
     def __init__(self, config: FeishuChannelConfig | None = None) -> None:
         super().__init__()
         super().__init__()
@@ -45,11 +45,16 @@ class FeishuChannelManager(ChannelRegistry):
             timeout=self._config.http_timeout,
             timeout=self._config.http_timeout,
         )
         )
         self._trace_backend = MemoryTraceBackend()
         self._trace_backend = MemoryTraceBackend()
-        self._executor = EchoExecutorBackend(
-            prefix=self._config.echo_prefix,
-            enabled=self._config.echo_replies,
-        )
         self._identity = DefaultUserIdentityResolver()
         self._identity = DefaultUserIdentityResolver()
+        self._executor = FeishuHttpRunApiExecutor(
+            base_url=self._config.agent_api_base_url,
+            timeout=self._config.http_timeout,
+            identity_resolver=self._identity,
+            model=self._config.agent_run_model,
+            max_iterations=self._config.agent_run_max_iterations,
+            temperature=self._config.agent_run_temperature,
+            notify_on_submit=self._config.feishu_run_notify_on_submit,
+        )
         self._router = FeishuMessageRouter(
         self._router = FeishuMessageRouter(
             connector=self._connector,
             connector=self._connector,
             trace_backend=self._trace_backend,
             trace_backend=self._trace_backend,
@@ -81,10 +86,15 @@ class FeishuChannelManager(ChannelRegistry):
             FeishuChannelConfig(
             FeishuChannelConfig(
                 feishu_http_base_url=os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip(),
                 feishu_http_base_url=os.getenv("FEISHU_HTTP_BASE_URL", "http://127.0.0.1:4380").strip(),
                 http_timeout=float(os.getenv("FEISHU_HTTP_TIMEOUT", "120")),
                 http_timeout=float(os.getenv("FEISHU_HTTP_TIMEOUT", "120")),
-                echo_replies=os.getenv("CHANNELS_ECHO_REPLIES", "true").lower() in ("1", "true", "yes"),
-                echo_prefix=os.getenv("CHANNELS_ECHO_PREFIX", "[Gateway] "),
                 dispatch_reactions=os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes"),
                 dispatch_reactions=os.getenv("CHANNELS_DISPATCH_REACTIONS", "false").lower() in ("1", "true", "yes"),
-                dispatch_card_actions=os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "false").lower() in ("1", "true", "yes"),
+                dispatch_card_actions=os.getenv("CHANNELS_DISPATCH_CARD_ACTIONS", "false").lower()
+                in ("1", "true", "yes"),
+                agent_api_base_url=os.getenv("GATEWAY_AGENT_API_BASE_URL", "http://127.0.0.1:8000").strip(),
+                agent_run_model=os.getenv("FEISHU_AGENT_RUN_MODEL", "gpt-4o").strip(),
+                agent_run_max_iterations=int(os.getenv("FEISHU_AGENT_RUN_MAX_ITERATIONS", "200")),
+                agent_run_temperature=float(os.getenv("FEISHU_AGENT_RUN_TEMPERATURE", "0.3")),
+                feishu_run_notify_on_submit=os.getenv("CHANNELS_FEISHU_RUN_NOTIFY", "true").lower()
+                in ("1", "true", "yes"),
             )
             )
         )
         )
 
 

+ 1 - 1
gateway/core/channels/protocols.py

@@ -30,7 +30,7 @@ class ExecutorBackend(Protocol):
     """接收解析后的入站消息,触发业务处理,返回 task_id。
     """接收解析后的入站消息,触发业务处理,返回 task_id。
 
 
     ``reply_context`` 与 ``event`` 的具体类型由各渠道自行约束;
     ``reply_context`` 与 ``event`` 的具体类型由各渠道自行约束;
-    ``connector`` 须实现 ``send_text(reply_context, text) -> dict`` 以支持回显
+    ``connector`` 须实现 ``send_text(reply_context, text) -> dict`` 以便向用户发送状态或结果
     """
     """
 
 
     async def handle_inbound_message(
     async def handle_inbound_message(