generate_text_from_video.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import datetime
  7. import traceback
  8. import requests
  9. from pymysql.cursors import DictCursor
  10. from tqdm import tqdm
  11. from applications.api import GoogleAIAPI
  12. from applications.const import VideoToTextConst
  13. from applications.db import DatabaseConnector
  14. from config import long_articles_config
  15. from config import apolloConfig
  16. # 办公室网络调试需要打开代理
  17. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  18. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  19. const = VideoToTextConst()
  20. config = apolloConfig(env="prod")
  21. # pool_size
  22. POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  23. # batch_size
  24. BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  25. def generate_transforming_prompt(title):
  26. video_transforming_prompt = f"""
  27. 理解视频内容
  28. 视频的标题是 {title}
  29. 你是一名视频分析专家,你非常精通视频的内容的总结,我会给出你视频及视频的标题,现在请你进行仔细的视频分析,并按照以下要求进行回答
  30. #要求
  31. 1.输出20个以上,30个以下的中文字符输出视频的选题,选题应该要达到使人能从选题中理解到视频主要想表达的内容,要包含这个视频的关键性内容和亮点内容,并针对你的选题进行关键信息和亮点的详细描述;
  32. 2.用中文字符精简的结构性输出视频的主要内容,需要包含该视频描述的核心事件或观点,字符数量要求在180到230之间;
  33. 3.请严格控制输出的内容能够被正确解析为JSON;
  34. output in JSON format with keys:
  35. 选题(str)
  36. 描述(str)//主要内容
  37. 你需要注意
  38. 1.关注我给出的视频中的主要内容,生成的描述主要面向的是50岁以上的老年人,语言风格要适配用户群体;
  39. 2.请针对视频的内容本身输出客观、具象的回答,你的分析必须基于视频内容,不能凭空想象;
  40. 2.信息缺失和无法分析理解的部分请你忽略,不能自行编造回答
  41. 3.请只描述客观事实,不要加入任何主观评价性语言;请使用专业语言进行回答。不要出现概括性描述、主观猜测,抽象表述
  42. 4.语言表达上注意不要使用倒装句、长句、复杂句,尽量使用陈述句、简单句;
  43. 直接用json格式直接输出结论,不要做任何其他的解释或说明
  44. """
  45. return video_transforming_prompt
  46. def download_file(task_id, oss_path):
  47. """
  48. 下载视频文件
  49. """
  50. video_url = "https://rescdn.yishihui.com/" + oss_path
  51. file_name = "static/{}.mp4".format(task_id)
  52. if os.path.exists(file_name):
  53. return file_name
  54. proxies = {
  55. "http": None,
  56. "https": None
  57. }
  58. with open(file_name, 'wb') as f:
  59. response = requests.get(video_url, proxies=proxies)
  60. f.write(response.content)
  61. return file_name
  62. class GenerateTextFromVideo(object):
  63. """
  64. 从视频中生成文本
  65. """
  66. def __init__(self):
  67. self.google_ai_api = GoogleAIAPI()
  68. self.db = DatabaseConnector(db_config=long_articles_config)
  69. def connect_db(self):
  70. """
  71. 连接数据库
  72. """
  73. self.db.connect()
  74. def update_task_status(self, task_id, process, ori_status, new_status):
  75. """
  76. 回滚长时间处于处理中的任务
  77. """
  78. match process:
  79. case "upload":
  80. status = 'upload_status'
  81. update_timestamp = 'upload_status_ts'
  82. case "understanding":
  83. status = 'understanding_status'
  84. update_timestamp = 'understanding_status_ts'
  85. case "summary":
  86. status = 'summary_status'
  87. update_timestamp = 'summary_status_ts'
  88. case "rewrite":
  89. status = 'rewrite_status'
  90. update_timestamp = 'rewrite_status_ts'
  91. case _:
  92. raise ValueError(f"Unexpected task: {process}")
  93. update_sql = f"""
  94. update video_content_understanding
  95. set {status} = %s, {update_timestamp} = %s
  96. where {status} = %s and id = %s;
  97. """
  98. roll_back_rows = self.db.save(
  99. query=update_sql,
  100. params=(
  101. new_status,
  102. datetime.datetime.now(),
  103. ori_status,
  104. task_id,
  105. )
  106. )
  107. return roll_back_rows
  108. def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
  109. """
  110. 上传视频到Google AI
  111. max_processing_video_count: 处理中的最大视频数量,默认20
  112. video_content_understanding 表status字段
  113. 0: 未处理
  114. 1: 处理中
  115. 2: 处理完成
  116. """
  117. select_sql = f"""
  118. select count(1) as processing_count
  119. from video_content_understanding
  120. where understanding_status = {const.PROCESSING_STATUS};
  121. """
  122. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  123. rest_video_count = max_processing_video_count - count
  124. success_upload_count = 0
  125. if rest_video_count:
  126. sql = f"""
  127. select id, video_oss_path
  128. from video_content_understanding
  129. where upload_status = {const.INIT_STATUS}
  130. limit {rest_video_count};
  131. """
  132. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  133. for task in tqdm(task_list, desc="upload_video_task"):
  134. lock_rows = self.update_task_status(
  135. task_id=task['id'],
  136. process='upload',
  137. ori_status=const.INIT_STATUS,
  138. new_status=const.PROCESSING_STATUS
  139. )
  140. if not lock_rows:
  141. continue
  142. try:
  143. file_path = download_file(task['id'], task['video_oss_path'])
  144. google_upload_result = self.google_ai_api.upload_file(file_path)
  145. if google_upload_result:
  146. file_name, file_state, expire_time = google_upload_result
  147. update_sql = f"""
  148. update video_content_understanding
  149. set
  150. upload_status = %s,
  151. upload_status_ts = %s,
  152. file_name = %s,
  153. file_state = %s,
  154. file_expire_time = %s
  155. where id = %s and upload_status = %s;
  156. """
  157. self.db.save(
  158. update_sql,
  159. params=(
  160. const.SUCCESS_STATUS,
  161. datetime.datetime.now(),
  162. file_name,
  163. file_state,
  164. expire_time,
  165. task['id'],
  166. const.PROCESSING_STATUS
  167. )
  168. )
  169. success_upload_count += 1
  170. except Exception as e:
  171. print("task upload failed because of {}".format(e))
  172. print("trace_back: ", traceback.format_exc())
  173. # roll back status
  174. self.update_task_status(
  175. task_id=task['id'],
  176. process='upload',
  177. ori_status=const.PROCESSING_STATUS,
  178. new_status=const.FAIL_STATUS
  179. )
  180. return success_upload_count
  181. def delete_video_from_google(self, file_name):
  182. """
  183. 删除视频文件
  184. """
  185. self.google_ai_api.delete_video(file_name)
  186. def get_task_list(self):
  187. """
  188. 获取处理视频转文本任务
  189. """
  190. sql = f"""
  191. select id, file_name, video_ori_title
  192. from video_content_understanding
  193. where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
  194. order by file_expire_time
  195. limit {BATCH_SIZE};
  196. """
  197. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  198. return task_list
  199. def convert_video_to_text_with_google_ai(self):
  200. """
  201. 处理视频转文本任务
  202. """
  203. task_list = self.get_task_list()
  204. while task_list:
  205. for task in tqdm(task_list, desc="convert video to text"):
  206. # LOCK TASK
  207. lock_row = self.update_task_status(
  208. task_id=task['id'],
  209. process='understanding',
  210. ori_status=const.INIT_STATUS,
  211. new_status=const.PROCESSING_STATUS
  212. )
  213. if not lock_row:
  214. print("Lock")
  215. continue
  216. file_name = task['file_name']
  217. video_local_path = "static/{}.mp4".format(task['id'])
  218. google_file = self.google_ai_api.get_google_file(file_name)
  219. state = google_file.state.name
  220. match state:
  221. case 'ACTIVE':
  222. try:
  223. video_text = self.google_ai_api.get_video_text(
  224. prompt=generate_transforming_prompt(task['video_ori_title']),
  225. video_file=google_file
  226. )
  227. if video_text:
  228. update_sql = f"""
  229. update video_content_understanding
  230. set understanding_status = %s, video_text = %s, file_state = %s
  231. where id = %s and understanding_status = %s;
  232. """
  233. self.db.save(
  234. update_sql,
  235. params=(
  236. const.SUCCESS_STATUS,
  237. video_text,
  238. state,
  239. task['id'],
  240. const.PROCESSING_STATUS
  241. )
  242. )
  243. # # delete local file and google file
  244. # if os.path.exists(video_local_path):
  245. # os.remove(video_local_path)
  246. #
  247. # tqdm.write("video transform to text success, delete local file")
  248. task_list.remove(task)
  249. #
  250. # self.google_ai_api.delete_video(file_name)
  251. tqdm.write("delete video from google success: {}".format(file_name))
  252. else:
  253. # roll back status
  254. self.update_task_status(
  255. task_id=task['id'],
  256. process='understanding',
  257. ori_status=const.PROCESSING_STATUS,
  258. new_status=const.INIT_STATUS
  259. )
  260. except Exception as e:
  261. # roll back status
  262. self.update_task_status(
  263. task_id=task['id'],
  264. process='understanding',
  265. ori_status=const.PROCESSING_STATUS,
  266. new_status=const.FAIL_STATUS
  267. )
  268. tqdm.write(str(e))
  269. continue
  270. case 'PROCESSING':
  271. tqdm.write("video is still processing")
  272. case 'FAILED':
  273. self.update_task_status(
  274. task_id=task['id'],
  275. process='understanding',
  276. ori_status=const.PROCESSING_STATUS,
  277. new_status=const.FAIL_STATUS
  278. )
  279. if os.path.exists(video_local_path):
  280. os.remove(video_local_path)
  281. self.google_ai_api.delete_video(file_name)
  282. task_list.remove(task)
  283. tqdm.write("video process failed, delete local file")
  284. time.sleep(const.SLEEP_SECONDS)
  285. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  286. time.sleep(const.SLEEP_SECONDS)