""" @author: luojunhui """ import os import time import requests from pymysql.cursors import DictCursor from tqdm import tqdm from applications.api import GoogleAIAPI from applications.db import DatabaseConnector from config import long_articles_config # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087" # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087" PROCESSING_MAX_VIDEO_COUNT = 10 def download_file(pq_vid, video_url): """ 下载视频文件 """ file_name = "static/{}.mp4".format(pq_vid) if os.path.exists(file_name): return file_name proxies = { "http": None, "https": None } with open(file_name, 'wb') as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name class GenerateTextFromVideo(object): """ 从视频中生成文本 """ def __init__(self): self.google_ai_api = GoogleAIAPI() self.db = DatabaseConnector(db_config=long_articles_config) def connect_db(self): """ 连接数据库 """ self.db.connect() def input_task_list(self): """ 输入任务列表, 从single_video_pool中获取 """ sql = f""" select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id from publish_single_video_source where audit_status = 1 and bad_status = 0 and extract_status = 0 order by id desc; """ task_list = self.db.fetch(sql, cursor_type=DictCursor) insert_sql = f""" insert ignore into video_content_understanding (pq_vid, video_ori_title, video_oss_path) values (%s, %s, %s) """ affected_rows = self.db.save_many( insert_sql, params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list] ) print(affected_rows) def upload_video_to_google_ai(self): """ 上传视频到Google AI """ # 查询出在视频处于PROCESSING状态的视频数量 select_sql = "select count(1) as processing_count from video_content_understanding where status = 1;" count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count'] rest_video_count = PROCESSING_MAX_VIDEO_COUNT - count success_upload_count = 0 if rest_video_count: sql = f"""select pq_vid, video_oss_path from video_content_understanding where status = 0 limit {rest_video_count};""" task_list = self.db.fetch(sql, cursor_type=DictCursor) for task in tqdm(task_list, desc="upload_video_task"): file_path = download_file(task['pq_vid'], task['video_oss_path']) google_upload_result = self.google_ai_api.upload_file(file_path) if google_upload_result: file_name, file_state, expire_time = google_upload_result update_sql = f""" update video_content_understanding set status = %s, file_name = %s, file_state = %s, file_expire_time = %s where pq_vid = %s; """ self.db.save( update_sql, params=(1, file_name, file_state, expire_time, task['pq_vid']) ) success_upload_count += 1 else: continue return success_upload_count def get_tasks(self): """ 获取处理视频转文本任务 """ sql = "select pq_vid, file_name from video_content_understanding where status = 1 order by file_expire_time limit 5;" task_list = self.db.fetch(sql, cursor_type=DictCursor) return task_list def convert_video_to_text_with_google_ai(self): """ 处理视频转文本任务 """ task_list = self.get_tasks() while task_list: for task in tqdm(task_list, desc="convert video to text"): file_name = task['file_name'] google_file = self.google_ai_api.get_google_file(file_name) state = google_file.state.name match state: case 'ACTIVE': try: video_text = self.google_ai_api.get_video_text( prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。", video_file=google_file ) if video_text: update_sql = f""" update video_content_understanding set status = %s, video_text = %s, file_state = %s where pq_vid = %s; """ self.db.save( update_sql, params=(2, video_text, state, task['pq_vid']) ) os.remove("static/{}.mp4".format(task['pq_vid'])) tqdm.write("video transform to text success, delete local file, sleep 1 min...") task_list.remove(task) except Exception as e: tqdm.write(str(e)) continue case 'PROCESSING': tqdm.write("video is still processing") continue case 'FAILED': tqdm.write("video process failed") continue time.sleep(10) tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list))) time.sleep(60)