123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- """
- @author: luojunhui
- """
- import os
- import time
- import traceback
- import requests
- from pymysql.cursors import DictCursor
- from torch.fft import ifftshift
- from tqdm import tqdm
- from applications.api import GoogleAIAPI
- from applications.const import VideoToTextConst
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from config import apolloConfig
- # 办公室网络调试需要打开代理
- # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
- # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
- const = VideoToTextConst()
- config = apolloConfig(env="prod")
- # pool_size
- POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
- # batch_size
- BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
- def download_file(pq_vid, video_url):
- """
- 下载视频文件
- """
- file_name = "static/{}.mp4".format(pq_vid)
- if os.path.exists(file_name):
- return file_name
- proxies = {
- "http": None,
- "https": None
- }
- with open(file_name, 'wb') as f:
- response = requests.get(video_url, proxies=proxies)
- f.write(response.content)
- return file_name
- class GenerateTextFromVideo(object):
- """
- 从视频中生成文本
- """
- def __init__(self):
- self.google_ai_api = GoogleAIAPI()
- self.db = DatabaseConnector(db_config=long_articles_config)
- def connect_db(self):
- """
- 连接数据库
- """
- self.db.connect()
- def input_task_list(self):
- """
- 输入任务列表, 从single_video_pool中获取
- """
- sql = f"""
- select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
- from publish_single_video_source
- where audit_status = {const.AUDIT_SUCCESS_STATUS} and bad_status = {const.ARTICLE_GOOD_STATUS} and extract_status = {const.EXTRACT_INIT_STATUS}
- order by id desc;
- """
- task_list = self.db.fetch(sql, cursor_type=DictCursor)
- insert_sql = f"""
- insert ignore into video_content_understanding
- (pq_vid, video_ori_title, video_oss_path)
- values (%s, %s, %s);
- """
- affected_rows = self.db.save_many(
- insert_sql,
- params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list]
- )
- print(affected_rows)
- def roll_back_processing_videos(self):
- """
- 回滚长时间处于处理中的视频
- """
- sql = f"""
- select id, status_update_timestamp
- from video_content_understanding
- where status in ({const.VIDEO_UNDERSTAND_PROCESSING_STATUS, const.VIDEO_LOCK});
- """
- task_list = self.db.fetch(sql, cursor_type=DictCursor)
- now_timestamp = int(time.time())
- id_list = []
- for task in tqdm(task_list):
- if now_timestamp - task['status_update_timestamp'] >= const.MAX_PROCESSING_TIME:
- id_list.append(task['id'])
- if id_list:
- update_sql = f"""
- update video_content_understanding
- set status = %s
- where id in %s;
- """
- self.db.save(
- query=update_sql,
- params=(
- const.VIDEO_UNDERSTAND_INIT_STATUS,
- tuple(id_list)
- )
- )
- def update_video_status(self, ori_status, new_status, pq_vid):
- """
- 更新视频状态
- """
- sql = f"""
- update video_content_understanding
- set status = %s, status_update_timestamp = %s
- WHERE pq_vid = %s and status = %s;
- """
- affected_rows = self.db.save(
- query=sql,
- params=(new_status, pq_vid, ori_status, int(time.time()))
- )
- return affected_rows
- def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
- """
- 上传视频到Google AI
- max_processing_video_count: 处理中的最大视频数量,默认20
- video_content_understanding 表status字段
- 0: 未处理
- 1: 处理中
- 2: 处理完成
- """
- # 查询出在视频处于PROCESSING状态的视频数量
- select_sql = f"""
- select count(1) as processing_count
- from video_content_understanding
- where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS};
- """
- count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
- rest_video_count = max_processing_video_count - count
- success_upload_count = 0
- if rest_video_count:
- sql = f"""
- select pq_vid, video_oss_path
- from video_content_understanding
- where status = {const.VIDEO_UNDERSTAND_INIT_STATUS}
- limit {rest_video_count};
- """
- task_list = self.db.fetch(sql, cursor_type=DictCursor)
- for task in tqdm(task_list, desc="upload_video_task"):
- lock_rows = self.update_video_status(
- ori_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
- new_status=const.VIDEO_LOCK,
- pq_vid=task['pq_vid'],
- )
- if not lock_rows:
- continue
- try:
- file_path = download_file(task['pq_vid'], task['video_oss_path'])
- google_upload_result = self.google_ai_api.upload_file(file_path)
- if google_upload_result:
- file_name, file_state, expire_time = google_upload_result
- update_sql = f"""
- update video_content_understanding
- set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
- where pq_vid = %s and status = %s;
- """
- self.db.save(
- update_sql,
- params=(
- const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
- file_name,
- file_state,
- expire_time,
- task['pq_vid'],
- const.VIDEO_LOCK
- )
- )
- success_upload_count += 1
- except Exception as e:
- print("task upload failed because of {}".format(e))
- print("trace_back: ", traceback.format_exc())
- # roll back status
- self.update_video_status(
- ori_status=const.VIDEO_LOCK,
- new_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
- pq_vid=task['pq_vid'],
- )
- return success_upload_count
- def delete_video_from_google(self, file_name):
- """
- 删除视频文件
- """
- self.google_ai_api.delete_video(file_name)
- def get_tasks(self):
- """
- 获取处理视频转文本任务
- """
- sql = f"""
- select pq_vid, file_name
- from video_content_understanding
- where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS}
- order by file_expire_time
- limit {BATCH_SIZE};
- """
- task_list = self.db.fetch(sql, cursor_type=DictCursor)
- return task_list
- def convert_video_to_text_with_google_ai(self):
- """
- 处理视频转文本任务
- """
- self.roll_back_processing_videos()
- task_list = self.get_tasks()
- while task_list:
- for task in tqdm(task_list, desc="convert video to text"):
- print(task['pq_vid'], task['file_name'])
- # LOCK TASK
- lock_row = self.update_video_status(
- ori_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
- new_status=const.VIDEO_LOCK,
- pq_vid=task['pq_vid'],
- )
- if not lock_row:
- continue
- file_name = task['file_name']
- video_local_path = "static/{}.mp4".format(task['pq_vid'])
- google_file = self.google_ai_api.get_google_file(file_name)
- state = google_file.state.name
- match state:
- case 'ACTIVE':
- try:
- video_text = self.google_ai_api.get_video_text(
- prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。",
- video_file=google_file
- )
- if video_text:
- update_sql = f"""
- update video_content_understanding
- set status = %s, video_text = %s, file_state = %s
- where pq_vid = %s and status = %s;
- """
- self.db.save(
- update_sql,
- params=(
- const.VIDEO_UNDERSTAND_SUCCESS_STATUS,
- video_text,
- state,
- task['pq_vid'],
- const.VIDEO_LOCK
- )
- )
- # delete local file and google file
- if os.path.exists(video_local_path):
- os.remove(video_local_path)
- tqdm.write("video transform to text success, delete local file")
- task_list.remove(task)
- self.google_ai_api.delete_video(file_name)
- tqdm.write("delete video from google success: {}".format(file_name))
- else:
- # roll back status
- self.update_video_status(
- ori_status=const.VIDEO_LOCK,
- new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
- pq_vid=task['pq_vid'],
- )
- except Exception as e:
- # roll back status
- self.update_video_status(
- ori_status=const.VIDEO_LOCK,
- new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
- pq_vid=task['pq_vid'],
- )
- tqdm.write(str(e))
- continue
- case 'PROCESSING':
- tqdm.write("video is still processing")
- # roll back status
- self.update_video_status(
- ori_status=const.VIDEO_LOCK,
- new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
- pq_vid=task['pq_vid'],
- )
- case 'FAILED':
- self.update_video_status(
- ori_status=const.VIDEO_LOCK,
- new_status=const.VIDEO_UNDERSTAND_FAIL_STATUS,
- pq_vid=task['pq_vid']
- )
- if os.path.exists(video_local_path):
- os.remove(video_local_path)
- self.google_ai_api.delete_video(file_name)
- tqdm.write("video process failed, delete local file")
- time.sleep(const.SLEEP_SECONDS)
- tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
- time.sleep(const.SLEEP_SECONDS)
|