""" @author: luojunhui """ import time import uuid import asyncio from quart import Blueprint, jsonify, request from deal import RequestDeal, insert_text_mysql, get_text_by_id from applications.functions import whisper bp = Blueprint('VideosToArticle', __name__) def VTARoutes(mysql_client): """ :param mysql_client: :return: """ @bp.route('/videos', methods=["POST"]) async def find_videos(): """ 更具接口获取视频信息 :return: """ params = await request.get_json() RD = RequestDeal(params, mysql_client) result = await RD.deal() return jsonify(result) @bp.route('/whisper', methods=["POST"]) async def video_extracting(): """ whisper 处理文本 :return: """ params = await request.get_json() video_id = params['vid'] video_title = params['title'] try: response = whisper(video_id) await insert_text_mysql(mysql_client, video_id, response['text'], video_title) result = {"info": "success insert text into mysql", "vid": video_id} except Exception as e: result = {"error": str(e), "vid": video_id} return jsonify(result) @bp.route('/get_text', methods=["POST"]) async def get_video_text(): """ 获取视频文本 :return: """ params = await request.get_json() video_id = params['vid'] text = await get_text_by_id(mysql_client, video_id) if text: result = {"text": text} else: result = {"text": None} return jsonify(result) @bp.route('/publish', methods=["POST"]) async def auto_publish(): """ auto publish article info to aigc system :return: """ res = { "info": "this api is developing" } return jsonify(res) return bp