from __future__ import annotations from pydantic import ValidationError from quart import Blueprint, jsonify from app.api.service import TaskManager, TaskScheduler from app.api.v1.utils import ApiDependencies from app.api.v1.utils import RunTaskRequest, TaskListRequest, CancelTaskRequest from app.api.v1.utils import parse_json, validation_error_response from app.infra.shared.tools import generate_task_trace_id def create_tasks_bp(deps: ApiDependencies) -> Blueprint: bp = Blueprint("tasks", __name__) @bp.route("/run_task", methods=["POST"]) async def run_task(): trace_id = generate_task_trace_id() try: _, body = await parse_json(RunTaskRequest) except ValidationError as e: payload, status = validation_error_response(e) return jsonify(payload), status scheduler = TaskScheduler( body, deps.log, deps.db, trace_id, deps.config, deps.task_registry ) result = await scheduler.deal() return jsonify(result) @bp.route("/tasks", methods=["POST"]) async def list_tasks(): try: _, body = await parse_json(TaskListRequest) except ValidationError as e: payload, status = validation_error_response(e) return jsonify(payload), status manager = TaskManager(pool=deps.db, data=body, config=deps.config) result = await manager.list_tasks() return jsonify(result) @bp.route("/cancel_task", methods=["POST"]) async def cancel_task(): try: _, body = await parse_json(CancelTaskRequest) except ValidationError as e: payload, status = validation_error_response(e) return jsonify(payload), status trace_id = body["trace_id"] scheduler = TaskScheduler( body, deps.log, deps.db, trace_id, deps.config, deps.task_registry ) cancelled = await scheduler.cancel_task(trace_id) return jsonify({ "code": 0 if cancelled else 4004, "message": "Task cancelled" if cancelled else "Task not found or already finished", "trace_id": trace_id, }) @bp.route("/running_tasks", methods=["GET"]) async def running_tasks(): tasks = await deps.task_registry.get_running_tasks() return jsonify({"code": 0, "data": tasks, "total": len(tasks)}) return bp