浏览代码

新增置信区间下限指标

luojunhui 1 月之前
父节点
当前提交
e503cc660b
共有 4 个文件被更改,包括 5 次插入75 次删除
  1. 1 2
      applications/tasks/__init__.py
  2. 0 51
      applications/tasks/task_manager.py
  3. 2 4
      applications/tasks/task_scheduler.py
  4. 2 18
      routes/blueprint.py

+ 1 - 2
applications/tasks/__init__.py

@@ -1,2 +1 @@
-from .task_scheduler import TaskScheduler
-from .task_manager import TaskManager
+from .task_scheduler import TaskScheduler

+ 0 - 51
applications/tasks/task_manager.py

@@ -1,51 +0,0 @@
-# task_manager.py
-import asyncio
-from datetime import datetime
-from typing import Optional
-
-class TaskManager:
-    """跨请求共享的任务池:trace_id> Task + 元数据"""
-    def __init__(self):
-        self._tasks: dict[str, asyncio.Task] = {}
-        self._meta:  dict[str, dict]        = {}
-
-    def register(self, task_id: str, task: asyncio.Task, task_name: str):
-        self._tasks[task_id] = task
-        self._meta[task_id]  = {
-            "name":      task_name,
-            "started":   datetime.now().isoformat(timespec="seconds"),
-        }
-
-    def list(self):
-        """返回所有活跃任务及其状态"""
-        return [
-            {
-                "task_id":   tid,
-                "name":      self._meta[tid]["name"],
-                "started":   self._meta[tid]["started"],
-                "done":      t.done(),
-                "cancelled": t.cancelled(),
-            }
-            for tid, t in self._tasks.items()
-        ]
-
-    async def cancel(self, task_id: str) -> bool:
-        t: Optional[asyncio.Task] = self._tasks.get(task_id)
-        if not t:           # 已经结束或 id 写错
-            return True
-        if t.done():
-            self._cleanup(task_id)
-            return True
-
-        t.cancel()
-        try:
-            await t        # 等协程跑完 finally
-        except asyncio.CancelledError:
-            pass
-        finally:
-            self._cleanup(task_id)
-        return True
-
-    def _cleanup(self, task_id: str):
-        self._tasks.pop(task_id, None)
-        self._meta.pop(task_id,  None)

+ 2 - 4
applications/tasks/task_scheduler.py

@@ -14,14 +14,13 @@ class TaskScheduler(TaskHandler):
     """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
 
     # ---------- 初始化 ----------
-    def __init__(self, task_manager, data, log_service, db_client, trace_id):
+    def __init__(self, data, log_service, db_client, trace_id):
         super().__init__(data, log_service, db_client, trace_id)
         self.data = data
         self.log_client = log_service
         self.db_client = db_client
         self.table = "long_articles_task_manager"
         self.trace_id = trace_id
-        self.task_manager = task_manager
 
     # ---------- 公共数据库工具 ----------
     async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
@@ -138,8 +137,7 @@ class TaskScheduler(TaskHandler):
             finally:
                 await self._release_task(status)
 
-        task: asyncio.Task = asyncio.create_task(_wrapper(), name=task_name)
-        self.task_manager.register(task_id=self.trace_id, task=task, task_name=task_name)
+        asyncio.create_task(_wrapper(), name=task_name)
         return await task_schedule_response.success_response(
             task_name=task_name,
             data={"code": 0, "message": "task started", "trace_id": self.trace_id},

+ 2 - 18
routes/blueprint.py

@@ -2,11 +2,9 @@ from quart import Blueprint, jsonify, request
 from applications.ab_test import GetCoverService
 from applications.utils import generate_task_trace_id
 
-from applications.tasks import TaskScheduler, TaskManager
+from applications.tasks import TaskScheduler
 
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")
-task_manager_pool = TaskManager()
-print("协程管理池初始化成功")
 
 
 def server_routes(pools, log_service):
@@ -21,7 +19,7 @@ def server_routes(pools, log_service):
     async def run_task():
         trace_id = generate_task_trace_id()
         data = await request.get_json()
-        task_scheduler = TaskScheduler(task_manager_pool, data, log_service, pools, trace_id)
+        task_scheduler = TaskScheduler(data, log_service, pools, trace_id)
         response = await task_scheduler.deal()
         return jsonify(response)
 
@@ -30,18 +28,4 @@ def server_routes(pools, log_service):
         # data = await request.get_json()
         return jsonify({"message": "hello world"})
 
-    @server_blueprint.route("/list_tasks", methods=["GET"])
-    async def list_tasks():
-        return jsonify(task_manager_pool.list())
-
-    @server_blueprint.route("/cancel_task", methods=["POST"])
-    async def cancel_task():
-        data = await request.get_json()
-        task_id = data["task_id"]
-        flag = await task_manager_pool.cancel(task_id)
-        message = f"Task {task_id} canceled {'successfully' if flag else 'failed'}"
-        return jsonify({
-            "message": message
-        })
-
     return server_blueprint