Quellcode durchsuchen

video_generate_text

luojunhui vor 4 Monaten
Ursprung
Commit
9668b2fb26

+ 5 - 4
applications/api/google_ai_api.py

@@ -3,6 +3,7 @@
 """
 
 from google import genai
+from tqdm import tqdm
 
 
 class GoogleAIAPI(object):
@@ -17,16 +18,16 @@ class GoogleAIAPI(object):
         """
         file_path: 文件路径
         """
-        print("start uploading file: {}".format(file_path))
+        tqdm.write("start uploading file: {}".format(file_path))
         try:
             video_file = self.client.files.upload(file=file_path)
             file_name = video_file.name
             file_state = video_file.state.name
             expire_time = video_file.expiration_time
-            print("success uploaded file: {}".format(file_path))
+            tqdm.write("success uploaded file: {}".format(file_path))
             return file_name, file_state, expire_time
         except Exception as e:
-            print("fail to upload file: {} because {}".format(file_path, e))
+            tqdm.write("fail to upload file: {} because {}".format(file_path, e))
             return None
 
     def get_file_status(self, file_name: str,):
@@ -61,7 +62,7 @@ class GoogleAIAPI(object):
         video_file: <class 'google.genai.types.File'>
         """
         response = self.client.models.generate_content(
-            model='gemini-1.5-pro',
+            model='gemini-2.0-flash',
             contents=[
                 video_file,
                 prompt

+ 13 - 9
coldStartTasks/multi_modal/generate_text_from_video.py

@@ -13,8 +13,8 @@ from applications.api import GoogleAIAPI
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
-# os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
-# os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
+os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
+os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
 
 PROCESSING_MAX_VIDEO_COUNT = 10
 
@@ -81,10 +81,11 @@ class GenerateTextFromVideo(object):
         select_sql = "select count(1) as processing_count from video_content_understanding where status = 1;"
         count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
         rest_video_count = PROCESSING_MAX_VIDEO_COUNT - count
+        success_upload_count = 0
         if rest_video_count:
             sql = f"""select pq_vid, video_oss_path from video_content_understanding where status = 0 limit {rest_video_count};"""
             task_list = self.db.fetch(sql, cursor_type=DictCursor)
-            for task in tqdm(task_list):
+            for task in tqdm(task_list, desc="upload_video_task"):
                 file_path = download_file(task['pq_vid'], task['video_oss_path'])
                 google_upload_result = self.google_ai_api.upload_file(file_path)
 
@@ -99,9 +100,12 @@ class GenerateTextFromVideo(object):
                         update_sql,
                         params=(1, file_name, file_state, expire_time, task['pq_vid'])
                     )
+                    success_upload_count += 1
                 else:
                     continue
 
+        return success_upload_count
+
     def get_tasks(self):
         """
         获取处理视频转文本任务
@@ -116,7 +120,7 @@ class GenerateTextFromVideo(object):
         """
         task_list = self.get_tasks()
         while task_list:
-            for task in tqdm(task_list, desc="处理视频理解任务"):
+            for task in tqdm(task_list, desc="convert video to text"):
                 file_name = task['file_name']
                 google_file = self.google_ai_api.get_google_file(file_name)
                 state = google_file.state.name
@@ -138,20 +142,20 @@ class GenerateTextFromVideo(object):
                                     params=(2, video_text, state, task['pq_vid'])
                                 )
                                 os.remove("static/{}.mp4".format(task['pq_vid']))
-                                tqdm.write("识别完成,删除本地文件, 等待10s执行下一个任务")
+                                tqdm.write("video transform to text success, delete local file, sleep 1 min...")
                                 task_list.remove(task)
                         except Exception as e:
                             tqdm.write(str(e))
                             continue
 
                     case 'PROCESSING':
-                        print("video is still processing")
+                        tqdm.write("video is still processing")
                         continue
 
                     case 'FAILED':
-                        print("video process failed")
+                        tqdm.write("video process failed")
                         continue
                 time.sleep(10)
 
-            print("执行完一轮任务,剩余数量:{}".format(len(task_list)))
-            time.sleep(10)
+            tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
+            time.sleep(60)

+ 80 - 4
run_video_extract_text.py

@@ -1,11 +1,87 @@
 """
 @author: luojunhui
+@tools: PyCharm MarsCodeAI && DeepSeek
 """
+import sys
+import signal
+import time
+import threading
+
+from tqdm import tqdm
+
 from coldStartTasks.multi_modal import GenerateTextFromVideo
 
 
+class VideoProcessing:
+    """
+    video processing task
+    """
+    def __init__(self):
+        self.generate_text_from_video = GenerateTextFromVideo()
+        self.generate_text_from_video.connect_db()
+        self.running = True
+        self.lock = threading.Lock()
+        self.upload_thread = None
+        self.convert_thread = None
+
+    def upload_task(self):
+        """
+        upload task to google cloud storage
+        """
+        while self.running:
+            with self.lock:
+                tqdm.write("start upload_video_to_google_ai task...")
+                upload_videos_count = self.generate_text_from_video.upload_video_to_google_ai()
+                tqdm.write("upload_video_to_google_ai task completed, total upload_videos_count: {}".format(upload_videos_count))
+            time.sleep(600)
+
+    def convert_task(self):
+        """
+        convert video to text
+        """
+        while self.running:
+            with self.lock:
+                tqdm.write("Starting convert_video_to_text_with_google_ai task...")
+                self.generate_text_from_video.convert_video_to_text_with_google_ai()
+                tqdm.write("convert_video_to_text_with_google_ai() task completed.")
+            time.sleep(10)
+
+    def stop(self):
+        """停止所有线程"""
+        with self.lock:
+            tqdm.write("Stopping threads...")
+            self.running = False  # 设置标志为 False,通知线程退出
+        # 等待线程结束
+        if self.upload_thread:
+            self.upload_thread.join()
+        if self.convert_thread:
+            self.convert_thread.join()
+        tqdm.write("All threads stopped.")
+
+
+def signal_handler(sig, frame):
+    """捕获信号,优雅退出"""
+    video_processing.stop()
+    sys.exit(0)
+
+
 if __name__ == '__main__':
-    generate_text_from_video = GenerateTextFromVideo()
-    generate_text_from_video.connect_db()
-    # generate_text_from_video.upload_video_to_google_ai()
-    generate_text_from_video.convert_video_to_text_with_google_ai()
+    video_processing = VideoProcessing()
+
+    # 创建并启动第一个线程(上传任务)
+    video_processing.upload_thread = threading.Thread(target=video_processing.upload_task)
+    video_processing.upload_thread.daemon = False
+    video_processing.upload_thread.start()
+
+    # 创建并启动第二个线程(转换任务)
+    video_processing.convert_thread = threading.Thread(target=video_processing.convert_task)
+    video_processing.convert_thread.daemon = False
+    video_processing.convert_thread.start()
+
+    # 注册信号处理函数
+    signal.signal(signal.SIGINT, signal_handler)
+    signal.signal(signal.SIGTERM, signal_handler)
+
+    # 主线程保持运行,防止程序退出
+    while video_processing.running:
+        time.sleep(1)