Browse Source

history_task.py

对于特定账号不处理发布
luojunhui 5 months ago
parent
commit
3bd202efd5

+ 2 - 55
applications/config/__init__.py

@@ -1,58 +1,5 @@
 """
 @author: luojunhui
 """
-import pyapollos
-
-
-class Config(object):
-    """
-    apolloConfig
-    """
-
-    def __init__(self, env="pre"):
-        """
-        :param env:
-        """
-        match env:
-            case "prod":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos"
-                self.article_text_table = "long_articles_text"
-                self.article_crawler_video_table = "long_articles_crawler_videos"
-                self.root_source_id_table = "long_articles_root_source_id"
-                self.get_off_video_table = "get_off_videos"
-            case "dev":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos_copy1"
-                self.article_text_table = "long_articles_text_copy1"
-                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
-                self.root_source_id_table = "long_articles_root_source_id_copy1"
-                self.get_off_video_table = "get_off_videos_copy1"
-            case "pre":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos"
-                self.article_text_table = "long_articles_text"
-                self.article_crawler_video_table = "long_articles_crawler_videos"
-                self.root_source_id_table = "long_articles_root_source_id"
-                self.get_off_video_table = "get_off_videos"
-
-    def get_config_value(self, key):
-        """
-        通过 key 获取配置的 Config
-        :param key:
-        :return:
-        """
-        response = self.apollo_connection.get_value(key)
-        return response
+from .const import *
+from .apollo_config import Config

+ 58 - 0
applications/config/apollo_config.py

@@ -0,0 +1,58 @@
+"""
+@author: luojunhui
+"""
+import pyapollos
+
+
+class Config(object):
+    """
+    apolloConfig
+    """
+
+    def __init__(self, env="pre"):
+        """
+        :param env:
+        """
+        match env:
+            case "prod":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_videos"
+            case "dev":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos_copy1"
+                self.article_text_table = "long_articles_text_copy1"
+                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
+                self.root_source_id_table = "long_articles_root_source_id_copy1"
+                self.get_off_video_table = "get_off_videos_copy1"
+            case "pre":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_videos"
+
+    def get_config_value(self, key):
+        """
+        通过 key 获取配置的 Config
+        :param key:
+        :return:
+        """
+        response = self.apollo_connection.get_value(key)
+        return response

+ 29 - 0
applications/config/const.py

@@ -0,0 +1,29 @@
+"""
+@author: luojunhui
+"""
+
+
+class HistoryContentIdTaskConst:
+    """
+    历史文章id任务常量
+    """
+    # 任务处理中
+    TASK_PROCESSING_STATUS = 101
+    # 任务初始化状态
+    TASK_INIT_STATUS = 0
+    # 任务视频ETL 完成状态
+    TASK_ETL_COMPLETE_STATUS = 3
+    # 任务发布完成状态
+    TASK_PUBLISHED_STATUS = 4
+    # 文章已经退场 or 晋级
+    EXIT_STATUS = 97
+    # 文章品类不匹配
+    MISMATCH_STATUS = 96
+    # 视频已经下载成功状态
+    VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
+    # 任务最多处理次数
+    TASK_MAX_PROCESS_TIMES = 3
+
+    # 与AIGC交互,发送处理完成的trace_id至AIGC系统
+    RECORD_SUCCESS_TRACE_ID_CODE = 2
+    RECORD_FAIL_TRACE_ID_CODE = 3

+ 35 - 27
tasks/history_task.py

@@ -7,7 +7,7 @@ import asyncio
 import traceback
 
 from applications.feishu import bot
-from applications.config import Config
+from applications.config import Config, HistoryContentIdTaskConst
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
@@ -18,13 +18,6 @@ class historyContentIdTask(object):
     """
     处理已经匹配过小程序的文章
     """
-    TASK_PROCESSING_STATUS = 101
-    EXIT_STATUS = 97
-    MISMATCH_STATUS = 96
-    TASK_INIT_STATUS = 0
-    TASK_PUBLISHED_STATUS = 4
-    RECORD_SUCCESS_TRACE_ID_CODE = 2
-    RECORD_FAIL_TRACE_ID_CODE = 3
 
     def __init__(self, mysql_client):
         """
@@ -32,12 +25,14 @@ class historyContentIdTask(object):
         """
         self.mysql_client = mysql_client
         self.config = Config()
+        self.const = HistoryContentIdTaskConst()
         self.article_match_video_table = self.config.article_match_video_table
         self.article_text_table = self.config.article_text_table
         self.article_crawler_video_table = self.config.article_crawler_video_table
         self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
         self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
         self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
+        self.new_method_gh_id = json.loads(self.config.get_config_value("newMethodGhId"))
 
     async def get_tasks(self):
         """
@@ -55,10 +50,10 @@ class historyContentIdTask(object):
             JOIN (
                 select content_id, count(1) as cnt 
                 from {self.article_crawler_video_table}
-                where download_status = 2
+                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
-            WHERE ART.content_status = 0 and ART.process_times <= 3
+            WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
             ORDER BY ART.flow_pool_level, ART.request_timestamp
             LIMIT {self.history_coroutines};
         """
@@ -208,11 +203,11 @@ class historyContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_sql,
             params=(
-                self.TASK_PUBLISHED_STATUS,
+                self.const.TASK_PUBLISHED_STATUS,
                 json.dumps(L, ensure_ascii=False),
                 process_times + 1,
                 trace_id,
-                self.TASK_PROCESSING_STATUS
+                self.const.TASK_PROCESSING_STATUS
             )
         )
         logging(
@@ -221,7 +216,7 @@ class historyContentIdTask(object):
             trace_id=trace_id,
             data=L
         )
-        await record_trace_id(trace_id=trace_id, status=self.RECORD_SUCCESS_TRACE_ID_CODE)
+        await record_trace_id(trace_id=trace_id, status=self.const.RECORD_SUCCESS_TRACE_ID_CODE)
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
         """
@@ -241,11 +236,11 @@ class historyContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_article_sql,
             params=(
-                self.TASK_INIT_STATUS,
+                self.const.TASK_INIT_STATUS,
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.TASK_PROCESSING_STATUS
+                self.const.TASK_PROCESSING_STATUS
             )
         )
 
@@ -315,9 +310,12 @@ class historyContentIdTask(object):
         trace_id = params['trace_id']
         flow_pool_level = params['flow_pool_level']
         gh_id = params['gh_id']
+        process_times = params['process_times']
+
         if flow_pool_level == "autoArticlePoolLevel4":
             # 校验文章是否属于该账号的negative 类型
-            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id)
+            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id,
+                                                                       trace_id=trace_id)
             if negative_category_status:
                 # 修改状态为品类不匹配状态
                 logging(
@@ -327,8 +325,8 @@ class historyContentIdTask(object):
                 )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
-                    new_content_status=self.MISMATCH_STATUS,
-                    ori_content_status=self.TASK_INIT_STATUS
+                    new_content_status=self.const.MISMATCH_STATUS,
+                    ori_content_status=self.const.TASK_INIT_STATUS
                 )
                 logging(
                     code="history1003",
@@ -338,7 +336,7 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
-                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
+                await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
                 return
             # 校验文章是否晋升 or 退场
             exit_status = await self.check_title_whether_exit(content_id)
@@ -351,8 +349,8 @@ class historyContentIdTask(object):
                 )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
-                    new_content_status=self.EXIT_STATUS,
-                    ori_content_status=self.TASK_INIT_STATUS
+                    new_content_status=self.const.EXIT_STATUS,
+                    ori_content_status=self.const.TASK_INIT_STATUS
                 )
                 logging(
                     code="history1005",
@@ -362,23 +360,33 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
-                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
+                await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
                 return
 
-        gh_id = params['gh_id']
-        process_times = params['process_times']
         download_videos = await self.get_video_list(content_id=content_id)
-        # time.sleep(3)
         if download_videos:
             # 修改状态为执行状态,获取该任务的锁
             affected_rows = await self.update_content_status(
                 trace_id=trace_id,
-                new_content_status=self.TASK_PROCESSING_STATUS,
-                ori_content_status=self.TASK_INIT_STATUS
+                new_content_status=self.const.TASK_PROCESSING_STATUS,
+                ori_content_status=self.const.TASK_INIT_STATUS
             )
             if affected_rows == 0:
                 print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                 return
+            if gh_id in self.new_method_gh_id:
+                logging(
+                    code="3013",
+                    info="new_method_gh_id",
+                    trace_id=trace_id
+                )
+                # 把状态改为3
+                await self.update_content_status(
+                    trace_id=trace_id,
+                    new_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
+                    ori_content_status=self.const.TASK_PROCESSING_STATUS
+                )
+                return
             try:
                 kimi_title = await self.get_kimi_title(content_id)
                 await self.publish_videos_to_pq(