| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 |
- """飞书入站事件路由:Trace 会话准备、Executor 提交、绑定 Agent ``trace_id``。"""
- from __future__ import annotations
- import logging
- from collections.abc import Mapping
- from typing import Any
- from gateway.core.channels.feishu.connector import FeishuConnector
- from gateway.core.channels.feishu.protocols import (
- FeishuExecutorBackend,
- FeishuUserIdentityResolver,
- )
- from gateway.core.channels.feishu.types import (
- FeishuReplyContext,
- IncomingFeishuEvent,
- feishu_event_to_mapping,
- )
- from gateway.core.channels.protocols import TraceBackend
- from gateway.core.channels.router import ChannelTraceRouter
- from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult
- logger = logging.getLogger(__name__)
- def routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str | None, str | None]:
- """
- 当规范化 JSON 未带 chat_id 时,从飞书 card.action.trigger 原始体兜底解析。
- 常见路径:event.context.open_chat_id / open_message_id(或顶层 open_chat_id)。
- """
- if not raw:
- return None, None, None
- ev: Any = raw.get("event") if isinstance(raw.get("event"), dict) else raw
- if not isinstance(ev, dict):
- return None, None, None
- ctx = ev.get("context") if isinstance(ev.get("context"), dict) else {}
- def pick(*keys: str) -> str | None:
- for d in (ev, ctx):
- for k in keys:
- v = d.get(k)
- if isinstance(v, str) and v.strip():
- return v.strip()
- return None
- chat_id = pick("open_chat_id", "chat_id")
- message_id = pick("open_message_id", "message_id")
- chat_type = pick("chat_type")
- return chat_id, message_id, chat_type
- def as_opt_str(v: Any) -> str | None:
- if v is None:
- return None
- s = str(v)
- return s if s else None
- class FeishuMessageRouter(ChannelTraceRouter):
- """
- 飞书消息路由:prepare_session → Executor → bind_agent_trace_id。
- ``reaction`` / ``card_action`` 是否触发续跑由 ``dispatch_*`` 控制;
- ``card_action`` 多用于 OAuth 等卡片交互后让 Agent 继续执行。
- """
- def __init__(
- self,
- *,
- connector: FeishuConnector,
- trace_backend: TraceBackend,
- executor_backend: FeishuExecutorBackend,
- identity_resolver: FeishuUserIdentityResolver,
- workspace_prefix: str = CHANNEL_FEISHU,
- default_agent_type: str = "personal_assistant",
- auto_create_trace: bool = True,
- dispatch_reactions: bool = False,
- dispatch_card_actions: bool = False,
- ) -> None:
- super().__init__(
- trace_backend=trace_backend,
- workspace_prefix=workspace_prefix,
- default_agent_type=default_agent_type,
- )
- self._connector = connector
- self._executor = executor_backend
- self._identity = identity_resolver
- self._auto_create = auto_create_trace
- self._dispatch_reactions = dispatch_reactions
- self._dispatch_card_actions = dispatch_card_actions
- def _reply_context_from_event(self, event: IncomingFeishuEvent) -> FeishuReplyContext | None:
- chat_id = event.chat_id
- message_id = event.message_id
- if not chat_id and event.event_type == "card_action":
- c, m, _ = routing_from_card_action_raw(event.raw)
- chat_id = chat_id or c
- message_id = message_id or m
- if not chat_id:
- logger.warning("missing chat_id, cannot reply: %s", feishu_event_to_mapping(event))
- return None
- return FeishuReplyContext(
- account_id=event.account_id,
- app_id=event.app_id,
- chat_id=chat_id,
- message_id=message_id,
- open_id=event.open_id,
- )
- def _synthetic_text_for_event(self, event: IncomingFeishuEvent) -> str | None:
- if event.event_type == "reaction" and event.emoji:
- return f"[系统-表情] {event.emoji} message_id={event.message_id or ''}"
- if event.event_type == "card_action":
- return (
- "[系统-卡片交互] 用户已在飞书内完成卡片操作(如授权确认),"
- "请结合当前上下文继续执行未完成任务,必要时重试刚才失败的工具。"
- f" action={event.action or ''} operation_id={event.operation_id or ''}"
- )
- return None
- async def route_feishu_inbound_event(self, event: IncomingFeishuEvent) -> RouteResult:
- """处理 Feishu HTTP 适配服务转发的规范化入站事件。"""
- user_id = self._identity.resolve_user_id(event)
- workspace_id = self._workspace_id_for_user(user_id)
- dispatch = False
- text: str | None = None
- if event.event_type == "message":
- dispatch = True
- text = event.content or ""
- elif event.event_type == "reaction" and self._dispatch_reactions:
- dispatch = True
- text = self._synthetic_text_for_event(event) or ""
- elif event.event_type == "card_action" and self._dispatch_card_actions:
- dispatch = True
- text = self._synthetic_text_for_event(event) or ""
- if not dispatch:
- return RouteResult(
- ok=True,
- skipped=True,
- reason=f"event_type_not_dispatched:{event.event_type}",
- user_id=user_id,
- )
- if not self._auto_create:
- return RouteResult(ok=False, error="auto_create_trace_disabled", user_id=user_id)
- meta = feishu_event_to_mapping(event)
- await self._trace.prepare_session(
- channel=CHANNEL_FEISHU,
- user_id=user_id,
- workspace_id=workspace_id,
- agent_type=self._agent_type,
- metadata=meta,
- )
- existing_agent_trace_id = await self._trace.get_existing_trace_id(CHANNEL_FEISHU, user_id)
- ctx = self._reply_context_from_event(event)
- if ctx is None:
- return RouteResult(
- ok=False,
- error="missing_chat_id_for_reply",
- trace_id=existing_agent_trace_id or "",
- user_id=user_id,
- workspace_id=workspace_id,
- )
- task_id, agent_trace_id = await self._executor.handle_inbound_message(
- existing_agent_trace_id or "",
- text or "",
- ctx,
- self._connector,
- event=event,
- )
- if agent_trace_id:
- await self._trace.bind_agent_trace_id(
- channel=CHANNEL_FEISHU,
- user_id=user_id,
- workspace_id=workspace_id,
- agent_trace_id=agent_trace_id,
- agent_type=self._agent_type,
- metadata=meta,
- )
- return RouteResult(
- ok=True,
- trace_id=agent_trace_id or existing_agent_trace_id or "",
- task_id=task_id,
- user_id=user_id,
- workspace_id=workspace_id,
- )
- async def route_message(self, channel: str, user_id: str, message: Mapping[str, Any]) -> str:
- """
- 通用入口:message 建议包含 text、可选飞书上下文字段
- (account_id, app_id, chat_id, message_id, open_id)。
- """
- text = str(message.get("text") or message.get("content") or "")
- workspace_id = self._workspace_id_for_user(user_id)
- meta = dict(message) if isinstance(message, dict) else {}
- await self._trace.prepare_session(
- channel=channel,
- user_id=user_id,
- workspace_id=workspace_id,
- agent_type=self._agent_type,
- metadata=meta,
- )
- existing = await self._trace.get_existing_trace_id(channel, user_id)
- ctx = FeishuReplyContext(
- account_id=as_opt_str(message.get("account_id")),
- app_id=str(message.get("app_id") or ""),
- chat_id=str(message.get("chat_id") or ""),
- message_id=as_opt_str(message.get("message_id")),
- open_id=as_opt_str(message.get("open_id")),
- )
- if not ctx.app_id or not ctx.chat_id:
- raise ValueError("route_message requires app_id and chat_id in message for Feishu reply")
- synthetic = IncomingFeishuEvent(
- event_type="message",
- app_id=ctx.app_id,
- account_id=ctx.account_id,
- open_id=ctx.open_id,
- chat_type=as_opt_str(message.get("chat_type")),
- chat_id=ctx.chat_id,
- message_id=ctx.message_id,
- content=text,
- raw=dict(message) if isinstance(message, dict) else {},
- )
- task_id, agent_trace_id = await self._executor.handle_inbound_message(
- existing or "", text, ctx, self._connector, event=synthetic
- )
- if agent_trace_id:
- await self._trace.bind_agent_trace_id(
- channel=channel,
- user_id=user_id,
- workspace_id=workspace_id,
- agent_trace_id=agent_trace_id,
- agent_type=self._agent_type,
- metadata=meta,
- )
- return task_id
- async def send_agent_reply(
- self,
- trace_id: str,
- content: str,
- metadata: Mapping[str, Any] | None = None,
- ) -> dict[str, Any]:
- """
- Executor 完成后由业务调用:根据 metadata 中的飞书上下文发消息。
- metadata 键:account_id?, app_id, chat_id, message_id?, open_id?
- """
- meta = dict(metadata or {})
- ctx = FeishuReplyContext(
- account_id=as_opt_str(meta.get("account_id")),
- app_id=str(meta.get("app_id") or ""),
- chat_id=str(meta.get("chat_id") or ""),
- message_id=as_opt_str(meta.get("message_id")),
- open_id=as_opt_str(meta.get("open_id")),
- )
- if not ctx.chat_id:
- return {"ok": False, "error": "metadata missing chat_id", "trace_id": trace_id}
- _ = trace_id
- return await self._connector.send_text(ctx, content)
- __all__ = ["FeishuMessageRouter", "routing_from_card_action_raw", "as_opt_str"]
|