浏览代码

修复异常卡死

luojunhui 4 月之前
父节点
当前提交
3189fbc9cb
共有 1 个文件被更改,包括 16 次插入6 次删除
  1. 16 6
      tasks/new_contentId_task.py

+ 16 - 6
tasks/new_contentId_task.py

@@ -39,11 +39,15 @@ class NewContentIdTask(object):
         # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
         select_processing_sql = f"""
             SELECT 
-                trace_id, content_status_update_time, process_times
+                trace_id, content_status_update_time, process_times, content_status
             FROM 
                 {self.article_match_video_table}
             WHERE 
-                    content_status = {self.const.TASK_PROCESSING_STATUS} 
+                content_status in (
+                    {self.const.TASK_PROCESSING_STATUS},
+                    {self.const.TASK_KIMI_FINISHED_STATUS},
+                    {self.const.TASK_SPIDER_FINISHED_STATUS}
+                    )
                 and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}; 
         """
         processing_articles = await self.mysql_client.async_select(select_processing_sql)
@@ -52,7 +56,8 @@ class NewContentIdTask(object):
                 {
                     "trace_id": item[0],
                     "content_status_update_time": item[1],
-                    "process_times": item[2]
+                    "process_times": item[2],
+                    "content_status": item[3]
                 }
                 for item in processing_articles
             ]
@@ -61,7 +66,8 @@ class NewContentIdTask(object):
                     # 认为该任务失败
                     await self.roll_back_content_status_when_fails(
                         process_times=obj['process_times'] + 1,
-                        trace_id=obj['trace_id']
+                        trace_id=obj['trace_id'],
+                        ori_content_status=obj['content_status']
                     )
         # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
         update_status_sql = f"""
@@ -149,13 +155,17 @@ class NewContentIdTask(object):
         )
         return row_counts
 
-    async def roll_back_content_status_when_fails(self, process_times, trace_id):
+    async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=None):
         """
         处理失败,回滚至初始状态,处理次数加 1
         :param process_times:
         :param trace_id:
+        :param ori_content_status:
         :return:
         """
+        if not ori_content_status:
+            ori_content_status = self.const.TASK_PROCESSING_STATUS
+
         update_article_sql = f"""
                             UPDATE {self.article_match_video_table}
                             SET
@@ -171,7 +181,7 @@ class NewContentIdTask(object):
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.const.TASK_PROCESSING_STATUS
+                ori_content_status
             )
         )