from __future__ import annotations from pydantic import ValidationError from quart import Blueprint, jsonify from app.api.service import TaskManager, TaskScheduler from app.api.v1.utils import ApiDependencies from app.api.v1.utils import RunTaskRequest, TaskListRequest from app.api.v1.utils import parse_json, validation_error_response from app.infra.shared.tools import generate_task_trace_id def create_tasks_bp(deps: ApiDependencies) -> Blueprint: bp = Blueprint("tasks", __name__) @bp.route("/run_task", methods=["POST"]) async def run_task(): trace_id = generate_task_trace_id() try: _, body = await parse_json(RunTaskRequest) except ValidationError as e: payload, status = validation_error_response(e) return jsonify(payload), status scheduler = TaskScheduler(body, deps.log, deps.db, trace_id, deps.config) result = await scheduler.deal() return jsonify(result) @bp.route("/tasks", methods=["POST"]) async def list_tasks(): try: _, body = await parse_json(TaskListRequest) except ValidationError as e: payload, status = validation_error_response(e) return jsonify(payload), status manager = TaskManager(pool=deps.db, data=body, config=deps.config) result = await manager.list_tasks() return jsonify(result) return bp