""" @author: luojunhui """ import json import os import time import datetime import traceback import requests from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import GoogleAIAPI from applications.const import VideoToTextConst from applications.db import DatabaseConnector from config import long_articles_config from config import apolloConfig # 办公室网络调试需要打开代理 # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087" # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087" const = VideoToTextConst() config = apolloConfig(env="prod") # pool_size POOL_SIZE = int(config.getConfigValue("video_extract_pool_size")) # batch_size BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size")) def generate_transforming_prompt(title): video_transforming_prompt = f""" 理解输入的视频内容 视频的标题是 {title} 你是一名视频分析专家,你非常精通视频的内容的总结,我会给出你视频及视频的标题,现在请你进行仔细的视频分析,并按照以下要求进行回答 #要求 1.20个以上,30个以下的中文字符输出视频的选题,选题应该要达到使人能从选题中理解到视频主要想表达的内容,要包含这个视频的关键性内容和亮点内容,并针对你的选题进行关键信息和亮点的详细描述; 2.用中文概括视频的主要内容,需要包含该视频描述的核心事件或观点,可以包括具体事例。要求内容通顺易懂具有一定可读性,字数在180到230之间; 3.请严格控制输出的内容能够被正确解析为JSON; output in JSON format with keys: 选题(str), 用 theme 来作为 key 描述(str), 用 description 来作为 key 你需要注意 1.关注我给出的视频中的主要内容,生成的描述主要面向的是50岁以上的老年人,语言风格要适配用户群体; 2.请针对视频的内容本身输出客观、具象的回答,你的分析必须基于视频内容,不能凭空想象; 2.信息缺失和无法分析理解的部分请你忽略,不能自行编造回答 3.请只描述客观事实,不要加入任何主观评价性语言;请使用专业语言进行回答。不要出现概括性描述、主观猜测,抽象表述 4.语言表达上注意不要使用倒装句、长句、复杂句,尽量使用陈述句、简单句; 返回的结果的数据结果是字典 dict,不要做任何其他的解释或说明,不要出现```json等字段。 """ return video_transforming_prompt def download_file(task_id, oss_path): """ 下载视频文件 """ video_url = "https://rescdn.yishihui.com/" + oss_path file_name = "static/{}.mp4".format(task_id) if os.path.exists(file_name): return file_name proxies = { "http": None, "https": None } with open(file_name, 'wb') as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name class GenerateTextFromVideo(object): """ 从视频中生成文本 """ def __init__(self): self.google_ai_api = GoogleAIAPI() self.db = DatabaseConnector(db_config=long_articles_config) def connect_db(self): """ 连接数据库 """ self.db.connect() def update_task_status(self, task_id, process, ori_status, new_status): """ 回滚长时间处于处理中的任务 """ match process: case "upload": status = 'upload_status' update_timestamp = 'upload_status_ts' case "understanding": status = 'understanding_status' update_timestamp = 'understanding_status_ts' case "summary": status = 'summary_status' update_timestamp = 'summary_status_ts' case "rewrite": status = 'rewrite_status' update_timestamp = 'rewrite_status_ts' case _: raise ValueError(f"Unexpected task: {process}") update_sql = f""" update video_content_understanding set {status} = %s, {update_timestamp} = %s where {status} = %s and id = %s; """ roll_back_rows = self.db.save( query=update_sql, params=( new_status, datetime.datetime.now(), ori_status, task_id, ) ) return roll_back_rows def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE): """ 上传视频到Google AI max_processing_video_count: 处理中的最大视频数量,默认20 video_content_understanding 表status字段 0: 未处理 1: 处理中 2: 处理完成 """ select_sql = f""" select count(1) as processing_count from video_content_understanding where understanding_status = {const.PROCESSING_STATUS}; """ count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count'] rest_video_count = max_processing_video_count - count success_upload_count = 0 if rest_video_count: sql = f""" select id, video_oss_path from video_content_understanding where upload_status = {const.INIT_STATUS} limit {rest_video_count}; """ task_list = self.db.fetch(sql, cursor_type=DictCursor) for task in tqdm(task_list, desc="upload_video_task"): lock_rows = self.update_task_status( task_id=task['id'], process='upload', ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS ) if not lock_rows: continue try: file_path = download_file(task['id'], task['video_oss_path']) google_upload_result = self.google_ai_api.upload_file(file_path) if google_upload_result: file_name, file_state, expire_time = google_upload_result update_sql = f""" update video_content_understanding set upload_status = %s, upload_status_ts = %s, file_name = %s, file_state = %s, file_expire_time = %s where id = %s and upload_status = %s; """ self.db.save( update_sql, params=( const.SUCCESS_STATUS, datetime.datetime.now(), file_name, file_state, expire_time, task['id'], const.PROCESSING_STATUS ) ) success_upload_count += 1 except Exception as e: print("task upload failed because of {}".format(e)) print("trace_back: ", traceback.format_exc()) # roll back status self.update_task_status( task_id=task['id'], process='upload', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) return success_upload_count def delete_video_from_google(self, file_name): """ 删除视频文件 """ self.google_ai_api.delete_video(file_name) def get_task_list(self): """ 获取处理视频转文本任务 """ sql = f""" select id, file_name, video_ori_title from video_content_understanding where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} order by file_expire_time limit {BATCH_SIZE}; """ task_list = self.db.fetch(sql, cursor_type=DictCursor) return task_list def convert_video_to_text_with_google_ai(self): """ 处理视频转文本任务 """ task_list = self.get_task_list() while task_list: for task in tqdm(task_list, desc="convert video to text"): # LOCK TASK lock_row = self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.INIT_STATUS, new_status=const.PROCESSING_STATUS ) if not lock_row: print("Lock") continue file_name = task['file_name'] video_local_path = "static/{}.mp4".format(task['id']) google_file = self.google_ai_api.get_google_file(file_name) state = google_file.state.name match state: case 'ACTIVE': try: video_text = self.google_ai_api.get_video_text( prompt=generate_transforming_prompt(task['video_ori_title']), video_file=google_file ) if video_text: print(type(video_text)) print(video_text) if type(video_text) == dict: video_text = json.dumps(video_text, ensure_ascii=False) update_sql = f""" update video_content_understanding set understanding_status = %s, video_text = %s, file_state = %s where id = %s and understanding_status = %s; """ self.db.save( update_sql, params=( const.SUCCESS_STATUS, video_text, state, task['id'], const.PROCESSING_STATUS ) ) # # delete local file and google file # if os.path.exists(video_local_path): # os.remove(video_local_path) # # tqdm.write("video transform to text success, delete local file") task_list.remove(task) # # self.google_ai_api.delete_video(file_name) tqdm.write("delete video from google success: {}".format(file_name)) else: # roll back status self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.INIT_STATUS ) except Exception as e: # roll back status self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) tqdm.write(str(e)) continue case 'PROCESSING': tqdm.write("video is still processing") case 'FAILED': self.update_task_status( task_id=task['id'], process='understanding', ori_status=const.PROCESSING_STATUS, new_status=const.FAIL_STATUS ) if os.path.exists(video_local_path): os.remove(video_local_path) self.google_ai_api.delete_video(file_name) task_list.remove(task) tqdm.write("video process failed, delete local file") time.sleep(const.SLEEP_SECONDS) tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list))) time.sleep(const.SLEEP_SECONDS)