Browse Source

添加emoji reaction

kevin.yang 3 days ago
parent
commit
5b4ce023a2

+ 2 - 1
gateway/core/channels/feishu/__init__.py

@@ -1,5 +1,5 @@
 from gateway.core.channels.feishu.api import FeishuChannelApi
-from gateway.core.channels.feishu.connector import FeishuConnector
+from gateway.core.channels.feishu.connector import FeishuConnector, TYPING_REACTION_EMOJI
 from gateway.core.channels.feishu.identity import DefaultUserIdentityResolver
 from gateway.core.channels.feishu.manager import FeishuChannelConfig, FeishuChannelManager
 from gateway.core.channels.feishu.router import (
@@ -20,6 +20,7 @@ __all__ = [
     "FeishuChannelConfig",
     "FeishuChannelManager",
     "FeishuConnector",
+    "TYPING_REACTION_EMOJI",
     "FeishuExecutorBackend",
     "FeishuMessageRouter",
     "FeishuReplyContext",

+ 37 - 0
gateway/core/channels/feishu/connector.py

@@ -14,6 +14,9 @@ from gateway.core.channels.feishu.types import (
 
 logger = logging.getLogger(__name__)
 
+# 飞书开放平台 reaction emoji_type,与 openclaw-lark FeishuEmoji.TYPING 一致
+TYPING_REACTION_EMOJI = "Typing"
+
 
 class WebhookParseError(ValueError):
     pass
@@ -299,3 +302,37 @@ class FeishuConnector:
         已配置应用列表请用 `list_feishu_app_accounts()`(勿与 `/accounts` 语义混淆)。
         """
         return {"ok": True, "user_id": user_id, "profile": None}
+
+    async def add_message_reaction(
+        self,
+        message_id: str,
+        emoji: str,
+        *,
+        account_id: str | None = None,
+    ) -> dict[str, Any]:
+        """POST ``/feishu/react``,为消息添加机器人表情(``emoji`` 为开放平台 emoji_type,如 ``Typing``)。"""
+        body: dict[str, Any] = {
+            "action": "add",
+            "message_id": message_id,
+            "emoji": emoji,
+        }
+        if account_id:
+            body["account_id"] = account_id
+        return await self._http.post_json("/feishu/react", body)
+
+    async def remove_bot_reactions_for_emoji(
+        self,
+        message_id: str,
+        emoji: str,
+        *,
+        account_id: str | None = None,
+    ) -> dict[str, Any]:
+        """移除该消息上本机器人添加的指定 ``emoji`` 表情(适配器 ``action: remove``)。"""
+        body: dict[str, Any] = {
+            "action": "remove",
+            "message_id": message_id,
+            "emoji": emoji,
+        }
+        if account_id:
+            body["account_id"] = account_id
+        return await self._http.post_json("/feishu/react", body)

+ 187 - 49
gateway/core/channels/feishu/http_run_executor.py

@@ -25,6 +25,9 @@ _poll_tasks: dict[str, asyncio.Task[None]] = {}
 _poll_tasks_lock = asyncio.Lock()
 # trace_id → 已成功推送到飞书的 assistant sequence(跨多次 run,避免重复发送)
 _assistant_sent_sequences: dict[str, set[int]] = {}
+# trace_id → 待在本次任务结束时移除 Typing 表情的用户消息(可多条,同一 trace 并发续跑时合并清理)
+_typing_cleanup_lock = asyncio.Lock()
+_pending_typing_by_trace: dict[str, list[tuple[str, str | None]]] = {}
 
 
 def _format_api_error(status_code: int, body_text: str) -> str:
@@ -119,6 +122,63 @@ def _message_sequence(msg: dict[str, Any]) -> int | None:
         return None
 
 
+async def _register_pending_typing_cleanup(
+    trace_id: str,
+    message_id: str,
+    account_id: str | None,
+) -> None:
+    async with _typing_cleanup_lock:
+        _pending_typing_by_trace.setdefault(trace_id, []).append((message_id, account_id))
+
+
+async def _remove_typing_immediate(
+    connector: Any,
+    message_id: str | None,
+    account_id: str | None,
+    emoji: str,
+) -> None:
+    if not message_id:
+        return
+    try:
+        r = await connector.remove_bot_reactions_for_emoji(
+            message_id,
+            emoji,
+            account_id=account_id,
+        )
+        if not r.get("ok"):
+            logger.warning(
+                "feishu typing: immediate remove failed mid=%s result=%s",
+                message_id,
+                r,
+            )
+    except Exception:
+        logger.exception("feishu typing: immediate remove exception mid=%s", message_id)
+
+
+async def _flush_pending_typing_cleanups(
+    connector: Any,
+    trace_id: str,
+    emoji: str,
+) -> None:
+    async with _typing_cleanup_lock:
+        pairs = _pending_typing_by_trace.pop(trace_id, [])
+    for mid, acc in pairs:
+        try:
+            r = await connector.remove_bot_reactions_for_emoji(
+                mid,
+                emoji,
+                account_id=acc,
+            )
+            if not r.get("ok"):
+                logger.warning(
+                    "feishu typing cleanup: remove reaction failed mid=%s result=%s",
+                    mid,
+                    r,
+                )
+        except Exception:
+            logger.exception("feishu typing cleanup: remove reaction exception mid=%s", mid)
+
+
 async def _poll_assistants_to_feishu(
     *,
     agent_base_url: str,
@@ -130,9 +190,11 @@ async def _poll_assistants_to_feishu(
     terminal_grace_rounds: int,
     poll_max_seconds: float,
     max_text_chars: int,
+    forward_assistants: bool = True,
+    typing_emoji_for_cleanup: str = "Typing",
 ) -> None:
     """
-    轮询 Trace 状态与主路径消息,将尚未推送过的 assistant 消息按 sequence 顺序发到飞书
+    轮询 Trace 直至终态;可选将主路径 assistant 消息发到飞书;结束时清理已登记的 Typing 表情
     """
     if trace_id not in _assistant_sent_sequences:
         _assistant_sent_sequences[trace_id] = set()
@@ -169,51 +231,52 @@ async def _poll_assistants_to_feishu(
                         trace_obj = data.get("trace") or {}
                         status = str(trace_obj.get("status") or "running")
 
-                    ms = await client.get(
-                        f"{base}/api/traces/{trace_id}/messages",
-                        params={"mode": "main_path"},
-                    )
-                    if ms.status_code != 200:
-                        logger.warning(
-                            "feishu poll: GET messages failed status=%s",
-                            ms.status_code,
+                    if forward_assistants:
+                        ms = await client.get(
+                            f"{base}/api/traces/{trace_id}/messages",
+                            params={"mode": "main_path"},
                         )
-                    else:
-                        payload = ms.json()
-                        raw_list = payload.get("messages") or []
-                        assistants = [
-                            m
-                            for m in raw_list
-                            if isinstance(m, dict) and m.get("role") == "assistant"
-                        ]
-                        assistants.sort(key=lambda m: (_message_sequence(m) or 0))
-
-                        for m in assistants:
-                            seq = _message_sequence(m)
-                            if seq is None:
-                                continue
-                            if seq in sent_sequences:
-                                continue
-                            body = _assistant_wire_to_feishu_text(m)
-                            if body is None:
-                                sent_sequences.add(seq)
-                                continue
-                            body = _truncate_for_im(body, max_text_chars)
-                            try:
-                                result = await connector.send_text(reply_ctx, body)
-                                if result.get("ok"):
+                        if ms.status_code != 200:
+                            logger.warning(
+                                "feishu poll: GET messages failed status=%s",
+                                ms.status_code,
+                            )
+                        else:
+                            payload = ms.json()
+                            raw_list = payload.get("messages") or []
+                            assistants = [
+                                m
+                                for m in raw_list
+                                if isinstance(m, dict) and m.get("role") == "assistant"
+                            ]
+                            assistants.sort(key=lambda m: (_message_sequence(m) or 0))
+
+                            for m in assistants:
+                                seq = _message_sequence(m)
+                                if seq is None:
+                                    continue
+                                if seq in sent_sequences:
+                                    continue
+                                body = _assistant_wire_to_feishu_text(m)
+                                if body is None:
                                     sent_sequences.add(seq)
-                                else:
-                                    logger.error(
-                                        "feishu poll: send_text failed seq=%s result=%s",
+                                    continue
+                                body = _truncate_for_im(body, max_text_chars)
+                                try:
+                                    result = await connector.send_text(reply_ctx, body)
+                                    if result.get("ok"):
+                                        sent_sequences.add(seq)
+                                    else:
+                                        logger.error(
+                                            "feishu poll: send_text failed seq=%s result=%s",
+                                            seq,
+                                            result,
+                                        )
+                                except Exception:
+                                    logger.exception(
+                                        "feishu poll: send_text exception seq=%s",
                                         seq,
-                                        result,
                                     )
-                            except Exception:
-                                logger.exception(
-                                    "feishu poll: send_text exception seq=%s",
-                                    seq,
-                                )
             except httpx.RequestError as exc:
                 logger.warning("feishu poll: request error trace_id=%s err=%s", trace_id, exc)
 
@@ -226,13 +289,14 @@ async def _poll_assistants_to_feishu(
 
             await asyncio.sleep(poll_interval)
     finally:
+        await _flush_pending_typing_cleanups(connector, trace_id, typing_emoji_for_cleanup)
         cur = asyncio.current_task()
         async with _poll_tasks_lock:
             if _poll_tasks.get(trace_id) is cur:
-                _poll_tasks.pop(trace_id, None)
+                _ = _poll_tasks.pop(trace_id, None)
 
 
-def _schedule_assistant_poll(
+def _schedule_trace_followup(
     *,
     agent_base_url: str,
     trace_id: str,
@@ -243,8 +307,10 @@ def _schedule_assistant_poll(
     terminal_grace_rounds: int,
     poll_max_seconds: float,
     max_text_chars: int,
+    forward_assistants: bool,
+    typing_emoji: str,
 ) -> None:
-    """同一 trace 仅保留一个活跃轮询任务。"""
+    """同一 trace 仅保留一个活跃跟单任务(转发 assistant + 或在终态时清理 Typing)。"""
 
     async def _runner() -> None:
         await _poll_assistants_to_feishu(
@@ -257,6 +323,8 @@ def _schedule_assistant_poll(
             terminal_grace_rounds=terminal_grace_rounds,
             poll_max_seconds=poll_max_seconds,
             max_text_chars=max_text_chars,
+            forward_assistants=forward_assistants,
+            typing_emoji_for_cleanup=typing_emoji,
         )
 
     async def _spawn() -> None:
@@ -264,8 +332,7 @@ def _schedule_assistant_poll(
             existing = _poll_tasks.get(trace_id)
             if existing is not None and not existing.done():
                 return
-            task = asyncio.create_task(_runner())
-            _poll_tasks[trace_id] = task
+            _poll_tasks[trace_id] = asyncio.create_task(_runner())
 
     try:
         loop = asyncio.get_running_loop()
@@ -293,6 +360,8 @@ class FeishuHttpRunApiExecutor:
         poll_terminal_grace_rounds: int = 2,
         poll_max_seconds: float = 0.0,
         assistant_max_text_chars: int = 8000,
+        typing_reaction_enabled: bool = True,
+        typing_reaction_emoji: str = "Typing",
     ) -> None:
         self._base = base_url.rstrip("/")
         self._timeout = timeout
@@ -307,6 +376,8 @@ class FeishuHttpRunApiExecutor:
         self._poll_grace = poll_terminal_grace_rounds
         self._poll_max_seconds = poll_max_seconds
         self._assistant_max_chars = assistant_max_text_chars
+        self._typing_reaction_enabled = typing_reaction_enabled
+        self._typing_emoji = typing_reaction_emoji
         self._map_lock = asyncio.Lock()
         self._api_trace_by_user: dict[str, str] = {}
 
@@ -324,6 +395,27 @@ class FeishuHttpRunApiExecutor:
         content = _append_feishu_context_block(text, event, reply_context)
         task_id = f"task-{uuid.uuid4()}"
 
+        typing_placed = False
+        if self._typing_reaction_enabled and reply_context.message_id:
+            try:
+                react_res = await connector.add_message_reaction(
+                    reply_context.message_id,
+                    self._typing_emoji,
+                    account_id=reply_context.account_id,
+                )
+                typing_placed = bool(react_res.get("ok"))
+                if not typing_placed:
+                    logger.warning(
+                        "feishu typing: add reaction failed mid=%s result=%s",
+                        reply_context.message_id,
+                        react_res,
+                    )
+            except Exception:
+                logger.exception(
+                    "feishu typing: add reaction exception mid=%s",
+                    reply_context.message_id,
+                )
+
         async with self._map_lock:
             api_trace_id = self._api_trace_by_user.get(user_id)
 
@@ -348,6 +440,13 @@ class FeishuHttpRunApiExecutor:
                     )
         except httpx.RequestError as exc:
             logger.exception("FeishuHttpRunApiExecutor: Agent API 请求失败 user_id=%s", user_id)
+            if typing_placed:
+                await _remove_typing_immediate(
+                    connector,
+                    reply_context.message_id,
+                    reply_context.account_id,
+                    self._typing_emoji,
+                )
             await connector.send_text(
                 reply_context,
                 f"[Gateway] 无法连接 Agent API({self._base}):{exc}",
@@ -356,6 +455,13 @@ class FeishuHttpRunApiExecutor:
 
         body_text = resp.text
         if resp.status_code == 409:
+            if typing_placed:
+                await _remove_typing_immediate(
+                    connector,
+                    reply_context.message_id,
+                    reply_context.account_id,
+                    self._typing_emoji,
+                )
             await connector.send_text(
                 reply_context,
                 "[Gateway] 当前会话在 Agent 侧仍在运行,请稍后再发消息。",
@@ -370,6 +476,13 @@ class FeishuHttpRunApiExecutor:
                 user_id,
                 err,
             )
+            if typing_placed:
+                await _remove_typing_immediate(
+                    connector,
+                    reply_context.message_id,
+                    reply_context.account_id,
+                    self._typing_emoji,
+                )
             await connector.send_text(
                 reply_context,
                 f"[Gateway] Agent 启动失败({resp.status_code}):{err}",
@@ -379,6 +492,13 @@ class FeishuHttpRunApiExecutor:
         try:
             data = resp.json()
         except Exception:
+            if typing_placed:
+                await _remove_typing_immediate(
+                    connector,
+                    reply_context.message_id,
+                    reply_context.account_id,
+                    self._typing_emoji,
+                )
             await connector.send_text(
                 reply_context,
                 "[Gateway] Agent API 返回非 JSON,已放弃解析。",
@@ -387,6 +507,13 @@ class FeishuHttpRunApiExecutor:
 
         resolved_id = data.get("trace_id")
         if not isinstance(resolved_id, str) or not resolved_id:
+            if typing_placed:
+                await _remove_typing_immediate(
+                    connector,
+                    reply_context.message_id,
+                    reply_context.account_id,
+                    self._typing_emoji,
+                )
             await connector.send_text(
                 reply_context,
                 "[Gateway] Agent API 响应缺少 trace_id。",
@@ -403,8 +530,17 @@ class FeishuHttpRunApiExecutor:
                 f"[Gateway] 已提交 Agent(API trace_id={resolved_id}),后台执行中。",
             )
 
-        if self._poll_assistants:
-            _schedule_assistant_poll(
+        if typing_placed:
+            user_mid = reply_context.message_id
+            if user_mid:
+                await _register_pending_typing_cleanup(
+                    resolved_id,
+                    user_mid,
+                    reply_context.account_id,
+                )
+
+        if self._poll_assistants or typing_placed:
+            _schedule_trace_followup(
                 agent_base_url=self._base,
                 trace_id=resolved_id,
                 reply_context=copy(reply_context),
@@ -414,6 +550,8 @@ class FeishuHttpRunApiExecutor:
                 terminal_grace_rounds=self._poll_grace,
                 poll_max_seconds=self._poll_max_seconds,
                 max_text_chars=self._assistant_max_chars,
+                forward_assistants=self._poll_assistants,
+                typing_emoji=self._typing_emoji,
             )
 
         return task_id

+ 8 - 0
gateway/core/channels/feishu/manager.py

@@ -37,6 +37,8 @@ class FeishuChannelConfig:
     poll_terminal_grace_rounds: int = 2
     poll_max_seconds: float = 0.0
     assistant_max_text_chars: int = 8000
+    typing_reaction_enabled: bool = True
+    typing_reaction_emoji: str = "Typing"
 
 
 class FeishuChannelManager(ChannelRegistry):
@@ -67,6 +69,8 @@ class FeishuChannelManager(ChannelRegistry):
             poll_terminal_grace_rounds=self._config.poll_terminal_grace_rounds,
             poll_max_seconds=self._config.poll_max_seconds,
             assistant_max_text_chars=self._config.assistant_max_text_chars,
+            typing_reaction_enabled=self._config.typing_reaction_enabled,
+            typing_reaction_emoji=self._config.typing_reaction_emoji,
         )
         self._router = FeishuMessageRouter(
             connector=self._connector,
@@ -115,6 +119,10 @@ class FeishuChannelManager(ChannelRegistry):
                 poll_terminal_grace_rounds=int(os.getenv("FEISHU_AGENT_POLL_GRACE_ROUNDS", "2")),
                 poll_max_seconds=float(os.getenv("FEISHU_AGENT_POLL_MAX_SECONDS", "0")),
                 assistant_max_text_chars=int(os.getenv("FEISHU_AGENT_ASSISTANT_MAX_CHARS", "8000")),
+                typing_reaction_enabled=os.getenv("FEISHU_TYPING_REACTION", "true").lower()
+                in ("1", "true", "yes"),
+                typing_reaction_emoji=os.getenv("FEISHU_TYPING_REACTION_EMOJI", "Typing").strip()
+                or "Typing",
             )
         )