vta_routes.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import uuid
  6. import asyncio
  7. from quart import Blueprint, jsonify, request
  8. from deal import RequestDeal, insert_text_mysql, get_text_by_id
  9. from applications.functions import whisper
  10. bp = Blueprint('VideosToArticle', __name__)
  11. def VTARoutes(mysql_client):
  12. """
  13. :param mysql_client:
  14. :return:
  15. """
  16. @bp.route('/videos', methods=["POST"])
  17. async def find_videos():
  18. """
  19. 更具接口获取视频信息
  20. :return:
  21. """
  22. params = await request.get_json()
  23. RD = RequestDeal(params, mysql_client)
  24. result = await RD.deal()
  25. return jsonify(result)
  26. @bp.route('/whisper', methods=["POST"])
  27. async def video_extracting():
  28. """
  29. whisper 处理文本
  30. :return:
  31. """
  32. params = await request.get_json()
  33. video_id = params['vid']
  34. video_title = params['title']
  35. try:
  36. response = whisper(video_id)
  37. await insert_text_mysql(mysql_client, video_id, response['text'], video_title)
  38. result = {"info": "success insert text into mysql", "vid": video_id}
  39. except Exception as e:
  40. result = {"error": str(e), "vid": video_id}
  41. return jsonify(result)
  42. @bp.route('/get_text', methods=["POST"])
  43. async def get_video_text():
  44. """
  45. 获取视频文本
  46. :return:
  47. """
  48. params = await request.get_json()
  49. video_id = params['vid']
  50. text = await get_text_by_id(mysql_client, video_id)
  51. if text:
  52. result = {"text": text}
  53. else:
  54. result = {"text": None}
  55. return jsonify(result)
  56. @bp.route('/publish', methods=["POST"])
  57. async def auto_publish():
  58. """
  59. auto publish article info to aigc system
  60. :return:
  61. """
  62. res = {
  63. "info": "this api is developing"
  64. }
  65. return jsonify(res)
  66. return bp