فهرست منبع

bugfix-取消 mcp 代码

luojunhui 1 ماه پیش
والد
کامیت
e914e9ca6f

+ 2 - 2
app/api/v1/endpoints/__init__.py

@@ -3,7 +3,7 @@ from .health import create_health_bp
 from .tasks import create_tasks_bp
 from .tokens import create_tokens_bp
 from .monitor import create_monitor_bp
-# from .mcp import create_mcp_bp
+from .mcp import create_mcp_bp
 
 __all__ = [
     "create_abtest_bp",
@@ -11,5 +11,5 @@ __all__ = [
     "create_tasks_bp",
     "create_tokens_bp",
     "create_monitor_bp",
-    # "create_mcp_bp",
+    "create_mcp_bp",
 ]

+ 46 - 52
app/api/v1/endpoints/mcp.py

@@ -1,52 +1,46 @@
-# from __future__ import annotations
-#
-# from pydantic import ValidationError
-# from quart import Blueprint, jsonify
-#
-# from app.api.v1.utils import ApiDependencies, LongArticlesMcpRequest
-# from app.api.v1.utils import parse_json, validation_error_response
-# # from app.domains.mcp import deal, UnknownMcpTaskName
-#
-#
-# def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
-#     bp = Blueprint("mcp", __name__)
-#
-#     @bp.route("/long_articles_mcp/<string:task_name>", methods=["POST"])
-#     async def long_articles_mcp(task_name: str):
-#         """MCP 交互接口
-#
-#         - POST /api/long_articles_mcp/<task_name>
-#         - 通过 task_name 分发到 MCP 域内对应的方法
-#         - 请求体为 JSON,可选字段如 page、page_size、sort_by、sort_order 等
-#         """
-#         try:
-#             req, _ = await parse_json(LongArticlesMcpRequest)
-#         except ValidationError as e:
-#             payload, status = validation_error_response(e)
-#             return jsonify(payload), status
-#
-#         # 使用校验后的模型转 dict,保证类型正确,且含 extra 透传字段
-#         params = req.model_dump(exclude_none=True)
-#
-#         try:
-#             result = await deal(
-#                 task_name=task_name,
-#                 pool=deps.db,
-#                 log_service=deps.log,
-#                 params=params,
-#             )
-#         except UnknownMcpTaskName:
-#             return (
-#                 jsonify(
-#                     {
-#                         "code": 404,
-#                         "message": f"unknown task_name: {task_name}",
-#                         "data": None,
-#                     }
-#                 ),
-#                 404,
-#             )
-#
-#         return jsonify({"code": 0, "message": "success", "data": result})
-#
-#     return bp
+from __future__ import annotations
+
+from pydantic import ValidationError
+from quart import Blueprint, jsonify
+
+from app.api.v1.utils import ApiDependencies, LongArticlesMcpRequest
+from app.api.v1.utils import parse_json, validation_error_response
+from app.domains.mcp import LongArticlesMcp
+
+
+def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
+    bp = Blueprint("mcp", __name__)
+
+    @bp.route("/long_articles_mcp/<string:task_name>", methods=["POST"])
+    async def long_articles_mcp(task_name: str):
+        """MCP 交互接口
+
+        POST /api/long_articles_mcp/<task_name>
+        请求体为 JSON,可选字段: page, page_size, sort_by, sort_order
+        """
+        # 1. 请求体校验
+        try:
+            req, _ = await parse_json(LongArticlesMcpRequest)
+        except ValidationError as e:
+            payload, status = validation_error_response(e)
+            return jsonify(payload), status
+
+        params = req.model_dump(exclude_none=True)
+
+        # 2. 构造 MCP 入口并分发
+        mcp = LongArticlesMcp(
+            params=params,
+            pool=deps.db,
+            log_service=deps.log,
+            config=deps.config,
+        )
+        result = await mcp.deal(task_name=task_name, params=params)
+
+        # 3. deal() 已统一封装 {code, message, data},直接透传
+        http_status = 200 if result.get("code") == 0 else result.get("code", 500)
+        return jsonify(result), http_status
+
+    return bp
+
+
+__all__ = ["create_mcp_bp"]

+ 2 - 2
app/api/v1/routes/routes.py

@@ -9,7 +9,7 @@ from app.api.v1.endpoints import (
     create_tasks_bp,
     create_tokens_bp,
     create_monitor_bp,
-    # create_mcp_bp
+    create_mcp_bp
 )
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
@@ -33,7 +33,7 @@ def register_v1_blueprints(deps: ApiDependencies) -> Blueprint:
     api.register_blueprint(create_tokens_bp(deps))
     api.register_blueprint(create_abtest_bp(deps))
     api.register_blueprint(create_monitor_bp(deps))
-    # api.register_blueprint(create_mcp_bp(deps))
+    api.register_blueprint(create_mcp_bp(deps))
 
     return api
 

+ 3 - 0
app/domains/mcp/__init__.py

@@ -0,0 +1,3 @@
+from .entrance import LongArticlesMcp
+
+__all__ = ["LongArticlesMcp"]

+ 10 - 19
app/domains/mcp/_const.py

@@ -1,27 +1,18 @@
-import math
-
-
 class LongArticlesMcpConst:
-    """MCP 配置层:排序字段、SQL 片段、分页默认值等。"""
-    # 分页配置
+    """MCP 常量配置:分页默认值、排序字段映射。"""
+
+    # 分页
     DEFAULT_PAGE = 1
     DEFAULT_PAGE_SIZE = 20
     MAX_PAGE_SIZE = 100
 
-    @staticmethod
-    def normalize_pagination(page: int | None, page_size: int | None) -> tuple[int, int]:
-        page = page or LongArticlesMcpConst.DEFAULT_PAGE
-        page = max(page, 1)
-
-        page_size = page_size or LongArticlesMcpConst.DEFAULT_PAGE_SIZE
-        page_size = max(1, min(page_size, LongArticlesMcpConst.MAX_PAGE_SIZE))
-        return page, page_size
-
-    @staticmethod
-    def calc_total_pages(total: int, page_size: int) -> int:
-        if total <= 0:
-            return 0
-        return math.ceil(total / page_size)
+    # 排序:对外 sort_by 值 -> 实际 SQL 字段
+    SORT_FIELDS = {
+        "read_cnt": "t.read_cnt",
+        "read_median_multiplier": "t.read_median_multiplier",
+        "publish_time": "t.publish_time",
+    }
+    SORT_ORDERS = {"asc", "desc"}
 
 
 __all__ = ["LongArticlesMcpConst"]

+ 0 - 24
app/domains/mcp/_handler_map.py

@@ -1,24 +0,0 @@
-from typing import Any, Awaitable, Callable, Dict
-
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-
-HandlerType = Callable[[DatabaseManager, LogService, Dict[str, Any] | None], Awaitable[Any]]
-
-
-async def _get_decode_response_wrapper(
-    pool: DatabaseManager,
-    log_service: LogService,
-    params: Dict[str, Any] | None,
-):
-    # 这里只是为 handler_map 统一签名做一层薄封装
-    return await get_decode_response(pool=pool, log_service=log_service, params=params)
-
-
-HANDLER_MAP: Dict[str, HandlerType] = {
-    "get_decode_response": _get_decode_response_wrapper,
-}
-
-
-__all__ = ["HANDLER_MAP"]

+ 36 - 0
app/domains/mcp/_handlers.py

@@ -0,0 +1,36 @@
+from __future__ import annotations
+
+from typing import Any, Awaitable, Callable, Dict
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._mapper import LongArticlesMcpMapper
+from ._utils import LongArticlesMcpUtils
+
+
+class LongArticlesMcpHandlers:
+    """MCP 处理层:每个 task_name 对应一个同名 handler 方法。"""
+
+    def __init__(self, pool: DatabaseManager, log_service: LogService, config: GlobalConfigSettings):
+        self.log_service = log_service
+        self.config = config
+        self.tools = LongArticlesMcpUtils()
+        self.mapper = LongArticlesMcpMapper(pool=pool)
+
+    def get_handler(self, task_name: str) -> Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]]:
+        """根据 task_name 查找同名方法,未找到时抛出 KeyError。"""
+        method = getattr(self, task_name, None)
+        if method is None or not callable(method):
+            raise KeyError(task_name)
+        return method
+
+    # ---- 下面是各 task_name 对应的 handler 方法,统一签名 ----
+
+    async def get_decode_response(self, params: Dict[str, Any]) -> Dict[str, Any]:
+        """get_decode_response 处理逻辑(待实现)。"""
+        pass
+
+
+__all__ = ["LongArticlesMcpHandlers"]

+ 16 - 7
app/domains/mcp/_mapper.py

@@ -1,4 +1,6 @@
-from typing import Any, Dict, List, Optional
+from __future__ import annotations
+
+from typing import Any, Dict, List, Optional, Tuple
 
 from app.core.database import DatabaseManager
 
@@ -6,16 +8,23 @@ from ._const import LongArticlesMcpConst
 
 
 class LongArticlesMcpMapper(LongArticlesMcpConst):
-    """MCP 方法层:只负责拼 SQL + 访问 DB,不做业务编排。"""
+    """MCP 数据访问层:只负责拼 SQL + 访问 DB,不做业务编排。"""
 
     def __init__(self, pool: DatabaseManager):
         self.pool = pool
 
-    # 解构
-
-    # 抓取
-
-    # 统计
+    # ---- 下面是各查询方法,返回原始数据 ----
+
+    async def query_decode_response(
+        self,
+        page: int,
+        page_size: int,
+        sort_by: str | None,
+        sort_order: str | None,
+        filters: Optional[Dict[str, Any]] = None,
+    ) -> Tuple[int, List[Dict[str, Any]]]:
+        """查询解构结果(待实现)。返回 (total, items)。"""
+        pass
 
 
 __all__ = ["LongArticlesMcpMapper"]

+ 81 - 0
app/domains/mcp/_utils.py

@@ -0,0 +1,81 @@
+from __future__ import annotations
+
+import math
+from typing import Any, Dict, Tuple
+
+from ._const import LongArticlesMcpConst
+
+
+class LongArticlesMcpUtils(LongArticlesMcpConst):
+    """MCP 工具层:参数解析、校验、分页计算与统一响应构造。"""
+
+    # ---------- 分页计算 ----------
+
+    @staticmethod
+    def normalize_pagination(page: int | None, page_size: int | None) -> Tuple[int, int]:
+        page = page or LongArticlesMcpConst.DEFAULT_PAGE
+        page = max(page, 1)
+
+        page_size = page_size or LongArticlesMcpConst.DEFAULT_PAGE_SIZE
+        page_size = max(1, min(page_size, LongArticlesMcpConst.MAX_PAGE_SIZE))
+        return page, page_size
+
+    @staticmethod
+    def calc_total_pages(total: int, page_size: int) -> int:
+        if total <= 0:
+            return 0
+        return math.ceil(total / page_size)
+
+    # ---------- 参数解析 ----------
+
+    @classmethod
+    def parse_pagination(cls, params: Dict[str, Any]) -> Tuple[int, int]:
+        page_raw = params.get("page")
+        page_size_raw = params.get("page_size")
+
+        try:
+            page = int(page_raw) if page_raw is not None else None
+            page_size = int(page_size_raw) if page_size_raw is not None else None
+        except (TypeError, ValueError) as exc:
+            raise ValueError("page and page_size must be integers") from exc
+
+        return cls.normalize_pagination(page, page_size)
+
+    @classmethod
+    def parse_sort(cls, params: Dict[str, Any]) -> Tuple[str | None, str | None]:
+        sort_by = params.get("sort_by")
+        sort_order = params.get("sort_order")
+
+        if sort_by is not None:
+            if sort_by not in cls.SORT_FIELDS:
+                raise ValueError(f"invalid sort_by: {sort_by}")
+        if sort_order is not None:
+            sort_order = str(sort_order).lower()
+            if sort_order not in cls.SORT_ORDERS:
+                raise ValueError(f"invalid sort_order: {sort_order}")
+
+        return sort_by, sort_order
+
+    # ---------- 响应构造 ----------
+
+    @staticmethod
+    def build_success(data: Any) -> Dict[str, Any]:
+        return {
+            "code": 0,
+            "message": "success",
+            "data": data,
+        }
+
+    @staticmethod
+    def build_error(code: int, message: str, errors: list[str] | None = None) -> Dict[str, Any]:
+        body: Dict[str, Any] = {
+            "code": code,
+            "message": message,
+            "data": None,
+        }
+        if errors is not None:
+            body["errors"] = errors
+        return body
+
+
+__all__ = ["LongArticlesMcpUtils"]

+ 53 - 0
app/domains/mcp/entrance.py

@@ -0,0 +1,53 @@
+from __future__ import annotations
+
+import logging
+from typing import Any, Dict
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._handlers import LongArticlesMcpHandlers
+from ._utils import LongArticlesMcpUtils
+
+logger = logging.getLogger(__name__)
+
+
+class LongArticlesMcp:
+    """MCP 入口层:负责业务编排,统一分发与响应封装。"""
+
+    def __init__(
+        self,
+        params: Dict,
+        pool: DatabaseManager,
+        log_service: LogService,
+        config: GlobalConfigSettings,
+    ):
+        self.params = params
+        self.db_pool = pool
+        self.log_service = log_service
+        self.config = config
+        self.handler = LongArticlesMcpHandlers(pool=pool, log_service=log_service, config=config)
+        self.tools = LongArticlesMcpUtils()
+
+    async def deal(self, task_name: str, params: dict) -> Dict[str, Any]:
+        """基于 task_name 分发到具体 handler,统一封装 {code, message, data} 返回。"""
+        request_ctx = {"task_name": task_name}
+        logger.info("mcp.request.start ctx=%s params=%s", request_ctx, params)
+
+        try:
+            handler = self.handler.get_handler(task_name)
+        except KeyError:
+            logger.warning("mcp.unknown_task ctx=%s", request_ctx)
+            return self.tools.build_error(code=404, message=f"unknown task_name: {task_name}")
+
+        try:
+            data = await handler(params or {})
+            logger.info("mcp.request.success ctx=%s", request_ctx)
+            return self.tools.build_success(data=data)
+        except ValueError as exc:
+            logger.warning("mcp.validation_error ctx=%s error=%s", request_ctx, exc)
+            return self.tools.build_error(code=400, message="invalid request body", errors=[str(exc)])
+        except Exception as exc:
+            logger.error("mcp.internal_error ctx=%s error=%s", request_ctx, exc)
+            return self.tools.build_error(code=500, message="internal error")