blueprint.py 1.0 KB

12345678910111213141516171819202122232425262728293031
  1. from quart import Blueprint, jsonify, request
  2. from applications.ab_test import GetCoverService
  3. from applications.utils import generate_task_trace_id
  4. from applications.tasks import TaskScheduler
  5. server_blueprint = Blueprint("api", __name__, url_prefix="/api")
  6. def server_routes(pools, log_service):
  7. @server_blueprint.route("/get_cover", methods=["POST"])
  8. async def get_cover():
  9. params = await request.get_json()
  10. task = GetCoverService(pools, params)
  11. return jsonify(await task.deal())
  12. @server_blueprint.route("/run_task", methods=["POST"])
  13. async def run_task():
  14. trace_id = generate_task_trace_id()
  15. data = await request.get_json()
  16. task_scheduler = TaskScheduler(data, log_service, pools, trace_id)
  17. response = await task_scheduler.deal()
  18. return jsonify(response)
  19. @server_blueprint.route("/health", methods=["GET"])
  20. async def hello_world():
  21. # data = await request.get_json()
  22. return jsonify({"message": "hello world"})
  23. return server_blueprint