tasks.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from __future__ import annotations
  2. from pydantic import ValidationError
  3. from quart import Blueprint, jsonify
  4. from app.api.service import TaskManager, TaskScheduler
  5. from app.api.v1.utils import ApiDependencies
  6. from app.api.v1.utils import RunTaskRequest, TaskListRequest, CancelTaskRequest
  7. from app.api.v1.utils import parse_json, validation_error_response
  8. from app.infra.shared.tools import generate_task_trace_id
  9. def create_tasks_bp(deps: ApiDependencies) -> Blueprint:
  10. bp = Blueprint("tasks", __name__)
  11. @bp.route("/run_task", methods=["POST"])
  12. async def run_task():
  13. trace_id = generate_task_trace_id()
  14. try:
  15. _, body = await parse_json(RunTaskRequest)
  16. except ValidationError as e:
  17. payload, status = validation_error_response(e)
  18. return jsonify(payload), status
  19. scheduler = TaskScheduler(
  20. body, deps.log, deps.db, trace_id, deps.config, deps.task_registry
  21. )
  22. result = await scheduler.deal()
  23. return jsonify(result)
  24. @bp.route("/tasks", methods=["POST"])
  25. async def list_tasks():
  26. try:
  27. _, body = await parse_json(TaskListRequest)
  28. except ValidationError as e:
  29. payload, status = validation_error_response(e)
  30. return jsonify(payload), status
  31. manager = TaskManager(pool=deps.db, data=body, config=deps.config)
  32. result = await manager.list_tasks()
  33. return jsonify(result)
  34. @bp.route("/cancel_task", methods=["POST"])
  35. async def cancel_task():
  36. try:
  37. _, body = await parse_json(CancelTaskRequest)
  38. except ValidationError as e:
  39. payload, status = validation_error_response(e)
  40. return jsonify(payload), status
  41. trace_id = body["trace_id"]
  42. scheduler = TaskScheduler(
  43. body, deps.log, deps.db, trace_id, deps.config, deps.task_registry
  44. )
  45. cancelled = await scheduler.cancel_task(trace_id)
  46. return jsonify({
  47. "code": 0 if cancelled else 4004,
  48. "message": "Task cancelled" if cancelled else "Task not found or already finished",
  49. "trace_id": trace_id,
  50. })
  51. @bp.route("/running_tasks", methods=["GET"])
  52. async def running_tasks():
  53. tasks = await deps.task_registry.get_running_tasks()
  54. return jsonify({"code": 0, "data": tasks, "total": len(tasks)})
  55. return bp