Explorar el Código

长文-聚类优化

luojunhui hace 2 días
padre
commit
a81274e1a9

+ 13 - 13
app/api/v1/endpoints/mcp.py

@@ -5,7 +5,7 @@ from quart import Blueprint, jsonify
 
 from app.api.v1.utils import ApiDependencies, LongArticlesMcpRequest
 from app.api.v1.utils import parse_json, validation_error_response
-from app.domains.mcp import LongArticleMCP
+from app.domains.mcp import deal, UnknownMcpTaskName
 
 
 def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
@@ -13,12 +13,11 @@ def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
 
     @bp.route("/long_articles_mcp/<string:task_name>", methods=["POST"])
     async def long_articles_mcp(task_name: str):
-        """
-        MCP 交互接口
+        """MCP 交互接口
 
         - POST /api/long_articles_mcp/<task_name>
-        - 通过 task_name 分发到 LongArticleMCP 内对应的方法
-        - 请求体为 JSON,可选字段如 limit、offset 等,由各 task 按需使用
+        - 通过 task_name 分发到 MCP 内对应的方法
+        - 请求体为 JSON,可选字段如 page、page_size、sort_by、sort_order 等
         """
         try:
             req, _ = await parse_json(LongArticlesMcpRequest)
@@ -26,15 +25,17 @@ def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
             payload, status = validation_error_response(e)
             return jsonify(payload), status
 
-        # 使用校验后的模型转 dict,保证 limit/offset 等类型正确,且含 extra 透传字段
+        # 使用校验后的模型转 dict,保证类型正确,且含 extra 透传字段
         params = req.model_dump(exclude_none=True)
 
-        mcp = LongArticleMCP(pool=deps.db, log_service=deps.log)
-        handler_map = {
-            "get_decode_response": mcp.get_decode_response,
-        }
-        handler = handler_map.get(task_name)
-        if handler is None:
+        try:
+            result = await deal(
+                task_name=task_name,
+                pool=deps.db,
+                log_service=deps.log,
+                params=params,
+            )
+        except UnknownMcpTaskName:
             return (
                 jsonify(
                     {
@@ -46,7 +47,6 @@ def create_mcp_bp(deps: ApiDependencies) -> Blueprint:
                 404,
             )
 
-        result = await handler(params=params)
         return jsonify({"code": 0, "message": "success", "data": result})
 
     return bp

+ 2 - 3
app/domains/mcp/__init__.py

@@ -1,4 +1,3 @@
-from .long_articles_mcp import LongArticleMCP
-
-__all__ = ["LongArticleMCP"]
+from .entrance import deal, UnknownMcpTaskName
 
+__all__ = ["deal", "UnknownMcpTaskName"]

+ 27 - 0
app/domains/mcp/_const.py

@@ -0,0 +1,27 @@
+import math
+
+
+class LongArticlesMcpConst:
+    """MCP 配置层:排序字段、SQL 片段、分页默认值等。"""
+    # 分页配置
+    DEFAULT_PAGE = 1
+    DEFAULT_PAGE_SIZE = 20
+    MAX_PAGE_SIZE = 100
+
+    @staticmethod
+    def normalize_pagination(page: int | None, page_size: int | None) -> tuple[int, int]:
+        page = page or LongArticlesMcpConst.DEFAULT_PAGE
+        page = max(page, 1)
+
+        page_size = page_size or LongArticlesMcpConst.DEFAULT_PAGE_SIZE
+        page_size = max(1, min(page_size, LongArticlesMcpConst.MAX_PAGE_SIZE))
+        return page, page_size
+
+    @staticmethod
+    def calc_total_pages(total: int, page_size: int) -> int:
+        if total <= 0:
+            return 0
+        return math.ceil(total / page_size)
+
+
+__all__ = ["LongArticlesMcpConst"]

+ 24 - 0
app/domains/mcp/_handler_map.py

@@ -0,0 +1,24 @@
+from typing import Any, Awaitable, Callable, Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+
+HandlerType = Callable[[DatabaseManager, LogService, Dict[str, Any] | None], Awaitable[Any]]
+
+
+async def _get_decode_response_wrapper(
+    pool: DatabaseManager,
+    log_service: LogService,
+    params: Dict[str, Any] | None,
+):
+    # 这里只是为 handler_map 统一签名做一层薄封装
+    return await get_decode_response(pool=pool, log_service=log_service, params=params)
+
+
+HANDLER_MAP: Dict[str, HandlerType] = {
+    "get_decode_response": _get_decode_response_wrapper,
+}
+
+
+__all__ = ["HANDLER_MAP"]

+ 21 - 0
app/domains/mcp/_mapper.py

@@ -0,0 +1,21 @@
+from typing import Any, Dict, List, Optional
+
+from app.core.database import DatabaseManager
+
+from ._const import LongArticlesMcpConst
+
+
+class LongArticlesMcpMapper(LongArticlesMcpConst):
+    """MCP 方法层:只负责拼 SQL + 访问 DB,不做业务编排。"""
+
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    # 解构
+
+    # 抓取
+
+    # 统计
+
+
+__all__ = ["LongArticlesMcpMapper"]

+ 0 - 70
app/domains/mcp/long_articles_mcp.py

@@ -1,70 +0,0 @@
-import math
-from typing import Any, Dict, Optional
-
-from app.core.database import DatabaseManager
-from app.core.observability import LogService
-
-
-class LongArticleMCP:
-    def __init__(self, pool: DatabaseManager, log_service: LogService):
-        self.pool = pool
-        self.log_service = log_service
-
-    SORTABLE_FIELDS = {
-        "read_cnt",
-        "read_median_multiplier",
-        "publish_time",
-    }
-
-    BASE_WHERE = "WHERE t1.status = 2"
-
-    BASE_FROM = """
-        FROM long_articles_decode_tasks t1
-        JOIN ad_platform_accounts_daily_detail t2
-        ON t1.wx_sn = t2.wx_sn
-    """
-
-    async def get_decode_response(self, params: Optional[Dict[str, Any]] = None):
-        params = params or {}
-        page = params.get("page", 1)
-        page_size = params.get("page_size", 20)
-        sort_by = params.get("sort_by")
-        sort_order = params.get("sort_order", "asc")
-
-        count_query = f"SELECT COUNT(*) AS total {self.BASE_FROM} {self.BASE_WHERE}"
-        count_result = await self.pool.async_fetch(query=count_query)
-        total = count_result[0]["total"] if count_result else 0
-        total_pages = math.ceil(total / page_size) if total > 0 else 0
-
-        data_query = f"""
-            SELECT t2.gh_id,
-                 t2.account_name,
-                 t2.article_title,
-                 t2.article_link,
-                 t2.article_cover,
-                 t2.article_text,
-                 t2.read_cnt,
-                 t2.read_median_multiplier,
-                 from_unixtime(t2.publish_timestamp, '%%Y-%%m-%%d %%H:%%i:%%s') as publish_time,
-                 t1.result
-            {self.BASE_FROM}
-            {self.BASE_WHERE}
-        """
-
-        if sort_by and sort_by in self.SORTABLE_FIELDS:
-            direction = "ASC" if sort_order == "asc" else "DESC"
-            order_column = "t2.publish_timestamp" if sort_by == "publish_time" else f"t2.{sort_by}"
-            data_query += f" ORDER BY {order_column} {direction}"
-
-        offset = (page - 1) * page_size
-        data_query += " LIMIT %s OFFSET %s"
-        rows = await self.pool.async_fetch(query=data_query, params=(page_size, offset))
-
-        return {
-            "total": total,
-            "page": page,
-            "page_size": page_size,
-            "total_pages": total_pages,
-            "items": rows,
-        }
-

+ 1 - 0
app/domains/monitor_tasks/gzh_article_monitor.py

@@ -286,6 +286,7 @@ class InnerGzhArticlesMonitor(MonitorConst):
         )
         try:
             response = await get_article_detail(url, is_cache=False)
+            print(response)
             response_code = response["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 error_detail = response.get("msg")