luojunhui 1 месяц назад
Родитель
Сommit
148fa08113
2 измененных файлов с 91 добавлено и 26 удалено
  1. 18 0
      applications/const/__init__.py
  2. 73 26
      tasks/title_rewrite_task.py

+ 18 - 0
applications/const/__init__.py

@@ -268,6 +268,24 @@ class BaiduVideoCrawlerConst:
     LOCAL_PATH_DIR = "static"
 
 
+class TitleRewriteTaskConst:
+    """
+    title rewrite task const
+    """
+    # title rewrite status
+    TITLE_REWRITE_INIT_STATUS = 0
+    TITLE_REWRITE_SUCCESS_STATUS = 1
+    TITLE_REWRITE_FAIL_STATUS = 99
+    TITLE_REWRITE_LOCK_STATUS = 101
+
+    # article status
+    ARTICLE_AUDIT_PASSED_STATUS = 1
+    ARTICLE_POSITIVE_STATUS = 0
+
+    # title useful status
+    TITLE_USEFUL_STATUS = 1
+
+
 
 
 

+ 73 - 26
tasks/title_rewrite_task.py

@@ -1,13 +1,19 @@
 """
 @author: luojunhui
 """
+import traceback
+
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
+from applications import log
 from applications.api import fetch_deepseek_response
+from applications.const import TitleRewriteTaskConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
+const = TitleRewriteTaskConst()
+
 
 def generate_prompt(ori_title):
     """
@@ -107,7 +113,9 @@ class TitleRewriteTask:
         sql = f"""
             select content_trace_id, article_title 
             from publish_single_video_source 
-            where bad_status = 0 and audit_status = 1 and title_rewrite_status = 0
+            where bad_status = {const.ARTICLE_POSITIVE_STATUS} 
+                and audit_status = {const.ARTICLE_AUDIT_PASSED_STATUS} 
+                and title_rewrite_status = {const.TITLE_REWRITE_INIT_STATUS}
             limit {batch_size};
         """
         res = self.db.fetch(query=sql, cursor_type=DictCursor)
@@ -120,42 +128,81 @@ class TitleRewriteTask:
         sql = f"""
             update publish_single_video_source
             set title_rewrite_status = %s
-            where content_trace_id = %s and ori_status = %s;
+            where content_trace_id = %s and title_rewrite_status= %s;
+        """
+        affected_rows = self.db.save(query=sql, params=(new_status, content_trace_id, ori_status))
+        return affected_rows
+
+    def insert_into_rewrite_table(self, content_trace_id, new_title):
         """
-        self.db.save(query=sql, params=(new_status, content_trace_id, ori_status))
+        insert into rewrite_table
+        """
+        prompt_version = 'xx_250228'
+        insert_sql = f"""
+            insert into video_title_rewrite
+            (content_trace_id, new_title, status, prompt_version)
+            values (%s, %s, %s, %s);
+        """
+        self.db.save(query=insert_sql, params=(content_trace_id, new_title, const.TITLE_USEFUL_STATUS, prompt_version))
 
-    def change_each_article(self, article):
+    def rewrite_each_article(self, article):
         """
-        对每个标题进行修改
+        rewrite each article
         """
         content_trace_id = article['content_trace_id']
         article_title = article['article_title']
+
+        # lock each task
+        affected_rows = self.update_title_rewrite_status(
+            content_trace_id=content_trace_id,
+            ori_status=const.TITLE_REWRITE_INIT_STATUS,
+            new_status=const.TITLE_REWRITE_LOCK_STATUS
+        )
+        if not affected_rows:
+            return
+
         try:
             prompt = generate_prompt(article_title)
-            new_article_title = fetch_deepseek_response(model='default', prompt=prompt)
-            insert_sql = f"""
-                insert into video_title_rewrite
-                (content_trace_id, new_title, status, prompt_version)
-                values (%s, %s, %s, %s)
-            """
-            self.db.save(query=insert_sql, params=(content_trace_id, new_article_title, 1, 'xx_250228'))
-            update_sql = f"""
-                update publish_single_video_source
-                set title_rewrite_status = %s
-                where content_trace_id = %s;
-            """
-            self.db.save(query=update_sql, params=(1, content_trace_id))
-        except:
-            return
+            new_title = fetch_deepseek_response(model='default', prompt=prompt)
+
+            # insert into rewrite table
+            self.insert_into_rewrite_table(
+                content_trace_id=content_trace_id,
+                new_title=new_title
+            )
+
+            # unlock
+            self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=const.TITLE_REWRITE_LOCK_STATUS,
+                new_status=const.TITLE_REWRITE_SUCCESS_STATUS
+            )
+        except Exception as e:
+            log(
+                task="article_association_crawler",
+                function="rewrite_each_article",
+                message=content_trace_id,
+                data={
+                    "error_message": str(e),
+                    "error_type": type(e).__name__,
+                    "traceback": traceback.format_exc(),
+
+                }
+            )
+            self.update_title_rewrite_status(
+                content_trace_id=content_trace_id,
+                ori_status=const.TITLE_REWRITE_LOCK_STATUS,
+                new_status=const.TITLE_REWRITE_FAIL_STATUS
+            )
 
     def deal(self):
+        """
+        get tasks && deal tasks
+        """
         articles = self.get_articles_batch()
         bar = tqdm(articles, desc="title rewrite task")
-        for article in articles:
-            try:
-                self.change_each_article(article)
-                bar.set_postfix({"content_id": article['content_trace_id']})
-            except Exception as e:
-                print(e)
+        for article in bar:
+            self.rewrite_each_article(article)
+            bar.set_postfix({"content_trace_id": article['content_trace_id']})