luojunhui 1 mês atrás
pai
commit
104e850b46

+ 5 - 0
coldStartTasks/ai_pipeline/__init__.py

@@ -0,0 +1,5 @@
+"""
+@author: luojunhui
+"""
+
+from .video_to_text import GenerateTextFromVideo

+ 117 - 0
coldStartTasks/ai_pipeline/basic.py

@@ -0,0 +1,117 @@
+import os
+import time
+import datetime
+import requests
+
+
+def get_status_field_by_process(process):
+    match process:
+        case "upload":
+            status = "upload_status"
+            update_timestamp = "upload_status_ts"
+        case "understanding":
+            status = "understanding_status"
+            update_timestamp = "understanding_status_ts"
+        case "summary":
+            status = "summary_status"
+            update_timestamp = "summary_status_ts"
+        case "rewrite":
+            status = "rewrite_status"
+            update_timestamp = "rewrite_status_ts"
+        case _:
+            raise ValueError(f"Unexpected task: {process}")
+    return status, update_timestamp
+
+
+def roll_back_lock_tasks(
+    db_client, process, max_process_time, init_status, processing_status
+) -> int:
+    """
+    rollback tasks which have been locked for a long time
+    """
+    status, update_timestamp = get_status_field_by_process(process)
+    now_timestamp = int(time.time())
+    timestamp_threshold = now_timestamp - max_process_time
+    update_query = f"""
+        update video_content_understanding
+        set {status} = %s
+        where {status} = %s and {update_timestamp} < %s;
+    """
+    rollback_rows = db_client.save(
+        query=update_query, params=(init_status, processing_status, timestamp_threshold)
+    )
+    return rollback_rows
+
+
+def download_file(task_id, oss_path):
+    """
+    下载视频文件
+    """
+    video_url = "https://rescdn.yishihui.com/" + oss_path
+    file_name = "static/{}.mp4".format(task_id)
+    if os.path.exists(file_name):
+        return file_name
+
+    proxies = {"http": None, "https": None}
+    with open(file_name, "wb") as f:
+        response = requests.get(video_url, proxies=proxies)
+        f.write(response.content)
+    return file_name
+
+
+def generate_summary_prompt(text):
+    prompt = f"""
+        你是1个优秀的公众号文章写作大师,我对你有以下要求
+        视频总结:{text}
+
+        第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。
+        句子段落之间以悬念承接,可以吸引读者往下读第二句。
+
+        第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。
+
+        你最终输出一段总结内容,将第一段和第二段之间空格一行。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
+        """
+    return prompt
+
+
+def update_task_queue_status(db_client, task_id, process, ori_status, new_status):
+    """
+    回滚长时间处于处理中的任务
+    """
+    status, update_timestamp = get_status_field_by_process(process)
+    update_query = f"""
+        update video_content_understanding 
+        set {status} = %s, {update_timestamp} = %s
+        where {status} = %s and id = %s;
+    """
+    roll_back_rows = db_client.save(
+        query=update_query,
+        params=(
+            new_status,
+            datetime.datetime.now(),
+            ori_status,
+            task_id,
+        ),
+    )
+    return roll_back_rows
+
+
+def update_video_pool_status(db_client, content_trace_id, ori_status, new_status):
+    """
+    回滚长时间处于处理中的任务
+    """
+    update_sql = f"""
+        update publish_single_video_source
+        set status = %s
+        where content_trace_id = %s and status = %s;
+    """
+    # update publish_single_source_status
+    update_query = f"""
+                update publish_single_video_source
+                set status = %s
+                where content_trace_id = %s and status = %s
+            """
+    affected_rows = db_client.save(
+        query=update_query, params=(new_status, content_trace_id, ori_status)
+    )
+    return affected_rows

+ 54 - 52
tasks/article_summary_task.py → coldStartTasks/ai_pipeline/summary_text.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import time
 import datetime
 import traceback
@@ -8,32 +9,18 @@ import traceback
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
+from applications import log
 from applications.api import fetch_deepseek_response
 from applications.const import VideoToTextConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
+from coldStartTasks.ai_pipeline.basic import generate_summary_prompt
+from coldStartTasks.ai_pipeline.basic import update_video_pool_status
+from coldStartTasks.ai_pipeline.basic import update_task_queue_status
 
 const = VideoToTextConst()
 
 
-def generate_prompt(text):
-    """
-    生成prompt
-    """
-    prompt = f"""
-    你是1个优秀的公众号文章写作大师,我对你有以下要求
-    视频总结:{text}
-    
-    第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。
-    句子段落之间以悬念承接,可以吸引读者往下读第二句。
-    
-    第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。
-    
-    你最终输出一段总结内容,将第一段和第二段之间空格一行,并且对所有文字进行加粗处理。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
-    """
-    return prompt
-
-
 class ArticleSummaryTask(object):
     """
     文章总结任务
@@ -43,20 +30,20 @@ class ArticleSummaryTask(object):
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
-    def get_task_list(self):
+    def get_summary_task_list(self) -> list[dict]:
         """
         获取任务列表
         """
-        select_sql = f"""
-            select id, video_text
+        fetch_query = f"""
+            select id, content_trace_id, video_text
             from video_content_understanding
             where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
             limit {const.SUMMARY_BATCH_SIZE};
         """
-        task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
+        task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         return task_list
 
-    def rollback_lock_tasks(self):
+    def rollback_lock_tasks(self) -> int:
         """
         rollback tasks which have been locked for a long time
         """
@@ -79,37 +66,62 @@ class ArticleSummaryTask(object):
         :param task: keys: [id, video_text]
         """
         task_id = task["id"]
+        content_trace_id = task["content_trace_id"]
         video_text = task["video_text"]
 
         # Lock Task
-        affected_rows = self.update_task_status(
-            task_id, const.INIT_STATUS, const.PROCESSING_STATUS
+        affected_rows = update_task_queue_status(
+            db_client=self.db_client,
+            task_id=task_id,
+            process="summary",
+            ori_status=const.INIT_STATUS,
+            new_status=const.PROCESSING_STATUS,
         )
         if not affected_rows:
             return
 
+        # fetch summary text from AI
         try:
             # generate prompt
-            prompt = generate_prompt(video_text)
-
+            prompt = generate_summary_prompt(video_text)
             # get result from deep seek AI
             result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
             if result:
                 # set as success and update summary text
                 self.set_summary_text_for_task(task_id, result.strip())
+                task_status = const.SUCCESS_STATUS
             else:
                 # set as fail
-                self.update_task_status(
-                    task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
+                update_task_queue_status(
+                    db_client=self.db_client,
+                    task_id=task_id,
+                    process="summary",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
                 )
+                task_status = const.FAIL_STATUS
         except Exception as e:
-            print(e)
-            print(traceback.format_exc())
             # set as fail
-            self.update_task_status(
-                task_id, const.PROCESSING_STATUS, const.FAIL_STATUS
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task_id,
+                process="summary",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
+            task_status = const.FAIL_STATUS
+            log(
+                task="article_summary_task",
+                function="fetch_deepseek_response",
+                message="fetch_deepseek_response failed",
+                data={"error": str(e), "trace_back": traceback.format_exc()},
             )
 
+        # update video pool status
+        update_video_pool_status(
+            self.db_client, content_trace_id, const.PROCESSING_STATUS, task_status
+        )
+
     def set_summary_text_for_task(self, task_id, text):
         """
         successfully get summary text and update summary text to database
@@ -126,38 +138,28 @@ class ArticleSummaryTask(object):
                 text,
                 datetime.datetime.now(),
                 task_id,
-                const.PROCESSING_STATUS
+                const.PROCESSING_STATUS,
             ),
         )
         return affected_rows
 
-    def update_task_status(self, task_id, ori_status, new_status):
-        """
-        修改任务状态
-        """
-        update_sql = f"""
-            update video_content_understanding
-            set summary_status = %s, summary_status_ts = %s
-            where id = %s and summary_status = %s;
-        """
-        update_rows = self.db_client.save(
-            update_sql, (new_status, datetime.datetime.now(), task_id, ori_status)
-        )
-        return update_rows
-
     def deal(self):
         """
         entrance function for this class
         """
         # first of all rollback tasks which have been locked for a long time
         rollback_rows = self.rollback_lock_tasks()
-        print("rollback_lock_tasks: {}".format(rollback_rows))
+        tqdm.write("rollback_lock_tasks: {}".format(rollback_rows))
 
         # get task list
-        task_list = self.get_task_list()
+        task_list = self.get_summary_task_list()
         for task in tqdm(task_list, desc="handle each task"):
             try:
                 self.handle_task_execution(task=task)
             except Exception as e:
-                print("error: {}".format(e))
-                print(traceback.format_exc())
+                log(
+                    task="article_summary_task",
+                    function="deal",
+                    message="fetch_deepseek_response",
+                    data={"error": str(e), "trace_back": traceback.format_exc()},
+                )

+ 356 - 0
coldStartTasks/ai_pipeline/video_to_text.py

@@ -0,0 +1,356 @@
+"""
+@author: luojunhui
+"""
+
+import os
+import time
+import datetime
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.api import GoogleAIAPI
+from applications.const import VideoToTextConst
+from applications.db import DatabaseConnector
+from config import long_articles_config
+from config import apolloConfig
+from coldStartTasks.ai_pipeline.basic import download_file
+from coldStartTasks.ai_pipeline.basic import update_task_queue_status
+from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
+
+# 办公室网络调试需要打开代理
+# os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
+# os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
+
+const = VideoToTextConst()
+config = apolloConfig(env="prod")
+
+# pool_size
+POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
+# batch_size
+BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
+
+
+class GenerateTextFromVideo(object):
+    """
+    从视频中生成文本
+    """
+
+    def __init__(self):
+        self.google_ai_api = GoogleAIAPI()
+        self.db = DatabaseConnector(db_config=long_articles_config)
+        self.db.connect()
+
+    def get_upload_task_list(self, task_length: int) -> list[dict]:
+        """
+        获取上传视频任务,优先处理高流量池视频内容
+        """
+        fetch_query = f"""
+            select t1.id, t1.video_oss_path
+            from video_content_understanding t1
+            join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
+            where t1.upload_status = {const.INIT_STATUS} 
+                and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS} 
+                and t2.bad_status = {const.ARTICLE_GOOD_STATUS}
+            order by t2.flow_pool_level
+            limit {task_length};
+            """
+        task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def get_extract_task_list(self) -> list[dict]:
+        """
+        获取处理视频转文本任务
+        """
+        fetch_query = f"""
+            select id, file_name, video_ori_title
+            from video_content_understanding 
+            where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} 
+            order by file_expire_time
+            limit {BATCH_SIZE};
+        """
+        task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def get_processing_task_num(self) -> int:
+        """
+        get the number of processing task
+        """
+        select_query = f"""
+            select count(1) as processing_count 
+            from video_content_understanding 
+            where file_state = 'PROCESSING' and upload_status = {const.SUCCESS_STATUS};
+        """
+        fetch_response = self.db.fetch(query=select_query, cursor_type=DictCursor)[0][
+            "processing_count"
+        ]
+        processing_task_num = (
+            fetch_response[0]["processing_count"] if fetch_response else 0
+        )
+        return processing_task_num
+
+    def set_upload_result_for_task(
+        self, task_id: str, file_name: str, file_state: str, expire_time: str
+    ) -> int:
+        """
+        set upload result for task
+        """
+        update_query = f"""
+            update video_content_understanding
+            set upload_status = %s, upload_status_ts = %s, 
+                file_name = %s, file_state = %s, file_expire_time = %s
+            where id = %s and upload_status = %s;
+        """
+        affected_rows = self.db.save(
+            query=update_query,
+            params=(
+                const.SUCCESS_STATUS,
+                datetime.datetime.now(),
+                file_name,
+                file_state,
+                expire_time,
+                task_id,
+                const.PROCESSING_STATUS,
+            ),
+        )
+        return affected_rows
+
+    def set_understanding_result_for_task(
+        self, task_id: str, state: str, text: str
+    ) -> int:
+        update_query = f"""
+            update video_content_understanding
+            set understanding_status = %s, video_text = %s, file_state = %s
+            where id = %s and understanding_status = %s;
+        """
+        affected_rows = self.db.save(
+            query=update_query,
+            params=(
+                const.SUCCESS_STATUS,
+                text,
+                state,
+                task_id,
+                const.PROCESSING_STATUS,
+            ),
+        )
+        return affected_rows
+
+    def upload_video_to_google_ai_task(
+        self, max_processing_video_count: int = POOL_SIZE
+    ):
+        """
+        upload video to google AI and wait for processing
+        """
+        # rollback lock tasks
+        rollback_rows = roll_back_lock_tasks(
+            db_client=self.db,
+            process="upload",
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
+        )
+        tqdm.write("upload rollback_lock_tasks: {}".format(rollback_rows))
+
+        processing_task_num = self.get_processing_task_num()
+        rest_video_count = max_processing_video_count - processing_task_num
+        if rest_video_count:
+            task_list = self.get_upload_task_list(rest_video_count)
+            for task in tqdm(task_list, desc="upload_video_task"):
+                lock_rows = update_task_queue_status(
+                    db_client=self.db,
+                    task_id=task["id"],
+                    process="upload",
+                    ori_status=const.INIT_STATUS,
+                    new_status=const.PROCESSING_STATUS,
+                )
+                if not lock_rows:
+                    continue
+                try:
+                    file_path = download_file(task["id"], task["video_oss_path"])
+                    google_upload_result = self.google_ai_api.upload_file(file_path)
+                    if google_upload_result:
+                        file_name, file_state, expire_time = google_upload_result
+                        self.set_upload_result_for_task(
+                            task_id=task["id"],
+                            file_name=file_name,
+                            file_state=file_state,
+                            expire_time=expire_time,
+                        )
+                    else:
+                        # roll back status
+                        update_task_queue_status(
+                            db_client=self.db,
+                            task_id=task["id"],
+                            process="upload",
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.FAIL_STATUS,
+                        )
+                        log(
+                            task="video_to_text",
+                            function="upload_video_to_google_ai_task",
+                            message="upload_video_to_google_ai_task failed",
+                            data={
+                                "task_id": task["id"],
+                            },
+                        )
+                except Exception as e:
+                    log(
+                        task="video_to_text",
+                        function="upload_video_to_google_ai_task",
+                        message="upload_video_to_google_ai_task failed",
+                        data={
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                            "task_id": task["id"],
+                        },
+                    )
+                    # roll back status
+                    update_task_queue_status(
+                        db_client=self.db,
+                        task_id=task["id"],
+                        process="upload",
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.FAIL_STATUS,
+                    )
+        else:
+            log(
+                task="video_to_text",
+                function="upload_video_to_google_ai_task",
+                message="task pool is full",
+            )
+
+    def convert_video_to_text_with_google_ai_task(self):
+        """
+        处理视频转文本任务
+        """
+        rollback_rows = roll_back_lock_tasks(
+            db_client=self.db,
+            process="understanding",
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
+        )
+        tqdm.write("extract rollback_lock_tasks: {}".format(rollback_rows))
+
+        task_list = self.get_extract_task_list()
+        for task in tqdm(task_list, desc="convert video to text"):
+            # LOCK TASK
+            lock_row = update_task_queue_status(
+                db_client=self.db,
+                task_id=task["id"],
+                process="understanding",
+                ori_status=const.INIT_STATUS,
+                new_status=const.PROCESSING_STATUS,
+            )
+            if not lock_row:
+                print("Task has benn locked by other process")
+                continue
+            file_name = task["file_name"]
+            video_local_path = "static/{}.mp4".format(task["id"])
+            try:
+                google_file = self.google_ai_api.get_google_file(file_name)
+                state = google_file.state.name
+                match state:
+                    case "ACTIVE":
+                        try:
+                            video_text = self.google_ai_api.get_video_text(
+                                prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
+                                video_file=google_file,
+                            )
+                            if video_text:
+                                self.set_understanding_result_for_task(
+                                    task_id=task["id"], state=state, text=video_text
+                                )
+
+                                # delete local file and google file
+                                if os.path.exists(video_local_path):
+                                    os.remove(video_local_path)
+
+                                tqdm.write(
+                                    "video transform to text success, delete local file"
+                                )
+                                task_list.remove(task)
+
+                                self.google_ai_api.delete_video(file_name)
+                                tqdm.write(
+                                    "delete video from google success: {}".format(
+                                        file_name
+                                    )
+                                )
+                            else:
+                                # roll back status and wait for next process
+                                update_task_queue_status(
+                                    db_client=self.db,
+                                    task_id=task["id"],
+                                    process="understanding",
+                                    ori_status=const.PROCESSING_STATUS,
+                                    new_status=const.INIT_STATUS,
+                                )
+
+                        except Exception as e:
+                            # roll back status
+                            update_task_queue_status(
+                                db_client=self.db,
+                                task_id=task["id"],
+                                process="understanding",
+                                ori_status=const.PROCESSING_STATUS,
+                                new_status=const.FAIL_STATUS,
+                            )
+                            tqdm.write(str(e))
+                            continue
+
+                    case "PROCESSING":
+                        update_task_queue_status(
+                            db_client=self.db,
+                            task_id=task["id"],
+                            process="understanding",
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.INIT_STATUS,
+                        )
+                        tqdm.write("video is still processing")
+
+                    case "FAILED":
+                        update_sql = f"""
+                            update video_content_understanding
+                            set file_state = %s, understanding_status = %s, understanding_status_ts = %s
+                            where id = %s and understanding_status = %s;
+                        """
+                        self.db.save(
+                            query=update_sql,
+                            params=(
+                                state,
+                                const.FAIL_STATUS,
+                                datetime.datetime.now(),
+                                task["id"],
+                                const.PROCESSING_STATUS,
+                            ),
+                        )
+                        # delete local file and google file
+                        if os.path.exists(video_local_path):
+                            os.remove(video_local_path)
+
+                        self.google_ai_api.delete_video(file_name)
+                        task_list.remove(task)
+                        tqdm.write("video process failed, delete local file")
+
+                time.sleep(const.SLEEP_SECONDS)
+            except Exception as e:
+                log(
+                    task="video_to_text",
+                    function="extract_video_to_text_task",
+                    message="extract video to text task failed",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "task_id": task["id"],
+                    },
+                )
+                update_task_queue_status(
+                    db_client=self.db,
+                    task_id=task["id"],
+                    process="understanding",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
+                )

+ 0 - 4
coldStartTasks/multi_modal/__init__.py

@@ -1,4 +0,0 @@
-"""
-@author: luojunhui
-"""
-from .generate_text_from_video import GenerateTextFromVideo

+ 0 - 303
coldStartTasks/multi_modal/generate_text_from_video.py

@@ -1,303 +0,0 @@
-"""
-@author: luojunhui
-"""
-import json
-import os
-import time
-import datetime
-import traceback
-
-import requests
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications.api import GoogleAIAPI
-from applications.const import VideoToTextConst
-from applications.db import DatabaseConnector
-from config import long_articles_config
-from config import apolloConfig
-
-# 办公室网络调试需要打开代理
-# os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
-# os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
-
-const = VideoToTextConst()
-# config = apolloConfig(env="prod")
-
-# pool_size
-# POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
-POOL_SIZE = 20
-# batch_size
-# BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
-BATCH_SIZE = 10
-
-
-def download_file(task_id, oss_path):
-    """
-    下载视频文件
-    """
-    video_url = "https://rescdn.yishihui.com/" + oss_path
-    file_name = "static/{}.mp4".format(task_id)
-    if os.path.exists(file_name):
-        return file_name
-
-    proxies = {
-        "http": None,
-        "https": None
-    }
-    with open(file_name, 'wb') as f:
-        response = requests.get(video_url, proxies=proxies)
-        f.write(response.content)
-    return file_name
-
-
-class GenerateTextFromVideo(object):
-    """
-    从视频中生成文本
-    """
-    def __init__(self):
-        self.google_ai_api = GoogleAIAPI()
-        self.db = DatabaseConnector(db_config=long_articles_config)
-        self.db.connect()
-
-    def update_task_status(self, task_id, process, ori_status, new_status):
-        """
-        回滚长时间处于处理中的任务
-        """
-        match process:
-            case "upload":
-                status = 'upload_status'
-                update_timestamp = 'upload_status_ts'
-            case "understanding":
-                status = 'understanding_status'
-                update_timestamp = 'understanding_status_ts'
-            case "summary":
-                status = 'summary_status'
-                update_timestamp = 'summary_status_ts'
-            case "rewrite":
-                status = 'rewrite_status'
-                update_timestamp = 'rewrite_status_ts'
-            case _:
-                raise ValueError(f"Unexpected task: {process}")
-
-        update_sql = f"""
-            update video_content_understanding 
-            set {status} = %s, {update_timestamp} = %s
-            where {status} = %s and id = %s;
-        """
-        roll_back_rows = self.db.save(
-            query=update_sql,
-            params=(
-                new_status,
-                datetime.datetime.now(),
-                ori_status,
-                task_id,
-            )
-        )
-        return roll_back_rows
-
-    def upload_video_to_google_ai(self, max_processing_video_count=POOL_SIZE):
-        """
-        上传视频到Google AI
-        max_processing_video_count: 处理中的最大视频数量,默认20
-        video_content_understanding 表status字段
-        0: 未处理
-        1: 处理中
-        2: 处理完成
-        """
-        select_sql = f"""
-            select count(1) as processing_count 
-            from video_content_understanding 
-            where file_state = 'PROCESSING' and upload_status = {const.SUCCESS_STATUS};
-        """
-        count = self.db.fetch(select_sql, cursor_type=DictCursor)[0]['processing_count']
-        rest_video_count = max_processing_video_count - count
-        success_upload_count = 0
-        if rest_video_count:
-            sql = f"""
-                select t1.id, t1.video_oss_path
-                from video_content_understanding t1
-                join publish_single_video_source t2
-                on t1.content_trace_id = t2.content_trace_id
-                where t1.upload_status = {const.INIT_STATUS} 
-                    and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS} 
-                    and t2.bad_status = 0
-                order by t2.flow_pool_level
-                limit {rest_video_count};
-            """
-            task_list = self.db.fetch(sql, cursor_type=DictCursor)
-            for task in tqdm(task_list, desc="upload_video_task"):
-                lock_rows = self.update_task_status(
-                    task_id=task['id'],
-                    process='upload',
-                    ori_status=const.INIT_STATUS,
-                    new_status=const.PROCESSING_STATUS
-                )
-                if not lock_rows:
-                    continue
-                try:
-                    file_path = download_file(task['id'], task['video_oss_path'])
-                    google_upload_result = self.google_ai_api.upload_file(file_path)
-                    if google_upload_result:
-                        file_name, file_state, expire_time = google_upload_result
-                        update_sql = f"""
-                            update video_content_understanding
-                            set 
-                                upload_status = %s, 
-                                upload_status_ts = %s, 
-                                file_name = %s, 
-                                file_state = %s, 
-                                file_expire_time = %s
-                            where id = %s and upload_status = %s;
-                        """
-                        self.db.save(
-                            update_sql,
-                            params=(
-                                const.SUCCESS_STATUS,
-                                datetime.datetime.now(),
-                                file_name,
-                                file_state,
-                                expire_time,
-                                task['id'],
-                                const.PROCESSING_STATUS
-                            )
-                        )
-                        success_upload_count += 1
-
-                except Exception as e:
-                    print("task upload failed because of {}".format(e))
-                    print("trace_back: ", traceback.format_exc())
-                    # roll back status
-                    self.update_task_status(
-                        task_id=task['id'],
-                        process='upload',
-                        ori_status=const.PROCESSING_STATUS,
-                        new_status=const.FAIL_STATUS
-                    )
-
-        return success_upload_count
-
-    def delete_video_from_google(self, file_name):
-        """
-        删除视频文件
-        """
-        self.google_ai_api.delete_video(file_name)
-
-    def get_task_list(self):
-        """
-        获取处理视频转文本任务
-        """
-        sql = f"""
-            select id, file_name, video_ori_title
-            from video_content_understanding 
-            where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} 
-            order by file_expire_time
-            limit {BATCH_SIZE};
-        """
-        task_list = self.db.fetch(sql, cursor_type=DictCursor)
-        return task_list
-
-    def convert_video_to_text_with_google_ai(self):
-        """
-        处理视频转文本任务
-        """
-        task_list = self.get_task_list()
-        while task_list:
-            for task in tqdm(task_list, desc="convert video to text"):
-                # LOCK TASK
-                lock_row = self.update_task_status(
-                    task_id=task['id'],
-                    process='understanding',
-                    ori_status=const.INIT_STATUS,
-                    new_status=const.PROCESSING_STATUS
-                )
-                if not lock_row:
-                    print("Lock")
-                    continue
-                file_name = task['file_name']
-                video_local_path = "static/{}.mp4".format(task['id'])
-                google_file = self.google_ai_api.get_google_file(file_name)
-                state = google_file.state.name
-                match state:
-                    case 'ACTIVE':
-                        try:
-                            video_text = self.google_ai_api.get_video_text(
-                                prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
-                                video_file=google_file
-                            )
-                            if video_text:
-                                update_sql = f"""
-                                    update video_content_understanding
-                                    set understanding_status = %s, video_text = %s, file_state = %s
-                                    where id = %s and understanding_status = %s;
-                                """
-                                self.db.save(
-                                    update_sql,
-                                    params=(
-                                        const.SUCCESS_STATUS,
-                                        video_text,
-                                        state,
-                                        task['id'],
-                                        const.PROCESSING_STATUS
-                                    )
-                                )
-                                # delete local file and google file
-                                if os.path.exists(video_local_path):
-                                    os.remove(video_local_path)
-
-                                tqdm.write("video transform to text success, delete local file")
-                                task_list.remove(task)
-
-                                self.google_ai_api.delete_video(file_name)
-                                tqdm.write("delete video from google success: {}".format(file_name))
-                            else:
-                                # roll back status
-                                self.update_task_status(
-                                    task_id=task['id'],
-                                    process='understanding',
-                                    ori_status=const.PROCESSING_STATUS,
-                                    new_status=const.INIT_STATUS
-                                )
-                        except Exception as e:
-                            # roll back status
-                            self.update_task_status(
-                                task_id=task['id'],
-                                process='understanding',
-                                ori_status=const.PROCESSING_STATUS,
-                                new_status=const.FAIL_STATUS
-                            )
-                            tqdm.write(str(e))
-                            continue
-
-                    case 'PROCESSING':
-                        tqdm.write("video is still processing")
-
-                    case 'FAILED':
-                        update_sql = f"""
-                            update video_content_understanding
-                            set file_state = %s, understanding_status = %s, understanding_status_ts = %s
-                            where id = %s and understanding_status = %s;
-                        """
-                        self.db.save(
-                            query=update_sql,
-                            params=(
-                                state,
-                                const.FAIL_STATUS,
-                                datetime.datetime.now(),
-                                task['id'],
-                                const.PROCESSING_STATUS
-                            )
-                        )
-                        # delete local file and google file
-                        if os.path.exists(video_local_path):
-                            os.remove(video_local_path)
-
-                        self.google_ai_api.delete_video(file_name)
-                        task_list.remove(task)
-                        tqdm.write("video process failed, delete local file")
-
-                time.sleep(const.SLEEP_SECONDS)
-
-            tqdm.write("执行完一轮任务,剩余数量:{}".format(len(task_list)))
-            time.sleep(const.SLEEP_SECONDS)

+ 4 - 4
run_article_summary.py

@@ -1,9 +1,9 @@
 """
 @author: luojunhui
 """
-from tasks.article_summary_task import ArticleSummaryTask
 
+from coldStartTasks.ai_pipeline.summary_text import ArticleSummaryTask
 
-if __name__ == '__main__':
-    article_summary_task = ArticleSummaryTask()
-    article_summary_task.deal()
+if __name__ == "__main__":
+    task = ArticleSummaryTask()
+    task.deal()

+ 0 - 10
run_video_extract_text.py

@@ -1,10 +0,0 @@
-"""
-@author: luojunhui
-@tools: PyCharm MarsCodeAI && DeepSeek
-"""
-
-from coldStartTasks.multi_modal import GenerateTextFromVideo
-
-if __name__ == '__main__':
-    generate_text_from_video = GenerateTextFromVideo()
-    generate_text_from_video.convert_video_to_text_with_google_ai()

+ 35 - 0
run_video_understanding_with_google.py

@@ -0,0 +1,35 @@
+"""
+@author: luojunhui
+"""
+
+import datetime
+
+from applications import log
+from coldStartTasks.ai_pipeline import GenerateTextFromVideo
+
+
+def main():
+    task = GenerateTextFromVideo()
+
+    # 查询有多少任务正在处理中
+    processing_tasks = task.get_processing_task_num()
+
+    if processing_tasks:
+        print(
+            f"{datetime.datetime.now()} 当前有 {processing_tasks} 个任务正在等待 google 处理..."
+        )
+        task.convert_video_to_text_with_google_ai_task()
+    else:
+        print(f"{datetime.datetime.now()} 没有任务正在处理中...")
+        # upload video to google ai
+        task.upload_video_to_google_ai_task()
+        log(
+            task="video_understanding_with_google",
+            function="main",
+            message="upload_video_to_google_ai_task",
+        )
+        task.convert_video_to_text_with_google_ai_task()
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 17
run_video_upload_to_google.py

@@ -1,17 +0,0 @@
-"""
-@author: luojunhui
-"""
-from coldStartTasks.multi_modal import GenerateTextFromVideo
-
-
-def upload_videos_to_google_task():
-    """
-    本地视频上传至google云存储
-    """
-    video_processing = GenerateTextFromVideo()
-    video_processing.upload_video_to_google_ai()
-
-
-if __name__ == '__main__':
-    upload_videos_to_google_task()
-

+ 1 - 1
sh/run_upload_video_to_google.sh

@@ -22,6 +22,6 @@ else
     conda activate tasks
 
     # 在后台运行 Python 脚本并重定向日志输出
-    nohup python3 run_video_upload_to_google.py >> "${LOG_FILE}" 2>&1 &
+    nohup python3 run_video_understanding_with_google.py >> "${LOG_FILE}" 2>&1 &
     echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_upload_to_google.py"
 fi

+ 0 - 26
sh/run_video_extract_task.sh

@@ -1,26 +0,0 @@
-#!/bin/bash
-
-# 获取当前日期,格式为 YYYY-MM-DD
-CURRENT_DATE=$(date +%F)
-
-# 日志文件路径,包含日期
-LOG_FILE="/root/luojunhui/logs/video_extract_log_$CURRENT_DATE.txt"
-
-# 重定向整个脚本的输出到带日期的日志文件
-exec >> "$LOG_FILE" 2>&1
-if pgrep -f "python3 run_video_extract_text.py" > /dev/null
-then
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_video_extract_text.py is running"
-else
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_video_extract_text.py"
-    # 切换到指定目录
-    cd /root/luojunhui/LongArticlesJob
-
-    # 激活 Conda 环境
-    source /root/miniconda3/etc/profile.d/conda.sh
-    conda activate tasks
-
-    # 在后台运行 Python 脚本并重定向日志输出
-    nohup python3 run_video_extract_text.py >> "${LOG_FILE}" 2>&1 &
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_extract_text.py"
-fi