""" @author: luojunhui """ import os import time import traceback import requests from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import GoogleAIAPI from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config from config import apolloConfig # 办公室网络调试需要打开代理 # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087" # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087" const = VideoToTextConst() config = apolloConfig(env="prod") # pool_size POOL_SIZE = int(config.getConfigValue("video_extract_pool_size")) # batch_size BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size")) def download_file(pq_vid, video_url): """ 下载视频文件 """ file_name = "static/{}.mp4".format(pq_vid) if os.path.exists(file_name): return file_name proxies = { "http": None, "https": None } with open(file_name, 'wb') as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name class GenerateTextFromVideo(object): """ 从视频中生成文本 """ def __init__(self): self.google_ai_api = GoogleAIAPI() self.db = DatabaseConnector(db_config=long_articles_config) def connect_db(self): """ 连接数据库 """ self.db.connect() def update_task_status(self, task_id, process, ori_status, new_status): """ 回滚长时间处于处理中的任务 """ match process: case "upload": status = 'upload_status' update_timestamp = 'upload_status_ts' case "understanding": status = 'understanding_status' update_timestamp = 'understanding_status_ts' case "summary": status = 'summary_status' update_timestamp = 'summary_status_ts' case "rewrite": status = 'rewrite_status' update_timestamp = 'rewrite_status_ts' case _: raise ValueError(f"Unexpected task: {process}") update_sql = f""" update video_content_understanding set {status} = %s, {update_timestamp} = %s where {status} = %s and id = %s; """ roll_back_rows = self.db.save( query=update_sql, params=( new_status, int(time.time()), ori_status, task_id, ) ) return roll_back_rows def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE): """ 上传视频到Google AI max_processing_video_count: 处理中的最大视频数量,默认20 video_content_understanding 表status字段 0: 未处理 1: 处理中 2: 处理完成 """ select_sql = f""" select count(1) as processing_count from video_content_understanding where understanding_status = {const.PROCESSING_STATUS}; """ count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count'] rest_video_count = max_processing_video_count - count success_upload_count = 0 if rest_video_count: sql = f""" select id, video_oss_path from video_content_understanding where upload_status = {const.INIT_STATUS} limit {rest_video_count}; """ task_list = self.db.fetch(sql, cursor_type=DictCursor) for task in tqdm(task_list, desc="upload_video_task"): lock_rows = self.update_task_status( task_id=task['id'], process='upload', ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS ) if not lock_rows: continue try: file_path = download_file(task['id'], task['video_oss_path']) google_upload_result = self.google_ai_api.upload_file(file_path) if google_upload_result: file_name, file_state, expire_time = google_upload_result update_sql = f""" update video_content_understanding set upload_status = %s, upload_status_ts = %s, file_name = %s, file_state = %s, file_expire_time = %s where id = %s and upload_status = %s; """ self.db.save( update_sql, params=( const.SUCCESS_STATUS, int(time.time()), file_name, file_state, expire_time, task['id'], const.PROCESSING_STATUS ) ) success_upload_count += 1 except Exception as e: print("task upload failed because of {}".format(e)) print("trace_back: ", traceback.format_exc()) # roll back status self.update_task_status( task_id=task['id'], process='upload', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) return success_upload_count def delete_video_from_google(self, file_name): """ 删除视频文件 """ self.google_ai_api.delete_video(file_name) def get_task_list(self): """ 获取处理视频转文本任务 """ sql = f""" select id, file_name from video_content_understanding where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} order by file_expire_time limit {BATCH_SIZE}; """ task_list = self.db.fetch(sql, cursor_type=DictCursor) return task_list def convert_video_to_text_with_google_ai(self): """ 处理视频转文本任务 """ task_list = self.get_task_list() while task_list: for task in tqdm(task_list, desc="convert video to text"): print(task['pq_vid'], task['file_name']) # LOCK TASK lock_row = self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS ) if not lock_row: print("Lock") continue file_name = task['file_name'] video_local_path = "static/{}.mp4".format(task['id']) google_file = self.google_ai_api.get_google_file(file_name) state = google_file.state.name match state: case 'ACTIVE': try: video_text = self.google_ai_api.get_video_text( prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。", video_file=google_file ) if video_text: update_sql = f""" update video_content_understanding set understanding_status = %s, video_text = %s, file_state = %s where id = %s and understanding_status = %s; """ self.db.save( update_sql, params=( const.SUCCESS_STATUS, video_text, state, task['id'], const.PROCESSING_STATUS ) ) # delete local file and google file if os.path.exists(video_local_path): os.remove(video_local_path) tqdm.write("video transform to text success, delete local file") task_list.remove(task) self.google_ai_api.delete_video(file_name) tqdm.write("delete video from google success: {}".format(file_name)) else: # roll back status self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.INIT_STATUS ) except Exception as e: # roll back status self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) tqdm.write(str(e)) continue case 'PROCESSING': tqdm.write("video is still processing") case 'FAILED': self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) if os.path.exists(video_local_path): os.remove(video_local_path) self.google_ai_api.delete_video(file_name) task_list.remove(task) tqdm.write("video process failed, delete local file") time.sleep(const.SLEEP_SECONDS) tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list))) time.sleep(const.SLEEP_SECONDS)