| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- from __future__ import annotations
- from pydantic import ValidationError
- from quart import Blueprint, jsonify
- from app.api.service import TaskManager, TaskScheduler
- from app.api.v1.utils import ApiDependencies
- from app.api.v1.utils import RunTaskRequest, TaskListRequest, CancelTaskRequest
- from app.api.v1.utils import parse_json, validation_error_response
- from app.infra.shared.tools import generate_task_trace_id
- def create_tasks_bp(deps: ApiDependencies) -> Blueprint:
- bp = Blueprint("tasks", __name__)
- @bp.route("/run_task", methods=["POST"])
- async def run_task():
- trace_id = generate_task_trace_id()
- try:
- _, body = await parse_json(RunTaskRequest)
- except ValidationError as e:
- payload, status = validation_error_response(e)
- return jsonify(payload), status
- scheduler = TaskScheduler(
- body, deps.log, deps.db, trace_id, deps.config, deps.task_registry
- )
- result = await scheduler.deal()
- return jsonify(result)
- @bp.route("/tasks", methods=["POST"])
- async def list_tasks():
- try:
- _, body = await parse_json(TaskListRequest)
- except ValidationError as e:
- payload, status = validation_error_response(e)
- return jsonify(payload), status
- manager = TaskManager(pool=deps.db, data=body, config=deps.config)
- result = await manager.list_tasks()
- return jsonify(result)
- @bp.route("/cancel_task", methods=["POST"])
- async def cancel_task():
- try:
- _, body = await parse_json(CancelTaskRequest)
- except ValidationError as e:
- payload, status = validation_error_response(e)
- return jsonify(payload), status
- trace_id = body["trace_id"]
- scheduler = TaskScheduler(
- body, deps.log, deps.db, trace_id, deps.config, deps.task_registry
- )
- cancelled = await scheduler.cancel_task(trace_id)
- return jsonify({
- "code": 0 if cancelled else 4004,
- "message": "Task cancelled" if cancelled else "Task not found or already finished",
- "trace_id": trace_id,
- })
- @bp.route("/running_tasks", methods=["GET"])
- async def running_tasks():
- tasks = await deps.task_registry.get_running_tasks()
- return jsonify({"code": 0, "data": tasks, "total": len(tasks)})
- return bp
|