소스 검색

add const, apollo_config, lock
to video_extract_task

luojunhui 4 달 전
부모
커밋
82a619ff4a
2개의 변경된 파일163개의 추가작업 그리고 39개의 파일을 삭제
  1. 28 0
      applications/const/__init__.py
  2. 135 39
      coldStartTasks/multi_modal/generate_text_from_video.py

+ 28 - 0
applications/const/__init__.py

@@ -205,3 +205,31 @@ class ArticleCollectorConst:
     ARTICLE_SUCCESS_CODE = 0
     ARTICLE_UNKNOWN_CODE = 10000
 
+
+# 视频转文本任务
+class VideoToTextConst:
+    """
+    视频转文本任务常量配置
+    """
+    # extract_status 是否提取文本状态
+    EXTRACT_INIT_STATUS = 0
+    EXTRACT_PROCESSING_STATUS = 101
+    EXTRACT_SUCCESS_STATUS = 2
+    EXTRACT_FAIL_STATUS = 99
+
+    # bad_status 文章质量状态
+    ARTICLE_GOOD_STATUS = 0
+
+    # audit_status 文章审核状态
+    AUDIT_SUCCESS_STATUS = 1
+
+    # video understand status
+    VIDEO_UNDERSTAND_INIT_STATUS = 0
+    VIDEO_UNDERSTAND_PROCESSING_STATUS = 1
+    VIDEO_UNDERSTAND_SUCCESS_STATUS = 2
+    VIDEO_UNDERSTAND_FAIL_STATUS = 99
+    VIDEO_LOCK = 101
+
+    # sleep seconds
+    SLEEP_SECONDS = 60
+

+ 135 - 39
coldStartTasks/multi_modal/generate_text_from_video.py

@@ -1,9 +1,9 @@
 """
 @author: luojunhui
-todo: 加上多进程锁
 """
 import os
 import time
+import traceback
 
 import requests
 
@@ -11,13 +11,23 @@ from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
 from applications.api import GoogleAIAPI
+from applications.const import VideoToTextConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
+from config import apolloConfig
 
 # 办公室网络调试需要打开代理
 # os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
 # os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
 
+const = VideoToTextConst()
+config = apolloConfig(env="prod")
+
+# pool_size
+POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
+# batch_size
+BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
+
 
 def download_file(pq_vid, video_url):
     """
@@ -58,14 +68,14 @@ class GenerateTextFromVideo(object):
         sql = f"""
         select article_title, concat('https://rescdn.yishihui.com/', video_oss_path ) as video_url, audit_video_id
         from publish_single_video_source 
-        where audit_status = 1 and bad_status = 0 and extract_status = 0
+        where audit_status = {const.AUDIT_SUCCESS_STATUS} and bad_status = {const.ARTICLE_GOOD_STATUS} and extract_status = {const.EXTRACT_INIT_STATUS}
         order by id desc;
         """
         task_list = self.db.fetch(sql, cursor_type=DictCursor)
         insert_sql = f"""
         insert ignore into video_content_understanding
             (pq_vid, video_ori_title, video_oss_path)
-        values (%s, %s, %s)
+        values (%s, %s, %s);
         """
         affected_rows = self.db.save_many(
             insert_sql,
@@ -73,7 +83,22 @@ class GenerateTextFromVideo(object):
         )
         print(affected_rows)
 
-    def upload_video_to_google_ai(self, max_processing_video_count=20):
+    def update_video_status(self, ori_status, new_status, pq_vid):
+        """
+        更新视频状态
+        """
+        sql = f"""
+            update video_content_understanding
+            set status = %s
+            WHERE pq_vid = %s and status = %s;
+        """
+        affected_rows = self.db.save(
+            query=sql,
+            params=(new_status, pq_vid, ori_status)
+        )
+        return affected_rows
+
+    def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
         """
         上传视频到Google AI
         max_processing_video_count: 处理中的最大视频数量,默认20
@@ -83,31 +108,63 @@ class GenerateTextFromVideo(object):
         2: 处理完成
         """
         # 查询出在视频处于PROCESSING状态的视频数量
-        select_sql = "select count(1) as processing_count from video_content_understanding where status = 1;"
+        select_sql = f"""
+            select count(1) as processing_count 
+            from video_content_understanding 
+            where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS};
+        """
         count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
         rest_video_count = max_processing_video_count - count
         success_upload_count = 0
         if rest_video_count:
-            sql = f"""select pq_vid, video_oss_path from video_content_understanding where status = 0 limit {rest_video_count};"""
+            sql = f"""
+                select pq_vid, video_oss_path 
+                from video_content_understanding 
+                where status = {const.VIDEO_UNDERSTAND_INIT_STATUS} 
+                limit {rest_video_count};
+            """
+
             task_list = self.db.fetch(sql, cursor_type=DictCursor)
             for task in tqdm(task_list, desc="upload_video_task"):
-                file_path = download_file(task['pq_vid'], task['video_oss_path'])
-                google_upload_result = self.google_ai_api.upload_file(file_path)
-
-                if google_upload_result:
-                    file_name, file_state, expire_time = google_upload_result
-                    update_sql = f"""
-                        update video_content_understanding
-                        set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
-                        where pq_vid = %s;
-                    """
-                    self.db.save(
-                        update_sql,
-                        params=(1, file_name, file_state, expire_time, task['pq_vid'])
-                    )
-                    success_upload_count += 1
-                else:
+                lock_rows = self.update_video_status(
+                    ori_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
+                    new_status=const.VIDEO_LOCK,
+                    pq_vid=task['pq_vid'],
+                )
+                if not lock_rows:
                     continue
+                try:
+                    file_path = download_file(task['pq_vid'], task['video_oss_path'])
+                    google_upload_result = self.google_ai_api.upload_file(file_path)
+                    if google_upload_result:
+                        file_name, file_state, expire_time = google_upload_result
+                        update_sql = f"""
+                            update video_content_understanding
+                            set status = %s, file_name = %s, file_state = %s, file_expire_time = %s
+                            where pq_vid = %s and status = %s;
+                        """
+                        self.db.save(
+                            update_sql,
+                            params=(
+                                const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                                file_name,
+                                file_state,
+                                expire_time,
+                                task['pq_vid'],
+                                const.VIDEO_LOCK
+                            )
+                        )
+                        success_upload_count += 1
+
+                except Exception as e:
+                    print("task upload failed because of {}".format(e))
+                    print("trace_back: ", traceback.format_exc())
+                    # roll back status
+                    self.update_video_status(
+                        ori_status=const.VIDEO_LOCK,
+                        new_status=const.VIDEO_UNDERSTAND_INIT_STATUS,
+                        pq_vid=task['pq_vid'],
+                    )
 
         return success_upload_count
 
@@ -121,7 +178,13 @@ class GenerateTextFromVideo(object):
         """
         获取处理视频转文本任务
         """
-        sql = "select pq_vid, file_name from video_content_understanding where status = 1 order by file_expire_time limit 10;"
+        sql = f"""
+            select pq_vid, file_name 
+            from video_content_understanding 
+            where status = {const.VIDEO_UNDERSTAND_PROCESSING_STATUS} 
+            order by file_expire_time 
+            limit {BATCH_SIZE};
+        """
         task_list = self.db.fetch(sql, cursor_type=DictCursor)
         return task_list
 
@@ -132,6 +195,14 @@ class GenerateTextFromVideo(object):
         task_list = self.get_tasks()
         while task_list:
             for task in tqdm(task_list, desc="convert video to text"):
+                # LOCK TASK
+                lock_row = self.update_video_status(
+                    ori_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                    new_status=const.VIDEO_LOCK,
+                    pq_vid=task['pq_vid'],
+                )
+                if not lock_row:
+                    continue
                 file_name = task['file_name']
                 video_local_path = "static/{}.mp4".format(task['pq_vid'])
                 google_file = self.google_ai_api.get_google_file(file_name)
@@ -147,42 +218,67 @@ class GenerateTextFromVideo(object):
                                 update_sql = f"""
                                     update video_content_understanding
                                     set status = %s, video_text = %s, file_state = %s
-                                    where pq_vid = %s;
+                                    where pq_vid = %s and status = %s;
                                 """
                                 self.db.save(
                                     update_sql,
-                                    params=(2, video_text, state, task['pq_vid'])
+                                    params=(
+                                        const.VIDEO_UNDERSTAND_SUCCESS_STATUS,
+                                        video_text,
+                                        state,
+                                        task['pq_vid'],
+                                        const.VIDEO_LOCK
+                                    )
                                 )
+                                # delete local file and google file
                                 if os.path.exists(video_local_path):
                                     os.remove(video_local_path)
-                                tqdm.write("video transform to text success, delete local file, sleep 1 min...")
+
+                                tqdm.write("video transform to text success, delete local file")
                                 task_list.remove(task)
+
                                 self.google_ai_api.delete_video(file_name)
-                                print("delete video from google success: {}".format(file_name))
+                                tqdm.write("delete video from google success: {}".format(file_name))
+                            else:
+                                # roll back status
+                                self.update_video_status(
+                                    ori_status=const.VIDEO_LOCK,
+                                    new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                                    pq_vid=task['pq_vid'],
+                                )
                         except Exception as e:
+                            # roll back status
+                            self.update_video_status(
+                                ori_status=const.VIDEO_LOCK,
+                                new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                                pq_vid=task['pq_vid'],
+                            )
                             tqdm.write(str(e))
                             continue
 
                     case 'PROCESSING':
                         tqdm.write("video is still processing")
-                        continue
+                        # roll back status
+                        self.update_video_status(
+                            ori_status=const.VIDEO_LOCK,
+                            new_status=const.VIDEO_UNDERSTAND_PROCESSING_STATUS,
+                            pq_vid=task['pq_vid'],
+                        )
 
                     case 'FAILED':
-                        update_sql = f"""
-                            update video_content_understanding
-                            set status = %s, file_state = %s
-                            where pq_vid = %s;
-                        """
-                        self.db.save(
-                            update_sql,
-                            params=(99, state, task['pq_vid'])
+                        self.update_video_status(
+                            ori_status=const.VIDEO_LOCK,
+                            new_status=const.VIDEO_UNDERSTAND_FAIL_STATUS,
+                            pq_vid=task['pq_vid']
                         )
+
                         if os.path.exists(video_local_path):
                             os.remove(video_local_path)
+
                         self.google_ai_api.delete_video(file_name)
                         tqdm.write("video process failed, delete local file")
-                        continue
-                time.sleep(10)
+
+                time.sleep(const.SLEEP_SECONDS)
 
             tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
-            time.sleep(60)
+            time.sleep(const.SLEEP_SECONDS)