123456789101112131415161718192021222324252627282930313233343536 |
- import time
- from applications.api import feishu_robot
- from applications.tasks.task_mapper import TaskMapper
- class TaskProcessingMonitor(TaskMapper):
- def __init__(self, pool):
- self.pool = pool
- async def get_processing_tasks(self):
- query = f"""
- select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
- """
- processing_task_list, error = await self.pool.async_fetch(query=query, params=(self.TASK_PROCESSING_STATUS,))
- return processing_task_list
- async def deal(self):
- tasks = await self.get_processing_tasks()
- bad_tasks = []
- for task in tasks:
- task_name = task['task_name']
- start_timestamp = task['start_timestamp']
- task_timeout = self.get_task_config(task_name)['expire_duration']
- if int(time.time()) - start_timestamp >= task_timeout:
- bad_tasks.append(task_name)
- if bad_tasks:
- await feishu_robot.bot(
- title="任务执行异常,超过正常时间,请注意!",
- detail=bad_tasks,
- mention=True
- )
|