_mapper.py 826 B

123456789101112131415161718192021222324252627282930
  1. from __future__ import annotations
  2. from typing import Any, Dict, List, Optional, Tuple
  3. from app.core.database import DatabaseManager
  4. from ._const import LongArticlesMcpConst
  5. class LongArticlesMcpMapper(LongArticlesMcpConst):
  6. """MCP 数据访问层:只负责拼 SQL + 访问 DB,不做业务编排。"""
  7. def __init__(self, pool: DatabaseManager):
  8. self.pool = pool
  9. # ---- 下面是各查询方法,返回原始数据 ----
  10. async def query_decode_response(
  11. self,
  12. page: int,
  13. page_size: int,
  14. sort_by: str | None,
  15. sort_order: str | None,
  16. filters: Optional[Dict[str, Any]] = None,
  17. ) -> Tuple[int, List[Dict[str, Any]]]:
  18. """查询解构结果(待实现)。返回 (total, items)。"""
  19. pass
  20. __all__ = ["LongArticlesMcpMapper"]