Browse Source

修复异常卡死

luojunhui 4 months ago
parent
commit
50f946ece0
2 changed files with 60 additions and 37 deletions
  1. 3 3
      applications/functions/common.py
  2. 57 34
      tasks/new_contentId_task.py

+ 3 - 3
applications/functions/common.py

@@ -52,9 +52,9 @@ def sensitive_flag(s_words, ori_title):
     :param ori_title:
     :return:
     """
-    # for word in s_words:
-    #     if str(word) in ori_title:
-    #         return False
+    for word in s_words:
+        if str(word) in ori_title:
+            return False
     return True
 
 

+ 57 - 34
tasks/new_contentId_task.py

@@ -36,39 +36,11 @@ class NewContentIdTask(object):
         获取 task
         :return:
         """
-        # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
-        select_processing_sql = f"""
-            SELECT 
-                trace_id, content_status_update_time, process_times, content_status
-            FROM 
-                {self.article_match_video_table}
-            WHERE 
-                content_status in (
-                    {self.const.TASK_PROCESSING_STATUS},
-                    {self.const.TASK_KIMI_FINISHED_STATUS},
-                    {self.const.TASK_SPIDER_FINISHED_STATUS}
-                    )
-                and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}; 
-        """
-        processing_articles = await self.mysql_client.async_select(select_processing_sql)
-        if processing_articles:
-            processing_list = [
-                {
-                    "trace_id": item[0],
-                    "content_status_update_time": item[1],
-                    "process_times": item[2],
-                    "content_status": item[3]
-                }
-                for item in processing_articles
-            ]
-            for obj in processing_list:
-                if int(time.time()) - obj['content_status_update_time'] >= self.const.TASK_PROCESSING_TIMEOUT:
-                    # 认为该任务失败
-                    await self.roll_back_content_status_when_fails(
-                        process_times=obj['process_times'] + 1,
-                        trace_id=obj['trace_id'],
-                        ori_content_status=obj['content_status']
-                    )
+        # 处理未托管的任务
+        await self.roll_back_unfinished_tasks(publish_flag=self.const.NEED_PUBLISH)
+
+        # 处理托管任务
+        await self.roll_back_unfinished_tasks(publish_flag=self.const.DO_NOT_NEED_PUBLISH)
         # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
         update_status_sql = f"""
             UPDATE 
@@ -115,6 +87,56 @@ class NewContentIdTask(object):
         else:
             return []
 
+    async def roll_back_unfinished_tasks(self, publish_flag):
+        """
+        将长时间处于中间状态的任务回滚
+        """
+        # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
+        if publish_flag == self.const.NEED_PUBLISH:
+            processing_status_tuple = (
+                                self.const.TASK_PROCESSING_STATUS,
+                                self.const.TASK_KIMI_FINISHED_STATUS,
+                                self.const.TASK_SPIDER_FINISHED_STATUS,
+                                self.const.TASK_ETL_COMPLETE_STATUS
+                                )
+        elif publish_flag == self.const.DO_NOT_NEED_PUBLISH:
+            processing_status_tuple = (
+                self.const.TASK_PROCESSING_STATUS,
+                self.const.TASK_KIMI_FINISHED_STATUS,
+                self.const.TASK_SPIDER_FINISHED_STATUS
+            )
+        else:
+            return
+        select_processing_sql = f"""
+            SELECT 
+                trace_id, content_status_update_time, process_times, content_status
+            FROM 
+                {self.article_match_video_table}
+            WHERE 
+                content_status in {processing_status_tuple}
+                and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+                and publish_flag = {publish_flag}; 
+                                        """
+        processing_articles = await self.mysql_client.async_select(select_processing_sql)
+        if processing_articles:
+            processing_list = [
+                {
+                    "trace_id": item[0],
+                    "content_status_update_time": item[1],
+                    "process_times": item[2],
+                    "content_status": item[3]
+                }
+                for item in processing_articles
+            ]
+            for obj in processing_list:
+                if int(time.time()) - obj['content_status_update_time'] >= self.const.TASK_PROCESSING_TIMEOUT:
+                    # 认为该任务失败
+                    await self.roll_back_content_status_when_fails(
+                        process_times=obj['process_times'] + 1,
+                        trace_id=obj['trace_id'],
+                        ori_content_status=obj['content_status']
+                    )
+
     async def get_video_list(self, content_id):
         """
         判断该文章是否存在历史匹配视频
@@ -337,7 +359,8 @@ class NewContentIdTask(object):
                         WHERE content_id = %s;"""
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
-                    params=(kimi_title, content_title, content_keys, self.const.KIMI_SUCCESS_STATUS, params['content_id'])
+                    params=(
+                    kimi_title, content_title, content_keys, self.const.KIMI_SUCCESS_STATUS, params['content_id'])
                 )
                 await self.update_content_status(
                     new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,