| 123456789101112131415161718192021222324252627282930 |
- from __future__ import annotations
- from typing import Any, Dict, List, Optional, Tuple
- from app.core.database import DatabaseManager
- from ._const import LongArticlesMcpConst
- class LongArticlesMcpMapper(LongArticlesMcpConst):
- """MCP 数据访问层:只负责拼 SQL + 访问 DB,不做业务编排。"""
- def __init__(self, pool: DatabaseManager):
- self.pool = pool
- # ---- 下面是各查询方法,返回原始数据 ----
- async def query_decode_response(
- self,
- page: int,
- page_size: int,
- sort_by: str | None,
- sort_order: str | None,
- filters: Optional[Dict[str, Any]] = None,
- ) -> Tuple[int, List[Dict[str, Any]]]:
- """查询解构结果(待实现)。返回 (total, items)。"""
- pass
- __all__ = ["LongArticlesMcpMapper"]
|