from quart import Blueprint, jsonify, request from applications.ab_test import GetCoverService from applications.utils import generate_task_trace_id from applications.tasks import TaskScheduler, TaskManager server_blueprint = Blueprint("api", __name__, url_prefix="/api") task_manager_pool = TaskManager() print("协程管理池初始化成功") def server_routes(pools, log_service): @server_blueprint.route("/get_cover", methods=["POST"]) async def get_cover(): params = await request.get_json() task = GetCoverService(pools, params) return jsonify(await task.deal()) @server_blueprint.route("/run_task", methods=["POST"]) async def run_task(): trace_id = generate_task_trace_id() data = await request.get_json() task_scheduler = TaskScheduler(task_manager_pool, data, log_service, pools, trace_id) response = await task_scheduler.deal() return jsonify(response) @server_blueprint.route("/health", methods=["GET"]) async def hello_world(): # data = await request.get_json() return jsonify({"message": "hello world"}) @server_blueprint.route("/list_tasks", methods=["GET"]) async def list_tasks(): return jsonify(task_manager_pool.list()) @server_blueprint.route("/cancel_task", methods=["POST"]) async def cancel_task(): data = await request.get_json() task_id = data["task_id"] flag = await task_manager_pool.cancel(task_id) message = f"Task {task_id} canceled {'successfully' if flag else 'failed'}" return jsonify({ "message": message }) return server_blueprint