Kaynağa Gözat

add const, apollo_config, lock
to video_extract_task

luojunhui 3 ay önce
ebeveyn
işleme
f0959e21bc
1 değiştirilmiş dosya ile 59 ekleme ve 28 silme
  1. 59 28
      tasks/article_summary_task.py

+ 59 - 28
tasks/article_summary_task.py

@@ -5,9 +5,12 @@ from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
 from applications.api import deep_seek_api
+from applications.const import VideoToTextConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
+const = VideoToTextConst()
+
 
 def generate_prompt(text):
     """
@@ -45,57 +48,85 @@ class ArticleSummaryTask(object):
         """
         select_sql = f"""
             select t1.video_text, t2.audit_video_id
-            from video_content_understanding t1 join publish_single_video_source t2 on t1.pq_vid = t2.audit_video_id
-            where t1.status = 2 and t2.bad_status = 0 and t2.extract_status = 0;
+            from video_content_understanding t1 
+                join publish_single_video_source t2 
+                on t1.pq_vid = t2.audit_video_id
+            where t1.status = {const.VIDEO_UNDERSTAND_SUCCESS_STATUS} 
+                and t2.bad_status = {const.ARTICLE_GOOD_STATUS} 
+                and t2.extract_status = {const.EXTRACT_INIT_STATUS};
         """
         task_list = self.db_client.fetch(select_sql, cursor_type=DictCursor)
         return task_list
 
     def process_each_task(self, task):
         """
-        task: {
-            "video_text": "视频内容",
-            "audit_video_id": "视频id"
-        }
+        处理每个任务
         """
         video_text = task["video_text"]
         audit_video_id = task["audit_video_id"]
         # 开始处理,将extract_status更新为101
         update_sql = f"""
-            update publish_single_video_source set extract_status = %s where audit_video_id = %s and extract_status = %s
+            update publish_single_video_source 
+            set extract_status = %s 
+            where audit_video_id = %s and extract_status = %s;
         """
         affected_rows = self.db_client.save(
             query=update_sql,
-            params=(101, audit_video_id, 0)
+            params=(const.EXTRACT_PROCESSING_STATUS, audit_video_id, const.EXTRACT_INIT_STATUS)
         )
         if not affected_rows:
             return
-
-        # 生成prompt
-        prompt = generate_prompt(video_text)
-        response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
-        if response:
-            update_sql = f"""
-                update publish_single_video_source 
-                set extract_status = %s, summary_text = %s
-                where audit_video_id = %s and extract_status = %s;
-            """
-            affected_rows = self.db_client.save(
-                query=update_sql,
-                params=(2, response.strip(), audit_video_id, 101)
-            )
-            print(affected_rows)
-        else:
+        try:
+            # 生成prompt
+            prompt = generate_prompt(video_text)
+            response = deep_seek_api(model="DeepSeek-R1", prompt=prompt)
+            if response:
+                update_sql = f"""
+                    update publish_single_video_source 
+                    set extract_status = %s, summary_text = %s
+                    where audit_video_id = %s and extract_status = %s;
+                """
+                affected_rows = self.db_client.save(
+                    query=update_sql,
+                    params=(
+                        const.EXTRACT_SUCCESS_STATUS,
+                        response.strip(),
+                        audit_video_id,
+                        const.EXTRACT_PROCESSING_STATUS
+                    )
+                )
+                print(affected_rows)
+            else:
+                update_sql = f"""
+                    update publish_single_video_source 
+                    set extract_status = %s
+                    where audit_video_id = %s and extract_status = %s;
+                """
+                affected_rows = self.db_client.save(
+                    query=update_sql,
+                    params=(
+                        const.EXTRACT_FAIL_STATUS,
+                        audit_video_id,
+                        const.EXTRACT_PROCESSING_STATUS
+                    )
+                )
+                print(affected_rows)
+        except Exception as e:
+            print(e)
+            # set as fail
             update_sql = f"""
-                update publish_single_video_source 
+                update publish_single_video_source
                 set extract_status = %s
                 where audit_video_id = %s and extract_status = %s;
             """
-            affected_rows = self.db_client.save(
+            self.db_client.save(
                 query=update_sql,
-                params=(99, audit_video_id, 101)
+                params=(
+                    const.EXTRACT_FAIL_STATUS,
+                    audit_video_id,
+                    const.EXTRACT_PROCESSING_STATUS
+                )
             )
-            print(affected_rows)
 
     def deal(self):
         """