tasks.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import annotations
  2. from pydantic import ValidationError
  3. from quart import Blueprint, jsonify, current_app
  4. from app.api.service import TaskManager, TaskScheduler
  5. from app.api.v1.utils import ApiDependencies
  6. from app.api.v1.utils import RunTaskRequest, TaskListRequest, CancelTaskRequest
  7. from app.api.v1.utils import parse_json, validation_error_response
  8. from app.infra.shared.tools import generate_task_trace_id
  9. def create_tasks_bp(deps: ApiDependencies) -> Blueprint:
  10. bp = Blueprint("tasks", __name__)
  11. @bp.route("/run_task", methods=["POST"])
  12. async def run_task():
  13. # 检查是否接受新任务
  14. if not current_app.config.get("ACCEPTING_TASKS", True):
  15. return jsonify({"code": 5003, "message": "Server is shutting down"}), 503
  16. trace_id = generate_task_trace_id()
  17. try:
  18. _, body = await parse_json(RunTaskRequest)
  19. except ValidationError as e:
  20. payload, status = validation_error_response(e)
  21. return jsonify(payload), status
  22. scheduler = TaskScheduler(body, deps.log, deps.db, trace_id, deps.config)
  23. result = await scheduler.deal()
  24. return jsonify(result)
  25. @bp.route("/tasks", methods=["POST"])
  26. async def list_tasks():
  27. try:
  28. _, body = await parse_json(TaskListRequest)
  29. except ValidationError as e:
  30. payload, status = validation_error_response(e)
  31. return jsonify(payload), status
  32. manager = TaskManager(pool=deps.db, data=body, config=deps.config)
  33. result = await manager.list_tasks()
  34. return jsonify(result)
  35. @bp.route("/cancel_task", methods=["POST"])
  36. async def cancel_task():
  37. try:
  38. _, body = await parse_json(CancelTaskRequest)
  39. except ValidationError as e:
  40. payload, status = validation_error_response(e)
  41. return jsonify(payload), status
  42. trace_id = body["trace_id"]
  43. scheduler = TaskScheduler(body, deps.log, deps.db, trace_id, deps.config)
  44. success = await scheduler.cancel_task(trace_id)
  45. return jsonify(
  46. {
  47. "code": 0 if success else 1,
  48. "message": "cancel requested" if success else "task not found or already finished",
  49. "trace_id": trace_id,
  50. }
  51. )
  52. return bp