import time from applications.api import feishu_robot from applications.tasks.task_mapper import TaskMapper class TaskProcessingMonitor(TaskMapper): def __init__(self, pool): self.pool = pool async def get_processing_tasks(self): query = f""" select task_name, start_timestamp from long_articles_task_manager where task_status = %s; """ processing_task_list, error = await self.pool.async_fetch(query=query, params=(self.TASK_PROCESSING_STATUS,)) return processing_task_list async def deal(self): tasks = await self.get_processing_tasks() bad_tasks = [] for task in tasks: task_name = task['task_name'] start_timestamp = task['start_timestamp'] task_timeout = self.get_task_config(task_name)['expire_duration'] if int(time.time()) - start_timestamp >= task_timeout: bad_tasks.append(task_name) if bad_tasks: await feishu_robot.bot( title="任务执行异常,超过正常时间,请注意!", detail=bad_tasks, mention=True )