Browse Source

history_task.py 单个任务异常不影响全局

罗俊辉 6 months ago
parent
commit
27e93161b7
1 changed files with 44 additions and 7 deletions
  1. 44 7
      tasks/history_task.py

+ 44 - 7
tasks/history_task.py

@@ -205,6 +205,32 @@ class historyContentIdTask(object):
             trace_id=trace_id
         )
 
+    async def roll_back_content_status_when_fails(self, process_times, trace_id):
+        """
+        处理失败,回滚至初始状态,处理次数加 1
+        :param process_times:
+        :param trace_id:
+        :return:
+        """
+        update_article_sql = f"""
+                            UPDATE {self.article_match_video_table}
+                            SET
+                                content_status = %s, 
+                                content_status_update_time = %s,
+                                process_times = %s
+                            WHERE trace_id = %s and content_status = %s;
+                        """
+        await self.mysql_client.async_insert(
+            sql=update_article_sql,
+            params=(
+                self.TASK_INIT_STATUS,
+                int(time.time()),
+                process_times + 1,
+                trace_id,
+                self.TASK_PROCESSING_STATUS
+            )
+        )
+
     async def process_task(self, params):
         """
         异步执行
@@ -219,7 +245,7 @@ class historyContentIdTask(object):
         download_videos = await self.get_video_list(content_id=content_id)
         # time.sleep(3)
         if download_videos:
-            # 把状态修改为 4
+            # 修改状态为执行状态,获取该任务的锁
             affected_rows = await self.update_content_status(
                 trace_id=trace_id,
                 new_content_status=self.TASK_PROCESSING_STATUS,
@@ -228,13 +254,24 @@ class historyContentIdTask(object):
             if affected_rows == 0:
                 print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                 return
-            kimi_title = await self.get_kimi_title(content_id)
-            await self.publish_videos_to_pq(
-                flow_pool_level=flow_pool_level,
-                kimi_title=kimi_title,
-                gh_id=gh_id,
+            try:
+                kimi_title = await self.get_kimi_title(content_id)
+                await self.publish_videos_to_pq(
+                    flow_pool_level=flow_pool_level,
+                    kimi_title=kimi_title,
+                    gh_id=gh_id,
+                    trace_id=trace_id,
+                    download_videos=download_videos,
+                    process_times=process_times
+                )
+            except Exception as e:
+                logging(
+                    code="5003",
+                    info="history task 在发布的时候出现异常, error = {}".format(e),
+                    trace_id=trace_id
+                )
+            await self.roll_back_content_status_when_fails(
                 trace_id=trace_id,
-                download_videos=download_videos,
                 process_times=process_times
             )
         else: