generate_text_from_video.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import traceback
  7. import requests
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications.api import GoogleAIAPI
  11. from applications.const import VideoToTextConst
  12. from applications.db import DatabaseConnector
  13. from config import long_articles_config
  14. from config import apolloConfig
  15. # 办公室网络调试需要打开代理
  16. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  17. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  18. const = VideoToTextConst()
  19. config = apolloConfig(env="prod")
  20. # pool_size
  21. POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  22. # batch_size
  23. BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  24. def generate_transforming_prompt(title):
  25. video_transforming_prompt = f"""
  26. 视频的标题是: {title}
  27. 你是一名视频分析专家,你非常精通视频的内容的总结,我会给出你视频及视频的标题,现在请你进行仔细的视频分析,并按照以下要求进行回答
  28. #要求
  29. 1.30个以下的中文字符输出视频的选题,选题应该要达到使人能从选题中理解到视频主要想表达的内容,要包含这个视频的关键性内容和亮点内容,并针对你的选题进行关键信息和亮点的详细描述
  30. 2.用200个以内的中文字符精简的结构性输出视频的主要内容,需要包含该视频描述的核心事件或观点
  31. 3.请严格控制输出的内容能够被正确解析为JSON;
  32. output in JSON format with keys:
  33. "theme": 选题
  34. "summary": 主要内容
  35. 你需要注意
  36. 1.关注我给出的视频中的主要内容,生成的描述主要面向的是50岁以上的老年人,语言风格要适配用户群体;
  37. 2.请针对视频的内容本身输出客观、具象的回答,你的分析必须基于视频内容,不能凭空想象;
  38. 2.信息缺失和无法分析理解的部分请你忽略,不能自行编造回答
  39. 3.请只描述客观事实,不要加入任何主观评价性语言;请使用专业语言进行回答。不要出现概括性描述、主观猜测,抽象表述
  40. 4.语言表达上注意不要使用倒装句、长句、复杂句,尽量使用陈述句、简单句;
  41. 直接用json格式直接输出结论,不要做任何其他的解释或说明
  42. """
  43. return video_transforming_prompt
  44. def download_file(task_id, oss_path):
  45. """
  46. 下载视频文件
  47. """
  48. video_url = "https://rescdn.yishihui.com/" + oss_path
  49. file_name = "static/{}.mp4".format(task_id)
  50. if os.path.exists(file_name):
  51. return file_name
  52. proxies = {
  53. "http": None,
  54. "https": None
  55. }
  56. with open(file_name, 'wb') as f:
  57. response = requests.get(video_url, proxies=proxies)
  58. f.write(response.content)
  59. return file_name
  60. class GenerateTextFromVideo(object):
  61. """
  62. 从视频中生成文本
  63. """
  64. def __init__(self):
  65. self.google_ai_api = GoogleAIAPI()
  66. self.db = DatabaseConnector(db_config=long_articles_config)
  67. def connect_db(self):
  68. """
  69. 连接数据库
  70. """
  71. self.db.connect()
  72. def update_task_status(self, task_id, process, ori_status, new_status):
  73. """
  74. 回滚长时间处于处理中的任务
  75. """
  76. match process:
  77. case "upload":
  78. status = 'upload_status'
  79. update_timestamp = 'upload_status_ts'
  80. case "understanding":
  81. status = 'understanding_status'
  82. update_timestamp = 'understanding_status_ts'
  83. case "summary":
  84. status = 'summary_status'
  85. update_timestamp = 'summary_status_ts'
  86. case "rewrite":
  87. status = 'rewrite_status'
  88. update_timestamp = 'rewrite_status_ts'
  89. case _:
  90. raise ValueError(f"Unexpected task: {process}")
  91. update_sql = f"""
  92. update video_content_understanding
  93. set {status} = %s, {update_timestamp} = %s
  94. where {status} = %s and id = %s;
  95. """
  96. roll_back_rows = self.db.save(
  97. query=update_sql,
  98. params=(
  99. new_status,
  100. int(time.time()),
  101. ori_status,
  102. task_id,
  103. )
  104. )
  105. return roll_back_rows
  106. def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
  107. """
  108. 上传视频到Google AI
  109. max_processing_video_count: 处理中的最大视频数量,默认20
  110. video_content_understanding 表status字段
  111. 0: 未处理
  112. 1: 处理中
  113. 2: 处理完成
  114. """
  115. select_sql = f"""
  116. select count(1) as processing_count
  117. from video_content_understanding
  118. where understanding_status = {const.PROCESSING_STATUS};
  119. """
  120. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  121. rest_video_count = max_processing_video_count - count
  122. success_upload_count = 0
  123. if rest_video_count:
  124. sql = f"""
  125. select id, video_oss_path
  126. from video_content_understanding
  127. where upload_status = {const.INIT_STATUS}
  128. limit {rest_video_count};
  129. """
  130. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  131. for task in tqdm(task_list, desc="upload_video_task"):
  132. lock_rows = self.update_task_status(
  133. task_id=task['id'],
  134. process='upload',
  135. ori_status=const.INIT_STATUS,
  136. new_status=const.PROCESSING_STATUS
  137. )
  138. if not lock_rows:
  139. continue
  140. try:
  141. file_path = download_file(task['id'], task['video_oss_path'])
  142. google_upload_result = self.google_ai_api.upload_file(file_path)
  143. if google_upload_result:
  144. file_name, file_state, expire_time = google_upload_result
  145. update_sql = f"""
  146. update video_content_understanding
  147. set
  148. upload_status = %s,
  149. upload_status_ts = %s,
  150. file_name = %s,
  151. file_state = %s,
  152. file_expire_time = %s
  153. where id = %s and upload_status = %s;
  154. """
  155. self.db.save(
  156. update_sql,
  157. params=(
  158. const.SUCCESS_STATUS,
  159. int(time.time()),
  160. file_name,
  161. file_state,
  162. expire_time,
  163. task['id'],
  164. const.PROCESSING_STATUS
  165. )
  166. )
  167. success_upload_count += 1
  168. except Exception as e:
  169. print("task upload failed because of {}".format(e))
  170. print("trace_back: ", traceback.format_exc())
  171. # roll back status
  172. self.update_task_status(
  173. task_id=task['id'],
  174. process='upload',
  175. ori_status=const.PROCESSING_STATUS,
  176. new_status=const.FAIL_STATUS
  177. )
  178. return success_upload_count
  179. def delete_video_from_google(self, file_name):
  180. """
  181. 删除视频文件
  182. """
  183. self.google_ai_api.delete_video(file_name)
  184. def get_task_list(self):
  185. """
  186. 获取处理视频转文本任务
  187. """
  188. sql = f"""
  189. select id, file_name, video_ori_title
  190. from video_content_understanding
  191. where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
  192. order by file_expire_time
  193. limit {BATCH_SIZE};
  194. """
  195. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  196. return task_list
  197. def convert_video_to_text_with_google_ai(self):
  198. """
  199. 处理视频转文本任务
  200. """
  201. task_list = self.get_task_list()
  202. while task_list:
  203. for task in tqdm(task_list, desc="convert video to text"):
  204. # LOCK TASK
  205. lock_row = self.update_task_status(
  206. task_id=task['id'],
  207. process='understanding',
  208. ori_status=const.INIT_STATUS,
  209. new_status=const.PROCESSING_STATUS
  210. )
  211. if not lock_row:
  212. print("Lock")
  213. continue
  214. file_name = task['file_name']
  215. video_local_path = "static/{}.mp4".format(task['id'])
  216. google_file = self.google_ai_api.get_google_file(file_name)
  217. state = google_file.state.name
  218. match state:
  219. case 'ACTIVE':
  220. try:
  221. video_text = self.google_ai_api.get_video_text(
  222. prompt=generate_transforming_prompt(task['video_ori_title']),
  223. video_file=google_file
  224. )
  225. if video_text:
  226. update_sql = f"""
  227. update video_content_understanding
  228. set understanding_status = %s, video_text = %s, file_state = %s
  229. where id = %s and understanding_status = %s;
  230. """
  231. self.db.save(
  232. update_sql,
  233. params=(
  234. const.SUCCESS_STATUS,
  235. video_text,
  236. state,
  237. task['id'],
  238. const.PROCESSING_STATUS
  239. )
  240. )
  241. # delete local file and google file
  242. if os.path.exists(video_local_path):
  243. os.remove(video_local_path)
  244. tqdm.write("video transform to text success, delete local file")
  245. task_list.remove(task)
  246. self.google_ai_api.delete_video(file_name)
  247. tqdm.write("delete video from google success: {}".format(file_name))
  248. else:
  249. # roll back status
  250. self.update_task_status(
  251. task_id=task['id'],
  252. process='understanding',
  253. ori_status=const.PROCESSING_STATUS,
  254. new_status=const.INIT_STATUS
  255. )
  256. except Exception as e:
  257. # roll back status
  258. self.update_task_status(
  259. task_id=task['id'],
  260. process='understanding',
  261. ori_status=const.PROCESSING_STATUS,
  262. new_status=const.FAIL_STATUS
  263. )
  264. tqdm.write(str(e))
  265. continue
  266. case 'PROCESSING':
  267. tqdm.write("video is still processing")
  268. case 'FAILED':
  269. self.update_task_status(
  270. task_id=task['id'],
  271. process='understanding',
  272. ori_status=const.PROCESSING_STATUS,
  273. new_status=const.FAIL_STATUS
  274. )
  275. if os.path.exists(video_local_path):
  276. os.remove(video_local_path)
  277. self.google_ai_api.delete_video(file_name)
  278. task_list.remove(task)
  279. tqdm.write("video process failed, delete local file")
  280. time.sleep(const.SLEEP_SECONDS)
  281. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  282. time.sleep(const.SLEEP_SECONDS)