""" 飞书 ↔ Agent Trace 桥接(模块 ``gateway.core.channels.feishu.bridge``)。 职责概览:用户从飞书发来消息后,经 ``FeishuHttpRunApiExecutor`` 调用 Agent 的 Trace HTTP API (``POST /api/traces`` 建链或 ``POST /api/traces/{id}/run`` 续跑),再经 WebSocket 订阅 ``/api/traces/{id}/watch`` 跟单,把 **assistant 最终回复** 推回飞书;可选挂载 Workspace / ``gateway_exec``(Docker 容器)生命周期,与 Trace 终态联动。 文件内分区(单模块,避免过度拆包): 1. **Agent 请求体 / 飞书上下文** — ``append_feishu_context_block``、``feishu_adapter_payload`` 等 2. **Trace / WS 消息解析** — ``TERMINAL_STATUSES``、assistant 正文提取、``trace_watch_ws_url`` 3. **跟单与 Typing** — ``poll_assistants_to_feishu``、``schedule_trace_followup`` 4. **``FeishuHttpRunApiExecutor``** — 飞书入站入口 转发规则:不推 ``branch_type=reflection``;不推仍含 ``tool_calls`` 的中间轮;避免 ``description`` 与 ``text`` 重复拼接。 """ from __future__ import annotations import asyncio import json import logging import time import uuid from collections.abc import Awaitable, Callable from copy import copy from typing import Any import httpx from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent from gateway.core.lifecycle.trace.backend import LifecycleTraceBackend from gateway.core.lifecycle.workspace import WorkspaceManager __all__ = [ "FeishuHttpRunApiExecutor", "FollowupFinishedCallback", "TERMINAL_STATUSES", "append_feishu_context_block", "feishu_adapter_payload", "format_api_error", "normalized_agent_trace_id", "schedule_trace_followup", ] logger = logging.getLogger(__name__) # ============================================================================= # 1. Agent 请求体与飞书上下文 # ============================================================================= def normalized_agent_trace_id(raw: str) -> str | None: t = (raw or "").strip() return t if t else None def format_api_error(status_code: int, body_text: str) -> str: try: data = json.loads(body_text) detail = data.get("detail") if isinstance(detail, str): return detail if isinstance(detail, list): return json.dumps(detail, ensure_ascii=False) if detail is not None: return str(detail) except Exception: pass return (body_text or "")[:800] or f"HTTP {status_code}" def append_feishu_context_block( text: str, event: IncomingFeishuEvent, reply_context: FeishuReplyContext, ) -> str: core = text.strip() if text else "" if not core: core = "(空消息)" lines = [ core, "", "[飞书上下文 · 工具调用时请使用下列字段,勿向用户复述本段]", f"account_id={reply_context.account_id or ''}", f"app_id={reply_context.app_id}", f"chat_id={reply_context.chat_id}", f"message_id={reply_context.message_id or ''}", f"open_id={reply_context.open_id or ''}", f"chat_type={event.chat_type or ''}", ] return "\n".join(lines) def feishu_adapter_payload( event: IncomingFeishuEvent, reply_context: FeishuReplyContext, ) -> dict[str, str]: return { "account_id": reply_context.account_id or "", "app_id": reply_context.app_id, "chat_id": reply_context.chat_id, "message_id": reply_context.message_id or "", "sender_open_id": reply_context.open_id or "", "chat_type": event.chat_type or "", } # ============================================================================= # 2. Trace / WebSocket 消息解析 # ============================================================================= TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"}) def assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool: if msg.get("role") != "assistant": return False c = msg.get("content") if not isinstance(c, dict): return False tc = c.get("tool_calls") if tc is None: return False if isinstance(tc, list): return len(tc) > 0 return bool(tc) def assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None: if msg.get("role") != "assistant": return None content = msg.get("content") parts: list[str] = [] if isinstance(content, dict): text = (content.get("text") or "").strip() tool_calls = content.get("tool_calls") desc = (msg.get("description") or "").strip() if text: parts.append(text) if tool_calls: if not text: parts.append(desc if desc else "[工具调用]") elif desc and desc != text: parts.append(desc) elif isinstance(content, str) and content.strip(): parts.append(content.strip()) else: desc = (msg.get("description") or "").strip() if desc: parts.append(desc) if not parts: return None return "\n".join(parts) def truncate_for_im(text: str, max_chars: int) -> str: if len(text) <= max_chars: return text return text[: max_chars - 80] + "\n\n…(内容过长已截断)" def trace_watch_ws_url(http_base: str, trace_id: str) -> str: b = http_base.strip().rstrip("/") if b.startswith("https://"): origin = "wss://" + b[8:] elif b.startswith("http://"): origin = "ws://" + b[7:] else: origin = "ws://" + b return f"{origin}/api/traces/{trace_id}/watch" def message_sequence(msg: dict[str, Any]) -> int | None: s = msg.get("sequence") if s is None: return None if isinstance(s, int): return s if isinstance(s, float): return int(s) if isinstance(s, str): try: return int(s) except ValueError: return None try: return int(s) except (TypeError, ValueError): return None def watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None: if isinstance(raw, (bytes, bytearray)): raw = raw.decode("utf-8", errors="replace") if not isinstance(raw, str): return None try: data = json.loads(raw) except (json.JSONDecodeError, TypeError): return None return data if isinstance(data, dict) else None # ============================================================================= # 3. Trace 跟单、Typing、HTTP 兜底 # ============================================================================= FollowupFinishedCallback = Callable[[str, str], Awaitable[None]] """``(trace_id, reason)`` · ``terminal`` | ``timeout`` | ``not_found``""" _poll_tasks: dict[str, asyncio.Task[None]] = {} _poll_tasks_lock = asyncio.Lock() _assistant_sent_sequences: dict[str, set[int]] = {} _typing_cleanup_lock = asyncio.Lock() _pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {} async def remove_typing_reaction_safe( connector: Any, message_id: str, account_id: str | None, emoji: str, *, log_label: str, ) -> None: try: r = await connector.remove_bot_reactions_for_emoji( message_id, emoji, account_id=account_id, ) if not r.get("ok"): logger.warning( "%s: remove reaction failed mid=%s result=%s", log_label, message_id, r, ) except Exception: logger.exception("%s: remove reaction exception mid=%s", log_label, message_id) async def register_pending_typing_cleanup( trace_id: str, message_id: str, account_id: str | None, ) -> None: async with _typing_cleanup_lock: _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id)) async def remove_typing_immediate( connector: Any, message_id: str | None, account_id: str | None, emoji: str, ) -> None: if not message_id: return await remove_typing_reaction_safe( connector, message_id, account_id, emoji, log_label="feishu typing", ) async def flush_pending_typing_cleanups( connector: Any, trace_id: str, emoji: str, ) -> None: async with _typing_cleanup_lock: pairs = _pending_typing_by_trace.pop(trace_id, []) for mid, acc in pairs: await remove_typing_reaction_safe( connector, mid, acc, emoji, log_label="feishu typing cleanup", ) async def inbound_fail_reply( connector: Any, reply_context: FeishuReplyContext, *, typing_placed: bool, typing_emoji: str, message: str, ) -> None: if typing_placed: await remove_typing_immediate( connector, reply_context.message_id, reply_context.account_id, typing_emoji, ) await connector.send_text(reply_context, message) async def forward_one_assistant_to_feishu( m: dict[str, Any], *, sent_sequences: set[int], reply_ctx: FeishuReplyContext, connector: Any, max_text_chars: int, ) -> None: seq = message_sequence(m) if seq is None or m.get("role") != "assistant": return if seq in sent_sequences: return if m.get("branch_type") == "reflection": sent_sequences.add(seq) return if assistant_content_has_tool_calls(m): sent_sequences.add(seq) return body = assistant_wire_to_feishu_text(m) if body is None: sent_sequences.add(seq) return body = truncate_for_im(body, max_text_chars) try: result = await connector.send_text(reply_ctx, body) if result.get("ok"): sent_sequences.add(seq) else: logger.error("feishu forward: send_text failed seq=%s result=%s", seq, result) except Exception: logger.exception("feishu forward: send_text exception seq=%s", seq) async def poll_assistants_to_feishu( *, agent_base_url: str, trace_id: str, reply_ctx: FeishuReplyContext, connector: Any, poll_interval: float, poll_request_timeout: float, terminal_grace_rounds: int, poll_max_seconds: float, max_text_chars: int, forward_assistants: bool = True, typing_emoji_for_cleanup: str = "Typing", on_finished: FollowupFinishedCallback | None = None, ) -> None: if trace_id not in _assistant_sent_sequences: _assistant_sent_sequences[trace_id] = set() sent_sequences = _assistant_sent_sequences[trace_id] grace = 0 started = time.monotonic() base = agent_base_url.rstrip("/") ws = None try: import websockets ws = await websockets.connect( trace_watch_ws_url(base, trace_id), max_size=10_000_000, ping_interval=20, ping_timeout=60, ) logger.info("feishu: trace watch WS connected trace_id=%s", trace_id) except Exception as e: logger.warning("feishu: trace watch WS connect failed: %s", e) ws = None forward_warned = False exit_reason: str | None = None async def _dispatch_watch_event(data: dict[str, Any]) -> str: ev = data.get("event") if ev == "message_added" and forward_assistants: msg = data.get("message") if isinstance(msg, dict): await forward_one_assistant_to_feishu( msg, sent_sequences=sent_sequences, reply_ctx=reply_ctx, connector=connector, max_text_chars=max_text_chars, ) if ev == "trace_status_changed": st = data.get("status") if isinstance(st, str) and st in TERMINAL_STATUSES: return st if ev == "trace_completed": return "completed" return "running" try: while True: if poll_max_seconds > 0 and (time.monotonic() - started) >= poll_max_seconds: logger.warning( "feishu watch: trace_id=%s stopped by poll_max_seconds=%s", trace_id, poll_max_seconds, ) exit_reason = "timeout" break status_hint = "running" if ws is not None: stream = ws try: raw = await asyncio.wait_for(stream.recv(), timeout=poll_interval) except asyncio.TimeoutError: raw = None except Exception as e: logger.warning("feishu watch WS error, HTTP status fallback: %s", e) try: await stream.close() except Exception: pass ws = None raw = None while raw is not None: data = watch_ws_payload_to_dict(raw) if data is not None: st = await _dispatch_watch_event(data) if st in TERMINAL_STATUSES: status_hint = st try: raw = await asyncio.wait_for(stream.recv(), timeout=0.001) except asyncio.TimeoutError: raw = None except Exception: raw = None else: await asyncio.sleep(poll_interval) if forward_assistants and not forward_warned: logger.error( "feishu: WebSocket 不可用,无法推送 assistant;" "仅 HTTP 查询 trace 状态以结束跟单(不拉 messages)" ) forward_warned = True effective = status_hint if ws is None: try: async with httpx.AsyncClient(timeout=poll_request_timeout) as client: tr = await client.get(f"{base}/api/traces/{trace_id}") if tr.status_code == 404: logger.warning("feishu watch: trace %s not found, stop", trace_id) exit_reason = "not_found" break if tr.status_code >= 400: logger.warning( "feishu watch: GET trace failed status=%s body=%s", tr.status_code, tr.text[:300], ) else: body = tr.json() trace_obj = body.get("trace") or {} st = str(trace_obj.get("status") or "running") if st in TERMINAL_STATUSES: effective = st except httpx.RequestError as exc: logger.warning("feishu watch: HTTP status check error trace_id=%s err=%s", trace_id, exc) if effective in TERMINAL_STATUSES: grace += 1 if grace >= terminal_grace_rounds: exit_reason = "terminal" break else: grace = 0 finally: if ws is not None: try: await ws.close() except Exception: pass await flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup) cur = asyncio.current_task() async with _poll_tasks_lock: if _poll_tasks.get(trace_id) is cur: _ = _poll_tasks.pop(trace_id, None) if on_finished is not None and exit_reason is not None: try: await on_finished(trace_id, exit_reason) except Exception: logger.exception( "feishu watch: on_finished failed trace_id=%s reason=%s", trace_id, exit_reason, ) def schedule_trace_followup( *, agent_base_url: str, trace_id: str, reply_context: FeishuReplyContext, connector: Any, poll_interval: float, poll_request_timeout: float, terminal_grace_rounds: int, poll_max_seconds: float, max_text_chars: int, forward_assistants: bool, typing_emoji: str, on_finished: FollowupFinishedCallback | None = None, ) -> None: async def _runner() -> None: await poll_assistants_to_feishu( agent_base_url=agent_base_url, trace_id=trace_id, reply_ctx=reply_context, connector=connector, poll_interval=poll_interval, poll_request_timeout=poll_request_timeout, terminal_grace_rounds=terminal_grace_rounds, poll_max_seconds=poll_max_seconds, max_text_chars=max_text_chars, forward_assistants=forward_assistants, typing_emoji_for_cleanup=typing_emoji, on_finished=on_finished, ) async def _spawn() -> None: async with _poll_tasks_lock: existing = _poll_tasks.get(trace_id) if existing is not None and not existing.done(): return _poll_tasks[trace_id] = asyncio.create_task(_runner()) try: loop = asyncio.get_running_loop() except RuntimeError: return _ = loop.create_task(_spawn()) # ============================================================================= # 4. FeishuHttpRunApiExecutor # ============================================================================= class FeishuHttpRunApiExecutor: """调用 Agent Trace HTTP API;WebSocket 将 assistant 转发到飞书。""" def __init__( self, *, base_url: str, timeout: float, identity_resolver: Any, model: str = "qwen3.5-flash", max_iterations: int = 200, temperature: float = 0.3, notify_on_submit: bool = True, poll_assistant_messages: bool = True, poll_interval_seconds: float = 1.0, poll_request_timeout: float = 30.0, poll_terminal_grace_rounds: int = 2, poll_max_seconds: float = 0.0, assistant_max_text_chars: int = 8000, typing_reaction_enabled: bool = True, typing_reaction_emoji: str = "Typing", workspace_manager: WorkspaceManager | None = None, workspace_prefix: str = "feishu", channel_id: str = "feishu", lifecycle_trace_backend: LifecycleTraceBackend | None = None, stop_container_on_trace_terminal: bool = False, stop_container_on_trace_not_found: bool = True, release_ref_on_trace_terminal: bool = False, ) -> None: self._base = base_url.rstrip("/") self._timeout = timeout self._identity = identity_resolver self._model = model self._max_iterations = max_iterations self._temperature = temperature self._notify = notify_on_submit self._poll_assistants = poll_assistant_messages self._poll_interval = poll_interval_seconds self._poll_req_timeout = poll_request_timeout self._poll_grace = poll_terminal_grace_rounds self._poll_max_seconds = poll_max_seconds self._assistant_max_chars = assistant_max_text_chars self._typing_reaction_enabled = typing_reaction_enabled self._typing_emoji = typing_reaction_emoji self._workspace_manager = workspace_manager self._workspace_prefix = workspace_prefix self._channel_id = channel_id self._lifecycle_trace_backend = lifecycle_trace_backend self._stop_container_on_trace_terminal = stop_container_on_trace_terminal self._stop_container_on_trace_not_found = stop_container_on_trace_not_found self._release_ref_on_trace_terminal = release_ref_on_trace_terminal def _gateway_exec_for_user(self, user_id: str) -> dict[str, Any] | None: wm = self._workspace_manager if wm is None: return None wid = f"{self._workspace_prefix}:{user_id}" cid = wm.get_workspace_container_id(wid) if not cid: return None return { "docker_container": cid, "container_user": "agent", "container_workdir": "/home/agent/workspace", } async def handle_inbound_message( self, existing_agent_trace_id: str, text: str, reply_context: FeishuReplyContext, connector: Any, *, event: IncomingFeishuEvent, ) -> tuple[str, str]: user_id = self._identity.resolve_user_id(event) content = append_feishu_context_block(text, event, reply_context) task_id = f"task-{uuid.uuid4()}" typing_placed = False if ( self._typing_reaction_enabled and reply_context.message_id and event.event_type == "message" ): try: react_res = await connector.add_message_reaction( reply_context.message_id, self._typing_emoji, account_id=reply_context.account_id, ) typing_placed = bool(react_res.get("ok")) if not typing_placed: logger.warning( "feishu typing: add reaction failed mid=%s result=%s", reply_context.message_id, react_res, ) except Exception: logger.exception( "feishu typing: add reaction exception mid=%s", reply_context.message_id, ) api_trace_id = normalized_agent_trace_id(existing_agent_trace_id) adapter = feishu_adapter_payload(event, reply_context) gateway_exec = self._gateway_exec_for_user(user_id) try: async with httpx.AsyncClient(timeout=self._timeout) as client: if api_trace_id is None: body: dict[str, Any] = { "messages": [{"role": "user", "content": content}], "model": self._model, "temperature": self._temperature, "max_iterations": self._max_iterations, "uid": user_id, "name": f"feishu-{user_id}", "feishu_adapter": adapter, } if gateway_exec: body["gateway_exec"] = gateway_exec resp = await client.post(f"{self._base}/api/traces", json=body) else: body = { "messages": [{"role": "user", "content": content}], "feishu_adapter": adapter, } if gateway_exec: body["gateway_exec"] = gateway_exec resp = await client.post( f"{self._base}/api/traces/{api_trace_id}/run", json=body, ) except httpx.RequestError as exc: logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id) await inbound_fail_reply( connector, reply_context, typing_placed=typing_placed, typing_emoji=self._typing_emoji, message=f"[Gateway] 无法连接 Agent API({self._base}):{exc}", ) return task_id, "" body_text = resp.text if resp.status_code == 409: await inbound_fail_reply( connector, reply_context, typing_placed=typing_placed, typing_emoji=self._typing_emoji, message="[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。", ) return task_id, "" if resp.status_code >= 400: err = format_api_error(resp.status_code, body_text) logger.warning( "FeishuHttpRunApiExecutor: API 错误 status=%s user_id=%s detail=%s", resp.status_code, user_id, err, ) await inbound_fail_reply( connector, reply_context, typing_placed=typing_placed, typing_emoji=self._typing_emoji, message=f"[Gateway] Agent 启动失败({resp.status_code}):{err}", ) return task_id, "" try: data = resp.json() except Exception: await inbound_fail_reply( connector, reply_context, typing_placed=typing_placed, typing_emoji=self._typing_emoji, message="[Gateway] Agent API 返回非 JSON,已放弃解析。", ) return task_id, "" resolved_id = data.get("trace_id") if not isinstance(resolved_id, str) or not resolved_id: await inbound_fail_reply( connector, reply_context, typing_placed=typing_placed, typing_emoji=self._typing_emoji, message="[Gateway] Agent API 响应缺少 trace_id。", ) return task_id, "" if self._notify: await connector.send_text( reply_context, f"[Gateway] 已提交 Agent(API trace_id={resolved_id}),后台执行中。", ) if typing_placed: user_mid = reply_context.message_id if user_mid: await register_pending_typing_cleanup( resolved_id, user_mid, reply_context.account_id, ) if self._poll_assistants or typing_placed: wid = f"{self._workspace_prefix}:{user_id}" async def _on_followup_finished(tid: str, reason: str) -> None: if tid != resolved_id: return wm = self._workspace_manager if wm is None: return stop = False if reason == "terminal" and self._stop_container_on_trace_terminal: stop = True elif reason == "not_found" and self._stop_container_on_trace_not_found: stop = True if stop: await wm.stop_workspace_sandbox(wid) if ( reason == "terminal" and self._release_ref_on_trace_terminal and self._lifecycle_trace_backend is not None ): await self._lifecycle_trace_backend.forget_trace_binding( self._channel_id, user_id, workspace_id=wid, ) schedule_trace_followup( agent_base_url=self._base, trace_id=resolved_id, reply_context=copy(reply_context), connector=connector, poll_interval=self._poll_interval, poll_request_timeout=self._poll_req_timeout, terminal_grace_rounds=self._poll_grace, poll_max_seconds=self._poll_max_seconds, max_text_chars=self._assistant_max_chars, forward_assistants=self._poll_assistants, typing_emoji=self._typing_emoji, on_finished=_on_followup_finished, ) return task_id, resolved_id