|
|
@@ -1,14 +1,15 @@
|
|
|
"""
|
|
|
-飞书执行器:通过 HTTP 调用 Agent 的 ``run_api``(``POST /api/traces`` / ``POST /api/traces/{id}/run``)。
|
|
|
-
|
|
|
-按飞书 ``user_id``(与 ``DefaultUserIdentityResolver`` 一致)维护 API ``trace_id``,与首次创建 / 续跑语义对齐。
|
|
|
+飞书执行器:通过 HTTP 调用 Agent 的 ``run_api``,并在后台轮询 ``GET /api/traces/{id}/messages``
|
|
|
+将主路径上每一条 assistant 消息转发到飞书。
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
import asyncio
|
|
|
import logging
|
|
|
+import time
|
|
|
import uuid
|
|
|
+from copy import copy
|
|
|
from typing import Any
|
|
|
|
|
|
import httpx
|
|
|
@@ -17,6 +18,14 @@ from gateway.core.channels.feishu.types import FeishuReplyContext, IncomingFeish
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
+_TERMINAL_STATUSES = frozenset({"completed", "failed", "stopped"})
|
|
|
+
|
|
|
+# 同一 trace 仅一个轮询任务,避免并发重复推送
|
|
|
+_poll_tasks: dict[str, asyncio.Task[None]] = {}
|
|
|
+_poll_tasks_lock = asyncio.Lock()
|
|
|
+# trace_id → 已成功推送到飞书的 assistant sequence(跨多次 run,避免重复发送)
|
|
|
+_assistant_sent_sequences: dict[str, set[int]] = {}
|
|
|
+
|
|
|
|
|
|
def _format_api_error(status_code: int, body_text: str) -> str:
|
|
|
try:
|
|
|
@@ -58,8 +67,215 @@ def _append_feishu_context_block(
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
|
|
|
+def _assistant_wire_to_feishu_text(msg: dict[str, Any]) -> str | None:
|
|
|
+ """从 ``GET .../messages`` 返回的单条消息 dict 提取可发给用户的文本;无可展示内容则返回 None。"""
|
|
|
+ if msg.get("role") != "assistant":
|
|
|
+ return None
|
|
|
+ content = msg.get("content")
|
|
|
+ parts: list[str] = []
|
|
|
+
|
|
|
+ if isinstance(content, dict):
|
|
|
+ text = (content.get("text") or "").strip()
|
|
|
+ if text:
|
|
|
+ parts.append(text)
|
|
|
+ tool_calls = content.get("tool_calls")
|
|
|
+ if tool_calls:
|
|
|
+ desc = (msg.get("description") or "").strip()
|
|
|
+ parts.append(desc if desc else "[工具调用]")
|
|
|
+ elif isinstance(content, str) and content.strip():
|
|
|
+ parts.append(content.strip())
|
|
|
+ else:
|
|
|
+ desc = (msg.get("description") or "").strip()
|
|
|
+ if desc:
|
|
|
+ parts.append(desc)
|
|
|
+
|
|
|
+ if not parts:
|
|
|
+ return None
|
|
|
+ return "\n".join(parts)
|
|
|
+
|
|
|
+
|
|
|
+def _truncate_for_im(text: str, max_chars: int) -> str:
|
|
|
+ if len(text) <= max_chars:
|
|
|
+ return text
|
|
|
+ return text[: max_chars - 80] + "\n\n…(内容过长已截断)"
|
|
|
+
|
|
|
+
|
|
|
+def _message_sequence(msg: dict[str, Any]) -> int | None:
|
|
|
+ s = msg.get("sequence")
|
|
|
+ if s is None:
|
|
|
+ return None
|
|
|
+ if isinstance(s, int):
|
|
|
+ return s
|
|
|
+ if isinstance(s, float):
|
|
|
+ return int(s)
|
|
|
+ if isinstance(s, str):
|
|
|
+ try:
|
|
|
+ return int(s)
|
|
|
+ except ValueError:
|
|
|
+ return None
|
|
|
+ try:
|
|
|
+ return int(s)
|
|
|
+ except (TypeError, ValueError):
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+async def _poll_assistants_to_feishu(
|
|
|
+ *,
|
|
|
+ agent_base_url: str,
|
|
|
+ trace_id: str,
|
|
|
+ reply_ctx: FeishuReplyContext,
|
|
|
+ connector: Any,
|
|
|
+ poll_interval: float,
|
|
|
+ poll_request_timeout: float,
|
|
|
+ terminal_grace_rounds: int,
|
|
|
+ poll_max_seconds: float,
|
|
|
+ max_text_chars: int,
|
|
|
+) -> None:
|
|
|
+ """
|
|
|
+ 轮询 Trace 状态与主路径消息,将尚未推送过的 assistant 消息按 sequence 顺序发到飞书。
|
|
|
+ """
|
|
|
+ if trace_id not in _assistant_sent_sequences:
|
|
|
+ _assistant_sent_sequences[trace_id] = set()
|
|
|
+ sent_sequences = _assistant_sent_sequences[trace_id]
|
|
|
+ grace = 0
|
|
|
+ started = time.monotonic()
|
|
|
+ base = agent_base_url.rstrip("/")
|
|
|
+
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ if poll_max_seconds > 0 and (time.monotonic() - started) >= poll_max_seconds:
|
|
|
+ logger.warning(
|
|
|
+ "feishu poll: trace_id=%s stopped by poll_max_seconds=%s",
|
|
|
+ trace_id,
|
|
|
+ poll_max_seconds,
|
|
|
+ )
|
|
|
+ break
|
|
|
+
|
|
|
+ status = "running"
|
|
|
+ try:
|
|
|
+ async with httpx.AsyncClient(timeout=poll_request_timeout) as client:
|
|
|
+ tr = await client.get(f"{base}/api/traces/{trace_id}")
|
|
|
+ if tr.status_code == 404:
|
|
|
+ logger.warning("feishu poll: trace %s not found, stop", trace_id)
|
|
|
+ break
|
|
|
+ if tr.status_code >= 400:
|
|
|
+ logger.warning(
|
|
|
+ "feishu poll: GET trace failed status=%s body=%s",
|
|
|
+ tr.status_code,
|
|
|
+ tr.text[:300],
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ data = tr.json()
|
|
|
+ trace_obj = data.get("trace") or {}
|
|
|
+ status = str(trace_obj.get("status") or "running")
|
|
|
+
|
|
|
+ ms = await client.get(
|
|
|
+ f"{base}/api/traces/{trace_id}/messages",
|
|
|
+ params={"mode": "main_path"},
|
|
|
+ )
|
|
|
+ if ms.status_code != 200:
|
|
|
+ logger.warning(
|
|
|
+ "feishu poll: GET messages failed status=%s",
|
|
|
+ ms.status_code,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ payload = ms.json()
|
|
|
+ raw_list = payload.get("messages") or []
|
|
|
+ assistants = [
|
|
|
+ m
|
|
|
+ for m in raw_list
|
|
|
+ if isinstance(m, dict) and m.get("role") == "assistant"
|
|
|
+ ]
|
|
|
+ assistants.sort(key=lambda m: (_message_sequence(m) or 0))
|
|
|
+
|
|
|
+ for m in assistants:
|
|
|
+ seq = _message_sequence(m)
|
|
|
+ if seq is None:
|
|
|
+ continue
|
|
|
+ if seq in sent_sequences:
|
|
|
+ continue
|
|
|
+ body = _assistant_wire_to_feishu_text(m)
|
|
|
+ if body is None:
|
|
|
+ sent_sequences.add(seq)
|
|
|
+ continue
|
|
|
+ body = _truncate_for_im(body, max_text_chars)
|
|
|
+ try:
|
|
|
+ result = await connector.send_text(reply_ctx, body)
|
|
|
+ if result.get("ok"):
|
|
|
+ sent_sequences.add(seq)
|
|
|
+ else:
|
|
|
+ logger.error(
|
|
|
+ "feishu poll: send_text failed seq=%s result=%s",
|
|
|
+ seq,
|
|
|
+ result,
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ logger.exception(
|
|
|
+ "feishu poll: send_text exception seq=%s",
|
|
|
+ seq,
|
|
|
+ )
|
|
|
+ except httpx.RequestError as exc:
|
|
|
+ logger.warning("feishu poll: request error trace_id=%s err=%s", trace_id, exc)
|
|
|
+
|
|
|
+ if status in _TERMINAL_STATUSES:
|
|
|
+ grace += 1
|
|
|
+ if grace >= terminal_grace_rounds:
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ grace = 0
|
|
|
+
|
|
|
+ await asyncio.sleep(poll_interval)
|
|
|
+ finally:
|
|
|
+ cur = asyncio.current_task()
|
|
|
+ async with _poll_tasks_lock:
|
|
|
+ if _poll_tasks.get(trace_id) is cur:
|
|
|
+ _poll_tasks.pop(trace_id, None)
|
|
|
+
|
|
|
+
|
|
|
+def _schedule_assistant_poll(
|
|
|
+ *,
|
|
|
+ agent_base_url: str,
|
|
|
+ trace_id: str,
|
|
|
+ reply_context: FeishuReplyContext,
|
|
|
+ connector: Any,
|
|
|
+ poll_interval: float,
|
|
|
+ poll_request_timeout: float,
|
|
|
+ terminal_grace_rounds: int,
|
|
|
+ poll_max_seconds: float,
|
|
|
+ max_text_chars: int,
|
|
|
+) -> None:
|
|
|
+ """同一 trace 仅保留一个活跃轮询任务。"""
|
|
|
+
|
|
|
+ async def _runner() -> None:
|
|
|
+ await _poll_assistants_to_feishu(
|
|
|
+ agent_base_url=agent_base_url,
|
|
|
+ trace_id=trace_id,
|
|
|
+ reply_ctx=reply_context,
|
|
|
+ connector=connector,
|
|
|
+ poll_interval=poll_interval,
|
|
|
+ poll_request_timeout=poll_request_timeout,
|
|
|
+ terminal_grace_rounds=terminal_grace_rounds,
|
|
|
+ poll_max_seconds=poll_max_seconds,
|
|
|
+ max_text_chars=max_text_chars,
|
|
|
+ )
|
|
|
+
|
|
|
+ async def _spawn() -> None:
|
|
|
+ async with _poll_tasks_lock:
|
|
|
+ existing = _poll_tasks.get(trace_id)
|
|
|
+ if existing is not None and not existing.done():
|
|
|
+ return
|
|
|
+ task = asyncio.create_task(_runner())
|
|
|
+ _poll_tasks[trace_id] = task
|
|
|
+
|
|
|
+ try:
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ except RuntimeError:
|
|
|
+ return
|
|
|
+ _ = loop.create_task(_spawn())
|
|
|
+
|
|
|
+
|
|
|
class FeishuHttpRunApiExecutor:
|
|
|
- """调用 ``agent/trace/run_api`` 暴露的 Trace HTTP API,触发后台 Agent 运行。"""
|
|
|
+ """调用 Agent Trace HTTP API,并可选轮询 assistant 消息转发到飞书。"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
@@ -67,10 +283,16 @@ class FeishuHttpRunApiExecutor:
|
|
|
base_url: str,
|
|
|
timeout: float,
|
|
|
identity_resolver: Any,
|
|
|
- model: str = "gpt-4o",
|
|
|
+ model: str = "qwen3.5-flash",
|
|
|
max_iterations: int = 200,
|
|
|
temperature: float = 0.3,
|
|
|
notify_on_submit: bool = True,
|
|
|
+ poll_assistant_messages: bool = True,
|
|
|
+ poll_interval_seconds: float = 1.0,
|
|
|
+ poll_request_timeout: float = 30.0,
|
|
|
+ poll_terminal_grace_rounds: int = 2,
|
|
|
+ poll_max_seconds: float = 0.0,
|
|
|
+ assistant_max_text_chars: int = 8000,
|
|
|
) -> None:
|
|
|
self._base = base_url.rstrip("/")
|
|
|
self._timeout = timeout
|
|
|
@@ -79,6 +301,12 @@ class FeishuHttpRunApiExecutor:
|
|
|
self._max_iterations = max_iterations
|
|
|
self._temperature = temperature
|
|
|
self._notify = notify_on_submit
|
|
|
+ self._poll_assistants = poll_assistant_messages
|
|
|
+ self._poll_interval = poll_interval_seconds
|
|
|
+ self._poll_req_timeout = poll_request_timeout
|
|
|
+ self._poll_grace = poll_terminal_grace_rounds
|
|
|
+ self._poll_max_seconds = poll_max_seconds
|
|
|
+ self._assistant_max_chars = assistant_max_text_chars
|
|
|
self._map_lock = asyncio.Lock()
|
|
|
self._api_trace_by_user: dict[str, str] = {}
|
|
|
|
|
|
@@ -175,4 +403,17 @@ class FeishuHttpRunApiExecutor:
|
|
|
f"[Gateway] 已提交 Agent(API trace_id={resolved_id}),后台执行中。",
|
|
|
)
|
|
|
|
|
|
+ if self._poll_assistants:
|
|
|
+ _schedule_assistant_poll(
|
|
|
+ agent_base_url=self._base,
|
|
|
+ trace_id=resolved_id,
|
|
|
+ reply_context=copy(reply_context),
|
|
|
+ connector=connector,
|
|
|
+ poll_interval=self._poll_interval,
|
|
|
+ poll_request_timeout=self._poll_req_timeout,
|
|
|
+ terminal_grace_rounds=self._poll_grace,
|
|
|
+ poll_max_seconds=self._poll_max_seconds,
|
|
|
+ max_text_chars=self._assistant_max_chars,
|
|
|
+ )
|
|
|
+
|
|
|
return task_id
|