luojunhui vor 2 Wochen
Ursprung
Commit
1f532148be
1 geänderte Dateien mit 242 neuen und 253 gelöschten Zeilen
  1. 242 253
      coldStartTasks/ai_pipeline/extract_video_best_frame.py

+ 242 - 253
coldStartTasks/ai_pipeline/extract_video_best_frame.py

@@ -34,6 +34,24 @@ class ExtractVideoBestFrame:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
+    def _roll_back_lock_tasks(self, task: str) -> int:
+        return roll_back_lock_tasks(
+            db_client=self.db_client,
+            task=task,
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
+        )
+
+    def _lock_task(self, task_id: int, task_name) -> int:
+        return update_task_queue_status(
+            db_client=self.db_client,
+            task_id=task_id,
+            task=task_name,
+            ori_status=const.INIT_STATUS,
+            new_status=const.PROCESSING_STATUS,
+        )
+
     def get_upload_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
         """
         get upload task list
@@ -154,77 +172,67 @@ class ExtractVideoBestFrame:
         )
         return update_rows
 
-    def upload_video_to_gemini_ai(
-        self, max_processing_pool_size: int = const.POOL_SIZE
-    ) -> None:
+    def upload_each_video(self, task: dict) -> None:
+        lock_status = self._lock_task(task_id=task["id"], task_name="upload")
+        if not lock_status:
+            return None
+
+        try:
+            file_path = download_file(task["id"], task["video_oss_path"])
+            upload_response = google_ai.upload_file(file_path)
+            if upload_response:
+                file_name, file_state, expire_time = upload_response
+                self.set_upload_result(
+                    task_id=task["id"],
+                    file_name=file_name,
+                    file_state=file_state,
+                    file_expire_time=expire_time,
+                )
+                return None
+            else:
+                # set status as fail
+                update_task_queue_status(
+                    db_client=self.db_client,
+                    task_id=task["id"],
+                    task="upload",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
+                )
+                return None
+        except Exception as e:
+            log(
+                task=const.TASK_NAME,
+                function="upload_video_to_gemini_ai",
+                message="task_failed",
+                data={
+                    "task_id": task["id"],
+                    "track_back": traceback.format_exc(),
+                    "error": str(e),
+                }
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="upload",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
+            return None
+
+    def upload_video_to_gemini_ai(self, max_processing_pool_size: int = const.POOL_SIZE) -> None:
         # upload video to gemini ai
-        roll_back_lock_tasks_count = roll_back_lock_tasks(
-            db_client=self.db_client,
-            task="upload",
-            init_status=const.INIT_STATUS,
-            processing_status=const.PROCESSING_STATUS,
-            max_process_time=const.MAX_PROCESSING_TIME,
-        )
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="upload")
         log(
             task=const.TASK_NAME,
             function="upload_video_to_gemini_ai",
             message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
         )
-
         processing_task_num = self.get_processing_task_pool_size()
         res_task_num = max_processing_pool_size - processing_task_num
         if res_task_num:
             upload_task_list = self.get_upload_task_list(task_num=res_task_num)
             for task in tqdm(upload_task_list, desc="upload_video_to_gemini_ai"):
-                lock_status = update_task_queue_status(
-                    db_client=self.db_client,
-                    task_id=task["id"],
-                    task="upload",
-                    ori_status=const.INIT_STATUS,
-                    new_status=const.PROCESSING_STATUS,
-                )
-                if not lock_status:
-                    continue
-
-                try:
-                    file_path = download_file(task["id"], task["video_oss_path"])
-                    upload_response = google_ai.upload_file(file_path)
-                    if upload_response:
-                        file_name, file_state, expire_time = upload_response
-                        self.set_upload_result(
-                            task_id=task["id"],
-                            file_name=file_name,
-                            file_state=file_state,
-                            file_expire_time=expire_time,
-                        )
-                    else:
-                        # set status as fail
-                        update_task_queue_status(
-                            db_client=self.db_client,
-                            task_id=task["id"],
-                            task="upload",
-                            ori_status=const.PROCESSING_STATUS,
-                            new_status=const.FAIL_STATUS,
-                        )
-                except Exception as e:
-                    log(
-                        task=const.TASK_NAME,
-                        function="upload_video_to_gemini_ai",
-                        message="task_failed",
-                        data={
-                            "task_id": task["id"],
-                            "track_back": traceback.format_exc(),
-                            "error": str(e),
-                        }
-                    )
-                    update_task_queue_status(
-                        db_client=self.db_client,
-                        task_id=task["id"],
-                        task="upload",
-                        ori_status=const.PROCESSING_STATUS,
-                        new_status=const.FAIL_STATUS,
-                    )
-                    continue
+                self.upload_each_video(task=task)
 
         else:
             log(
@@ -233,175 +241,211 @@ class ExtractVideoBestFrame:
                 message="reach pool size, no more space for task to upload",
             )
 
-    def extract_best_frame_with_gemini_ai(self):
-        # roll back lock tasks
-        roll_back_lock_tasks_count = roll_back_lock_tasks(
-            db_client=self.db_client,
-            task="extract",
-            init_status=const.INIT_STATUS,
-            processing_status=const.PROCESSING_STATUS,
-            max_process_time=const.MAX_PROCESSING_TIME,
-        )
-        log(
-            task=const.TASK_NAME,
-            function="extract_best_frame_with_gemini_ai",
-            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
-        )
-        # do extract frame task
-        task_list = self.get_extract_task_list()
-        for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"):
-            # lock task
-            lock_status = update_task_queue_status(
-                db_client=self.db_client,
-                task_id=task["id"],
-                task="extract",
-                ori_status=const.INIT_STATUS,
-                new_status=const.PROCESSING_STATUS,
-            )
-            if not lock_status:
-                continue
+    def extract_each_video(self, task: dict) -> None:
+        # lock task
+        lock_status = self._lock_task(task_id=task["id"], task_name="extract")
+        if not lock_status:
+            return None
 
-            file_name = task["file_name"]
-            video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"]))
-            try:
-                google_file = google_ai.get_google_file(file_name)
-                state = google_file.state.name
-                match state:
-                    case "PROCESSING":
-                        # google is still processing this video
-                        update_task_queue_status(
-                            db_client=self.db_client,
-                            task_id=task["id"],
-                            task="extract",
-                            ori_status=const.PROCESSING_STATUS,
-                            new_status=const.INIT_STATUS,
+        file_name = task["file_name"]
+        video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"]))
+        try:
+            google_file = google_ai.get_google_file(file_name)
+            state = google_file.state.name
+            match state:
+                case "PROCESSING":
+                    # google is still processing this video
+                    update_task_queue_status(
+                        db_client=self.db_client,
+                        task_id=task["id"],
+                        task="extract",
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.INIT_STATUS,
+                    )
+                    log(
+                        task=const.TASK_NAME,
+                        function="extract_best_frame_with_gemini_ai",
+                        message="google is still processing this video",
+                        data={
+                            "task_id": task["id"],
+                            "file_name": file_name,
+                            "state": state
+                        }
+                    )
+
+                case "FAILED":
+                    # google process this video failed
+                    update_query = f"""
+                        update {const.TABLE_NAME}
+                        set file_state = %s, extract_status = %s, extract_status_ts = %s
+                        where id = %s and extract_status = %s;
+                    """
+                    self.db_client.save(
+                        query=update_query,
+                        params=(
+                            "FAILED",
+                            const.FAIL_STATUS,
+                            datetime.datetime.now(),
+                            task["id"],
+                            const.PROCESSING_STATUS,
+                        ),
+                    )
+                    log(
+                        task=const.TASK_NAME,
+                        function="extract_best_frame_with_gemini_ai",
+                        message="google process this video failed",
+                        data={
+                            "task_id": task["id"],
+                            "file_name": file_name,
+                            "state": state
+                        }
+                    )
+
+                case "ACTIVE":
+                    # video process successfully
+                    try:
+                        best_frame_tims_ms = google_ai.fetch_info_from_google_ai(
+                            prompt=extract_best_frame_prompt(),
+                            video_file=google_file,
                         )
+                        if best_frame_tims_ms:
+                            self.set_extract_result(
+                                task_id=task["id"],
+                                file_state="ACTIVE",
+                                best_frame_tims_ms=best_frame_tims_ms.strip(),
+                            )
+                        else:
+                            update_task_queue_status(
+                                db_client=self.db_client,
+                                task_id=task["id"],
+                                task="extract",
+                                ori_status=const.PROCESSING_STATUS,
+                                new_status=const.FAIL_STATUS,
+                            )
+                        # delete local file and google file
+                        if os.path.exists(video_local_path):
+                            os.remove(video_local_path)
+
+                        google_ai.delete_video(file_name)
                         log(
                             task=const.TASK_NAME,
                             function="extract_best_frame_with_gemini_ai",
-                            message="google is still processing this video",
+                            message="video process successfully",
                             data={
                                 "task_id": task["id"],
                                 "file_name": file_name,
-                                "state": state
+                                "state": state,
+                                "best_frame_tims_ms": best_frame_tims_ms
                             }
                         )
-
-                    case "FAILED":
-                        # google process this video failed
-                        update_query = f"""
-                            update {const.TABLE_NAME}
-                            set file_state = %s, extract_status = %s, extract_status_ts = %s
-                            where id = %s and extract_status = %s;
-                        """
-                        self.db_client.save(
-                            query=update_query,
-                            params=(
-                                "FAILED",
-                                const.FAIL_STATUS,
-                                datetime.datetime.now(),
-                                task["id"],
-                                const.PROCESSING_STATUS,
-                            ),
-                        )
+                    except Exception as e:
                         log(
                             task=const.TASK_NAME,
                             function="extract_best_frame_with_gemini_ai",
-                            message="google process this video failed",
+                            message="task_failed_inside_cycle",
                             data={
                                 "task_id": task["id"],
-                                "file_name": file_name,
-                                "state": state
+                                "track_back": traceback.format_exc(),
+                                "error": str(e),
                             }
                         )
+                        update_task_queue_status(
+                            db_client=self.db_client,
+                            task_id=task["id"],
+                            task="extract",
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.FAIL_STATUS,
+                        )
 
-                    case "ACTIVE":
-                        # video process successfully
-                        try:
-                            best_frame_tims_ms = google_ai.fetch_info_from_google_ai(
-                                prompt=extract_best_frame_prompt(),
-                                video_file=google_file,
-                            )
-                            if best_frame_tims_ms:
-                                self.set_extract_result(
-                                    task_id=task["id"],
-                                    file_state="ACTIVE",
-                                    best_frame_tims_ms=best_frame_tims_ms.strip(),
-                                )
-                            else:
-                                update_task_queue_status(
-                                    db_client=self.db_client,
-                                    task_id=task["id"],
-                                    task="extract",
-                                    ori_status=const.PROCESSING_STATUS,
-                                    new_status=const.FAIL_STATUS,
-                                )
-                            # delete local file and google file
-                            if os.path.exists(video_local_path):
-                                os.remove(video_local_path)
+        except Exception as e:
+            log(
+                task=const.TASK_NAME,
+                function="extract_best_frame_with_gemini_ai",
+                message="task_failed_outside_cycle",
+                data={
+                    "task_id": task["id"],
+                    "track_back": traceback.format_exc(),
+                    "error": str(e),
+                }
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="extract",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
 
-                            google_ai.delete_video(file_name)
-                            log(
-                                task=const.TASK_NAME,
-                                function="extract_best_frame_with_gemini_ai",
-                                message="video process successfully",
-                                data={
-                                    "task_id": task["id"],
-                                    "file_name": file_name,
-                                    "state": state,
-                                    "best_frame_tims_ms": best_frame_tims_ms
-                                }
-                            )
-                        except Exception as e:
-                            log(
-                                task=const.TASK_NAME,
-                                function="extract_best_frame_with_gemini_ai",
-                                message="task_failed_inside_cycle",
-                                data={
-                                    "task_id": task["id"],
-                                    "track_back": traceback.format_exc(),
-                                    "error": str(e),
-                                }
-                            )
-                            update_task_queue_status(
-                                db_client=self.db_client,
-                                task_id=task["id"],
-                                task="extract",
-                                ori_status=const.PROCESSING_STATUS,
-                                new_status=const.FAIL_STATUS,
-                            )
+    def extract_best_frame_with_gemini_ai(self):
+        # roll back lock tasks
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="extract")
+        log(
+            task=const.TASK_NAME,
+            function="extract_best_frame_with_gemini_ai",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
+        # do extract frame task
+        task_list = self.get_extract_task_list()
+        for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"):
+            self.extract_each_video(task=task)
 
-            except Exception as e:
-                log(
-                    task=const.TASK_NAME,
-                    function="extract_best_frame_with_gemini_ai",
-                    message="task_failed_outside_cycle",
-                    data={
-                        "task_id": task["id"],
-                        "track_back": traceback.format_exc(),
-                        "error": str(e),
-                    }
-                )
+    def get_each_cover(self, task: dict) -> None:
+        lock_status = self._lock_task(task_id=task["id"], task_name="get_cover")
+        if not lock_status:
+            return None
+
+        time_str = normalize_time_str(task["best_frame_time_ms"])
+        if time_str:
+            response = get_video_cover(
+                video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
+            )
+            log(
+                task=const.TASK_NAME,
+                function="extract_cover_with_ffmpeg",
+                message="get_video_cover_with_ffmpeg",
+                data={
+                    "task_id": task["id"],
+                    "video_oss_path": task["video_oss_path"],
+                    "time_millisecond_str": time_str,
+                    "response": response
+                }
+            )
+            if response["success"] and response["data"]:
+                cover_oss_path = response["data"]
+                self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path)
+            else:
                 update_task_queue_status(
                     db_client=self.db_client,
                     task_id=task["id"],
-                    task="extract",
+                    task="get_cover",
                     ori_status=const.PROCESSING_STATUS,
                     new_status=const.FAIL_STATUS,
                 )
+        else:
+            log(
+                task=const.TASK_NAME,
+                function="extract_cover_with_ffmpeg",
+                message="time_str format is not correct",
+                data={
+                    "task_id": task["id"],
+                    "video_oss_path": task["video_oss_path"],
+                    "time_millisecond_str": time_str
+                }
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="get_cover",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
 
     def get_cover_with_best_frame(self):
         """
         get cover with best frame
         """
         # roll back lock tasks
-        roll_back_lock_tasks_count = roll_back_lock_tasks(
-            db_client=self.db_client,
-            task="get_cover",
-            init_status=const.INIT_STATUS,
-            processing_status=const.PROCESSING_STATUS,
-            max_process_time=const.MAX_PROCESSING_TIME,
-        )
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="get_cover")
         log(
             task=const.TASK_NAME,
             function="extract_cover_with_ffmpeg",
@@ -410,59 +454,4 @@ class ExtractVideoBestFrame:
         # get task list
         task_list = self.get_cover_task_list()
         for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"):
-            # lock task
-            lock_status = update_task_queue_status(
-                db_client=self.db_client,
-                task_id=task["id"],
-                task="get_cover",
-                ori_status=const.INIT_STATUS,
-                new_status=const.PROCESSING_STATUS,
-            )
-            if not lock_status:
-                continue
-
-            time_str = normalize_time_str(task["best_frame_time_ms"])
-            if time_str:
-                response = get_video_cover(
-                    video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
-                )
-                log(
-                    task=const.TASK_NAME,
-                    function="extract_cover_with_ffmpeg",
-                    message="get_video_cover_with_ffmpeg",
-                    data={
-                        "task_id": task["id"],
-                        "video_oss_path": task["video_oss_path"],
-                        "time_millisecond_str": time_str,
-                        "response": response
-                    }
-                )
-                if response["success"] and response["data"]:
-                    cover_oss_path = response["data"]
-                    self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path)
-                else:
-                    update_task_queue_status(
-                        db_client=self.db_client,
-                        task_id=task["id"],
-                        task="get_cover",
-                        ori_status=const.PROCESSING_STATUS,
-                        new_status=const.FAIL_STATUS,
-                    )
-            else:
-                log(
-                    task=const.TASK_NAME,
-                    function="extract_cover_with_ffmpeg",
-                    message="time_str format is not correct",
-                    data={
-                        "task_id": task["id"],
-                        "video_oss_path": task["video_oss_path"],
-                        "time_millisecond_str": time_str
-                    }
-                )
-                update_task_queue_status(
-                    db_client=self.db_client,
-                    task_id=task["id"],
-                    task="get_cover",
-                    ori_status=const.PROCESSING_STATUS,
-                    new_status=const.FAIL_STATUS,
-                )
+            self.get_each_cover(task=task)