浏览代码

Merge branch '2024-11-26-fix-match-task' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 4 月之前
父节点
当前提交
9885b0e98b
共有 3 个文件被更改,包括 125 次插入93 次删除
  1. 2 0
      applications/config/const.py
  2. 3 3
      applications/functions/common.py
  3. 120 90
      tasks/new_contentId_task.py

+ 2 - 0
applications/config/const.py

@@ -70,3 +70,5 @@ class NewContentIdTaskConst(HistoryContentIdTaskConst):
 
     # 视频下载失败状态
     VIDEO_DOWNLOAD_FAIL_STATUS = 3
+
+new_content_id_task = NewContentIdTaskConst()

+ 3 - 3
applications/functions/common.py

@@ -52,9 +52,9 @@ def sensitive_flag(s_words, ori_title):
     :param ori_title:
     :return:
     """
-    # for word in s_words:
-    #     if str(word) in ori_title:
-    #         return False
+    for word in s_words:
+        if str(word) in ori_title:
+            return False
     return True
 
 

+ 120 - 90
tasks/new_contentId_task.py

@@ -4,7 +4,8 @@
 import json
 import time
 
-from applications.config import Config, NewContentIdTaskConst
+from applications.config import Config
+from applications.config.const import new_content_id_task as NewContentIdTaskConst
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
@@ -29,40 +30,17 @@ class NewContentIdTask(object):
         self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
         self.account_map = json.loads(self.config.get_config_value("accountMap"))
         self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
-        self.const = NewContentIdTaskConst()
 
     async def get_tasks(self):
         """
         获取 task
         :return:
         """
-        # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
-        select_processing_sql = f"""
-            SELECT 
-                trace_id, content_status_update_time, process_times
-            FROM 
-                {self.article_match_video_table}
-            WHERE 
-                    content_status = {self.const.TASK_PROCESSING_STATUS} 
-                and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}; 
-        """
-        processing_articles = await self.mysql_client.async_select(select_processing_sql)
-        if processing_articles:
-            processing_list = [
-                {
-                    "trace_id": item[0],
-                    "content_status_update_time": item[1],
-                    "process_times": item[2]
-                }
-                for item in processing_articles
-            ]
-            for obj in processing_list:
-                if int(time.time()) - obj['content_status_update_time'] >= self.const.TASK_PROCESSING_TIMEOUT:
-                    # 认为该任务失败
-                    await self.roll_back_content_status_when_fails(
-                        process_times=obj['process_times'] + 1,
-                        trace_id=obj['trace_id']
-                    )
+        # 处理未托管的任务
+        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
+
+        # 处理托管任务
+        await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
         # 将  process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
         update_status_sql = f"""
             UPDATE 
@@ -75,10 +53,10 @@ class NewContentIdTask(object):
         await self.mysql_client.async_insert(
             update_status_sql,
             params=(
-                self.const.TASK_FAIL_STATUS,
-                self.const.TASK_MAX_PROCESS_TIMES,
-                self.const.TASK_ETL_COMPLETE_STATUS,
-                self.const.TASK_PUBLISHED_STATUS
+                NewContentIdTaskConst.TASK_FAIL_STATUS,
+                NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
+                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
+                NewContentIdTaskConst.TASK_PUBLISHED_STATUS
             )
         )
         # 获取  process_times <= 3 且  content_status = 0 的任务
@@ -88,8 +66,8 @@ class NewContentIdTask(object):
             FROM
                 {self.article_match_video_table}
             WHERE
-                    content_status = {self.const.TASK_INIT_STATUS}
-                and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+                    content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
+                and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
             ORDER BY flow_pool_level, request_timestamp
             LIMIT {self.spider_coroutines};
         """
@@ -109,6 +87,56 @@ class NewContentIdTask(object):
         else:
             return []
 
+    async def roll_back_unfinished_tasks(self, publish_flag):
+        """
+        将长时间处于中间状态的任务回滚
+        """
+        # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
+        if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
+            processing_status_tuple = (
+                                NewContentIdTaskConst.TASK_PROCESSING_STATUS,
+                                NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                                NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                                NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
+                                )
+        elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
+            processing_status_tuple = (
+                NewContentIdTaskConst.TASK_PROCESSING_STATUS,
+                NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
+            )
+        else:
+            return
+        select_processing_sql = f"""
+            SELECT 
+                trace_id, content_status_update_time, process_times, content_status
+            FROM 
+                {self.article_match_video_table}
+            WHERE 
+                content_status in {processing_status_tuple}
+                and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
+                and publish_flag = {publish_flag}; 
+                                        """
+        processing_articles = await self.mysql_client.async_select(select_processing_sql)
+        if processing_articles:
+            processing_list = [
+                {
+                    "trace_id": item[0],
+                    "content_status_update_time": item[1],
+                    "process_times": item[2],
+                    "content_status": item[3]
+                }
+                for item in processing_articles
+            ]
+            for obj in processing_list:
+                if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
+                    # 认为该任务失败
+                    await self.roll_back_content_status_when_fails(
+                        process_times=obj['process_times'] + 1,
+                        trace_id=obj['trace_id'],
+                        ori_content_status=obj['content_status']
+                    )
+
     async def get_video_list(self, content_id):
         """
         判断该文章是否存在历史匹配视频
@@ -118,10 +146,10 @@ class NewContentIdTask(object):
         sql = f"""
         SELECT id
         FROM {self.article_crawler_video_table}
-        WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
+        WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
         """
         res_tuple = await self.mysql_client.async_select(sql)
-        if len(res_tuple) >= self.const.MIN_MATCH_VIDEO_NUM:
+        if len(res_tuple) >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
             return True
         else:
             return False
@@ -149,11 +177,12 @@ class NewContentIdTask(object):
         )
         return row_counts
 
-    async def roll_back_content_status_when_fails(self, process_times, trace_id):
+    async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
         """
         处理失败,回滚至初始状态,处理次数加 1
         :param process_times:
         :param trace_id:
+        :param ori_content_status:
         :return:
         """
         update_article_sql = f"""
@@ -167,11 +196,11 @@ class NewContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_article_sql,
             params=(
-                self.const.TASK_INIT_STATUS,
+                NewContentIdTaskConst.TASK_INIT_STATUS,
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.const.TASK_PROCESSING_STATUS
+                ori_content_status
             )
         )
 
@@ -196,11 +225,11 @@ class NewContentIdTask(object):
                 content_status = item[0]
                 # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
                 if content_status in {
-                    self.const.TASK_KIMI_FINISHED_STATUS,
-                    self.const.TASK_SPIDER_FINISHED_STATUS,
-                    self.const.TASK_ETL_COMPLETE_STATUS,
-                    self.const.TASK_PROCESSING_STATUS,
-                    self.const.TASK_PUBLISHED_STATUS
+                    NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
+                    NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
+                    NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
+                    NewContentIdTaskConst.TASK_PROCESSING_STATUS,
+                    NewContentIdTaskConst.TASK_PUBLISHED_STATUS
                 }:
                     return True
             return False
@@ -215,7 +244,7 @@ class NewContentIdTask(object):
         sql = f"""
                 SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
                 FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
+                WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
                 ORDER BY score DESC;
                 """
         res_tuple = await self.mysql_client.async_select(sql)
@@ -246,7 +275,7 @@ class NewContentIdTask(object):
             kimi_status = response[0][0]
             return kimi_status
         else:
-            return self.const.ARTICLE_TEXT_TABLE_ERROR
+            return NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR
 
     async def kimi_task(self, params):
         """
@@ -258,11 +287,11 @@ class NewContentIdTask(object):
         process_times = params['process_times']
         kimi_status_code = await self.get_kimi_status(content_id=content_id)
 
-        if kimi_status_code == self.const.KIMI_SUCCESS_STATUS:
+        if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
             affected_rows = await self.update_content_status(
-                new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
+                new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.const.TASK_INIT_STATUS
+                ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
             )
             if affected_rows == 0:
                 logging(
@@ -282,7 +311,7 @@ class NewContentIdTask(object):
                 "kimi_summary": kimi_info[0][2],
                 "kimi_keys": json.loads(kimi_info[0][3])
             }
-        elif kimi_status_code == self.const.ARTICLE_TEXT_TABLE_ERROR:
+        elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
             logging(
                 code="4000",
                 info="long_articles_text表中未找到 content_id"
@@ -290,9 +319,9 @@ class NewContentIdTask(object):
         else:
             # 开始处理,讲 content_status 从 0  改为  101
             affected_rows = await self.update_content_status(
-                new_content_status=self.const.TASK_PROCESSING_STATUS,
+                new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.const.TASK_INIT_STATUS
+                ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
             )
             if affected_rows == 0:
                 logging(
@@ -327,12 +356,13 @@ class NewContentIdTask(object):
                         WHERE content_id = %s;"""
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
-                    params=(kimi_title, content_title, content_keys, self.const.KIMI_SUCCESS_STATUS, params['content_id'])
+                    params=(
+                    kimi_title, content_title, content_keys, NewContentIdTaskConst.KIMI_SUCCESS_STATUS, params['content_id'])
                 )
                 await self.update_content_status(
-                    new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
+                    new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.const.TASK_PROCESSING_STATUS
+                    ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
                 )
                 return {
                     "kimi_title": kimi_title,
@@ -351,7 +381,7 @@ class NewContentIdTask(object):
                 await self.mysql_client.async_insert(
                     sql=update_kimi_sql,
                     params=(
-                        self.const.KIMI_FAIL_STATUS,
+                        NewContentIdTaskConst.KIMI_FAIL_STATUS,
                         content_id
                     )
                 )
@@ -375,21 +405,21 @@ class NewContentIdTask(object):
             SELECT count(id) 
             FROM {self.article_crawler_video_table} 
             WHERE content_id = '{content_id}' 
-            AND download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
+            AND download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
         """
         count_tuple = await self.mysql_client.async_select(select_sql)
         counts = count_tuple[0][0]
-        if counts >= self.const.MIN_MATCH_VIDEO_NUM:
+        if counts >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
             await self.update_content_status(
-                new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
+                new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
-                ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS
+                ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
             )
             return True
         # 开始处理,将状态由 1 改成  101
         affected_rows = await self.update_content_status(
-            new_content_status=self.const.TASK_PROCESSING_STATUS,
-            ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
+            new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
+            ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
             trace_id=trace_id
         )
         if affected_rows == 0:
@@ -418,7 +448,7 @@ class NewContentIdTask(object):
                 gh_id_map=self.account_map,
                 db_client=self.mysql_client
             )
-            if search_videos_count >= self.const.MIN_MATCH_VIDEO_NUM:
+            if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
                 # 表示爬虫任务执行成功, 将状态从 101  改为 2
                 logging(
                     code="spider_1002",
@@ -427,9 +457,9 @@ class NewContentIdTask(object):
                     data=kimi_result
                 )
                 await self.update_content_status(
-                    new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
+                    new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                     trace_id=trace_id,
-                    ori_content_status=self.const.TASK_PROCESSING_STATUS
+                    ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
                 )
                 return True
             else:
@@ -465,15 +495,15 @@ class NewContentIdTask(object):
         select_sql = f"""
             select count(id) 
             from {self.article_crawler_video_table} 
-            where content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
+            where content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
         """
         video_count_tuple = await self.mysql_client.async_select(select_sql)
         video_count = video_count_tuple[0][0]
-        if video_count >= self.const.MIN_MATCH_VIDEO_NUM:
+        if video_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
             affect_rows = await self.update_content_status(
-                ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
+                ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
-                new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
+                new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
             )
             if affect_rows == 0:
                 logging(
@@ -485,9 +515,9 @@ class NewContentIdTask(object):
         else:
             # 开始处理, 将文章状态修改为处理状态
             affected_rows = await self.update_content_status(
-                ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
+                ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
                 trace_id=trace_id,
-                new_content_status=self.const.TASK_PROCESSING_STATUS
+                new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
             )
             if affected_rows == 0:
                 logging(
@@ -498,7 +528,7 @@ class NewContentIdTask(object):
             select_sql = f"""
                 SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
                 FROM {self.article_crawler_video_table}
-                WHERE content_id = '{content_id}' and download_status != {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                WHERE content_id = '{content_id}' and download_status != {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
                 ORDER BY score DESC;
             """
             videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
@@ -531,7 +561,7 @@ class NewContentIdTask(object):
                         """
                         await self.mysql_client.async_insert(
                             sql=update_sql,
-                            params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
+                            params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                         )
                         logging(
                             code="etl_1001",
@@ -571,7 +601,7 @@ class NewContentIdTask(object):
                             params=(
                                 oss_video,
                                 oss_cover,
-                                self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
+                                NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS,
                                 params['id']
                             )
                         )
@@ -583,11 +613,11 @@ class NewContentIdTask(object):
                             function="etl_task"
                         )
                     # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
-                    if downloaded_count > self.const.MIN_MATCH_VIDEO_NUM:
+                    if downloaded_count > NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
                         await self.update_content_status(
-                            ori_content_status=self.const.TASK_PROCESSING_STATUS,
+                            ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
                             trace_id=trace_id,
-                            new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
+                            new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
                         )
                         return True
                 except Exception as e:
@@ -598,7 +628,7 @@ class NewContentIdTask(object):
                     """
                     await self.mysql_client.async_insert(
                         sql=update_sql,
-                        params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
+                        params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                     )
                     logging(
                         code="etl_1001",
@@ -608,9 +638,9 @@ class NewContentIdTask(object):
                     )
             if downloaded_count >= 3:
                 await self.update_content_status(
-                    ori_content_status=self.const.TASK_PROCESSING_STATUS,
+                    ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
                     trace_id=trace_id,
-                    new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
+                    new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
                 )
                 return True
             else:
@@ -634,9 +664,9 @@ class NewContentIdTask(object):
         process_times = params['process_times']
         # 开始处理,将状态修改为操作状态
         affected_rows = await self.update_content_status(
-            ori_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
+            ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
             trace_id=trace_id,
-            new_content_status=self.const.TASK_PROCESSING_STATUS
+            new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
         )
         if affected_rows == 0:
             logging(
@@ -692,11 +722,11 @@ class NewContentIdTask(object):
             await self.mysql_client.async_insert(
                 sql=update_sql,
                 params=(
-                    self.const.TASK_PUBLISHED_STATUS,
+                    NewContentIdTaskConst.TASK_PUBLISHED_STATUS,
                     json.dumps(L, ensure_ascii=False),
                     process_times + 1,
                     trace_id,
-                    self.const.TASK_PROCESSING_STATUS
+                    NewContentIdTaskConst.TASK_PROCESSING_STATUS
                 )
             )
         except Exception as e:
@@ -751,7 +781,7 @@ class NewContentIdTask(object):
                     todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
                     目前先对这两种情况都做托管操作
                     """
-                    if publish_flag == self.const.DO_NOT_NEED_PUBLISH:
+                    if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
                         logging(
                             code="3013",
                             info="不需要发布,长文系统托管发布",
@@ -768,7 +798,7 @@ class NewContentIdTask(object):
                             )
                             await record_trace_id(
                                 trace_id=trace_id,
-                                status=self.const.RECORD_SUCCESS_TRACE_ID_CODE
+                                status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
                             )
                         except Exception as e:
                             logging(
@@ -794,7 +824,7 @@ class NewContentIdTask(object):
                 info="kimi 处理失败",
                 trace_id=trace_id
             )
-            if process_times >= self.const.TASK_MAX_PROCESS_TIMES:
+            if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
                 logging(
                     code="6011",
                     info="kimi处理次数达到上限, 放弃处理",
@@ -809,9 +839,9 @@ class NewContentIdTask(object):
                 affected_rows = await self.mysql_client.async_insert(
                     sql=update_sql,
                     params=(
-                        self.const.KIMI_ILLEGAL_STATUS,
+                        NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
                         content_id,
-                        self.const.TASK_INIT_STATUS
+                        NewContentIdTaskConst.TASK_INIT_STATUS
                     )
                 )
                 bot(