echo_executor.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. from __future__ import annotations
  2. import logging
  3. import uuid
  4. from typing import TYPE_CHECKING, Any
  5. from gateway.core.channels.types import FeishuReplyContext, IncomingFeishuEvent
  6. if TYPE_CHECKING:
  7. from gateway.core.channels.feishu.connector import FeishuConnector
  8. logger = logging.getLogger(__name__)
  9. class EchoExecutorBackend:
  10. """默认执行器:回显或固定话术,验证「Gateway → Node → 飞书」链路。"""
  11. def __init__(self, *, prefix: str = "[Gateway] ", enabled: bool = True) -> None:
  12. self._prefix = prefix
  13. self._enabled = enabled
  14. async def handle_inbound_message(
  15. self,
  16. trace_id: str,
  17. text: str,
  18. reply_context: FeishuReplyContext,
  19. connector: Any,
  20. *,
  21. event: IncomingFeishuEvent,
  22. ) -> str:
  23. task_id = f"task-{uuid.uuid4()}"
  24. if not self._enabled:
  25. logger.info("EchoExecutor disabled, skip reply trace_id=%s", trace_id)
  26. return task_id
  27. reply_body = f"{self._prefix}{text}" if text else f"{self._prefix}(空消息)"
  28. result = await connector.send_text(reply_context, reply_body)
  29. if not result.get("ok"):
  30. logger.error("send_text failed trace_id=%s result=%s", trace_id, result)
  31. return task_id