123456789101112131415161718192021222324252627282930313233343536 |
- import time
- from applications.api import feishu_robot
- from applications.tasks.task_mapper import TaskMapper
- class TaskProcessingMonitor(TaskMapper):
- def __init__(self, pool):
- self.pool = pool
- async def get_processing_tasks(self):
- query = f"""
- select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
- """
- processing_task_list = await self.pool.async_fetch(
- query=query, params=(self.TASK_PROCESSING_STATUS,)
- )
- return processing_task_list
- async def deal(self):
- tasks = await self.get_processing_tasks()
- bad_tasks = []
- for task in tasks:
- task_name = task["task_name"]
- start_timestamp = task["start_timestamp"]
- task_timeout = self.get_task_config(task_name)["expire_duration"]
- if int(time.time()) - start_timestamp >= task_timeout:
- bad_tasks.append(task_name)
- if bad_tasks:
- await feishu_robot.bot(
- title="任务执行异常,超过正常时间,请注意!",
- detail=bad_tasks,
- mention=True,
- )
|