瀏覽代碼

Revert "history_task.py"

This reverts commit 3bd202efd5beb88a5ba363563281a3fc23dc0e8c.
luojunhui 11 月之前
父節點
當前提交
050ce21a96
共有 4 個文件被更改,包括 82 次插入124 次删除
  1. 55 2
      applications/config/__init__.py
  2. 0 58
      applications/config/apollo_config.py
  3. 0 29
      applications/config/const.py
  4. 27 35
      tasks/history_task.py

+ 55 - 2
applications/config/__init__.py

@@ -1,5 +1,58 @@
 """
 @author: luojunhui
 """
-from .const import *
-from .apollo_config import Config
+import pyapollos
+
+
+class Config(object):
+    """
+    apolloConfig
+    """
+
+    def __init__(self, env="pre"):
+        """
+        :param env:
+        """
+        match env:
+            case "prod":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_videos"
+            case "dev":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos_copy1"
+                self.article_text_table = "long_articles_text_copy1"
+                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
+                self.root_source_id_table = "long_articles_root_source_id_copy1"
+                self.get_off_video_table = "get_off_videos_copy1"
+            case "pre":
+                self.apollo_connection = pyapollos.ApolloClient(
+                    app_id="LongArticlesMatchServer",
+                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
+                    timeout=10
+                )
+                self.article_match_video_table = "long_articles_match_videos"
+                self.article_text_table = "long_articles_text"
+                self.article_crawler_video_table = "long_articles_crawler_videos"
+                self.root_source_id_table = "long_articles_root_source_id"
+                self.get_off_video_table = "get_off_videos"
+
+    def get_config_value(self, key):
+        """
+        通过 key 获取配置的 Config
+        :param key:
+        :return:
+        """
+        response = self.apollo_connection.get_value(key)
+        return response

+ 0 - 58
applications/config/apollo_config.py

@@ -1,58 +0,0 @@
-"""
-@author: luojunhui
-"""
-import pyapollos
-
-
-class Config(object):
-    """
-    apolloConfig
-    """
-
-    def __init__(self, env="pre"):
-        """
-        :param env:
-        """
-        match env:
-            case "prod":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="https://apolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos"
-                self.article_text_table = "long_articles_text"
-                self.article_crawler_video_table = "long_articles_crawler_videos"
-                self.root_source_id_table = "long_articles_root_source_id"
-                self.get_off_video_table = "get_off_videos"
-            case "dev":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="https://devapolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos_copy1"
-                self.article_text_table = "long_articles_text_copy1"
-                self.article_crawler_video_table = "long_articles_crawler_videos_copy1"
-                self.root_source_id_table = "long_articles_root_source_id_copy1"
-                self.get_off_video_table = "get_off_videos_copy1"
-            case "pre":
-                self.apollo_connection = pyapollos.ApolloClient(
-                    app_id="LongArticlesMatchServer",
-                    config_server_url="http://preapolloconfig-internal.piaoquantv.com/",
-                    timeout=10
-                )
-                self.article_match_video_table = "long_articles_match_videos"
-                self.article_text_table = "long_articles_text"
-                self.article_crawler_video_table = "long_articles_crawler_videos"
-                self.root_source_id_table = "long_articles_root_source_id"
-                self.get_off_video_table = "get_off_videos"
-
-    def get_config_value(self, key):
-        """
-        通过 key 获取配置的 Config
-        :param key:
-        :return:
-        """
-        response = self.apollo_connection.get_value(key)
-        return response

+ 0 - 29
applications/config/const.py

@@ -1,29 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-
-class HistoryContentIdTaskConst:
-    """
-    历史文章id任务常量
-    """
-    # 任务处理中
-    TASK_PROCESSING_STATUS = 101
-    # 任务初始化状态
-    TASK_INIT_STATUS = 0
-    # 任务视频ETL 完成状态
-    TASK_ETL_COMPLETE_STATUS = 3
-    # 任务发布完成状态
-    TASK_PUBLISHED_STATUS = 4
-    # 文章已经退场 or 晋级
-    EXIT_STATUS = 97
-    # 文章品类不匹配
-    MISMATCH_STATUS = 96
-    # 视频已经下载成功状态
-    VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
-    # 任务最多处理次数
-    TASK_MAX_PROCESS_TIMES = 3
-
-    # 与AIGC交互,发送处理完成的trace_id至AIGC系统
-    RECORD_SUCCESS_TRACE_ID_CODE = 2
-    RECORD_FAIL_TRACE_ID_CODE = 3

+ 27 - 35
tasks/history_task.py

@@ -7,7 +7,7 @@ import asyncio
 import traceback
 
 from applications.feishu import bot
-from applications.config import Config, HistoryContentIdTaskConst
+from applications.config import Config
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
 from applications.functions.common import shuffle_list
@@ -18,6 +18,13 @@ class historyContentIdTask(object):
     """
     处理已经匹配过小程序的文章
     """
+    TASK_PROCESSING_STATUS = 101
+    EXIT_STATUS = 97
+    MISMATCH_STATUS = 96
+    TASK_INIT_STATUS = 0
+    TASK_PUBLISHED_STATUS = 4
+    RECORD_SUCCESS_TRACE_ID_CODE = 2
+    RECORD_FAIL_TRACE_ID_CODE = 3
 
     def __init__(self, mysql_client):
         """
@@ -25,14 +32,12 @@ class historyContentIdTask(object):
         """
         self.mysql_client = mysql_client
         self.config = Config()
-        self.const = HistoryContentIdTaskConst()
         self.article_match_video_table = self.config.article_match_video_table
         self.article_text_table = self.config.article_text_table
         self.article_crawler_video_table = self.config.article_crawler_video_table
         self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
         self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
         self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
-        self.new_method_gh_id = json.loads(self.config.get_config_value("newMethodGhId"))
 
     async def get_tasks(self):
         """
@@ -50,10 +55,10 @@ class historyContentIdTask(object):
             JOIN (
                 select content_id, count(1) as cnt 
                 from {self.article_crawler_video_table}
-                where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
+                where download_status = 2
                 group by content_id
             ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
-            WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
+            WHERE ART.content_status = 0 and ART.process_times <= 3
             ORDER BY ART.flow_pool_level, ART.request_timestamp
             LIMIT {self.history_coroutines};
         """
@@ -203,11 +208,11 @@ class historyContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_sql,
             params=(
-                self.const.TASK_PUBLISHED_STATUS,
+                self.TASK_PUBLISHED_STATUS,
                 json.dumps(L, ensure_ascii=False),
                 process_times + 1,
                 trace_id,
-                self.const.TASK_PROCESSING_STATUS
+                self.TASK_PROCESSING_STATUS
             )
         )
         logging(
@@ -216,7 +221,7 @@ class historyContentIdTask(object):
             trace_id=trace_id,
             data=L
         )
-        await record_trace_id(trace_id=trace_id, status=self.const.RECORD_SUCCESS_TRACE_ID_CODE)
+        await record_trace_id(trace_id=trace_id, status=self.RECORD_SUCCESS_TRACE_ID_CODE)
 
     async def roll_back_content_status_when_fails(self, process_times, trace_id):
         """
@@ -236,11 +241,11 @@ class historyContentIdTask(object):
         await self.mysql_client.async_insert(
             sql=update_article_sql,
             params=(
-                self.const.TASK_INIT_STATUS,
+                self.TASK_INIT_STATUS,
                 int(time.time()),
                 process_times + 1,
                 trace_id,
-                self.const.TASK_PROCESSING_STATUS
+                self.TASK_PROCESSING_STATUS
             )
         )
 
@@ -310,12 +315,9 @@ class historyContentIdTask(object):
         trace_id = params['trace_id']
         flow_pool_level = params['flow_pool_level']
         gh_id = params['gh_id']
-        process_times = params['process_times']
-
         if flow_pool_level == "autoArticlePoolLevel4":
             # 校验文章是否属于该账号的negative 类型
-            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id,
-                                                                       trace_id=trace_id)
+            negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id)
             if negative_category_status:
                 # 修改状态为品类不匹配状态
                 logging(
@@ -325,8 +327,8 @@ class historyContentIdTask(object):
                 )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
-                    new_content_status=self.const.MISMATCH_STATUS,
-                    ori_content_status=self.const.TASK_INIT_STATUS
+                    new_content_status=self.MISMATCH_STATUS,
+                    ori_content_status=self.TASK_INIT_STATUS
                 )
                 logging(
                     code="history1003",
@@ -336,7 +338,7 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
-                await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
+                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
                 return
             # 校验文章是否晋升 or 退场
             exit_status = await self.check_title_whether_exit(content_id)
@@ -349,8 +351,8 @@ class historyContentIdTask(object):
                 )
                 affected_rows = await self.update_content_status(
                     trace_id=trace_id,
-                    new_content_status=self.const.EXIT_STATUS,
-                    ori_content_status=self.const.TASK_INIT_STATUS
+                    new_content_status=self.EXIT_STATUS,
+                    ori_content_status=self.TASK_INIT_STATUS
                 )
                 logging(
                     code="history1005",
@@ -360,33 +362,23 @@ class historyContentIdTask(object):
                 if affected_rows == 0:
                     print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                     return
-                await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
+                await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
                 return
 
+        gh_id = params['gh_id']
+        process_times = params['process_times']
         download_videos = await self.get_video_list(content_id=content_id)
+        # time.sleep(3)
         if download_videos:
             # 修改状态为执行状态,获取该任务的锁
             affected_rows = await self.update_content_status(
                 trace_id=trace_id,
-                new_content_status=self.const.TASK_PROCESSING_STATUS,
-                ori_content_status=self.const.TASK_INIT_STATUS
+                new_content_status=self.TASK_PROCESSING_STATUS,
+                ori_content_status=self.TASK_INIT_STATUS
             )
             if affected_rows == 0:
                 print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
                 return
-            if gh_id in self.new_method_gh_id:
-                logging(
-                    code="3013",
-                    info="new_method_gh_id",
-                    trace_id=trace_id
-                )
-                # 把状态改为3
-                await self.update_content_status(
-                    trace_id=trace_id,
-                    new_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
-                    ori_content_status=self.const.TASK_PROCESSING_STATUS
-                )
-                return
             try:
                 kimi_title = await self.get_kimi_title(content_id)
                 await self.publish_videos_to_pq(