| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- """
- 飞书渠道 FastAPI 路由。
- ``FeishuChannelApi`` 以类方法作为路由处理器,依赖通过构造函数注入,
- 避免闭包捕获 channel_manager 的写法,也使各处理方法可独立测试。
- """
- from __future__ import annotations
- import logging
- from typing import Any
- from fastapi import APIRouter, HTTPException, Request
- from gateway.core.channels.feishu.manager import FeishuChannelManager
- logger = logging.getLogger(__name__)
- class FeishuChannelApi:
- """飞书渠道 HTTP 路由:持有 manager 引用,以方法作为路由处理器。
- 实现 ``ChannelPlugin`` Protocol,由 ``load_enabled_channels`` 自动挂载路由。
- """
- def __init__(self, channel_manager: FeishuChannelManager) -> None:
- self._manager = channel_manager
- @classmethod
- def from_env(cls) -> FeishuChannelApi:
- """从环境变量构造实例,供 ``load_enabled_channels`` 调用。"""
- return cls(FeishuChannelManager.from_env())
- async def inbound_webhook(self, request: Request) -> dict[str, Any]:
- """POST /api/channels/feishu/inbound/webhook
- Feishu HTTP 适配服务经 ``GATEWAY_FEISHU_WEBHOOK_URL`` 转发规范化 JSON 到此端点。
- """
- try:
- body = await request.json()
- except Exception:
- raise HTTPException(status_code=400, detail="invalid_json")
- if not isinstance(body, dict):
- raise HTTPException(status_code=400, detail="body_must_be_object")
- result = await self._manager.handle_feishu_inbound_webhook(body)
- if not result.ok and result.error:
- logger.warning("feishu inbound webhook error: %s", result.error)
- out: dict[str, Any] = {
- "ok": result.ok,
- "skipped": result.skipped,
- "reason": result.reason,
- "trace_id": result.trace_id,
- "task_id": result.task_id,
- "user_id": result.user_id,
- "workspace_id": result.workspace_id,
- }
- if result.error:
- out["error"] = result.error
- return out
- async def channel_status(self, channel_id: str) -> dict[str, Any]:
- """GET /api/channels/{channel_id}/status"""
- return self._manager.get_channel_status(channel_id)
- def build_router(self) -> APIRouter:
- """返回挂载好路由的 ``/api/channels`` APIRouter。"""
- channels = APIRouter(prefix="/api/channels", tags=["channels"])
- feishu = APIRouter(prefix="/feishu", tags=["feishu"])
- feishu.add_api_route(
- "/inbound/webhook",
- self.inbound_webhook,
- methods=["POST"],
- )
- channels.add_api_route(
- "/{channel_id}/status",
- self.channel_status,
- methods=["GET"],
- )
- channels.include_router(feishu)
- return channels
|