video_to_text.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import time
  6. import datetime
  7. import traceback
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.api import GoogleAIAPI
  12. from applications.const import VideoToTextConst
  13. from applications.db import DatabaseConnector
  14. from config import long_articles_config
  15. from config import apolloConfig
  16. from coldStartTasks.ai_pipeline.basic import download_file
  17. from coldStartTasks.ai_pipeline.basic import update_task_queue_status
  18. from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
  19. # 办公室网络调试需要打开代理
  20. # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
  21. # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
  22. const = VideoToTextConst()
  23. config = apolloConfig(env="prod")
  24. # pool_size
  25. POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
  26. # batch_size
  27. BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
  28. class GenerateTextFromVideo(object):
  29. """
  30. 从视频中生成文本
  31. """
  32. def __init__(self):
  33. self.google_ai_api = GoogleAIAPI()
  34. self.db = DatabaseConnector(db_config=long_articles_config)
  35. self.db.connect()
  36. def get_upload_task_list(self, task_length: int) -> list[dict]:
  37. """
  38. 获取上传视频任务,优先处理高流量池视频内容
  39. """
  40. fetch_query = f"""
  41. select t1.id, t1.video_oss_path
  42. from video_content_understanding t1
  43. join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
  44. where t1.upload_status = {const.INIT_STATUS}
  45. and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS}
  46. and t2.bad_status = {const.ARTICLE_GOOD_STATUS}
  47. and t1.id in (25402, 25286, 25391, 25919, 27410, 25719, 25272, 27391, 26058, 25203, 27440, 25602, 25649, 25537, 28150, 25733, 25397, 28139, 25965, 29351, 25293, 25503, 25822, 28072, 28409, 28098, 25266, 26191, 25928, 25278, 25396, 28100, 25452, 28146, 25384, 25595, 25938, 25657, 25407, 26281)
  48. order by t2.flow_pool_level
  49. limit {task_length};
  50. """
  51. task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
  52. return task_list
  53. def get_extract_task_list(self) -> list[dict]:
  54. """
  55. 获取处理视频转文本任务
  56. """
  57. fetch_query = f"""
  58. select id, file_name, video_ori_title
  59. from video_content_understanding
  60. where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS}
  61. order by file_expire_time
  62. limit {BATCH_SIZE};
  63. """
  64. task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
  65. return task_list
  66. def get_processing_task_num(self) -> int:
  67. """
  68. get the number of processing task
  69. """
  70. select_query = f"""
  71. select count(1) as processing_count
  72. from video_content_understanding
  73. where file_state = 'PROCESSING' and upload_status = {const.SUCCESS_STATUS};
  74. """
  75. fetch_response = self.db.fetch(query=select_query, cursor_type=DictCursor)[0][
  76. "processing_count"
  77. ]
  78. processing_task_num = (
  79. fetch_response[0]["processing_count"] if fetch_response else 0
  80. )
  81. return processing_task_num
  82. def set_upload_result_for_task(
  83. self, task_id: str, file_name: str, file_state: str, expire_time: str
  84. ) -> int:
  85. """
  86. set upload result for task
  87. """
  88. update_query = f"""
  89. update video_content_understanding
  90. set upload_status = %s, upload_status_ts = %s,
  91. file_name = %s, file_state = %s, file_expire_time = %s
  92. where id = %s and upload_status = %s;
  93. """
  94. affected_rows = self.db.save(
  95. query=update_query,
  96. params=(
  97. const.SUCCESS_STATUS,
  98. datetime.datetime.now(),
  99. file_name,
  100. file_state,
  101. expire_time,
  102. task_id,
  103. const.PROCESSING_STATUS,
  104. ),
  105. )
  106. return affected_rows
  107. def set_understanding_result_for_task(
  108. self, task_id: str, state: str, text: str
  109. ) -> int:
  110. update_query = f"""
  111. update video_content_understanding
  112. set understanding_status = %s, video_text = %s, file_state = %s
  113. where id = %s and understanding_status = %s;
  114. """
  115. affected_rows = self.db.save(
  116. query=update_query,
  117. params=(
  118. const.SUCCESS_STATUS,
  119. text,
  120. state,
  121. task_id,
  122. const.PROCESSING_STATUS,
  123. ),
  124. )
  125. return affected_rows
  126. def upload_video_to_google_ai_task(
  127. self, max_processing_video_count: int = POOL_SIZE
  128. ):
  129. """
  130. upload video to google AI and wait for processing
  131. """
  132. # rollback lock tasks
  133. rollback_rows = roll_back_lock_tasks(
  134. db_client=self.db,
  135. process="upload",
  136. init_status=const.INIT_STATUS,
  137. processing_status=const.PROCESSING_STATUS,
  138. max_process_time=const.MAX_PROCESSING_TIME,
  139. )
  140. tqdm.write("upload rollback_lock_tasks: {}".format(rollback_rows))
  141. processing_task_num = self.get_processing_task_num()
  142. rest_video_count = max_processing_video_count - processing_task_num
  143. if rest_video_count:
  144. task_list = self.get_upload_task_list(rest_video_count)
  145. for task in tqdm(task_list, desc="upload_video_task"):
  146. lock_rows = update_task_queue_status(
  147. db_client=self.db,
  148. task_id=task["id"],
  149. process="upload",
  150. ori_status=const.INIT_STATUS,
  151. new_status=const.PROCESSING_STATUS,
  152. )
  153. if not lock_rows:
  154. continue
  155. try:
  156. file_path = download_file(task["id"], task["video_oss_path"])
  157. google_upload_result = self.google_ai_api.upload_file(file_path)
  158. if google_upload_result:
  159. file_name, file_state, expire_time = google_upload_result
  160. self.set_upload_result_for_task(
  161. task_id=task["id"],
  162. file_name=file_name,
  163. file_state=file_state,
  164. expire_time=expire_time,
  165. )
  166. else:
  167. # roll back status
  168. update_task_queue_status(
  169. db_client=self.db,
  170. task_id=task["id"],
  171. process="upload",
  172. ori_status=const.PROCESSING_STATUS,
  173. new_status=const.FAIL_STATUS,
  174. )
  175. log(
  176. task="video_to_text",
  177. function="upload_video_to_google_ai_task",
  178. message="upload_video_to_google_ai_task failed",
  179. data={
  180. "task_id": task["id"],
  181. },
  182. )
  183. except Exception as e:
  184. log(
  185. task="video_to_text",
  186. function="upload_video_to_google_ai_task",
  187. message="upload_video_to_google_ai_task failed",
  188. data={
  189. "error": str(e),
  190. "traceback": traceback.format_exc(),
  191. "task_id": task["id"],
  192. },
  193. )
  194. # roll back status
  195. update_task_queue_status(
  196. db_client=self.db,
  197. task_id=task["id"],
  198. process="upload",
  199. ori_status=const.PROCESSING_STATUS,
  200. new_status=const.FAIL_STATUS,
  201. )
  202. else:
  203. log(
  204. task="video_to_text",
  205. function="upload_video_to_google_ai_task",
  206. message="task pool is full",
  207. )
  208. def convert_video_to_text_with_google_ai_task(self):
  209. """
  210. 处理视频转文本任务
  211. """
  212. rollback_rows = roll_back_lock_tasks(
  213. db_client=self.db,
  214. process="understanding",
  215. init_status=const.INIT_STATUS,
  216. processing_status=const.PROCESSING_STATUS,
  217. max_process_time=const.MAX_PROCESSING_TIME,
  218. )
  219. tqdm.write("extract rollback_lock_tasks: {}".format(rollback_rows))
  220. task_list = self.get_extract_task_list()
  221. for task in tqdm(task_list, desc="convert video to text"):
  222. # LOCK TASK
  223. lock_row = update_task_queue_status(
  224. db_client=self.db,
  225. task_id=task["id"],
  226. process="understanding",
  227. ori_status=const.INIT_STATUS,
  228. new_status=const.PROCESSING_STATUS,
  229. )
  230. if not lock_row:
  231. print("Task has benn locked by other process")
  232. continue
  233. file_name = task["file_name"]
  234. video_local_path = "static/{}.mp4".format(task["id"])
  235. try:
  236. google_file = self.google_ai_api.get_google_file(file_name)
  237. state = google_file.state.name
  238. match state:
  239. case "ACTIVE":
  240. try:
  241. video_text = self.google_ai_api.get_video_text(
  242. prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
  243. video_file=google_file,
  244. )
  245. if video_text:
  246. self.set_understanding_result_for_task(
  247. task_id=task["id"], state=state, text=video_text
  248. )
  249. # delete local file and google file
  250. if os.path.exists(video_local_path):
  251. os.remove(video_local_path)
  252. tqdm.write(
  253. "video transform to text success, delete local file"
  254. )
  255. task_list.remove(task)
  256. self.google_ai_api.delete_video(file_name)
  257. tqdm.write(
  258. "delete video from google success: {}".format(
  259. file_name
  260. )
  261. )
  262. else:
  263. # roll back status and wait for next process
  264. update_task_queue_status(
  265. db_client=self.db,
  266. task_id=task["id"],
  267. process="understanding",
  268. ori_status=const.PROCESSING_STATUS,
  269. new_status=const.INIT_STATUS,
  270. )
  271. except Exception as e:
  272. # roll back status
  273. update_task_queue_status(
  274. db_client=self.db,
  275. task_id=task["id"],
  276. process="understanding",
  277. ori_status=const.PROCESSING_STATUS,
  278. new_status=const.FAIL_STATUS,
  279. )
  280. tqdm.write(str(e))
  281. continue
  282. case "PROCESSING":
  283. update_task_queue_status(
  284. db_client=self.db,
  285. task_id=task["id"],
  286. process="understanding",
  287. ori_status=const.PROCESSING_STATUS,
  288. new_status=const.INIT_STATUS,
  289. )
  290. tqdm.write("video is still processing")
  291. case "FAILED":
  292. update_sql = f"""
  293. update video_content_understanding
  294. set file_state = %s, understanding_status = %s, understanding_status_ts = %s
  295. where id = %s and understanding_status = %s;
  296. """
  297. self.db.save(
  298. query=update_sql,
  299. params=(
  300. state,
  301. const.FAIL_STATUS,
  302. datetime.datetime.now(),
  303. task["id"],
  304. const.PROCESSING_STATUS,
  305. ),
  306. )
  307. # delete local file and google file
  308. if os.path.exists(video_local_path):
  309. os.remove(video_local_path)
  310. self.google_ai_api.delete_video(file_name)
  311. task_list.remove(task)
  312. tqdm.write("video process failed, delete local file")
  313. time.sleep(const.SLEEP_SECONDS)
  314. except Exception as e:
  315. log(
  316. task="video_to_text",
  317. function="extract_video_to_text_task",
  318. message="extract video to text task failed",
  319. data={
  320. "error": str(e),
  321. "traceback": traceback.format_exc(),
  322. "task_id": task["id"],
  323. },
  324. )
  325. update_task_queue_status(
  326. db_client=self.db,
  327. task_id=task["id"],
  328. process="understanding",
  329. ori_status=const.PROCESSING_STATUS,
  330. new_status=const.FAIL_STATUS,
  331. )