generate_text_from_video.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import requests
  7. from pymysql.cursors import DictCursor
  8. from tqdm import tqdm
  9. from applications.api import GoogleAIAPI
  10. from applications.db import DatabaseConnector
  11. from config import long_articles_config
  12. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  13. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  14. PROCESSING_MAX_VIDEO_COUNT = 10
  15. def download_file(pq_vid, video_url):
  16. """
  17. 下载视频文件
  18. """
  19. file_name = "static/{}.mp4".format(pq_vid)
  20. if os.path.exists(file_name):
  21. return file_name
  22. proxies = {
  23. "http": None,
  24. "https": None
  25. }
  26. with open(file_name, 'wb') as f:
  27. response = requests.get(video_url, proxies=proxies)
  28. f.write(response.content)
  29. return file_name
  30. class GenerateTextFromVideo(object):
  31. """
  32. 从视频中生成文本
  33. """
  34. def __init__(self):
  35. self.google_ai_api = GoogleAIAPI()
  36. self.db = DatabaseConnector(db_config=long_articles_config)
  37. def connect_db(self):
  38. """
  39. 连接数据库
  40. """
  41. self.db.connect()
  42. def input_task_list(self):
  43. """
  44. 输入任务列表, 从single_video_pool中获取
  45. """
  46. sql = f"""
  47. select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
  48. from publish_single_video_source
  49. where audit_status = 1 and bad_status = 0 and extract_status = 0
  50. order by id desc;
  51. """
  52. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  53. insert_sql = f"""
  54. insert ignore into video_content_understanding
  55. (pq_vid, video_ori_title, video_oss_path)
  56. values (%s, %s, %s)
  57. """
  58. affected_rows = self.db.save_many(
  59. insert_sql,
  60. params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list]
  61. )
  62. print(affected_rows)
  63. def upload_video_to_google_ai(self):
  64. """
  65. 上传视频到Google AI
  66. """
  67. # 查询出在视频处于PROCESSING状态的视频数量
  68. select_sql = "select count(1) as processing_count from video_content_understanding where status = 1;"
  69. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  70. rest_video_count = PROCESSING_MAX_VIDEO_COUNT - count
  71. success_upload_count = 0
  72. if rest_video_count:
  73. sql = f"""select pq_vid, video_oss_path from video_content_understanding where status = 0 limit {rest_video_count};"""
  74. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  75. for task in tqdm(task_list, desc="upload_video_task"):
  76. file_path = download_file(task['pq_vid'], task['video_oss_path'])
  77. google_upload_result = self.google_ai_api.upload_file(file_path)
  78. if google_upload_result:
  79. file_name, file_state, expire_time = google_upload_result
  80. update_sql = f"""
  81. update video_content_understanding
  82. set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
  83. where pq_vid = %s;
  84. """
  85. self.db.save(
  86. update_sql,
  87. params=(1, file_name, file_state, expire_time, task['pq_vid'])
  88. )
  89. success_upload_count += 1
  90. else:
  91. continue
  92. return success_upload_count
  93. def get_tasks(self):
  94. """
  95. 获取处理视频转文本任务
  96. """
  97. sql = "select pq_vid, file_name from video_content_understanding where status = 1 order by file_expire_time limit 5;"
  98. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  99. return task_list
  100. def convert_video_to_text_with_google_ai(self):
  101. """
  102. 处理视频转文本任务
  103. """
  104. task_list = self.get_tasks()
  105. while task_list:
  106. for task in tqdm(task_list, desc="convert video to text"):
  107. file_name = task['file_name']
  108. google_file = self.google_ai_api.get_google_file(file_name)
  109. state = google_file.state.name
  110. match state:
  111. case 'ACTIVE':
  112. try:
  113. video_text = self.google_ai_api.get_video_text(
  114. prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。",
  115. video_file=google_file
  116. )
  117. if video_text:
  118. update_sql = f"""
  119. update video_content_understanding
  120. set status = %s, video_text = %s, file_state = %s
  121. where pq_vid = %s;
  122. """
  123. self.db.save(
  124. update_sql,
  125. params=(2, video_text, state, task['pq_vid'])
  126. )
  127. os.remove("static/{}.mp4".format(task['pq_vid']))
  128. tqdm.write("video transform to text success, delete local file, sleep 1 min...")
  129. task_list.remove(task)
  130. except Exception as e:
  131. tqdm.write(str(e))
  132. continue
  133. case 'PROCESSING':
  134. tqdm.write("video is still processing")
  135. continue
  136. case 'FAILED':
  137. tqdm.write("video process failed")
  138. continue
  139. time.sleep(10)
  140. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  141. time.sleep(60)