echo_executor.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. """
  2. 通用回显执行器。
  3. 适用于任何实现了 ``send_text(reply_context, text) -> dict`` 的渠道连接器,
  4. 无需感知具体渠道类型(飞书、微信等)。
  5. """
  6. from __future__ import annotations
  7. import logging
  8. import uuid
  9. from typing import Any
  10. logger = logging.getLogger(__name__)
  11. class EchoExecutorBackend:
  12. """默认执行器:将用户消息原文回显,用于验证 Gateway → 渠道适配层 → IM 全链路。"""
  13. def __init__(self, *, prefix: str = "[Gateway] ", enabled: bool = True) -> None:
  14. self._prefix = prefix
  15. self._enabled = enabled
  16. async def handle_inbound_message(
  17. self,
  18. trace_id: str,
  19. text: str,
  20. reply_context: Any,
  21. connector: Any,
  22. *,
  23. event: Any,
  24. ) -> str:
  25. task_id = f"task-{uuid.uuid4()}"
  26. if not self._enabled:
  27. logger.info("EchoExecutor disabled, skip reply trace_id=%s", trace_id)
  28. return task_id
  29. reply_body = f"{self._prefix}{text}" if text else f"{self._prefix}(空消息)"
  30. result = await connector.send_text(reply_context, reply_body)
  31. if not result.get("ok"):
  32. logger.error("send_text failed trace_id=%s result=%s", trace_id, result)
  33. return task_id