Browse Source

修复了重复回复的问题

kevin.yang 3 ngày trước cách đây
mục cha
commit
5bfa297ad5

+ 1 - 12
agent/core/runner.py

@@ -211,12 +211,7 @@ BUILTIN_TOOLS = [
     "browser_export_cookies",
     "browser_load_cookies",
 
-    # 飞书工具
-    "feishu_send_message_to_contact",
-    "feishu_get_chat_history",
-    "feishu_get_contact_replies",
-    "feishu_get_contact_list",
-    # 飞书 Node HTTP 适配层 (4380):/tools、/tool-call、/tool-calls/batch
+    # 仅飞书 Node HTTP 适配层:/tools、/tool-call、/tool-calls/batch
     "feishu_adapter_list_tools",
     "feishu_adapter_tool_call",
     "feishu_adapter_tool_calls_batch",
@@ -1056,12 +1051,6 @@ class AgentRunner:
                         head_sequence=head_seq,
                         completed_at=datetime.now(),
                     )
-                    # 广播状态变化给前端
-                    try:
-                        from agent.trace.websocket import broadcast_trace_status_changed
-                        await broadcast_trace_status_changed(trace_id, "stopped")
-                    except Exception:
-                        pass
                     trace_obj = await self.trace_store.get_trace(trace_id)
                     if trace_obj:
                         yield trace_obj

+ 1 - 6
agent/tools/builtin/feishu/__init__.py

@@ -1,5 +1,4 @@
-from agent.tools.builtin.feishu.chat import (feishu_get_chat_history, feishu_get_contact_replies,
-                                         feishu_send_message_to_contact,feishu_get_contact_list)
+# 仅导入会触发 @tool 注册的模块。直连 SDK 的会话工具在 chat.py,按需单独 import(如 chat_test),不默认注册。
 from agent.tools.builtin.feishu.http_adapter_tools import (
     feishu_adapter_list_tools,
     feishu_adapter_tool_call,
@@ -7,10 +6,6 @@ from agent.tools.builtin.feishu.http_adapter_tools import (
 )
 
 __all__ = [
-    "feishu_get_chat_history",
-    "feishu_get_contact_replies",
-    "feishu_send_message_to_contact",
-    "feishu_get_contact_list",
     "feishu_adapter_list_tools",
     "feishu_adapter_tool_call",
     "feishu_adapter_tool_calls_batch",

+ 4 - 0
agent/trace/models.py

@@ -453,6 +453,10 @@ class Message:
             result["cache_creation_tokens"] = self.cache_creation_tokens
         if self.cache_read_tokens is not None:
             result["cache_read_tokens"] = self.cache_read_tokens
+        if self.branch_type is not None:
+            result["branch_type"] = self.branch_type
+        if self.branch_id is not None:
+            result["branch_id"] = self.branch_id
         return result
 
 

+ 19 - 0
agent/trace/store.py

@@ -111,6 +111,7 @@ class FileSystemTraceStore:
         if not trace:
             return
 
+        prev_status = trace.status
         # 更新字段
         for key, value in updates.items():
             if hasattr(trace, key):
@@ -120,6 +121,24 @@ class FileSystemTraceStore:
         meta_file = self._get_meta_file(trace_id)
         meta_file.write_text(json.dumps(trace.to_dict(), indent=2, ensure_ascii=False), encoding="utf-8")
 
+        # 进入终态时广播,供 WebSocket 订阅方(如 Gateway 飞书跟单)结束跟单,无需再 HTTP 轮询
+        new_status = trace.status
+        if (
+            "status" in updates
+            and new_status in ("completed", "failed", "stopped")
+            and new_status != prev_status
+        ):
+            try:
+                from . import websocket as trace_ws
+
+                await trace_ws.broadcast_trace_status_changed(trace_id, new_status)
+            except Exception:
+                logger.exception(
+                    "broadcast_trace_status_changed failed trace_id=%s status=%s",
+                    trace_id,
+                    new_status,
+                )
+
     async def list_traces(
         self,
         mode: Optional[str] = None,

+ 296 - 154
gateway/core/channels/feishu/http_run_executor.py

@@ -1,11 +1,17 @@
 """
-飞书执行器:通过 HTTP 调用 Agent 的 ``run_api``,并在后台轮询 ``GET /api/traces/{id}/messages``
-将主路径上每一条 assistant 消息转发到飞书。
+飞书执行器:HTTP 调用 Agent ``run_api``,WebSocket 订阅 ``/api/traces/{id}/watch``,
+将 assistant 消息转发到飞书(不轮询 messages)。
+
+转发规则:
+- 不转发 ``branch_type=reflection``(完成后知识提取侧分支)
+- 不转发仍含 ``tool_calls`` 的中间轮,只推工具执行后的最终回复
+- 提取正文时避免 ``description`` 与 ``text`` 重复拼接
 """
 
 from __future__ import annotations
 
 import asyncio
+import json
 import logging
 import time
 import uuid
@@ -20,20 +26,21 @@ logger = logging.getLogger(__name__)
 
 _TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"})
 
-# 同一 trace 仅一个轮询任务,避免并发重复推送
+# 同一 trace 仅一个跟单任务,避免并发重复推送
 _poll_tasks: dict[str, asyncio.Task[None]] = {}
 _poll_tasks_lock = asyncio.Lock()
 # trace_id → 已成功推送到飞书的 assistant sequence(跨多次 run,避免重复发送)
 _assistant_sent_sequences: dict[str, set[int]] = {}
-# trace_id → 待在本次任务结束时移除 Typing 表情的用户消息(可多条,同一 trace 并发续跑时合并清理)
+# trace_id → 待任务结束时移除 Typing 表情的用户消息
 _typing_cleanup_lock = asyncio.Lock()
 _pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {}
 
 
+# ----- HTTP / Agent API -----
+
+
 def _format_api_error(status_code: int, body_text: str) -> str:
     try:
-        import json
-
         data = json.loads(body_text)
         detail = data.get("detail")
         if isinstance(detail, str):
@@ -47,6 +54,9 @@ def _format_api_error(status_code: int, body_text: str) -> str:
     return (body_text or "")[:800] or f"HTTP {status_code}"
 
 
+# ----- 飞书上下文(用户消息 / Trace.context)-----
+
+
 def _append_feishu_context_block(
     text: str,
     event: IncomingFeishuEvent,
@@ -74,7 +84,7 @@ def _feishu_adapter_payload(
     event: IncomingFeishuEvent,
     reply_context: FeishuReplyContext,
 ) -> dict[str, str]:
-    """写入 Agent Trace.context['feishu_adapter'],供 feishu_adapter_tool_call 对齐 Node /tool-call。"""
+    """写入 Trace.context['feishu_adapter'],供 feishu_adapter_tool_call 对齐 Node /tool-call。"""
     return {
         "account_id": reply_context.account_id or "",
         "app_id": reply_context.app_id,
@@ -85,8 +95,26 @@ def _feishu_adapter_payload(
     }
 
 
+# ----- Trace assistant → 飞书正文 -----
+
+
+def _assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool:
+    """assistant 是否仍带有待执行的 tool_calls(中间轮,不当最终回复推给用户)。"""
+    if msg.get("role") != "assistant":
+        return False
+    c = msg.get("content")
+    if not isinstance(c, dict):
+        return False
+    tc = c.get("tool_calls")
+    if tc is None:
+        return False
+    if isinstance(tc, list):
+        return len(tc) > 0
+    return bool(tc)
+
+
 def _assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
-    """从 ``GET .../messages`` 返回的单条消息 dict 提取可发给用户的文本;无可展示内容则返回 None。"""
+    """从 Trace 消息 dict 提取可发给用户的文本。"""
     if msg.get("role") != "assistant":
         return None
     content = msg.get("content")
@@ -94,12 +122,15 @@ def _assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
 
     if isinstance(content, dict):
         text = (content.get("text") or "").strip()
+        tool_calls = content.get("tool_calls")
+        desc = (msg.get("description") or "").strip()
         if text:
             parts.append(text)
-        tool_calls = content.get("tool_calls")
         if tool_calls:
-            desc = (msg.get("description") or "").strip()
-            parts.append(desc if desc else "[工具调用]")
+            if not text:
+                parts.append(desc if desc else "[工具调用]")
+            elif desc and desc != text:
+                parts.append(desc)
     elif isinstance(content, str) and content.strip():
         parts.append(content.strip())
     else:
@@ -118,6 +149,18 @@ def _truncate_for_im(text: str, max_chars: int) -> str:
     return text[: max_chars - 80] + "\n\n…(内容过长已截断)"
 
 
+def _trace_watch_ws_url(http_base: str, trace_id: str) -> str:
+    """Agent HTTP 根地址 → ``/api/traces/{id}/watch`` 的 WebSocket URL。"""
+    b = http_base.strip().rstrip("/")
+    if b.startswith("https://"):
+        origin = "wss://" + b[8:]
+    elif b.startswith("http://"):
+        origin = "ws://" + b[7:]
+    else:
+        origin = "ws://" + b
+    return f"{origin}/api/traces/{trace_id}/watch"
+
+
 def _message_sequence(msg: dict[str, Any]) -> int | None:
     s = msg.get("sequence")
     if s is None:
@@ -137,23 +180,29 @@ def _message_sequence(msg: dict[str, Any]) -> int | None:
         return None
 
 
-async def _register_pending_typing_cleanup(
-    trace_id: str,
-    message_id: str,
-    account_id: str | None,
-) -> None:
-    async with _typing_cleanup_lock:
-        _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id))
+def _watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None:
+    if isinstance(raw, (bytes, bytearray)):
+        raw = raw.decode("utf-8", errors="replace")
+    if not isinstance(raw, str):
+        return None
+    try:
+        data = json.loads(raw)
+    except (json.JSONDecodeError, TypeError):
+        return None
+    return data if isinstance(data, dict) else None
 
 
-async def _remove_typing_immediate(
+# ----- Typing 表情 -----
+
+
+async def _remove_typing_reaction_safe(
     connector: Any,
-    message_id: str | None,
+    message_id: str,
     account_id: str | None,
     emoji: str,
+    *,
+    log_label: str,
 ) -> None:
-    if not message_id:
-        return
     try:
         r = await connector.remove_bot_reactions_for_emoji(
             message_id,
@@ -162,12 +211,39 @@ async def _remove_typing_immediate(
         )
         if not r.get("ok"):
             logger.warning(
-                "feishu typing: immediate remove failed mid=%s result=%s",
+                "%s: remove reaction failed mid=%s result=%s",
+                log_label,
                 message_id,
                 r,
             )
     except Exception:
-        logger.exception("feishu typing: immediate remove exception mid=%s", message_id)
+        logger.exception("%s: remove reaction exception mid=%s", log_label, message_id)
+
+
+async def _register_pending_typing_cleanup(
+    trace_id: str,
+    message_id: str,
+    account_id: str | None,
+) -> None:
+    async with _typing_cleanup_lock:
+        _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id))
+
+
+async def _remove_typing_immediate(
+    connector: Any,
+    message_id: str | None,
+    account_id: str | None,
+    emoji: str,
+) -> None:
+    if not message_id:
+        return
+    await _remove_typing_reaction_safe(
+        connector,
+        message_id,
+        account_id,
+        emoji,
+        log_label="feishu typing",
+    )
 
 
 async def _flush_pending_typing_cleanups(
@@ -178,20 +254,50 @@ async def _flush_pending_typing_cleanups(
     async with _typing_cleanup_lock:
         pairs = _pending_typing_by_trace.pop(trace_id, [])
     for mid, acc in pairs:
-        try:
-            r = await connector.remove_bot_reactions_for_emoji(
-                mid,
-                emoji,
-                account_id=acc,
-            )
-            if not r.get("ok"):
-                logger.warning(
-                    "feishu typing cleanup: remove reaction failed mid=%s result=%s",
-                    mid,
-                    r,
-                )
-        except Exception:
-            logger.exception("feishu typing cleanup: remove reaction exception mid=%s", mid)
+        await _remove_typing_reaction_safe(
+            connector,
+            mid,
+            acc,
+            emoji,
+            log_label="feishu typing cleanup",
+        )
+
+
+# ----- 跟单:WS 转发 assistant -----
+
+
+async def _forward_one_assistant_to_feishu(
+    m: dict[str, Any],
+    *,
+    sent_sequences: set[int],
+    reply_ctx: FeishuReplyContext,
+    connector: Any,
+    max_text_chars: int,
+) -> None:
+    seq = _message_sequence(m)
+    if seq is None or m.get("role") != "assistant":
+        return
+    if seq in sent_sequences:
+        return
+    if m.get("branch_type") == "reflection":
+        sent_sequences.add(seq)
+        return
+    if _assistant_content_has_tool_calls(m):
+        sent_sequences.add(seq)
+        return
+    body = _assistant_wire_to_feishu_text(m)
+    if body is None:
+        sent_sequences.add(seq)
+        return
+    body = _truncate_for_im(body, max_text_chars)
+    try:
+        result = await connector.send_text(reply_ctx, body)
+        if result.get("ok"):
+            sent_sequences.add(seq)
+        else:
+            logger.error("feishu forward: send_text failed seq=%s result=%s", seq, result)
+    except Exception:
+        logger.exception("feishu forward: send_text exception seq=%s", seq)
 
 
 async def _poll_assistants_to_feishu(
@@ -209,7 +315,8 @@ async def _poll_assistants_to_feishu(
     typing_emoji_for_cleanup: str = "Typing",
 ) -> None:
     """
-    轮询 Trace 直至终态;可选将主路径 assistant 消息发到飞书;结束时清理已登记的 Typing 表情。
+    WebSocket 订阅直至终态;转发 ``message_added`` 中的 assistant。
+    WS 不可用时仅 ``GET /api/traces/{id}`` 轮询状态(结束跟单 + 清理 Typing),不拉 messages。
     """
     if trace_id not in _assistant_sent_sequences:
         _assistant_sent_sequences[trace_id] = set()
@@ -218,92 +325,125 @@ async def _poll_assistants_to_feishu(
     started = time.monotonic()
     base = agent_base_url.rstrip("/")
 
+    ws = None
+    try:
+        import websockets
+
+        ws = await websockets.connect(
+            _trace_watch_ws_url(base, trace_id),
+            max_size=10_000_000,
+            ping_interval=20,
+            ping_timeout=60,
+        )
+        logger.info("feishu: trace watch WS connected trace_id=%s", trace_id)
+    except Exception as e:
+        logger.warning("feishu: trace watch WS connect failed: %s", e)
+        ws = None
+
+    forward_warned = False
+
+    async def _dispatch_watch_event(data: dict[str, Any]) -> str:
+        ev = data.get("event")
+        if ev == "message_added" and forward_assistants:
+            msg = data.get("message")
+            if isinstance(msg, dict):
+                await _forward_one_assistant_to_feishu(
+                    msg,
+                    sent_sequences=sent_sequences,
+                    reply_ctx=reply_ctx,
+                    connector=connector,
+                    max_text_chars=max_text_chars,
+                )
+        if ev == "trace_status_changed":
+            st = data.get("status")
+            if isinstance(st, str) and st in _TERMINAL_STATUSES:
+                return st
+        if ev == "trace_completed":
+            return "completed"
+        return "running"
+
     try:
         while True:
             if poll_max_seconds > 0 and (time.monotonic() - started) >= poll_max_seconds:
                 logger.warning(
-                    "feishu poll: trace_id=%s stopped by poll_max_seconds=%s",
+                    "feishu watch: trace_id=%s stopped by poll_max_seconds=%s",
                     trace_id,
                     poll_max_seconds,
                 )
                 break
 
-            status = "running"
-            try:
-                async with httpx.AsyncClient(timeout=poll_request_timeout) as client:
-                    tr = await client.get(f"{base}/api/traces/{trace_id}")
-                    if tr.status_code == 404:
-                        logger.warning("feishu poll: trace %s not found, stop", trace_id)
-                        break
-                    if tr.status_code >= 400:
-                        logger.warning(
-                            "feishu poll: GET trace failed status=%s body=%s",
-                            tr.status_code,
-                            tr.text[:300],
-                        )
-                    else:
-                        data = tr.json()
-                        trace_obj = data.get("trace") or {}
-                        status = str(trace_obj.get("status") or "running")
-
-                    if forward_assistants:
-                        ms = await client.get(
-                            f"{base}/api/traces/{trace_id}/messages",
-                            params={"mode": "main_path"},
-                        )
-                        if ms.status_code != 200:
+            status_hint = "running"
+
+            if ws is not None:
+                try:
+                    raw = await asyncio.wait_for(ws.recv(), timeout=poll_interval)
+                except asyncio.TimeoutError:
+                    raw = None
+                except Exception as e:
+                    logger.warning("feishu watch WS error, HTTP status fallback: %s", e)
+                    try:
+                        await ws.close()
+                    except Exception:
+                        pass
+                    ws = None
+                    raw = None
+
+                while raw is not None:
+                    data = _watch_ws_payload_to_dict(raw)
+                    if data is not None:
+                        st = await _dispatch_watch_event(data)
+                        if st in _TERMINAL_STATUSES:
+                            status_hint = st
+                    try:
+                        raw = await asyncio.wait_for(ws.recv(), timeout=0.001)
+                    except asyncio.TimeoutError:
+                        raw = None
+                    except Exception:
+                        raw = None
+            else:
+                await asyncio.sleep(poll_interval)
+                if forward_assistants and not forward_warned:
+                    logger.error(
+                        "feishu: WebSocket 不可用,无法推送 assistant;"
+                        "仅 HTTP 查询 trace 状态以结束跟单(不拉 messages)"
+                    )
+                    forward_warned = True
+
+            effective = status_hint
+            if ws is None:
+                try:
+                    async with httpx.AsyncClient(timeout=poll_request_timeout) as client:
+                        tr = await client.get(f"{base}/api/traces/{trace_id}")
+                        if tr.status_code == 404:
+                            logger.warning("feishu watch: trace %s not found, stop", trace_id)
+                            break
+                        if tr.status_code >= 400:
                             logger.warning(
-                                "feishu poll: GET messages failed status=%s",
-                                ms.status_code,
+                                "feishu watch: GET trace failed status=%s body=%s",
+                                tr.status_code,
+                                tr.text[:300],
                             )
                         else:
-                            payload = ms.json()
-                            raw_list = payload.get("messages") or []
-                            assistants = [
-                                m
-                                for m in raw_list
-                                if isinstance(m, dict) and m.get("role") == "assistant"
-                            ]
-                            assistants.sort(key=lambda m: (_message_sequence(m) or 0))
-
-                            for m in assistants:
-                                seq = _message_sequence(m)
-                                if seq is None:
-                                    continue
-                                if seq in sent_sequences:
-                                    continue
-                                body = _assistant_wire_to_feishu_text(m)
-                                if body is None:
-                                    sent_sequences.add(seq)
-                                    continue
-                                body = _truncate_for_im(body, max_text_chars)
-                                try:
-                                    result = await connector.send_text(reply_ctx, body)
-                                    if result.get("ok"):
-                                        sent_sequences.add(seq)
-                                    else:
-                                        logger.error(
-                                            "feishu poll: send_text failed seq=%s result=%s",
-                                            seq,
-                                            result,
-                                        )
-                                except Exception:
-                                    logger.exception(
-                                        "feishu poll: send_text exception seq=%s",
-                                        seq,
-                                    )
-            except httpx.RequestError as exc:
-                logger.warning("feishu poll: request error trace_id=%s err=%s", trace_id, exc)
-
-            if status in _TERMINAL_STATUSES:
+                            body = tr.json()
+                            trace_obj = body.get("trace") or {}
+                            st = str(trace_obj.get("status") or "running")
+                            if st in _TERMINAL_STATUSES:
+                                effective = st
+                except httpx.RequestError as exc:
+                    logger.warning("feishu watch: HTTP status check error trace_id=%s err=%s", trace_id, exc)
+
+            if effective in _TERMINAL_STATUSES:
                 grace += 1
                 if grace >= terminal_grace_rounds:
                     break
             else:
                 grace = 0
-
-            await asyncio.sleep(poll_interval)
     finally:
+        if ws is not None:
+            try:
+                await ws.close()
+            except Exception:
+                pass
         await _flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup)
         cur = asyncio.current_task()
         async with _poll_tasks_lock:
@@ -325,7 +465,7 @@ def _schedule_trace_followup(
     forward_assistants: bool,
     typing_emoji: str,
 ) -> None:
-    """同一 trace 仅保留一个活跃跟单任务(转发 assistant + 或在终态时清理 Typing)。"""
+    """同一 trace 仅保留一个活跃跟单任务。"""
 
     async def _runner() -> None:
         await _poll_assistants_to_feishu(
@@ -356,8 +496,30 @@ def _schedule_trace_followup(
     _ = loop.create_task(_spawn())
 
 
+# ----- 入站:提交 Agent -----
+
+
+async def _inbound_fail_reply(
+    connector: Any,
+    reply_context: FeishuReplyContext,
+    *,
+    typing_placed: bool,
+    typing_emoji: str,
+    message: str,
+) -> None:
+    """错误路径:先摘 Typing(若曾加上),再向用户发送说明。"""
+    if typing_placed:
+        await _remove_typing_immediate(
+            connector,
+            reply_context.message_id,
+            reply_context.account_id,
+            typing_emoji,
+        )
+    await connector.send_text(reply_context, message)
+
+
 class FeishuHttpRunApiExecutor:
-    """调用 Agent Trace HTTP API,并可选轮询 assistant 消息转发到飞书。"""
+    """调用 Agent Trace HTTP API,WebSocket 将 assistant 转发到飞书。"""
 
     def __init__(
         self,
@@ -461,31 +623,23 @@ class FeishuHttpRunApiExecutor:
                     )
         except httpx.RequestError as exc:
             logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id)
-            if typing_placed:
-                await _remove_typing_immediate(
-                    connector,
-                    reply_context.message_id,
-                    reply_context.account_id,
-                    self._typing_emoji,
-                )
-            await connector.send_text(
+            await _inbound_fail_reply(
+                connector,
                 reply_context,
-                f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
+                typing_placed=typing_placed,
+                typing_emoji=self._typing_emoji,
+                message=f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
             )
             return task_id
 
         body_text = resp.text
         if resp.status_code == 409:
-            if typing_placed:
-                await _remove_typing_immediate(
-                    connector,
-                    reply_context.message_id,
-                    reply_context.account_id,
-                    self._typing_emoji,
-                )
-            await connector.send_text(
+            await _inbound_fail_reply(
+                connector,
                 reply_context,
-                "[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
+                typing_placed=typing_placed,
+                typing_emoji=self._typing_emoji,
+                message="[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
             )
             return task_id
 
@@ -497,47 +651,35 @@ class FeishuHttpRunApiExecutor:
                 user_id,
                 err,
             )
-            if typing_placed:
-                await _remove_typing_immediate(
-                    connector,
-                    reply_context.message_id,
-                    reply_context.account_id,
-                    self._typing_emoji,
-                )
-            await connector.send_text(
+            await _inbound_fail_reply(
+                connector,
                 reply_context,
-                f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
+                typing_placed=typing_placed,
+                typing_emoji=self._typing_emoji,
+                message=f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
             )
             return task_id
 
         try:
             data = resp.json()
         except Exception:
-            if typing_placed:
-                await _remove_typing_immediate(
-                    connector,
-                    reply_context.message_id,
-                    reply_context.account_id,
-                    self._typing_emoji,
-                )
-            await connector.send_text(
+            await _inbound_fail_reply(
+                connector,
                 reply_context,
-                "[Gateway] Agent API 返回非 JSON,已放弃解析。",
+                typing_placed=typing_placed,
+                typing_emoji=self._typing_emoji,
+                message="[Gateway] Agent API 返回非 JSON,已放弃解析。",
             )
             return task_id
 
         resolved_id = data.get("trace_id")
         if not isinstance(resolved_id, str) or not resolved_id:
-            if typing_placed:
-                await _remove_typing_immediate(
-                    connector,
-                    reply_context.message_id,
-                    reply_context.account_id,
-                    self._typing_emoji,
-                )
-            await connector.send_text(
+            await _inbound_fail_reply(
+                connector,
                 reply_context,
-                "[Gateway] Agent API 响应缺少 trace_id。",
+                typing_placed=typing_placed,
+                typing_emoji=self._typing_emoji,
+                message="[Gateway] Agent API 响应缺少 trace_id。",
             )
             return task_id
 

+ 6 - 6
gateway/core/channels/feishu/manager.py

@@ -30,12 +30,12 @@ class FeishuChannelConfig:
     agent_run_max_iterations: int = 200
     agent_run_temperature: float = 0.3
     feishu_run_notify_on_submit: bool = True
-    # 轮询 Agent GET /api/traces/{id}/messages,逐条转发主路径 assistant 到飞书
-    poll_assistant_messages: bool = True
-    poll_interval_seconds: float = 1.0
-    poll_request_timeout: float = 30.0
-    poll_terminal_grace_rounds: int = 2
-    poll_max_seconds: float = 0.0
+    # 以下为「Trace 跟单」参数(WebSocket watch;不再 HTTP 轮询 messages)
+    poll_assistant_messages: bool = True  # 是否把 assistant 推到飞书(False 时仍可连 WS 等终态清 Typing)
+    poll_interval_seconds: float = 1.0  # WS recv 超时;无 WS 时 HTTP 查 trace 状态的间隔
+    poll_request_timeout: float = 30.0  # 仅 HTTP 兜底 GET /api/traces/{id} 的超时
+    poll_terminal_grace_rounds: int = 2  # 连续 N 轮终态后结束跟单
+    poll_max_seconds: float = 0.0  # 跟单最长秒数,0=不限制
     assistant_max_text_chars: int = 8000
     typing_reaction_enabled: bool = True
     typing_reaction_emoji: str = "Typing"

+ 5 - 0
gateway/core/channels/feishu/openclaw-lark-patch/src/http/config.yml

@@ -1,6 +1,11 @@
 feishu:
   domain: feishu
   ownerOnly: false
+  # CardKit 流式卡片:仅当回复由本 Node(openclaw 内置链路)直接发出时生效。
+  # 当前若走 Gateway → Python Agent → HTTP 发飞书,请同时开 Gateway 侧 FEISHU_ASSISTANT_VIA_WEBSOCKET。
+  streaming: true
+  # 私聊 auto → streaming;群聊 auto → static。若群聊也要流式卡片可改为:
+  # replyMode: { default: auto, direct: streaming, group: streaming }
   accounts:
     devops:
       appId: cli_a928053b7378dcef