luojunhui 2 weeks ago
parent
commit
e2b250bc5a

+ 6 - 0
applications/const/__init__.py

@@ -404,3 +404,9 @@ class GoogleVideoUnderstandTaskConst:
 
     # max processing time
     MAX_PROCESSING_TIME = 3600
+
+    # task info
+    TABLE_NAME = "long_articles_new_video_cover"
+    TASK_NAME = "extract_video_best_frame_as_cover"
+    DIR_NAME = "static"
+    POOL_SIZE = 15

+ 145 - 40
coldStartTasks/ai_pipeline/extract_video_best_frame.py

@@ -5,9 +5,11 @@
 
 import os
 import datetime
+import traceback
 from tqdm import tqdm
 from pymysql.cursors import DictCursor
 
+from applications import log
 from applications.api import GoogleAIAPI
 from applications.const import GoogleVideoUnderstandTaskConst
 from applications.db import DatabaseConnector
@@ -20,9 +22,6 @@ from coldStartTasks.ai_pipeline.basic import get_video_cover
 from coldStartTasks.ai_pipeline.basic import normalize_time_str
 
 const = GoogleVideoUnderstandTaskConst()
-table_name = "long_articles_new_video_cover"
-dir_name = "static"
-POOL_SIZE = 10
 google_ai = GoogleAIAPI()
 
 
@@ -35,13 +34,14 @@ class ExtractVideoBestFrame:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
-    def get_upload_task_list(self, task_num: int = POOL_SIZE) -> list[dict]:
+    def get_upload_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
         """
         get upload task list
         """
         fetch_query = f"""
-            select id, video_oss_path from {table_name} 
-            where upload_status = {const.INIT_STATUS} and priority = 1
+            select id, video_oss_path from {const.TABLE_NAME} 
+            where upload_status = {const.INIT_STATUS}
+            order by priority desc
             limit {task_num};
         """
         upload_task_list = self.db_client.fetch(
@@ -49,12 +49,12 @@ class ExtractVideoBestFrame:
         )
         return upload_task_list
 
-    def get_extract_task_list(self, task_num: int = POOL_SIZE) -> list[dict]:
+    def get_extract_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
         """
         get extract task list
         """
         fetch_query = f"""
-            select id, file_name from {table_name}
+            select id, file_name from {const.TABLE_NAME} 
             where upload_status = {const.SUCCESS_STATUS} and extract_status = {const.INIT_STATUS}
             order by file_expire_time
             limit {task_num};
@@ -69,7 +69,7 @@ class ExtractVideoBestFrame:
         get cover task list
         """
         fetch_query = f"""
-            select id, video_oss_path, best_frame_time_ms from {table_name}
+            select id, video_oss_path, best_frame_time_ms from {const.TABLE_NAME}
             where extract_status = {const.SUCCESS_STATUS} and get_cover_status = {const.INIT_STATUS};
             """
         extract_task_list = self.db_client.fetch(
@@ -82,7 +82,7 @@ class ExtractVideoBestFrame:
         get processing task pool size
         """
         fetch_query = f"""
-            select count(1) as pool_size from {table_name}
+            select count(1) as pool_size from {const.TABLE_NAME}
             where upload_status = {const.SUCCESS_STATUS} and file_state = 'PROCESSING' and extract_status = {const.INIT_STATUS};
         """
         fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
@@ -95,7 +95,7 @@ class ExtractVideoBestFrame:
         self, task_id: int, file_name: str, file_state: str, file_expire_time: str
     ) -> int:
         update_query = f"""
-            update {table_name} 
+            update {const.TABLE_NAME} 
             set upload_status = %s, upload_status_ts = %s,
                 file_name = %s, file_state = %s, file_expire_time = %s
             where id = %s and upload_status = %s;
@@ -118,7 +118,7 @@ class ExtractVideoBestFrame:
         self, task_id: int, file_state: str, best_frame_tims_ms: str
     ) -> int:
         update_query = f"""
-            update {table_name} 
+            update {const.TABLE_NAME} 
             set extract_status = %s, extract_status_ts = %s,
                 file_state = %s, best_frame_time_ms = %s
             where id = %s and extract_status = %s;
@@ -136,8 +136,26 @@ class ExtractVideoBestFrame:
         )
         return update_rows
 
+    def set_cover_result(self, task_id: int, cover_oss_path: str) -> int:
+        update_query = f"""
+            update {const.TABLE_NAME}
+            set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s
+            where id = %s and get_cover_status = %s;
+        """
+        update_rows = self.db_client.save(
+            query=update_query,
+            params=(
+                cover_oss_path,
+                const.SUCCESS_STATUS,
+                datetime.datetime.now(),
+                task_id,
+                const.PROCESSING_STATUS,
+            )
+        )
+        return update_rows
+
     def upload_video_to_gemini_ai(
-        self, max_processing_pool_size: int = POOL_SIZE
+        self, max_processing_pool_size: int = const.POOL_SIZE
     ) -> None:
         # upload video to gemini ai
         roll_back_lock_tasks_count = roll_back_lock_tasks(
@@ -147,7 +165,11 @@ class ExtractVideoBestFrame:
             processing_status=const.PROCESSING_STATUS,
             max_process_time=const.MAX_PROCESSING_TIME,
         )
-        print("roll_back_lock_tasks_count", roll_back_lock_tasks_count)
+        log(
+            task=const.TASK_NAME,
+            function="upload_video_to_gemini_ai",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
 
         processing_task_num = self.get_processing_task_pool_size()
         res_task_num = max_processing_pool_size - processing_task_num
@@ -185,7 +207,16 @@ class ExtractVideoBestFrame:
                             new_status=const.FAIL_STATUS,
                         )
                 except Exception as e:
-                    print(f"download_file error: {e}")
+                    log(
+                        task=const.TASK_NAME,
+                        function="upload_video_to_gemini_ai",
+                        message="task_failed",
+                        data={
+                            "task_id": task["id"],
+                            "track_back": traceback.format_exc(),
+                            "error": str(e),
+                        }
+                    )
                     update_task_queue_status(
                         db_client=self.db_client,
                         task_id=task["id"],
@@ -196,7 +227,11 @@ class ExtractVideoBestFrame:
                     continue
 
         else:
-            print("Processing task pool is full")
+            log(
+                task=const.TASK_NAME,
+                function="upload_video_to_gemini_ai",
+                message="reach pool size, no more space for task to upload",
+            )
 
     def extract_best_frame_with_gemini_ai(self):
         # roll back lock tasks
@@ -207,8 +242,11 @@ class ExtractVideoBestFrame:
             processing_status=const.PROCESSING_STATUS,
             max_process_time=const.MAX_PROCESSING_TIME,
         )
-        print("roll_back_lock_tasks_count", roll_back_lock_tasks_count)
-
+        log(
+            task=const.TASK_NAME,
+            function="extract_best_frame_with_gemini_ai",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
         # do extract frame task
         task_list = self.get_extract_task_list()
         for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"):
@@ -224,7 +262,7 @@ class ExtractVideoBestFrame:
                 continue
 
             file_name = task["file_name"]
-            video_local_path = os.path.join(dir_name, "{}.mp4".format(task["id"]))
+            video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"]))
             try:
                 google_file = google_ai.get_google_file(file_name)
                 state = google_file.state.name
@@ -238,16 +276,25 @@ class ExtractVideoBestFrame:
                             ori_status=const.PROCESSING_STATUS,
                             new_status=const.INIT_STATUS,
                         )
-                        print("this video is still processing")
+                        log(
+                            task=const.TASK_NAME,
+                            function="extract_best_frame_with_gemini_ai",
+                            message="google is still processing this video",
+                            data={
+                                "task_id": task["id"],
+                                "file_name": file_name,
+                                "state": state
+                            }
+                        )
 
                     case "FAILED":
                         # google process this video failed
                         update_query = f"""
-                            update {table_name}
+                            update {const.TABLE_NAME}
                             set file_state = %s, extract_status = %s, extract_status_ts = %s
                             where id = %s and extract_status = %s;
                         """
-                        update_rows = self.db_client.save(
+                        self.db_client.save(
                             query=update_query,
                             params=(
                                 "FAILED",
@@ -257,6 +304,16 @@ class ExtractVideoBestFrame:
                                 const.PROCESSING_STATUS,
                             ),
                         )
+                        log(
+                            task=const.TASK_NAME,
+                            function="extract_best_frame_with_gemini_ai",
+                            message="google process this video failed",
+                            data={
+                                "task_id": task["id"],
+                                "file_name": file_name,
+                                "state": state
+                            }
+                        )
 
                     case "ACTIVE":
                         # video process successfully
@@ -284,8 +341,28 @@ class ExtractVideoBestFrame:
                                 os.remove(video_local_path)
 
                             google_ai.delete_video(file_name)
+                            log(
+                                task=const.TASK_NAME,
+                                function="extract_best_frame_with_gemini_ai",
+                                message="video process successfully",
+                                data={
+                                    "task_id": task["id"],
+                                    "file_name": file_name,
+                                    "state": state,
+                                    "best_frame_tims_ms": best_frame_tims_ms
+                                }
+                            )
                         except Exception as e:
-                            print(e)
+                            log(
+                                task=const.TASK_NAME,
+                                function="extract_best_frame_with_gemini_ai",
+                                message="task_failed_inside_cycle",
+                                data={
+                                    "task_id": task["id"],
+                                    "track_back": traceback.format_exc(),
+                                    "error": str(e),
+                                }
+                            )
                             update_task_queue_status(
                                 db_client=self.db_client,
                                 task_id=task["id"],
@@ -295,7 +372,16 @@ class ExtractVideoBestFrame:
                             )
 
             except Exception as e:
-                print(f"update_task_queue_status error: {e}")
+                log(
+                    task=const.TASK_NAME,
+                    function="extract_best_frame_with_gemini_ai",
+                    message="task_failed_outside_cycle",
+                    data={
+                        "task_id": task["id"],
+                        "track_back": traceback.format_exc(),
+                        "error": str(e),
+                    }
+                )
                 update_task_queue_status(
                     db_client=self.db_client,
                     task_id=task["id"],
@@ -308,6 +394,19 @@ class ExtractVideoBestFrame:
         """
         get cover with best frame
         """
+        # roll back lock tasks
+        roll_back_lock_tasks_count = roll_back_lock_tasks(
+            db_client=self.db_client,
+            task="get_cover",
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
+        )
+        log(
+            task=const.TASK_NAME,
+            function="extract_cover_with_ffmpeg",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
         # get task list
         task_list = self.get_cover_task_list()
         for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"):
@@ -327,24 +426,20 @@ class ExtractVideoBestFrame:
                 response = get_video_cover(
                     video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
                 )
-                print(response)
+                log(
+                    task=const.TASK_NAME,
+                    function="extract_cover_with_ffmpeg",
+                    message="get_video_cover_with_ffmpeg",
+                    data={
+                        "task_id": task["id"],
+                        "video_oss_path": task["video_oss_path"],
+                        "time_millisecond_str": time_str,
+                        "response": response
+                    }
+                )
                 if response["success"] and response["data"]:
                     cover_oss_path = response["data"]
-                    update_query = f"""
-                        update {table_name}
-                        set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s
-                        where id = %s and get_cover_status = %s;
-                    """
-                    update_rows = self.db_client.save(
-                        query=update_query,
-                        params=(
-                            cover_oss_path,
-                            const.SUCCESS_STATUS,
-                            datetime.datetime.now(),
-                            task["id"],
-                            const.PROCESSING_STATUS,
-                        ),
-                    )
+                    self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path)
                 else:
                     update_task_queue_status(
                         db_client=self.db_client,
@@ -354,6 +449,16 @@ class ExtractVideoBestFrame:
                         new_status=const.FAIL_STATUS,
                     )
             else:
+                log(
+                    task=const.TASK_NAME,
+                    function="extract_cover_with_ffmpeg",
+                    message="time_str format is not correct",
+                    data={
+                        "task_id": task["id"],
+                        "video_oss_path": task["video_oss_path"],
+                        "time_millisecond_str": time_str
+                    }
+                )
                 update_task_queue_status(
                     db_client=self.db_client,
                     task_id=task["id"],

+ 4 - 1
run_video_understanding_with_google.py

@@ -10,6 +10,7 @@ from coldStartTasks.ai_pipeline import ExtractVideoBestFrame
 
 PROCESS_EXIT_TIMEOUT = 10 * 60
 
+
 def start_task():
     task = ExtractVideoBestFrame()
 
@@ -45,7 +46,9 @@ def main():
     process.join(PROCESS_EXIT_TIMEOUT)
 
     if process.is_alive():
-        print(f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating...")
+        print(
+            f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating..."
+        )
         process.terminate()
         process.join()