api.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. """
  2. 飞书渠道 FastAPI 路由。
  3. ``FeishuChannelApi`` 以类方法作为路由处理器,依赖通过构造函数注入,
  4. 避免闭包捕获 channel_manager 的写法,也使各处理方法可独立测试。
  5. """
  6. from __future__ import annotations
  7. import logging
  8. from typing import Any
  9. from fastapi import APIRouter, HTTPException, Request
  10. from gateway.core.channels.feishu.manager import FeishuChannelManager
  11. logger = logging.getLogger(__name__)
  12. class FeishuChannelApi:
  13. """飞书渠道 HTTP 路由:持有 manager 引用,以方法作为路由处理器。
  14. 实现 ``ChannelPlugin`` Protocol,由 ``load_enabled_channels`` 自动挂载路由。
  15. """
  16. def __init__(self, channel_manager: FeishuChannelManager) -> None:
  17. self._manager = channel_manager
  18. @classmethod
  19. def from_env(cls) -> FeishuChannelApi:
  20. """从环境变量构造实例,供 ``load_enabled_channels`` 调用。"""
  21. return cls(FeishuChannelManager.from_env())
  22. async def inbound_webhook(self, request: Request) -> dict[str, Any]:
  23. """POST /api/channels/feishu/inbound/webhook
  24. Feishu HTTP 适配服务经 ``GATEWAY_FEISHU_WEBHOOK_URL`` 转发规范化 JSON 到此端点。
  25. """
  26. try:
  27. body = await request.json()
  28. except Exception:
  29. raise HTTPException(status_code=400, detail="invalid_json")
  30. if not isinstance(body, dict):
  31. raise HTTPException(status_code=400, detail="body_must_be_object")
  32. result = await self._manager.handle_feishu_inbound_webhook(body)
  33. if not result.ok and result.error:
  34. logger.warning("feishu inbound webhook error: %s", result.error)
  35. out: dict[str, Any] = {
  36. "ok": result.ok,
  37. "skipped": result.skipped,
  38. "reason": result.reason,
  39. "trace_id": result.trace_id,
  40. "task_id": result.task_id,
  41. "user_id": result.user_id,
  42. "workspace_id": result.workspace_id,
  43. }
  44. if result.error:
  45. out["error"] = result.error
  46. return out
  47. async def channel_status(self, channel_id: str) -> dict[str, Any]:
  48. """GET /api/channels/{channel_id}/status"""
  49. return self._manager.get_channel_status(channel_id)
  50. def build_router(self) -> APIRouter:
  51. """返回挂载好路由的 ``/api/channels`` APIRouter。"""
  52. channels = APIRouter(prefix="/api/channels", tags=["channels"])
  53. feishu = APIRouter(prefix="/feishu", tags=["feishu"])
  54. feishu.add_api_route(
  55. "/inbound/webhook",
  56. self.inbound_webhook,
  57. methods=["POST"],
  58. )
  59. channels.add_api_route(
  60. "/{channel_id}/status",
  61. self.channel_status,
  62. methods=["GET"],
  63. )
  64. channels.include_router(feishu)
  65. return channels