luojunhui преди 7 месеца
родител
ревизия
f040c06a61
променени са 2 файла, в които са добавени 57 реда и са изтрити 1 реда
  1. 3 0
      applications/const/__init__.py
  2. 54 1
      tasks/title_rewrite_task.py

+ 3 - 0
applications/const/__init__.py

@@ -288,6 +288,9 @@ class TitleRewriteTaskConst:
     # prompt version
     PROMPT_VERSION = "xx_250228"  # 信欣2025-02-28提供
 
+    # block expire time 1h
+    TITLE_REWRITE_LOCK_TIME = 60 * 60
+
 
 
 

+ 54 - 1
tasks/title_rewrite_task.py

@@ -2,6 +2,7 @@
 @author: luojunhui
 """
 
+import time
 import traceback
 
 from pymysql.cursors import DictCursor
@@ -108,12 +109,47 @@ class TitleRewriteTask:
         self.db = DatabaseConnector(db_config=long_articles_config)
         self.db.connect()
 
+    def roll_back_blocked_tasks(self):
+        """
+        rollback blocked tasks
+        """
+        sql = f"""
+            select id, title_rewrite_status_update_timestamp
+            from publish_single_video_source
+            where title_rewrite_status = {const.TITLE_REWRITE_LOCK_STATUS};
+        """
+        article_list = self.db.fetch(query=sql, cursor_type=DictCursor)
+        if article_list:
+            blocked_id_list = [
+                i["id"]
+                for i in article_list
+                if (
+                    int(time.time())
+                    - i["title_rewrite_status_update_timestamp"]
+                )
+                > const.TITLE_REWRITE_LOCK_TIME
+            ]
+            if blocked_id_list:
+                update_sql = f"""
+                    update publish_single_video_source
+                    set title_rewrite_status = %s
+                    where id in %s and title_rewrite_status = %s;
+                """
+                self.db.save(
+                    query=update_sql,
+                    params=(
+                        const.TITLE_REWRITE_INIT_STATUS,
+                        tuple(blocked_id_list),
+                        const.TITLE_REWRITE_LOCK_STATUS,
+                    )
+                )
+
     def get_articles_batch(self, batch_size=1000):
         """
         从数据库中获取文章
         """
         sql = f"""
-            select content_trace_id, article_title 
+            select content_trace_id, article_title
             from publish_single_video_source 
             where bad_status = {const.ARTICLE_POSITIVE_STATUS} 
                 and audit_status = {const.ARTICLE_AUDIT_PASSED_STATUS} 
@@ -209,6 +245,23 @@ class TitleRewriteTask:
         """
         get tasks && deal tasks
         """
+        # rollback blocked tasks
+        try:
+            self.roll_back_blocked_tasks()
+        except Exception as e:
+            log(
+                task="title rewrite task",
+                function="roll_back_blocked_tasks",
+                message="roll back blocked tasks fail",
+                status="fail",
+                data={
+                    "error_message": str(e),
+                    "error_type": type(e).__name__,
+                    "traceback": traceback.format_exc()
+                }
+            )
+
+        # process tasks
         articles = self.get_articles_batch()
         bar = tqdm(articles, desc="title rewrite task")
         for article in bar: