generate_text_from_video.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import traceback
  7. import requests
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications.api import GoogleAIAPI
  11. from applications.const import VideoToTextConst
  12. from applications.db import DatabaseConnector
  13. from config import long_articles_config
  14. from config import apolloConfig
  15. # 办公室网络调试需要打开代理
  16. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  17. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  18. const = VideoToTextConst()
  19. config = apolloConfig(env="prod")
  20. # pool_size
  21. POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  22. # batch_size
  23. BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  24. def download_file(pq_vid, video_url):
  25. """
  26. 下载视频文件
  27. """
  28. file_name = "static/{}.mp4".format(pq_vid)
  29. if os.path.exists(file_name):
  30. return file_name
  31. proxies = {
  32. "http": None,
  33. "https": None
  34. }
  35. with open(file_name, 'wb') as f:
  36. response = requests.get(video_url, proxies=proxies)
  37. f.write(response.content)
  38. return file_name
  39. class GenerateTextFromVideo(object):
  40. """
  41. 从视频中生成文本
  42. """
  43. def __init__(self):
  44. self.google_ai_api = GoogleAIAPI()
  45. self.db = DatabaseConnector(db_config=long_articles_config)
  46. def connect_db(self):
  47. """
  48. 连接数据库
  49. """
  50. self.db.connect()
  51. def input_task_list(self):
  52. """
  53. 暂时用于处理历史任务, 新的视频在插入publish_single_video_source后会直接插入video_content_understanding表中
  54. """
  55. sql = f"""
  56. select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
  57. from publish_single_video_source
  58. where audit_status = {const.AUDIT_SUCCESS_STATUS} and bad_status = {const.ARTICLE_GOOD_STATUS}
  59. order by id desc;
  60. """
  61. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  62. insert_sql = f"""
  63. insert ignore into video_content_understanding
  64. (pq_vid, video_ori_title, video_oss_path)
  65. values (%s, %s, %s);
  66. """
  67. affected_rows = self.db.save_many(
  68. insert_sql,
  69. params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list]
  70. )
  71. print(affected_rows)
  72. def roll_back_lock_tasks(self):
  73. """
  74. 回滚长时间处于处理中的任务
  75. """
  76. update_sql = f"""
  77. update video_content_understanding
  78. set status = %s
  79. where status = %s and status_update_timestamp < %s;
  80. """
  81. roll_back_rows = self.db.save(
  82. query=update_sql,
  83. params=(
  84. const.VIDEO_UNDERSTAND_INIT_STATUS,
  85. const.VIDEO_LOCK,
  86. int(time.time()) - const.MAX_PROCESSING_TIME
  87. )
  88. )
  89. return roll_back_rows
  90. def update_video_status(self, ori_status, new_status, pq_vid):
  91. """
  92. 更新视频状态
  93. """
  94. sql = f"""
  95. update video_content_understanding
  96. set status = %s, status_update_timestamp = %s
  97. WHERE pq_vid = %s and status = %s;
  98. """
  99. affected_rows = self.db.save(
  100. query=sql,
  101. params=(new_status, int(time.time()), pq_vid, ori_status)
  102. )
  103. return affected_rows
  104. def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
  105. """
  106. 上传视频到Google AI
  107. max_processing_video_count: 处理中的最大视频数量,默认20
  108. video_content_understanding 表status字段
  109. 0: 未处理
  110. 1: 处理中
  111. 2: 处理完成
  112. """
  113. # 查询出在视频处于PROCESSING状态的视频数量
  114. select_sql = f"""
  115. select count(1) as processing_count
  116. from video_content_understanding
  117. where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS};
  118. """
  119. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  120. rest_video_count = max_processing_video_count - count
  121. success_upload_count = 0
  122. if rest_video_count:
  123. sql = f"""
  124. select pq_vid, video_oss_path
  125. from video_content_understanding
  126. where status = {const.VIDEO_UNDERSTAND_INIT_STATUS}
  127. order by id desc
  128. limit {rest_video_count};
  129. """
  130. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  131. for task in tqdm(task_list, desc="upload_video_task"):
  132. lock_rows = self.update_video_status(
  133. ori_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
  134. new_status=const.VIDEO_LOCK,
  135. pq_vid=task['pq_vid'],
  136. )
  137. if not lock_rows:
  138. continue
  139. try:
  140. file_path = download_file(task['pq_vid'], task['video_oss_path'])
  141. google_upload_result = self.google_ai_api.upload_file(file_path)
  142. if google_upload_result:
  143. file_name, file_state, expire_time = google_upload_result
  144. update_sql = f"""
  145. update video_content_understanding
  146. set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
  147. where pq_vid = %s and status = %s;
  148. """
  149. self.db.save(
  150. update_sql,
  151. params=(
  152. const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
  153. file_name,
  154. file_state,
  155. expire_time,
  156. task['pq_vid'],
  157. const.VIDEO_LOCK
  158. )
  159. )
  160. success_upload_count += 1
  161. except Exception as e:
  162. print("task upload failed because of {}".format(e))
  163. print("trace_back: ", traceback.format_exc())
  164. # roll back status
  165. self.update_video_status(
  166. ori_status=const.VIDEO_LOCK,
  167. new_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
  168. pq_vid=task['pq_vid'],
  169. )
  170. return success_upload_count
  171. def delete_video_from_google(self, file_name):
  172. """
  173. 删除视频文件
  174. """
  175. self.google_ai_api.delete_video(file_name)
  176. def get_task_list(self):
  177. """
  178. 获取处理视频转文本任务
  179. """
  180. sql = f"""
  181. select pq_vid, file_name
  182. from video_content_understanding
  183. where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS}
  184. order by file_expire_time
  185. limit {BATCH_SIZE};
  186. """
  187. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  188. return task_list
  189. def convert_video_to_text_with_google_ai(self):
  190. """
  191. 处理视频转文本任务
  192. """
  193. self.roll_back_lock_tasks()
  194. task_list = self.get_task_list()
  195. while task_list:
  196. for task in tqdm(task_list, desc="convert video to text"):
  197. print(task['pq_vid'], task['file_name'])
  198. # LOCK TASK
  199. lock_row = self.update_video_status(
  200. ori_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
  201. new_status=const.VIDEO_LOCK,
  202. pq_vid=task['pq_vid'],
  203. )
  204. if not lock_row:
  205. print("Lock")
  206. continue
  207. file_name = task['file_name']
  208. video_local_path = "static/{}.mp4".format(task['pq_vid'])
  209. google_file = self.google_ai_api.get_google_file(file_name)
  210. state = google_file.state.name
  211. match state:
  212. case 'ACTIVE':
  213. try:
  214. video_text = self.google_ai_api.get_video_text(
  215. prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。",
  216. video_file=google_file
  217. )
  218. if video_text:
  219. update_sql = f"""
  220. update video_content_understanding
  221. set status = %s, video_text = %s, file_state = %s
  222. where pq_vid = %s and status = %s;
  223. """
  224. self.db.save(
  225. update_sql,
  226. params=(
  227. const.VIDEO_UNDERSTAND_SUCCESS_STATUS,
  228. video_text,
  229. state,
  230. task['pq_vid'],
  231. const.VIDEO_LOCK
  232. )
  233. )
  234. # delete local file and google file
  235. if os.path.exists(video_local_path):
  236. os.remove(video_local_path)
  237. tqdm.write("video transform to text success, delete local file")
  238. task_list.remove(task)
  239. self.google_ai_api.delete_video(file_name)
  240. tqdm.write("delete video from google success: {}".format(file_name))
  241. else:
  242. # roll back status
  243. self.update_video_status(
  244. ori_status=const.VIDEO_LOCK,
  245. new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
  246. pq_vid=task['pq_vid'],
  247. )
  248. except Exception as e:
  249. # roll back status
  250. self.update_video_status(
  251. ori_status=const.VIDEO_LOCK,
  252. new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
  253. pq_vid=task['pq_vid'],
  254. )
  255. tqdm.write(str(e))
  256. continue
  257. case 'PROCESSING':
  258. tqdm.write("video is still processing")
  259. # roll back status
  260. self.update_video_status(
  261. ori_status=const.VIDEO_LOCK,
  262. new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
  263. pq_vid=task['pq_vid'],
  264. )
  265. case 'FAILED':
  266. self.update_video_status(
  267. ori_status=const.VIDEO_LOCK,
  268. new_status=const.VIDEO_UNDERSTAND_FAIL_STATUS,
  269. pq_vid=task['pq_vid']
  270. )
  271. if os.path.exists(video_local_path):
  272. os.remove(video_local_path)
  273. self.google_ai_api.delete_video(file_name)
  274. task_list.remove(task)
  275. tqdm.write("video process failed, delete local file")
  276. time.sleep(const.SLEEP_SECONDS)
  277. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  278. time.sleep(const.SLEEP_SECONDS)