Forráskód Böngészése

Merge branch 'feature/20250725-luojunhui-task-config' of Server/LongArticleTaskServer into master

luojunhui 2 napja
szülő
commit
0caff15b7f

+ 1 - 0
applications/ab_test/__init__.py

@@ -0,0 +1 @@
+from .get_cover import GetCoverService

+ 0 - 0
applications/service/get_cover.py → applications/ab_test/get_cover.py


+ 0 - 3
applications/service/__init__.py

@@ -1,5 +1,2 @@
 # 日志服务
 from .log_service import LogService
-
-# 实验
-from .get_cover import GetCoverService

+ 1 - 0
applications/tasks/__init__.py

@@ -0,0 +1 @@
+from .task_scheduler import TaskScheduler

+ 6 - 6
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -182,7 +182,7 @@ class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
         """check account data"""
         query = f"""
             select accountName, count(1) as publish_count 
-            from official_articles_v2 where ghId = %s and from_unixtime(createTime) > %s;
+            from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s;
         """
         response, error = await self.pool.async_fetch(
             query=query,
@@ -363,7 +363,9 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             set oav.publish_timestamp = vv.publish_timestamp
             where oav.publish_timestamp <= %s;
         """
-        affected_rows_1 = await self.pool.async_save(query=update_sql, params=(0, 0), db_name="piaoquan_crawler")
+        affected_rows_1 = await self.pool.async_save(
+            query=update_sql, params=(0, 0), db_name="piaoquan_crawler"
+        )
 
         # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
         update_sql_2 = f"""
@@ -377,9 +379,9 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
                 title="执行兜底修改发布时间戳",
                 detail={
                     "通过msgId修改": affected_rows_1,
-                    "通过update_timestamp修改": affected_rows_2
+                    "通过update_timestamp修改": affected_rows_2,
                 },
-                mention=False
+                mention=False,
             )
 
     async def deal(self):
@@ -438,5 +440,3 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             current_hour = datetime.datetime.now().hour
             if current_hour >= 21:
                 await self.fallback_mechanism()
-
-

+ 1 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -4,3 +4,4 @@ from .get_off_videos import CheckVideoAuditStatus
 from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
+from .task_processing_monitor import TaskProcessingMonitor

+ 36 - 0
applications/tasks/monitor_tasks/task_processing_monitor.py

@@ -0,0 +1,36 @@
+import time
+
+from applications.api import feishu_robot
+from applications.tasks.task_mapper import TaskMapper
+
+
+class TaskProcessingMonitor(TaskMapper):
+
+    def __init__(self, pool):
+        self.pool = pool
+
+    async def get_processing_tasks(self):
+        query = f"""
+            select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
+        """
+        processing_task_list, error = await self.pool.async_fetch(query=query, params=(self.TASK_PROCESSING_STATUS,))
+        return processing_task_list
+
+    async def deal(self):
+        tasks = await self.get_processing_tasks()
+        bad_tasks = []
+        for task in tasks:
+            task_name = task['task_name']
+            start_timestamp = task['start_timestamp']
+            task_timeout = self.get_task_config(task_name)['expire_duration']
+            if int(time.time()) - start_timestamp >= task_timeout:
+                bad_tasks.append(task_name)
+
+        if bad_tasks:
+            await feishu_robot.bot(
+                title="任务执行异常,超过正常时间,请注意!",
+                detail=bad_tasks,
+                mention=True
+            )
+
+

+ 45 - 0
applications/tasks/task_mapper.py

@@ -0,0 +1,45 @@
+class Const:
+    # task status
+    TASK_INIT_STATUS = 0
+    TASK_PROCESSING_STATUS = 1
+    TASK_SUCCESS_STATUS = 2
+    TASK_FAILED_STATUS = 3
+
+    # DEFAULT
+    DEFAULT_TIMEOUT = 1800
+
+    # duration
+    CHECK_KIMI_BALANCE_TIMEOUT = 20
+    GET_OFF_VIDEO_TIMEOUT = 1800
+    CHECK_VIDEO_AUDIT_TIMEOUT = 1800
+    OUTSIDE_ARTICLE_MONITOR_TIMEOUT = 3 * 3600
+    INNER_ARTICLE_MONITOR_TIMEOUT = 3600
+    TITLE_REWRITE_TIMEOUT = 1800
+    RECYCLE_DAILY_ARTICLE_TIMEOUT = 3600
+    UPDATE_ROOT_SOURCE_ID_TIMEOUT = 3600
+
+
+class TaskMapper(Const):
+
+    def get_task_config(self, task_name) -> dict:
+        match task_name:
+            case "check_kimi_balance":
+                expire_duration = self.CHECK_KIMI_BALANCE_TIMEOUT
+            case "get_off_videos":
+                expire_duration = self.GET_OFF_VIDEO_TIMEOUT
+            case "check_publish_video_audit_status":
+                expire_duration = self.CHECK_VIDEO_AUDIT_TIMEOUT
+            case "outside_article_monitor":
+                expire_duration = self.OUTSIDE_ARTICLE_MONITOR_TIMEOUT
+            case "inner_article_monitor":
+                expire_duration = self.INNER_ARTICLE_MONITOR_TIMEOUT
+            case "title_rewrite":
+                expire_duration = self.TITLE_REWRITE_TIMEOUT
+            case "daily_publish_articles_recycle":
+                expire_duration = self.RECYCLE_DAILY_ARTICLE_TIMEOUT
+            case "update_root_source_id":
+                expire_duration = self.UPDATE_ROOT_SOURCE_ID_TIMEOUT
+            case _:
+                expire_duration = self.DEFAULT_TIMEOUT
+
+        return {"expire_duration": expire_duration}

+ 20 - 4
applications/tasks/task_scheduler.py

@@ -15,9 +15,11 @@ from applications.tasks.monitor_tasks import CheckVideoAuditStatus
 from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.task_mapper import TaskMapper
 
 
-class TaskScheduler:
+class TaskScheduler(TaskMapper):
     def __init__(self, data, log_service, db_client):
         self.data = data
         self.log_client = log_service
@@ -37,10 +39,12 @@ class TaskScheduler:
             return False
         else:
             start_timestamp = response[0]["start_timestamp"]
-            # todo: every task should has a unique expire timestamp, remember to write that in a task config file
-            if int(time.time()) - start_timestamp >= 86400:
+
+            if int(time.time()) - start_timestamp >= self.get_task_config(
+                task_name
+            ).get("expire_duration", self.DEFAULT_TIMEOUT):
                 await feishu_robot.bot(
-                    title=f"{task_name} has been processing for more than one day",
+                    title=f"{task_name} has been processing over timeout",
                     detail={"timestamp": start_timestamp},
                     env="long_articles_task",
                 )
@@ -257,6 +261,18 @@ class TaskScheduler:
                     data={"code": 0, "message": "task started background"},
                 )
 
+            case "task_processing_monitor":
+                async def background_task_processing_monitor():
+                    sub_task = TaskProcessingMonitor(self.db_client)
+                    await sub_task.deal()
+                    await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
+
+                asyncio.create_task(background_task_processing_monitor())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
             case _:
                 await self.log_client.log(
                     contents={

+ 2 - 2
routes/blueprint.py

@@ -1,7 +1,7 @@
 from quart import Blueprint, jsonify, request
-from applications.service import GetCoverService
+from applications.ab_test import GetCoverService
 
-from applications.tasks.task_scheduler import TaskScheduler
+from applications.tasks import TaskScheduler
 
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")