luojunhui 3 ヶ月 前
コミット
a9b7d4e15a

+ 4 - 5
applications/const/__init__.py

@@ -228,11 +228,10 @@ class VideoToTextConst:
     AUDIT_SUCCESS_STATUS = 1
 
     # video understand status
-    VIDEO_UNDERSTAND_INIT_STATUS = 0
-    VIDEO_UNDERSTAND_PROCESSING_STATUS = 1
-    VIDEO_UNDERSTAND_SUCCESS_STATUS = 2
-    VIDEO_UNDERSTAND_FAIL_STATUS = 99
-    VIDEO_LOCK = 101
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
 
     # sleep seconds
     SLEEP_SECONDS = 60

+ 77 - 97
coldStartTasks/multi_modal/generate_text_from_video.py

@@ -61,62 +61,42 @@ class GenerateTextFromVideo(object):
         """
         self.db.connect()
 
-    def input_task_list(self):
-        """
-        暂时用于处理历史任务, 新的视频在插入publish_single_video_source后会直接插入video_content_understanding表中
-        """
-        sql = f"""
-            select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
-            from publish_single_video_source 
-            where audit_status = {const.AUDIT_SUCCESS_STATUS} and bad_status = {const.ARTICLE_GOOD_STATUS}
-            order by id desc;
-        """
-        task_list = self.db.fetch(sql, cursor_type=DictCursor)
-        insert_sql = f"""
-        insert ignore into video_content_understanding
-            (pq_vid, video_ori_title, video_oss_path)
-        values (%s, %s, %s);
-        """
-        affected_rows = self.db.save_many(
-            insert_sql,
-            params_list=[(i['audit_video_id'], i['article_title'], i['video_url']) for i in task_list]
-        )
-        print(affected_rows)
-
-    def roll_back_lock_tasks(self):
+    def update_task_status(self, task_id, process, ori_status, new_status):
         """
         回滚长时间处于处理中的任务
         """
+        match process:
+            case "upload":
+                status = 'upload_status'
+                update_timestamp = 'upload_status_ts'
+            case "understanding":
+                status = 'understanding_status'
+                update_timestamp = 'understanding_status_ts'
+            case "summary":
+                status = 'summary_status'
+                update_timestamp = 'summary_status_ts'
+            case "rewrite":
+                status = 'rewrite_status'
+                update_timestamp = 'rewrite_status_ts'
+            case _:
+                raise ValueError(f"Unexpected task: {process}")
+
         update_sql = f"""
             update video_content_understanding 
-            set status = %s
-            where status = %s and status_update_timestamp < %s;
+            set {status} = %s, {update_timestamp} = %s
+            where {status} = %s and id = %s;
         """
         roll_back_rows = self.db.save(
             query=update_sql,
             params=(
-                const.VIDEO_UNDERSTAND_INIT_STATUS,
-                const.VIDEO_LOCK,
-                int(time.time()) - const.MAX_PROCESSING_TIME
+                new_status,
+                int(time.time()),
+                ori_status,
+                task_id,
             )
         )
         return roll_back_rows
 
-    def update_video_status(self, ori_status, new_status, pq_vid):
-        """
-        更新视频状态
-        """
-        sql = f"""
-            update video_content_understanding
-            set status = %s, status_update_timestamp = %s
-            WHERE pq_vid = %s and status = %s;
-        """
-        affected_rows = self.db.save(
-            query=sql,
-            params=(new_status, int(time.time()), pq_vid, ori_status)
-        )
-        return affected_rows
-
     def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
         """
         上传视频到Google AI
@@ -126,52 +106,56 @@ class GenerateTextFromVideo(object):
         1: 处理中
         2: 处理完成
         """
-        # 查询出在视频处于PROCESSING状态的视频数量
         select_sql = f"""
             select count(1) as processing_count 
             from video_content_understanding 
-            where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS};
+            where understanding_status = {const.PROCESSING_STATUS};
         """
         count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
         rest_video_count = max_processing_video_count - count
         success_upload_count = 0
         if rest_video_count:
             sql = f"""
-                select pq_vid, video_oss_path 
+                select id, video_oss_path
                 from video_content_understanding 
-                where status = {const.VIDEO_UNDERSTAND_INIT_STATUS} 
-                order by id desc
+                where status = {const.INIT_STATUS}
                 limit {rest_video_count};
             """
-
             task_list = self.db.fetch(sql, cursor_type=DictCursor)
             for task in tqdm(task_list, desc="upload_video_task"):
-                lock_rows = self.update_video_status(
-                    ori_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
-                    new_status=const.VIDEO_LOCK,
-                    pq_vid=task['pq_vid'],
+                lock_rows = self.update_task_status(
+                    task_id=task['id'],
+                    process='upload',
+                    ori_status=const.INIT_STATUS,
+                    new_status=const.PROCESSING_STATUS
                 )
                 if not lock_rows:
                     continue
                 try:
-                    file_path = download_file(task['pq_vid'], task['video_oss_path'])
+                    file_path = download_file(task['id'], task['video_oss_path'])
                     google_upload_result = self.google_ai_api.upload_file(file_path)
                     if google_upload_result:
                         file_name, file_state, expire_time = google_upload_result
                         update_sql = f"""
                             update video_content_understanding
-                            set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
-                            where pq_vid = %s and status = %s;
+                            set 
+                                upload_status = %s, 
+                                upload_status_ts = %s, 
+                                file_name = %s, 
+                                file_state = %s, 
+                                file_expire_time = %s
+                            where id = %s and upload_status = %s;
                         """
                         self.db.save(
                             update_sql,
                             params=(
-                                const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                                const.SUCCESS_STATUS,
+                                int(time.time()),
                                 file_name,
                                 file_state,
                                 expire_time,
-                                task['pq_vid'],
-                                const.VIDEO_LOCK
+                                task['id'],
+                                const.PROCESSING_STATUS
                             )
                         )
                         success_upload_count += 1
@@ -180,10 +164,11 @@ class GenerateTextFromVideo(object):
                     print("task upload failed because of {}".format(e))
                     print("trace_back: ", traceback.format_exc())
                     # roll back status
-                    self.update_video_status(
-                        ori_status=const.VIDEO_LOCK,
-                        new_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
-                        pq_vid=task['pq_vid'],
+                    self.update_task_status(
+                        task_id=task['id'],
+                        process='upload',
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.FAIL_STATUS
                     )
 
         return success_upload_count
@@ -199,10 +184,10 @@ class GenerateTextFromVideo(object):
         获取处理视频转文本任务
         """
         sql = f"""
-            select pq_vid, file_name 
+            select id, file_name 
             from video_content_understanding 
-            where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS} 
-            order by file_expire_time 
+            where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} 
+            order by file_expire_time
             limit {BATCH_SIZE};
         """
         task_list = self.db.fetch(sql, cursor_type=DictCursor)
@@ -212,23 +197,22 @@ class GenerateTextFromVideo(object):
         """
         处理视频转文本任务
         """
-        self.roll_back_lock_tasks()
-
         task_list = self.get_task_list()
         while task_list:
             for task in tqdm(task_list, desc="convert video to text"):
                 print(task['pq_vid'], task['file_name'])
                 # LOCK TASK
-                lock_row = self.update_video_status(
-                    ori_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
-                    new_status=const.VIDEO_LOCK,
-                    pq_vid=task['pq_vid'],
+                lock_row = self.update_task_status(
+                    task_id=task['id'],
+                    process='understanding',
+                    ori_status=const.INIT_STATUS,
+                    new_status=const.PROCESSING_STATUS
                 )
                 if not lock_row:
                     print("Lock")
                     continue
                 file_name = task['file_name']
-                video_local_path = "static/{}.mp4".format(task['pq_vid'])
+                video_local_path = "static/{}.mp4".format(task['id'])
                 google_file = self.google_ai_api.get_google_file(file_name)
                 state = google_file.state.name
                 match state:
@@ -241,17 +225,17 @@ class GenerateTextFromVideo(object):
                             if video_text:
                                 update_sql = f"""
                                     update video_content_understanding
-                                    set status = %s, video_text = %s, file_state = %s
-                                    where pq_vid = %s and status = %s;
+                                    set understanding_status = %s, video_text = %s, file_state = %s
+                                    where id = %s and understanding_status = %s;
                                 """
                                 self.db.save(
                                     update_sql,
                                     params=(
-                                        const.VIDEO_UNDERSTAND_SUCCESS_STATUS,
+                                        const.SUCCESS_STATUS,
                                         video_text,
                                         state,
-                                        task['pq_vid'],
-                                        const.VIDEO_LOCK
+                                        task['id'],
+                                        const.PROCESSING_STATUS
                                     )
                                 )
                                 # delete local file and google file
@@ -265,37 +249,33 @@ class GenerateTextFromVideo(object):
                                 tqdm.write("delete video from google success: {}".format(file_name))
                             else:
                                 # roll back status
-                                self.update_video_status(
-                                    ori_status=const.VIDEO_LOCK,
-                                    new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
-                                    pq_vid=task['pq_vid'],
+                                self.update_task_status(
+                                    task_id=task['id'],
+                                    process='understanding',
+                                    ori_status=const.PROCESSING_STATUS,
+                                    new_status=const.INIT_STATUS
                                 )
                         except Exception as e:
                             # roll back status
-                            self.update_video_status(
-                                ori_status=const.VIDEO_LOCK,
-                                new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
-                                pq_vid=task['pq_vid'],
+                            self.update_task_status(
+                                task_id=task['id'],
+                                process='understanding',
+                                ori_status=const.PROCESSING_STATUS,
+                                new_status=const.FAIL_STATUS
                             )
                             tqdm.write(str(e))
                             continue
 
                     case 'PROCESSING':
                         tqdm.write("video is still processing")
-                        # roll back status
-                        self.update_video_status(
-                            ori_status=const.VIDEO_LOCK,
-                            new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
-                            pq_vid=task['pq_vid'],
-                        )
 
                     case 'FAILED':
-                        self.update_video_status(
-                            ori_status=const.VIDEO_LOCK,
-                            new_status=const.VIDEO_UNDERSTAND_FAIL_STATUS,
-                            pq_vid=task['pq_vid']
+                        self.update_task_status(
+                            task_id=task['id'],
+                            process='understanding',
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.FAIL_STATUS
                         )
-
                         if os.path.exists(video_local_path):
                             os.remove(video_local_path)