| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808 |
- """
- 飞书 ↔ Agent Trace 桥接(模块 ``gateway.core.channels.feishu.bridge``)。
- 职责概览:用户从飞书发来消息后,经 ``FeishuHttpRunApiExecutor`` 调用 Agent 的 Trace HTTP API
- (``POST /api/traces`` 建链或 ``POST /api/traces/{id}/run`` 续跑),再经 WebSocket 订阅
- ``/api/traces/{id}/watch`` 跟单,把 **assistant 最终回复** 推回飞书;可选挂载 Workspace /
- ``gateway_exec``(Docker 容器)生命周期,与 Trace 终态联动。
- 文件内分区(单模块,避免过度拆包):
- 1. **Agent 请求体 / 飞书上下文** — ``append_feishu_context_block``、``feishu_adapter_payload`` 等
- 2. **Trace / WS 消息解析** — ``TERMINAL_STATUSES``、assistant 正文提取、``trace_watch_ws_url``
- 3. **跟单与 Typing** — ``poll_assistants_to_feishu``、``schedule_trace_followup``
- 4. **``FeishuHttpRunApiExecutor``** — 飞书入站入口
- 转发规则:不推 ``branch_type=reflection``;不推仍含 ``tool_calls`` 的中间轮;避免 ``description`` 与 ``text`` 重复拼接。
- """
- from __future__ import annotations
- import asyncio
- import json
- import logging
- import time
- import uuid
- from collections.abc import Awaitable, Callable
- from copy import copy
- from typing import Any
- import httpx
- from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeishuEvent
- from gateway.core.lifecycle.trace.backend import LifecycleTraceBackend
- from gateway.core.lifecycle.workspace import WorkspaceManager
- __all__ = [
- "FeishuHttpRunApiExecutor",
- "FollowupFinishedCallback",
- "TERMINAL_STATUSES",
- "append_feishu_context_block",
- "feishu_adapter_payload",
- "format_api_error",
- "normalized_agent_trace_id",
- "schedule_trace_followup",
- ]
- logger = logging.getLogger(__name__)
- # =============================================================================
- # 1. Agent 请求体与飞书上下文
- # =============================================================================
- def normalized_agent_trace_id(raw: str) -> str | None:
- t = (raw or "").strip()
- return t if t else None
- def format_api_error(status_code: int, body_text: str) -> str:
- try:
- data = json.loads(body_text)
- detail = data.get("detail")
- if isinstance(detail, str):
- return detail
- if isinstance(detail, list):
- return json.dumps(detail, ensure_ascii=False)
- if detail is not None:
- return str(detail)
- except Exception:
- pass
- return (body_text or "")[:800] or f"HTTP {status_code}"
- def append_feishu_context_block(
- text: str,
- event: IncomingFeishuEvent,
- reply_context: FeishuReplyContext,
- ) -> str:
- core = text.strip() if text else ""
- if not core:
- core = "(空消息)"
- lines = [
- core,
- "",
- "[飞书上下文 · 工具调用时请使用下列字段,勿向用户复述本段]",
- f"account_id={reply_context.account_id or ''}",
- f"app_id={reply_context.app_id}",
- f"chat_id={reply_context.chat_id}",
- f"message_id={reply_context.message_id or ''}",
- f"open_id={reply_context.open_id or ''}",
- f"chat_type={event.chat_type or ''}",
- ]
- return "\n".join(lines)
- def feishu_adapter_payload(
- event: IncomingFeishuEvent,
- reply_context: FeishuReplyContext,
- ) -> dict[str, str]:
- return {
- "account_id": reply_context.account_id or "",
- "app_id": reply_context.app_id,
- "chat_id": reply_context.chat_id,
- "message_id": reply_context.message_id or "",
- "sender_open_id": reply_context.open_id or "",
- "chat_type": event.chat_type or "",
- }
- # =============================================================================
- # 2. Trace / WebSocket 消息解析
- # =============================================================================
- TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"})
- def assistant_content_has_tool_calls(msg: dict[str, Any]) -> bool:
- if msg.get("role") != "assistant":
- return False
- c = msg.get("content")
- if not isinstance(c, dict):
- return False
- tc = c.get("tool_calls")
- if tc is None:
- return False
- if isinstance(tc, list):
- return len(tc) > 0
- return bool(tc)
- def assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
- if msg.get("role") != "assistant":
- return None
- content = msg.get("content")
- parts: list[str] = []
- if isinstance(content, dict):
- text = (content.get("text") or "").strip()
- tool_calls = content.get("tool_calls")
- desc = (msg.get("description") or "").strip()
- if text:
- parts.append(text)
- if tool_calls:
- if not text:
- parts.append(desc if desc else "[工具调用]")
- elif desc and desc != text:
- parts.append(desc)
- elif isinstance(content, str) and content.strip():
- parts.append(content.strip())
- else:
- desc = (msg.get("description") or "").strip()
- if desc:
- parts.append(desc)
- if not parts:
- return None
- return "\n".join(parts)
- def truncate_for_im(text: str, max_chars: int) -> str:
- if len(text) <= max_chars:
- return text
- return text[: max_chars - 80] + "\n\n…(内容过长已截断)"
- def trace_watch_ws_url(http_base: str, trace_id: str) -> str:
- b = http_base.strip().rstrip("/")
- if b.startswith("https://"):
- origin = "wss://" + b[8:]
- elif b.startswith("http://"):
- origin = "ws://" + b[7:]
- else:
- origin = "ws://" + b
- return f"{origin}/api/traces/{trace_id}/watch"
- def message_sequence(msg: dict[str, Any]) -> int | None:
- s = msg.get("sequence")
- if s is None:
- return None
- if isinstance(s, int):
- return s
- if isinstance(s, float):
- return int(s)
- if isinstance(s, str):
- try:
- return int(s)
- except ValueError:
- return None
- try:
- return int(s)
- except (TypeError, ValueError):
- return None
- def watch_ws_payload_to_dict(raw: Any) -> dict[str, Any] | None:
- if isinstance(raw, (bytes, bytearray)):
- raw = raw.decode("utf-8", errors="replace")
- if not isinstance(raw, str):
- return None
- try:
- data = json.loads(raw)
- except (json.JSONDecodeError, TypeError):
- return None
- return data if isinstance(data, dict) else None
- # =============================================================================
- # 3. Trace 跟单、Typing、HTTP 兜底
- # =============================================================================
- FollowupFinishedCallback = Callable[[str, str], Awaitable[None]]
- """``(trace_id, reason)`` · ``terminal`` | ``timeout`` | ``not_found``"""
- _poll_tasks: dict[str, asyncio.Task[None]] = {}
- _poll_tasks_lock = asyncio.Lock()
- _assistant_sent_sequences: dict[str, set[int]] = {}
- _typing_cleanup_lock = asyncio.Lock()
- _pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {}
- async def remove_typing_reaction_safe(
- connector: Any,
- message_id: str,
- account_id: str | None,
- emoji: str,
- *,
- log_label: str,
- ) -> None:
- try:
- r = await connector.remove_bot_reactions_for_emoji(
- message_id,
- emoji,
- account_id=account_id,
- )
- if not r.get("ok"):
- logger.warning(
- "%s: remove reaction failed mid=%s result=%s",
- log_label,
- message_id,
- r,
- )
- except Exception:
- logger.exception("%s: remove reaction exception mid=%s", log_label, message_id)
- async def register_pending_typing_cleanup(
- trace_id: str,
- message_id: str,
- account_id: str | None,
- ) -> None:
- async with _typing_cleanup_lock:
- _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id))
- async def remove_typing_immediate(
- connector: Any,
- message_id: str | None,
- account_id: str | None,
- emoji: str,
- ) -> None:
- if not message_id:
- return
- await remove_typing_reaction_safe(
- connector,
- message_id,
- account_id,
- emoji,
- log_label="feishu typing",
- )
- async def flush_pending_typing_cleanups(
- connector: Any,
- trace_id: str,
- emoji: str,
- ) -> None:
- async with _typing_cleanup_lock:
- pairs = _pending_typing_by_trace.pop(trace_id, [])
- for mid, acc in pairs:
- await remove_typing_reaction_safe(
- connector,
- mid,
- acc,
- emoji,
- log_label="feishu typing cleanup",
- )
- async def inbound_fail_reply(
- connector: Any,
- reply_context: FeishuReplyContext,
- *,
- typing_placed: bool,
- typing_emoji: str,
- message: str,
- ) -> None:
- if typing_placed:
- await remove_typing_immediate(
- connector,
- reply_context.message_id,
- reply_context.account_id,
- typing_emoji,
- )
- await connector.send_text(reply_context, message)
- async def forward_one_assistant_to_feishu(
- m: dict[str, Any],
- *,
- sent_sequences: set[int],
- reply_ctx: FeishuReplyContext,
- connector: Any,
- max_text_chars: int,
- ) -> None:
- seq = message_sequence(m)
- if seq is None or m.get("role") != "assistant":
- return
- if seq in sent_sequences:
- return
- if m.get("branch_type") == "reflection":
- sent_sequences.add(seq)
- return
- if assistant_content_has_tool_calls(m):
- sent_sequences.add(seq)
- return
- body = assistant_wire_to_feishu_text(m)
- if body is None:
- sent_sequences.add(seq)
- return
- body = truncate_for_im(body, max_text_chars)
- try:
- result = await connector.send_text(reply_ctx, body)
- if result.get("ok"):
- sent_sequences.add(seq)
- else:
- logger.error("feishu forward: send_text failed seq=%s result=%s", seq, result)
- except Exception:
- logger.exception("feishu forward: send_text exception seq=%s", seq)
- async def poll_assistants_to_feishu(
- *,
- agent_base_url: str,
- trace_id: str,
- reply_ctx: FeishuReplyContext,
- connector: Any,
- poll_interval: float,
- poll_request_timeout: float,
- terminal_grace_rounds: int,
- poll_max_seconds: float,
- max_text_chars: int,
- forward_assistants: bool = True,
- typing_emoji_for_cleanup: str = "Typing",
- on_finished: FollowupFinishedCallback | None = None,
- ) -> None:
- if trace_id not in _assistant_sent_sequences:
- _assistant_sent_sequences[trace_id] = set()
- sent_sequences = _assistant_sent_sequences[trace_id]
- grace = 0
- started = time.monotonic()
- base = agent_base_url.rstrip("/")
- ws = None
- try:
- import websockets
- ws = await websockets.connect(
- trace_watch_ws_url(base, trace_id),
- max_size=10_000_000,
- ping_interval=20,
- ping_timeout=60,
- )
- logger.info("feishu: trace watch WS connected trace_id=%s", trace_id)
- except Exception as e:
- logger.warning("feishu: trace watch WS connect failed: %s", e)
- ws = None
- forward_warned = False
- exit_reason: str | None = None
- async def _dispatch_watch_event(data: dict[str, Any]) -> str:
- ev = data.get("event")
- if ev == "message_added" and forward_assistants:
- msg = data.get("message")
- if isinstance(msg, dict):
- await forward_one_assistant_to_feishu(
- msg,
- sent_sequences=sent_sequences,
- reply_ctx=reply_ctx,
- connector=connector,
- max_text_chars=max_text_chars,
- )
- if ev == "trace_status_changed":
- st = data.get("status")
- if isinstance(st, str) and st in TERMINAL_STATUSES:
- return st
- if ev == "trace_completed":
- return "completed"
- return "running"
- try:
- while True:
- if poll_max_seconds > 0 and (time.monotonic() - started) >= poll_max_seconds:
- logger.warning(
- "feishu watch: trace_id=%s stopped by poll_max_seconds=%s",
- trace_id,
- poll_max_seconds,
- )
- exit_reason = "timeout"
- break
- status_hint = "running"
- if ws is not None:
- stream = ws
- try:
- raw = await asyncio.wait_for(stream.recv(), timeout=poll_interval)
- except asyncio.TimeoutError:
- raw = None
- except Exception as e:
- logger.warning("feishu watch WS error, HTTP status fallback: %s", e)
- try:
- await stream.close()
- except Exception:
- pass
- ws = None
- raw = None
- while raw is not None:
- data = watch_ws_payload_to_dict(raw)
- if data is not None:
- st = await _dispatch_watch_event(data)
- if st in TERMINAL_STATUSES:
- status_hint = st
- try:
- raw = await asyncio.wait_for(stream.recv(), timeout=0.001)
- except asyncio.TimeoutError:
- raw = None
- except Exception:
- raw = None
- else:
- await asyncio.sleep(poll_interval)
- if forward_assistants and not forward_warned:
- logger.error(
- "feishu: WebSocket 不可用,无法推送 assistant;"
- "仅 HTTP 查询 trace 状态以结束跟单(不拉 messages)"
- )
- forward_warned = True
- effective = status_hint
- if ws is None:
- try:
- async with httpx.AsyncClient(timeout=poll_request_timeout) as client:
- tr = await client.get(f"{base}/api/traces/{trace_id}")
- if tr.status_code == 404:
- logger.warning("feishu watch: trace %s not found, stop", trace_id)
- exit_reason = "not_found"
- break
- if tr.status_code >= 400:
- logger.warning(
- "feishu watch: GET trace failed status=%s body=%s",
- tr.status_code,
- tr.text[:300],
- )
- else:
- body = tr.json()
- trace_obj = body.get("trace") or {}
- st = str(trace_obj.get("status") or "running")
- if st in TERMINAL_STATUSES:
- effective = st
- except httpx.RequestError as exc:
- logger.warning("feishu watch: HTTP status check error trace_id=%s err=%s", trace_id, exc)
- if effective in TERMINAL_STATUSES:
- grace += 1
- if grace >= terminal_grace_rounds:
- exit_reason = "terminal"
- break
- else:
- grace = 0
- finally:
- if ws is not None:
- try:
- await ws.close()
- except Exception:
- pass
- await flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup)
- cur = asyncio.current_task()
- async with _poll_tasks_lock:
- if _poll_tasks.get(trace_id) is cur:
- _ = _poll_tasks.pop(trace_id, None)
- if on_finished is not None and exit_reason is not None:
- try:
- await on_finished(trace_id, exit_reason)
- except Exception:
- logger.exception(
- "feishu watch: on_finished failed trace_id=%s reason=%s",
- trace_id,
- exit_reason,
- )
- def schedule_trace_followup(
- *,
- agent_base_url: str,
- trace_id: str,
- reply_context: FeishuReplyContext,
- connector: Any,
- poll_interval: float,
- poll_request_timeout: float,
- terminal_grace_rounds: int,
- poll_max_seconds: float,
- max_text_chars: int,
- forward_assistants: bool,
- typing_emoji: str,
- on_finished: FollowupFinishedCallback | None = None,
- ) -> None:
- async def _runner() -> None:
- await poll_assistants_to_feishu(
- agent_base_url=agent_base_url,
- trace_id=trace_id,
- reply_ctx=reply_context,
- connector=connector,
- poll_interval=poll_interval,
- poll_request_timeout=poll_request_timeout,
- terminal_grace_rounds=terminal_grace_rounds,
- poll_max_seconds=poll_max_seconds,
- max_text_chars=max_text_chars,
- forward_assistants=forward_assistants,
- typing_emoji_for_cleanup=typing_emoji,
- on_finished=on_finished,
- )
- async def _spawn() -> None:
- async with _poll_tasks_lock:
- existing = _poll_tasks.get(trace_id)
- if existing is not None and not existing.done():
- return
- _poll_tasks[trace_id] = asyncio.create_task(_runner())
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- return
- _ = loop.create_task(_spawn())
- # =============================================================================
- # 4. FeishuHttpRunApiExecutor
- # =============================================================================
- class FeishuHttpRunApiExecutor:
- """调用 Agent Trace HTTP API;WebSocket 将 assistant 转发到飞书。"""
- def __init__(
- self,
- *,
- base_url: str,
- timeout: float,
- identity_resolver: Any,
- model: str = "qwen3.5-flash",
- max_iterations: int = 200,
- temperature: float = 0.3,
- notify_on_submit: bool = True,
- poll_assistant_messages: bool = True,
- poll_interval_seconds: float = 1.0,
- poll_request_timeout: float = 30.0,
- poll_terminal_grace_rounds: int = 2,
- poll_max_seconds: float = 0.0,
- assistant_max_text_chars: int = 8000,
- typing_reaction_enabled: bool = True,
- typing_reaction_emoji: str = "Typing",
- workspace_manager: WorkspaceManager | None = None,
- workspace_prefix: str = "feishu",
- channel_id: str = "feishu",
- lifecycle_trace_backend: LifecycleTraceBackend | None = None,
- stop_container_on_trace_terminal: bool = False,
- stop_container_on_trace_not_found: bool = True,
- release_ref_on_trace_terminal: bool = False,
- ) -> None:
- self._base = base_url.rstrip("/")
- self._timeout = timeout
- self._identity = identity_resolver
- self._model = model
- self._max_iterations = max_iterations
- self._temperature = temperature
- self._notify = notify_on_submit
- self._poll_assistants = poll_assistant_messages
- self._poll_interval = poll_interval_seconds
- self._poll_req_timeout = poll_request_timeout
- self._poll_grace = poll_terminal_grace_rounds
- self._poll_max_seconds = poll_max_seconds
- self._assistant_max_chars = assistant_max_text_chars
- self._typing_reaction_enabled = typing_reaction_enabled
- self._typing_emoji = typing_reaction_emoji
- self._workspace_manager = workspace_manager
- self._workspace_prefix = workspace_prefix
- self._channel_id = channel_id
- self._lifecycle_trace_backend = lifecycle_trace_backend
- self._stop_container_on_trace_terminal = stop_container_on_trace_terminal
- self._stop_container_on_trace_not_found = stop_container_on_trace_not_found
- self._release_ref_on_trace_terminal = release_ref_on_trace_terminal
- def _gateway_exec_for_user(self, user_id: str) -> dict[str, Any] | None:
- wm = self._workspace_manager
- if wm is None:
- return None
- wid = f"{self._workspace_prefix}:{user_id}"
- cid = wm.get_workspace_container_id(wid)
- if not cid:
- return None
- return {
- "docker_container": cid,
- "container_user": "agent",
- "container_workdir": "/home/agent/workspace",
- }
- async def handle_inbound_message(
- self,
- existing_agent_trace_id: str,
- text: str,
- reply_context: FeishuReplyContext,
- connector: Any,
- *,
- event: IncomingFeishuEvent,
- ) -> tuple[str, str]:
- user_id = self._identity.resolve_user_id(event)
- content = append_feishu_context_block(text, event, reply_context)
- task_id = f"task-{uuid.uuid4()}"
- typing_placed = False
- if (
- self._typing_reaction_enabled
- and reply_context.message_id
- and event.event_type == "message"
- ):
- try:
- react_res = await connector.add_message_reaction(
- reply_context.message_id,
- self._typing_emoji,
- account_id=reply_context.account_id,
- )
- typing_placed = bool(react_res.get("ok"))
- if not typing_placed:
- logger.warning(
- "feishu typing: add reaction failed mid=%s result=%s",
- reply_context.message_id,
- react_res,
- )
- except Exception:
- logger.exception(
- "feishu typing: add reaction exception mid=%s",
- reply_context.message_id,
- )
- api_trace_id = normalized_agent_trace_id(existing_agent_trace_id)
- adapter = feishu_adapter_payload(event, reply_context)
- gateway_exec = self._gateway_exec_for_user(user_id)
- try:
- async with httpx.AsyncClient(timeout=self._timeout) as client:
- if api_trace_id is None:
- body: dict[str, Any] = {
- "messages": [{"role": "user", "content": content}],
- "model": self._model,
- "temperature": self._temperature,
- "max_iterations": self._max_iterations,
- "uid": user_id,
- "name": f"feishu-{user_id}",
- "feishu_adapter": adapter,
- }
- if gateway_exec:
- body["gateway_exec"] = gateway_exec
- resp = await client.post(f"{self._base}/api/traces", json=body)
- else:
- body = {
- "messages": [{"role": "user", "content": content}],
- "feishu_adapter": adapter,
- }
- if gateway_exec:
- body["gateway_exec"] = gateway_exec
- resp = await client.post(
- f"{self._base}/api/traces/{api_trace_id}/run",
- json=body,
- )
- except httpx.RequestError as exc:
- logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id)
- await inbound_fail_reply(
- connector,
- reply_context,
- typing_placed=typing_placed,
- typing_emoji=self._typing_emoji,
- message=f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
- )
- return task_id, ""
- body_text = resp.text
- if resp.status_code == 409:
- await inbound_fail_reply(
- connector,
- reply_context,
- typing_placed=typing_placed,
- typing_emoji=self._typing_emoji,
- message="[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
- )
- return task_id, ""
- if resp.status_code >= 400:
- err = format_api_error(resp.status_code, body_text)
- logger.warning(
- "FeishuHttpRunApiExecutor: API 错误 status=%s user_id=%s detail=%s",
- resp.status_code,
- user_id,
- err,
- )
- await inbound_fail_reply(
- connector,
- reply_context,
- typing_placed=typing_placed,
- typing_emoji=self._typing_emoji,
- message=f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
- )
- return task_id, ""
- try:
- data = resp.json()
- except Exception:
- await inbound_fail_reply(
- connector,
- reply_context,
- typing_placed=typing_placed,
- typing_emoji=self._typing_emoji,
- message="[Gateway] Agent API 返回非 JSON,已放弃解析。",
- )
- return task_id, ""
- resolved_id = data.get("trace_id")
- if not isinstance(resolved_id, str) or not resolved_id:
- await inbound_fail_reply(
- connector,
- reply_context,
- typing_placed=typing_placed,
- typing_emoji=self._typing_emoji,
- message="[Gateway] Agent API 响应缺少 trace_id。",
- )
- return task_id, ""
- if self._notify:
- await connector.send_text(
- reply_context,
- f"[Gateway] 已提交 Agent(API trace_id={resolved_id}),后台执行中。",
- )
- if typing_placed:
- user_mid = reply_context.message_id
- if user_mid:
- await register_pending_typing_cleanup(
- resolved_id,
- user_mid,
- reply_context.account_id,
- )
- if self._poll_assistants or typing_placed:
- wid = f"{self._workspace_prefix}:{user_id}"
- async def _on_followup_finished(tid: str, reason: str) -> None:
- if tid != resolved_id:
- return
- wm = self._workspace_manager
- if wm is None:
- return
- stop = False
- if reason == "terminal" and self._stop_container_on_trace_terminal:
- stop = True
- elif reason == "not_found" and self._stop_container_on_trace_not_found:
- stop = True
- if stop:
- await wm.stop_workspace_sandbox(wid)
- if (
- reason == "terminal"
- and self._release_ref_on_trace_terminal
- and self._lifecycle_trace_backend is not None
- ):
- await self._lifecycle_trace_backend.forget_trace_binding(
- self._channel_id,
- user_id,
- workspace_id=wid,
- )
- schedule_trace_followup(
- agent_base_url=self._base,
- trace_id=resolved_id,
- reply_context=copy(reply_context),
- connector=connector,
- poll_interval=self._poll_interval,
- poll_request_timeout=self._poll_req_timeout,
- terminal_grace_rounds=self._poll_grace,
- poll_max_seconds=self._poll_max_seconds,
- max_text_chars=self._assistant_max_chars,
- forward_assistants=self._poll_assistants,
- typing_emoji=self._typing_emoji,
- on_finished=_on_followup_finished,
- )
- return task_id, resolved_id
|