generate_text_from_video.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import os
  6. import time
  7. import datetime
  8. import traceback
  9. import requests
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications.api import GoogleAIAPI
  13. from applications.const import VideoToTextConst
  14. from applications.db import DatabaseConnector
  15. from config import long_articles_config
  16. from config import apolloConfig
  17. # 办公室网络调试需要打开代理
  18. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  19. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  20. const = VideoToTextConst()
  21. config = apolloConfig(env="prod")
  22. # pool_size
  23. POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  24. # batch_size
  25. BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  26. def generate_transforming_prompt(title):
  27. video_transforming_prompt = f"""
  28. 理解输入的视频内容
  29. 视频的标题是 {title}
  30. 你是一名视频分析专家,你非常精通视频的内容的总结,我会给出你视频及视频的标题,现在请你进行仔细的视频分析,并按照以下要求进行回答
  31. #要求
  32. 1.20个以上,30个以下的中文字符输出视频的选题,选题应该要达到使人能从选题中理解到视频主要想表达的内容,要包含这个视频的关键性内容和亮点内容,并针对你的选题进行关键信息和亮点的详细描述;
  33. 2.用中文概括视频的主要内容,需要包含该视频描述的核心事件或观点,可以包括具体事例。要求内容通顺易懂具有一定可读性,字数在180到230之间;
  34. 3.请严格控制输出的内容能够被正确解析为JSON;
  35. output in JSON format with keys:
  36. 选题(str), 用 theme 来作为 key
  37. 描述(str), 用 description 来作为 key
  38. 你需要注意
  39. 1.关注我给出的视频中的主要内容,生成的描述主要面向的是50岁以上的老年人,语言风格要适配用户群体;
  40. 2.请针对视频的内容本身输出客观、具象的回答,你的分析必须基于视频内容,不能凭空想象;
  41. 2.信息缺失和无法分析理解的部分请你忽略,不能自行编造回答
  42. 3.请只描述客观事实,不要加入任何主观评价性语言;请使用专业语言进行回答。不要出现概括性描述、主观猜测,抽象表述
  43. 4.语言表达上注意不要使用倒装句、长句、复杂句,尽量使用陈述句、简单句;
  44. 返回的结果的数据结果是字典 dict,不要做任何其他的解释或说明,不要出现```json等字段。
  45. """
  46. return video_transforming_prompt
  47. def download_file(task_id, oss_path):
  48. """
  49. 下载视频文件
  50. """
  51. video_url = "https://rescdn.yishihui.com/" + oss_path
  52. file_name = "static/{}.mp4".format(task_id)
  53. if os.path.exists(file_name):
  54. return file_name
  55. proxies = {
  56. "http": None,
  57. "https": None
  58. }
  59. with open(file_name, 'wb') as f:
  60. response = requests.get(video_url, proxies=proxies)
  61. f.write(response.content)
  62. return file_name
  63. class GenerateTextFromVideo(object):
  64. """
  65. 从视频中生成文本
  66. """
  67. def __init__(self):
  68. self.google_ai_api = GoogleAIAPI()
  69. self.db = DatabaseConnector(db_config=long_articles_config)
  70. def connect_db(self):
  71. """
  72. 连接数据库
  73. """
  74. self.db.connect()
  75. def update_task_status(self, task_id, process, ori_status, new_status):
  76. """
  77. 回滚长时间处于处理中的任务
  78. """
  79. match process:
  80. case "upload":
  81. status = 'upload_status'
  82. update_timestamp = 'upload_status_ts'
  83. case "understanding":
  84. status = 'understanding_status'
  85. update_timestamp = 'understanding_status_ts'
  86. case "summary":
  87. status = 'summary_status'
  88. update_timestamp = 'summary_status_ts'
  89. case "rewrite":
  90. status = 'rewrite_status'
  91. update_timestamp = 'rewrite_status_ts'
  92. case _:
  93. raise ValueError(f"Unexpected task: {process}")
  94. update_sql = f"""
  95. update video_content_understanding
  96. set {status} = %s, {update_timestamp} = %s
  97. where {status} = %s and id = %s;
  98. """
  99. roll_back_rows = self.db.save(
  100. query=update_sql,
  101. params=(
  102. new_status,
  103. datetime.datetime.now(),
  104. ori_status,
  105. task_id,
  106. )
  107. )
  108. return roll_back_rows
  109. def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
  110. """
  111. 上传视频到Google AI
  112. max_processing_video_count: 处理中的最大视频数量,默认20
  113. video_content_understanding 表status字段
  114. 0: 未处理
  115. 1: 处理中
  116. 2: 处理完成
  117. """
  118. select_sql = f"""
  119. select count(1) as processing_count
  120. from video_content_understanding
  121. where understanding_status = {const.PROCESSING_STATUS};
  122. """
  123. count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
  124. rest_video_count = max_processing_video_count - count
  125. success_upload_count = 0
  126. if rest_video_count:
  127. sql = f"""
  128. select id, video_oss_path
  129. from video_content_understanding
  130. where upload_status = {const.INIT_STATUS}
  131. limit {rest_video_count};
  132. """
  133. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  134. for task in tqdm(task_list, desc="upload_video_task"):
  135. lock_rows = self.update_task_status(
  136. task_id=task['id'],
  137. process='upload',
  138. ori_status=const.INIT_STATUS,
  139. new_status=const.PROCESSING_STATUS
  140. )
  141. if not lock_rows:
  142. continue
  143. try:
  144. file_path = download_file(task['id'], task['video_oss_path'])
  145. google_upload_result = self.google_ai_api.upload_file(file_path)
  146. if google_upload_result:
  147. file_name, file_state, expire_time = google_upload_result
  148. update_sql = f"""
  149. update video_content_understanding
  150. set
  151. upload_status = %s,
  152. upload_status_ts = %s,
  153. file_name = %s,
  154. file_state = %s,
  155. file_expire_time = %s
  156. where id = %s and upload_status = %s;
  157. """
  158. self.db.save(
  159. update_sql,
  160. params=(
  161. const.SUCCESS_STATUS,
  162. datetime.datetime.now(),
  163. file_name,
  164. file_state,
  165. expire_time,
  166. task['id'],
  167. const.PROCESSING_STATUS
  168. )
  169. )
  170. success_upload_count += 1
  171. except Exception as e:
  172. print("task upload failed because of {}".format(e))
  173. print("trace_back: ", traceback.format_exc())
  174. # roll back status
  175. self.update_task_status(
  176. task_id=task['id'],
  177. process='upload',
  178. ori_status=const.PROCESSING_STATUS,
  179. new_status=const.FAIL_STATUS
  180. )
  181. return success_upload_count
  182. def delete_video_from_google(self, file_name):
  183. """
  184. 删除视频文件
  185. """
  186. self.google_ai_api.delete_video(file_name)
  187. def get_task_list(self):
  188. """
  189. 获取处理视频转文本任务
  190. """
  191. sql = f"""
  192. select id, file_name, video_ori_title
  193. from video_content_understanding
  194. where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
  195. order by file_expire_time
  196. limit {BATCH_SIZE};
  197. """
  198. task_list = self.db.fetch(sql, cursor_type=DictCursor)
  199. return task_list
  200. def convert_video_to_text_with_google_ai(self):
  201. """
  202. 处理视频转文本任务
  203. """
  204. task_list = self.get_task_list()
  205. while task_list:
  206. for task in tqdm(task_list, desc="convert video to text"):
  207. # LOCK TASK
  208. lock_row = self.update_task_status(
  209. task_id=task['id'],
  210. process='understanding',
  211. ori_status=const.INIT_STATUS,
  212. new_status=const.PROCESSING_STATUS
  213. )
  214. if not lock_row:
  215. print("Lock")
  216. continue
  217. file_name = task['file_name']
  218. video_local_path = "static/{}.mp4".format(task['id'])
  219. google_file = self.google_ai_api.get_google_file(file_name)
  220. state = google_file.state.name
  221. match state:
  222. case 'ACTIVE':
  223. try:
  224. video_text = self.google_ai_api.get_video_text(
  225. prompt=generate_transforming_prompt(task['video_ori_title']),
  226. video_file=google_file
  227. )
  228. if video_text:
  229. print(type(video_text))
  230. print(video_text)
  231. if type(video_text) == dict:
  232. video_text = json.dumps(video_text, ensure_ascii=False)
  233. update_sql = f"""
  234. update video_content_understanding
  235. set understanding_status = %s, video_text = %s, file_state = %s
  236. where id = %s and understanding_status = %s;
  237. """
  238. self.db.save(
  239. update_sql,
  240. params=(
  241. const.SUCCESS_STATUS,
  242. video_text,
  243. state,
  244. task['id'],
  245. const.PROCESSING_STATUS
  246. )
  247. )
  248. # # delete local file and google file
  249. # if os.path.exists(video_local_path):
  250. # os.remove(video_local_path)
  251. #
  252. # tqdm.write("video transform to text success, delete local file")
  253. task_list.remove(task)
  254. #
  255. # self.google_ai_api.delete_video(file_name)
  256. tqdm.write("delete video from google success: {}".format(file_name))
  257. else:
  258. # roll back status
  259. self.update_task_status(
  260. task_id=task['id'],
  261. process='understanding',
  262. ori_status=const.PROCESSING_STATUS,
  263. new_status=const.INIT_STATUS
  264. )
  265. except Exception as e:
  266. # roll back status
  267. self.update_task_status(
  268. task_id=task['id'],
  269. process='understanding',
  270. ori_status=const.PROCESSING_STATUS,
  271. new_status=const.FAIL_STATUS
  272. )
  273. tqdm.write(str(e))
  274. continue
  275. case 'PROCESSING':
  276. tqdm.write("video is still processing")
  277. case 'FAILED':
  278. self.update_task_status(
  279. task_id=task['id'],
  280. process='understanding',
  281. ori_status=const.PROCESSING_STATUS,
  282. new_status=const.FAIL_STATUS
  283. )
  284. if os.path.exists(video_local_path):
  285. os.remove(video_local_path)
  286. self.google_ai_api.delete_video(file_name)
  287. task_list.remove(task)
  288. tqdm.write("video process failed, delete local file")
  289. time.sleep(const.SLEEP_SECONDS)
  290. tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
  291. time.sleep(const.SLEEP_SECONDS)