""" @author: luojunhui """ from quart import Blueprint, jsonify, request from applications.functions import ParamProcess from applications.upload import * video_score_blueprint = Blueprint('light_gbm_score', __name__) @video_score_blueprint.route('/healthcheck') async def hello(): """ Hello World Test :return: """ return jsonify({'message': 'Hello, World!'}) @video_score_blueprint.route('/lightgbm_score', methods=['POST']) async def post_data(): """ 请求接口代码 :return: """ p = ParamProcess() data = await request.get_json() processed_data = await p.process(data) return jsonify(processed_data) @video_score_blueprint.route('/etl', methods=['POST']) async def download_upload_publish(): """ ETL Python 接口 :return: """ data = await request.get_json() video_id = data['video_id'] video_url = data['video_url'] video_title = data['video_title'] result = await upload_to_oss(video_id=video_id, video_url=video_url) if result: upload_to_pq(oss_object_key=result, title=video_title) return jsonify({"status": "successfully uploaded"}) else: return jsonify({"status": "failed"})