|
@@ -0,0 +1,157 @@
|
|
|
+"""
|
|
|
+@author: luojunhui
|
|
|
+"""
|
|
|
+import os
|
|
|
+import time
|
|
|
+
|
|
|
+import requests
|
|
|
+
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+from tqdm import tqdm
|
|
|
+
|
|
|
+from applications.api import GoogleAIAPI
|
|
|
+from applications.db import DatabaseConnector
|
|
|
+from config import long_articles_config
|
|
|
+
|
|
|
+# os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
|
|
|
+# os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
|
|
|
+
|
|
|
+PROCESSING_MAX_VIDEO_COUNT = 10
|
|
|
+
|
|
|
+
|
|
|
+def download_file(pq_vid, video_url):
|
|
|
+ """
|
|
|
+ 下载视频文件
|
|
|
+ """
|
|
|
+ file_name = "static/{}.mp4".format(pq_vid)
|
|
|
+ if os.path.exists(file_name):
|
|
|
+ return file_name
|
|
|
+
|
|
|
+ proxies = {
|
|
|
+ "http": None,
|
|
|
+ "https": None
|
|
|
+ }
|
|
|
+ with open(file_name, 'wb') as f:
|
|
|
+ response = requests.get(video_url, proxies=proxies)
|
|
|
+ f.write(response.content)
|
|
|
+ return file_name
|
|
|
+
|
|
|
+
|
|
|
+class GenerateTextFromVideo(object):
|
|
|
+ """
|
|
|
+ 从视频中生成文本
|
|
|
+ """
|
|
|
+ def __init__(self):
|
|
|
+ self.google_ai_api = GoogleAIAPI()
|
|
|
+ self.db = DatabaseConnector(db_config=long_articles_config)
|
|
|
+
|
|
|
+ def connect_db(self):
|
|
|
+ """
|
|
|
+ 连接数据库
|
|
|
+ """
|
|
|
+ self.db.connect()
|
|
|
+
|
|
|
+ def input_task_list(self):
|
|
|
+ """
|
|
|
+ 输入任务列表, 从single_video_pool中获取
|
|
|
+ """
|
|
|
+ sql = f"""
|
|
|
+ select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
|
|
|
+ from publish_single_video_source
|
|
|
+ where audit_status = 1 and bad_status = 0 and extract_status = 0
|
|
|
+ order by id desc;
|
|
|
+ """
|
|
|
+ task_list = self.db.fetch(sql, cursor_type=DictCursor)
|
|
|
+ insert_sql = f"""
|
|
|
+ insert ignore into video_content_understanding
|
|
|
+ (pq_vid, video_ori_title, video_oss_path)
|
|
|
+ values (%s, %s, %s)
|
|
|
+ """
|
|
|
+ affected_rows = self.db.save_many(
|
|
|
+ insert_sql,
|
|
|
+ params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list]
|
|
|
+ )
|
|
|
+ print(affected_rows)
|
|
|
+
|
|
|
+ def upload_video_to_google_ai(self):
|
|
|
+ """
|
|
|
+ 上传视频到Google AI
|
|
|
+ """
|
|
|
+ # 查询出在视频处于PROCESSING状态的视频数量
|
|
|
+ select_sql = "select count(1) as processing_count from video_content_understanding where status = 1;"
|
|
|
+ count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
|
|
|
+ rest_video_count = PROCESSING_MAX_VIDEO_COUNT - count
|
|
|
+ if rest_video_count:
|
|
|
+ sql = f"""select pq_vid, video_oss_path from video_content_understanding where status = 0 limit {rest_video_count};"""
|
|
|
+ task_list = self.db.fetch(sql, cursor_type=DictCursor)
|
|
|
+ for task in tqdm(task_list):
|
|
|
+ file_path = download_file(task['pq_vid'], task['video_oss_path'])
|
|
|
+ google_upload_result = self.google_ai_api.upload_file(file_path)
|
|
|
+
|
|
|
+ if google_upload_result:
|
|
|
+ file_name, file_state, expire_time = google_upload_result
|
|
|
+ update_sql = f"""
|
|
|
+ update video_content_understanding
|
|
|
+ set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
|
|
|
+ where pq_vid = %s;
|
|
|
+ """
|
|
|
+ self.db.save(
|
|
|
+ update_sql,
|
|
|
+ params=(1, file_name, file_state, expire_time, task['pq_vid'])
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ continue
|
|
|
+
|
|
|
+ def get_tasks(self):
|
|
|
+ """
|
|
|
+ 获取处理视频转文本任务
|
|
|
+ """
|
|
|
+ sql = "select pq_vid, file_name from video_content_understanding where status = 1 order by file_expire_time limit 5;"
|
|
|
+ task_list = self.db.fetch(sql, cursor_type=DictCursor)
|
|
|
+ return task_list
|
|
|
+
|
|
|
+ def convert_video_to_text_with_google_ai(self):
|
|
|
+ """
|
|
|
+ 处理视频转文本任务
|
|
|
+ """
|
|
|
+ task_list = self.get_tasks()
|
|
|
+ while task_list:
|
|
|
+ for task in tqdm(task_list, desc="处理视频理解任务"):
|
|
|
+ file_name = task['file_name']
|
|
|
+ google_file = self.google_ai_api.get_google_file(file_name)
|
|
|
+ state = google_file.state.name
|
|
|
+ match state:
|
|
|
+ case 'ACTIVE':
|
|
|
+ try:
|
|
|
+ video_text = self.google_ai_api.get_video_text(
|
|
|
+ prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本。",
|
|
|
+ video_file=google_file
|
|
|
+ )
|
|
|
+ if video_text:
|
|
|
+ update_sql = f"""
|
|
|
+ update video_content_understanding
|
|
|
+ set status = %s, video_text = %s, file_state = %s
|
|
|
+ where pq_vid = %s;
|
|
|
+ """
|
|
|
+ self.db.save(
|
|
|
+ update_sql,
|
|
|
+ params=(2, video_text, state, task['pq_vid'])
|
|
|
+ )
|
|
|
+ os.remove("static/{}.mp4".format(task['pq_vid']))
|
|
|
+ tqdm.write("识别完成,删除本地文件, 等待10s执行下一个任务")
|
|
|
+ task_list.remove(task)
|
|
|
+ except Exception as e:
|
|
|
+ tqdm.write(str(e))
|
|
|
+ continue
|
|
|
+
|
|
|
+ case 'PROCESSING':
|
|
|
+ print("video is still processing")
|
|
|
+ continue
|
|
|
+
|
|
|
+ case 'FAILED':
|
|
|
+ print("video process failed")
|
|
|
+ continue
|
|
|
+ time.sleep(10)
|
|
|
+
|
|
|
+ print("执行完一轮任务,剩余数量:{}".format(len(task_list)))
|
|
|
+ time.sleep(10)
|