"""飞书入站事件路由:Trace 会话准备、Executor 提交、绑定 Agent ``trace_id``。""" from __future__ import annotations import logging from collections.abc import Mapping from typing import Any from gateway.core.channels.feishu.connector import FeishuConnector from gateway.core.channels.feishu.protocols import ( FeishuExecutorBackend, FeishuUserIdentityResolver, ) from gateway.core.channels.feishu.types import ( FeishuReplyContext, IncomingFeishuEvent, feishu_event_to_mapping, ) from gateway.core.channels.protocols import TraceBackend from gateway.core.channels.router import ChannelTraceRouter from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult logger = logging.getLogger(__name__) def routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str | None, str | None]: """ 当规范化 JSON 未带 chat_id 时,从飞书 card.action.trigger 原始体兜底解析。 常见路径:event.context.open_chat_id / open_message_id(或顶层 open_chat_id)。 """ if not raw: return None, None, None ev: Any = raw.get("event") if isinstance(raw.get("event"), dict) else raw if not isinstance(ev, dict): return None, None, None ctx = ev.get("context") if isinstance(ev.get("context"), dict) else {} def pick(*keys: str) -> str | None: for d in (ev, ctx): for k in keys: v = d.get(k) if isinstance(v, str) and v.strip(): return v.strip() return None chat_id = pick("open_chat_id", "chat_id") message_id = pick("open_message_id", "message_id") chat_type = pick("chat_type") return chat_id, message_id, chat_type def as_opt_str(v: Any) -> str | None: if v is None: return None s = str(v) return s if s else None class FeishuMessageRouter(ChannelTraceRouter): """ 飞书消息路由:prepare_session → Executor → bind_agent_trace_id。 ``reaction`` / ``card_action`` 是否触发续跑由 ``dispatch_*`` 控制; ``card_action`` 多用于 OAuth 等卡片交互后让 Agent 继续执行。 """ def __init__( self, *, connector: FeishuConnector, trace_backend: TraceBackend, executor_backend: FeishuExecutorBackend, identity_resolver: FeishuUserIdentityResolver, workspace_prefix: str = CHANNEL_FEISHU, default_agent_type: str = "personal_assistant", auto_create_trace: bool = True, dispatch_reactions: bool = False, dispatch_card_actions: bool = False, ) -> None: super().__init__( trace_backend=trace_backend, workspace_prefix=workspace_prefix, default_agent_type=default_agent_type, ) self._connector = connector self._executor = executor_backend self._identity = identity_resolver self._auto_create = auto_create_trace self._dispatch_reactions = dispatch_reactions self._dispatch_card_actions = dispatch_card_actions def _reply_context_from_event(self, event: IncomingFeishuEvent) -> FeishuReplyContext | None: chat_id = event.chat_id message_id = event.message_id if not chat_id and event.event_type == "card_action": c, m, _ = routing_from_card_action_raw(event.raw) chat_id = chat_id or c message_id = message_id or m if not chat_id: logger.warning("missing chat_id, cannot reply: %s", feishu_event_to_mapping(event)) return None return FeishuReplyContext( account_id=event.account_id, app_id=event.app_id, chat_id=chat_id, message_id=message_id, open_id=event.open_id, ) def _synthetic_text_for_event(self, event: IncomingFeishuEvent) -> str | None: if event.event_type == "reaction" and event.emoji: return f"[系统-表情] {event.emoji} message_id={event.message_id or ''}" if event.event_type == "card_action": return ( "[系统-卡片交互] 用户已在飞书内完成卡片操作(如授权确认)," "请结合当前上下文继续执行未完成任务,必要时重试刚才失败的工具。" f" action={event.action or ''} operation_id={event.operation_id or ''}" ) return None async def route_feishu_inbound_event(self, event: IncomingFeishuEvent) -> RouteResult: """处理 Feishu HTTP 适配服务转发的规范化入站事件。""" user_id = self._identity.resolve_user_id(event) workspace_id = self._workspace_id_for_user(user_id) dispatch = False text: str | None = None if event.event_type == "message": dispatch = True text = event.content or "" elif event.event_type == "reaction" and self._dispatch_reactions: dispatch = True text = self._synthetic_text_for_event(event) or "" elif event.event_type == "card_action" and self._dispatch_card_actions: dispatch = True text = self._synthetic_text_for_event(event) or "" if not dispatch: return RouteResult( ok=True, skipped=True, reason=f"event_type_not_dispatched:{event.event_type}", user_id=user_id, ) if not self._auto_create: return RouteResult(ok=False, error="auto_create_trace_disabled", user_id=user_id) meta = feishu_event_to_mapping(event) await self._trace.prepare_session( channel=CHANNEL_FEISHU, user_id=user_id, workspace_id=workspace_id, agent_type=self._agent_type, metadata=meta, ) existing_agent_trace_id = await self._trace.get_existing_trace_id(CHANNEL_FEISHU, user_id) ctx = self._reply_context_from_event(event) if ctx is None: return RouteResult( ok=False, error="missing_chat_id_for_reply", trace_id=existing_agent_trace_id or "", user_id=user_id, workspace_id=workspace_id, ) task_id, agent_trace_id = await self._executor.handle_inbound_message( existing_agent_trace_id or "", text or "", ctx, self._connector, event=event, ) if agent_trace_id: await self._trace.bind_agent_trace_id( channel=CHANNEL_FEISHU, user_id=user_id, workspace_id=workspace_id, agent_trace_id=agent_trace_id, agent_type=self._agent_type, metadata=meta, ) return RouteResult( ok=True, trace_id=agent_trace_id or existing_agent_trace_id or "", task_id=task_id, user_id=user_id, workspace_id=workspace_id, ) async def route_message(self, channel: str, user_id: str, message: Mapping[str, Any]) -> str: """ 通用入口:message 建议包含 text、可选飞书上下文字段 (account_id, app_id, chat_id, message_id, open_id)。 """ text = str(message.get("text") or message.get("content") or "") workspace_id = self._workspace_id_for_user(user_id) meta = dict(message) if isinstance(message, dict) else {} await self._trace.prepare_session( channel=channel, user_id=user_id, workspace_id=workspace_id, agent_type=self._agent_type, metadata=meta, ) existing = await self._trace.get_existing_trace_id(channel, user_id) ctx = FeishuReplyContext( account_id=as_opt_str(message.get("account_id")), app_id=str(message.get("app_id") or ""), chat_id=str(message.get("chat_id") or ""), message_id=as_opt_str(message.get("message_id")), open_id=as_opt_str(message.get("open_id")), ) if not ctx.app_id or not ctx.chat_id: raise ValueError("route_message requires app_id and chat_id in message for Feishu reply") synthetic = IncomingFeishuEvent( event_type="message", app_id=ctx.app_id, account_id=ctx.account_id, open_id=ctx.open_id, chat_type=as_opt_str(message.get("chat_type")), chat_id=ctx.chat_id, message_id=ctx.message_id, content=text, raw=dict(message) if isinstance(message, dict) else {}, ) task_id, agent_trace_id = await self._executor.handle_inbound_message( existing or "", text, ctx, self._connector, event=synthetic ) if agent_trace_id: await self._trace.bind_agent_trace_id( channel=channel, user_id=user_id, workspace_id=workspace_id, agent_trace_id=agent_trace_id, agent_type=self._agent_type, metadata=meta, ) return task_id async def send_agent_reply( self, trace_id: str, content: str, metadata: Mapping[str, Any] | None = None, ) -> dict[str, Any]: """ Executor 完成后由业务调用:根据 metadata 中的飞书上下文发消息。 metadata 键:account_id?, app_id, chat_id, message_id?, open_id? """ meta = dict(metadata or {}) ctx = FeishuReplyContext( account_id=as_opt_str(meta.get("account_id")), app_id=str(meta.get("app_id") or ""), chat_id=str(meta.get("chat_id") or ""), message_id=as_opt_str(meta.get("message_id")), open_id=as_opt_str(meta.get("open_id")), ) if not ctx.chat_id: return {"ok": False, "error": "metadata missing chat_id", "trace_id": trace_id} _ = trace_id return await self._connector.send_text(ctx, content) __all__ = ["FeishuMessageRouter", "routing_from_card_action_raw", "as_opt_str"]