router.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. """飞书入站事件路由:Trace 会话准备、Executor 提交、绑定 Agent ``trace_id``。"""
  2. from __future__ import annotations
  3. import logging
  4. from collections.abc import Mapping
  5. from typing import Any
  6. from gateway.core.channels.feishu.connector import FeishuConnector
  7. from gateway.core.channels.feishu.protocols import (
  8. FeishuExecutorBackend,
  9. FeishuUserIdentityResolver,
  10. )
  11. from gateway.core.channels.feishu.types import (
  12. FeishuReplyContext,
  13. IncomingFeishuEvent,
  14. feishu_event_to_mapping,
  15. )
  16. from gateway.core.channels.protocols import TraceBackend
  17. from gateway.core.channels.router import ChannelTraceRouter
  18. from gateway.core.channels.types import CHANNEL_FEISHU, RouteResult
  19. logger = logging.getLogger(__name__)
  20. def routing_from_card_action_raw(raw: dict[str, Any]) -> tuple[str | None, str | None, str | None]:
  21. """
  22. 当规范化 JSON 未带 chat_id 时,从飞书 card.action.trigger 原始体兜底解析。
  23. 常见路径:event.context.open_chat_id / open_message_id(或顶层 open_chat_id)。
  24. """
  25. if not raw:
  26. return None, None, None
  27. ev: Any = raw.get("event") if isinstance(raw.get("event"), dict) else raw
  28. if not isinstance(ev, dict):
  29. return None, None, None
  30. ctx = ev.get("context") if isinstance(ev.get("context"), dict) else {}
  31. def pick(*keys: str) -> str | None:
  32. for d in (ev, ctx):
  33. for k in keys:
  34. v = d.get(k)
  35. if isinstance(v, str) and v.strip():
  36. return v.strip()
  37. return None
  38. chat_id = pick("open_chat_id", "chat_id")
  39. message_id = pick("open_message_id", "message_id")
  40. chat_type = pick("chat_type")
  41. return chat_id, message_id, chat_type
  42. def as_opt_str(v: Any) -> str | None:
  43. if v is None:
  44. return None
  45. s = str(v)
  46. return s if s else None
  47. class FeishuMessageRouter(ChannelTraceRouter):
  48. """
  49. 飞书消息路由:prepare_session → Executor → bind_agent_trace_id。
  50. ``reaction`` / ``card_action`` 是否触发续跑由 ``dispatch_*`` 控制;
  51. ``card_action`` 多用于 OAuth 等卡片交互后让 Agent 继续执行。
  52. """
  53. def __init__(
  54. self,
  55. *,
  56. connector: FeishuConnector,
  57. trace_backend: TraceBackend,
  58. executor_backend: FeishuExecutorBackend,
  59. identity_resolver: FeishuUserIdentityResolver,
  60. workspace_prefix: str = CHANNEL_FEISHU,
  61. default_agent_type: str = "personal_assistant",
  62. auto_create_trace: bool = True,
  63. dispatch_reactions: bool = False,
  64. dispatch_card_actions: bool = False,
  65. ) -> None:
  66. super().__init__(
  67. trace_backend=trace_backend,
  68. workspace_prefix=workspace_prefix,
  69. default_agent_type=default_agent_type,
  70. )
  71. self._connector = connector
  72. self._executor = executor_backend
  73. self._identity = identity_resolver
  74. self._auto_create = auto_create_trace
  75. self._dispatch_reactions = dispatch_reactions
  76. self._dispatch_card_actions = dispatch_card_actions
  77. def _reply_context_from_event(self, event: IncomingFeishuEvent) -> FeishuReplyContext | None:
  78. chat_id = event.chat_id
  79. message_id = event.message_id
  80. if not chat_id and event.event_type == "card_action":
  81. c, m, _ = routing_from_card_action_raw(event.raw)
  82. chat_id = chat_id or c
  83. message_id = message_id or m
  84. if not chat_id:
  85. logger.warning("missing chat_id, cannot reply: %s", feishu_event_to_mapping(event))
  86. return None
  87. return FeishuReplyContext(
  88. account_id=event.account_id,
  89. app_id=event.app_id,
  90. chat_id=chat_id,
  91. message_id=message_id,
  92. open_id=event.open_id,
  93. )
  94. def _synthetic_text_for_event(self, event: IncomingFeishuEvent) -> str | None:
  95. if event.event_type == "reaction" and event.emoji:
  96. return f"[系统-表情] {event.emoji} message_id={event.message_id or ''}"
  97. if event.event_type == "card_action":
  98. return (
  99. "[系统-卡片交互] 用户已在飞书内完成卡片操作(如授权确认),"
  100. "请结合当前上下文继续执行未完成任务,必要时重试刚才失败的工具。"
  101. f" action={event.action or ''} operation_id={event.operation_id or ''}"
  102. )
  103. return None
  104. async def route_feishu_inbound_event(self, event: IncomingFeishuEvent) -> RouteResult:
  105. """处理 Feishu HTTP 适配服务转发的规范化入站事件。"""
  106. user_id = self._identity.resolve_user_id(event)
  107. workspace_id = self._workspace_id_for_user(user_id)
  108. dispatch = False
  109. text: str | None = None
  110. if event.event_type == "message":
  111. dispatch = True
  112. text = event.content or ""
  113. elif event.event_type == "reaction" and self._dispatch_reactions:
  114. dispatch = True
  115. text = self._synthetic_text_for_event(event) or ""
  116. elif event.event_type == "card_action" and self._dispatch_card_actions:
  117. dispatch = True
  118. text = self._synthetic_text_for_event(event) or ""
  119. if not dispatch:
  120. return RouteResult(
  121. ok=True,
  122. skipped=True,
  123. reason=f"event_type_not_dispatched:{event.event_type}",
  124. user_id=user_id,
  125. )
  126. if not self._auto_create:
  127. return RouteResult(ok=False, error="auto_create_trace_disabled", user_id=user_id)
  128. meta = feishu_event_to_mapping(event)
  129. await self._trace.prepare_session(
  130. channel=CHANNEL_FEISHU,
  131. user_id=user_id,
  132. workspace_id=workspace_id,
  133. agent_type=self._agent_type,
  134. metadata=meta,
  135. )
  136. existing_agent_trace_id = await self._trace.get_existing_trace_id(CHANNEL_FEISHU, user_id)
  137. ctx = self._reply_context_from_event(event)
  138. if ctx is None:
  139. return RouteResult(
  140. ok=False,
  141. error="missing_chat_id_for_reply",
  142. trace_id=existing_agent_trace_id or "",
  143. user_id=user_id,
  144. workspace_id=workspace_id,
  145. )
  146. task_id, agent_trace_id = await self._executor.handle_inbound_message(
  147. existing_agent_trace_id or "",
  148. text or "",
  149. ctx,
  150. self._connector,
  151. event=event,
  152. )
  153. if agent_trace_id:
  154. await self._trace.bind_agent_trace_id(
  155. channel=CHANNEL_FEISHU,
  156. user_id=user_id,
  157. workspace_id=workspace_id,
  158. agent_trace_id=agent_trace_id,
  159. agent_type=self._agent_type,
  160. metadata=meta,
  161. )
  162. return RouteResult(
  163. ok=True,
  164. trace_id=agent_trace_id or existing_agent_trace_id or "",
  165. task_id=task_id,
  166. user_id=user_id,
  167. workspace_id=workspace_id,
  168. )
  169. async def route_message(self, channel: str, user_id: str, message: Mapping[str, Any]) -> str:
  170. """
  171. 通用入口:message 建议包含 text、可选飞书上下文字段
  172. (account_id, app_id, chat_id, message_id, open_id)。
  173. """
  174. text = str(message.get("text") or message.get("content") or "")
  175. workspace_id = self._workspace_id_for_user(user_id)
  176. meta = dict(message) if isinstance(message, dict) else {}
  177. await self._trace.prepare_session(
  178. channel=channel,
  179. user_id=user_id,
  180. workspace_id=workspace_id,
  181. agent_type=self._agent_type,
  182. metadata=meta,
  183. )
  184. existing = await self._trace.get_existing_trace_id(channel, user_id)
  185. ctx = FeishuReplyContext(
  186. account_id=as_opt_str(message.get("account_id")),
  187. app_id=str(message.get("app_id") or ""),
  188. chat_id=str(message.get("chat_id") or ""),
  189. message_id=as_opt_str(message.get("message_id")),
  190. open_id=as_opt_str(message.get("open_id")),
  191. )
  192. if not ctx.app_id or not ctx.chat_id:
  193. raise ValueError("route_message requires app_id and chat_id in message for Feishu reply")
  194. synthetic = IncomingFeishuEvent(
  195. event_type="message",
  196. app_id=ctx.app_id,
  197. account_id=ctx.account_id,
  198. open_id=ctx.open_id,
  199. chat_type=as_opt_str(message.get("chat_type")),
  200. chat_id=ctx.chat_id,
  201. message_id=ctx.message_id,
  202. content=text,
  203. raw=dict(message) if isinstance(message, dict) else {},
  204. )
  205. task_id, agent_trace_id = await self._executor.handle_inbound_message(
  206. existing or "", text, ctx, self._connector, event=synthetic
  207. )
  208. if agent_trace_id:
  209. await self._trace.bind_agent_trace_id(
  210. channel=channel,
  211. user_id=user_id,
  212. workspace_id=workspace_id,
  213. agent_trace_id=agent_trace_id,
  214. agent_type=self._agent_type,
  215. metadata=meta,
  216. )
  217. return task_id
  218. async def send_agent_reply(
  219. self,
  220. trace_id: str,
  221. content: str,
  222. metadata: Mapping[str, Any] | None = None,
  223. ) -> dict[str, Any]:
  224. """
  225. Executor 完成后由业务调用:根据 metadata 中的飞书上下文发消息。
  226. metadata 键:account_id?, app_id, chat_id, message_id?, open_id?
  227. """
  228. meta = dict(metadata or {})
  229. ctx = FeishuReplyContext(
  230. account_id=as_opt_str(meta.get("account_id")),
  231. app_id=str(meta.get("app_id") or ""),
  232. chat_id=str(meta.get("chat_id") or ""),
  233. message_id=as_opt_str(meta.get("message_id")),
  234. open_id=as_opt_str(meta.get("open_id")),
  235. )
  236. if not ctx.chat_id:
  237. return {"ok": False, "error": "metadata missing chat_id", "trace_id": trace_id}
  238. _ = trace_id
  239. return await self._connector.send_text(ctx, content)
  240. __all__ = ["FeishuMessageRouter", "routing_from_card_action_raw", "as_opt_str"]