فهرست منبع

history_task.py dev

luojunhui 11 ماه پیش
والد
کامیت
2bef159980
1فایلهای تغییر یافته به همراه31 افزوده شده و 0 حذف شده
  1. 31 0
      tasks/history_task.py

+ 31 - 0
tasks/history_task.py

@@ -16,6 +16,7 @@ class historyContentIdTask(object):
     处理已经匹配过小程序的文章
     """
     TASK_PROCESSING_STATUS = 101
+    EXIT_STATUS = 97
     TASK_INIT_STATUS = 0
     TASK_PUBLISHED_STATUS = 4
 
@@ -233,6 +234,24 @@ class historyContentIdTask(object):
             )
         )
 
+    async def check_title_whether_exit(self, content_id):
+        """
+        校验文章是标题是否晋升 or 退场
+        :return:
+        """
+        sql = f"""
+            SELECT lat.article_title, cstp.status
+            FROM long_articles_text lat
+            JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title
+            WHERE lat.content_id = '{content_id}';
+        """
+        result = await self.mysql_client.async_select(sql)
+        if result:
+            return True
+        else:
+            return False
+
+
     async def process_task(self, params):
         """
         异步执行
@@ -242,6 +261,18 @@ class historyContentIdTask(object):
         content_id = params['content_id']
         trace_id = params['trace_id']
         flow_pool_level = params['flow_pool_level']
+        if flow_pool_level == "autoArticlePoolLevel4":
+            exit_status = await self.check_title_whether_exit(content_id)
+            if exit_status:
+                # 修改状态为执行状态
+                affected_rows = await self.update_content_status(
+                    trace_id=trace_id,
+                    new_content_status=self.TASK_PROCESSING_STATUS,
+                    ori_content_status=self.TASK_INIT_STATUS
+                )
+                if affected_rows == 0:
+                    print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
+                    return
         gh_id = params['gh_id']
         process_times = params['process_times']
         download_videos = await self.get_video_list(content_id=content_id)