generate_text_from_video.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import os
  6. import time
  7. import datetime
  8. import traceback
  9. import requests
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications.api import GoogleAIAPI
  13. from applications.const import VideoToTextConst
  14. from applications.db import DatabaseConnector
  15. from config import long_articles_config
  16. from config import apolloConfig
  17. # 办公室网络调试需要打开代理
  18. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  19. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  20. const = VideoToTextConst()
  21. # config = apolloConfig(env="prod")
  22. # pool_size
  23. # POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  24. POOL_SIZE = 20
  25. # batch_size
  26. # BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  27. BATCH_SIZE = 10
  28. def download_file(task_id, oss_path):
  29. """
  30. 下载视频文件
  31. """
  32. video_url = "https://rescdn.yishihui.com/" + oss_path
  33. file_name = "static/{}.mp4".format(task_id)
  34. if os.path.exists(file_name):
  35. return file_name
  36. proxies = {
  37. "http": None,
  38. "https": None
  39. }
  40. with open(file_name, 'wb') as f:
  41. response = requests.get(video_url, proxies=proxies)
  42. f.write(response.content)
  43. return file_name
  44. class GenerateTextFromVideo(object):
  45. """
  46. 从视频中生成文本
  47. """
  48. def __init__(self):
  49. self.google_ai_api = GoogleAIAPI()
  50. self.db = DatabaseConnector(db_config=long_articles_config)
  51. def connect_db(self):
  52. """
  53. 连接数据库
  54. """
  55. self.db.connect()
  56. def update_task_status(self, task_id, process, ori_status, new_status):
  57. """
  58. 回滚长时间处于处理中的任务
  59. """
  60. match process:
  61. case "upload":
  62. status = 'upload_status'
  63. update_timestamp = 'upload_status_ts'
  64. case "understanding":
  65. status = 'understanding_status'
  66. update_timestamp = 'understanding_status_ts'
  67. case "summary":
  68. status = 'summary_status'
  69. update_timestamp = 'summary_status_ts'
  70. case "rewrite":
  71. status = 'rewrite_status'
  72. update_timestamp = 'rewrite_status_ts'
  73. case _:
  74. raise ValueError(f"Unexpected task: {process}")
  75. update_sql = f"""
  76. update video_content_understanding
  77. set {status} = %s, {update_timestamp} = %s
  78. where {status} = %s and id = %s;
  79. """
  80. roll_back_rows = self.db.save(
  81. query=update_sql,
  82. params=(
  83. new_status,
  84. datetime.datetime.now(),
  85. ori_status,
  86. task_id,
  87. )
  88. )
  89. return roll_back_rows
  90. def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
  91. """
  92. 上传视频到Google AI
  93. max_processing_video_count: 处理中的最大视频数量,默认20
  94. video_content_understanding 表status字段
  95. 0: 未处理
  96. 1: 处理中
  97. 2: 处理完成
  98. """
  99. select_sql = f"""
  100. select count(1) as processing_count
  101. from video_content_understanding
  102. where understanding_status = {const.PROCESSING_STATUS};
  103. """
  104. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  105. rest_video_count = max_processing_video_count - count
  106. success_upload_count = 0
  107. if rest_video_count:
  108. sql = f"""
  109. select t1.id, t1.video_oss_path
  110. from video_content_understanding t1
  111. join publish_single_video_source t2
  112. on t1.content_trace_id = t2.content_trace_id
  113. where t1.upload_status = {const.INIT_STATUS}
  114. and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS}
  115. and t2.bad_status = 0
  116. order by t2.flow_pool_level
  117. limit {rest_video_count};
  118. """
  119. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  120. for task in tqdm(task_list, desc="upload_video_task"):
  121. lock_rows = self.update_task_status(
  122. task_id=task['id'],
  123. process='upload',
  124. ori_status=const.INIT_STATUS,
  125. new_status=const.PROCESSING_STATUS
  126. )
  127. if not lock_rows:
  128. continue
  129. try:
  130. file_path = download_file(task['id'], task['video_oss_path'])
  131. google_upload_result = self.google_ai_api.upload_file(file_path)
  132. if google_upload_result:
  133. file_name, file_state, expire_time = google_upload_result
  134. update_sql = f"""
  135. update video_content_understanding
  136. set
  137. upload_status = %s,
  138. upload_status_ts = %s,
  139. file_name = %s,
  140. file_state = %s,
  141. file_expire_time = %s
  142. where id = %s and upload_status = %s;
  143. """
  144. self.db.save(
  145. update_sql,
  146. params=(
  147. const.SUCCESS_STATUS,
  148. datetime.datetime.now(),
  149. file_name,
  150. file_state,
  151. expire_time,
  152. task['id'],
  153. const.PROCESSING_STATUS
  154. )
  155. )
  156. success_upload_count += 1
  157. except Exception as e:
  158. print("task upload failed because of {}".format(e))
  159. print("trace_back: ", traceback.format_exc())
  160. # roll back status
  161. self.update_task_status(
  162. task_id=task['id'],
  163. process='upload',
  164. ori_status=const.PROCESSING_STATUS,
  165. new_status=const.FAIL_STATUS
  166. )
  167. return success_upload_count
  168. def delete_video_from_google(self, file_name):
  169. """
  170. 删除视频文件
  171. """
  172. self.google_ai_api.delete_video(file_name)
  173. def get_task_list(self):
  174. """
  175. 获取处理视频转文本任务
  176. """
  177. sql = f"""
  178. select id, file_name, video_ori_title
  179. from video_content_understanding
  180. where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
  181. order by file_expire_time
  182. limit {BATCH_SIZE};
  183. """
  184. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  185. return task_list
  186. def convert_video_to_text_with_google_ai(self):
  187. """
  188. 处理视频转文本任务
  189. """
  190. task_list = self.get_task_list()
  191. while task_list:
  192. for task in tqdm(task_list, desc="convert video to text"):
  193. # LOCK TASK
  194. lock_row = self.update_task_status(
  195. task_id=task['id'],
  196. process='understanding',
  197. ori_status=const.INIT_STATUS,
  198. new_status=const.PROCESSING_STATUS
  199. )
  200. if not lock_row:
  201. print("Lock")
  202. continue
  203. file_name = task['file_name']
  204. video_local_path = "static/{}.mp4".format(task['id'])
  205. google_file = self.google_ai_api.get_google_file(file_name)
  206. state = google_file.state.name
  207. match state:
  208. case 'ACTIVE':
  209. try:
  210. video_text = self.google_ai_api.get_video_text(
  211. prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
  212. video_file=google_file
  213. )
  214. if video_text:
  215. print(type(video_text))
  216. print(video_text)
  217. update_sql = f"""
  218. update video_content_understanding
  219. set understanding_status = %s, video_text = %s, file_state = %s
  220. where id = %s and understanding_status = %s;
  221. """
  222. self.db.save(
  223. update_sql,
  224. params=(
  225. const.SUCCESS_STATUS,
  226. video_text,
  227. state,
  228. task['id'],
  229. const.PROCESSING_STATUS
  230. )
  231. )
  232. # # delete local file and google file
  233. # if os.path.exists(video_local_path):
  234. # os.remove(video_local_path)
  235. #
  236. # tqdm.write("video transform to text success, delete local file")
  237. task_list.remove(task)
  238. #
  239. # self.google_ai_api.delete_video(file_name)
  240. tqdm.write("delete video from google success: {}".format(file_name))
  241. else:
  242. # roll back status
  243. self.update_task_status(
  244. task_id=task['id'],
  245. process='understanding',
  246. ori_status=const.PROCESSING_STATUS,
  247. new_status=const.INIT_STATUS
  248. )
  249. except Exception as e:
  250. # roll back status
  251. self.update_task_status(
  252. task_id=task['id'],
  253. process='understanding',
  254. ori_status=const.PROCESSING_STATUS,
  255. new_status=const.FAIL_STATUS
  256. )
  257. tqdm.write(str(e))
  258. continue
  259. case 'PROCESSING':
  260. tqdm.write("video is still processing")
  261. case 'FAILED':
  262. self.update_task_status(
  263. task_id=task['id'],
  264. process='understanding',
  265. ori_status=const.PROCESSING_STATUS,
  266. new_status=const.FAIL_STATUS
  267. )
  268. if os.path.exists(video_local_path):
  269. os.remove(video_local_path)
  270. self.google_ai_api.delete_video(file_name)
  271. task_list.remove(task)
  272. tqdm.write("video process failed, delete local file")
  273. time.sleep(const.SLEEP_SECONDS)
  274. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  275. time.sleep(const.SLEEP_SECONDS)