task_processing_monitor.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import time
  2. from applications.api import feishu_robot
  3. from applications.tasks.task_mapper import TaskMapper
  4. class TaskProcessingMonitor(TaskMapper):
  5. def __init__(self, pool):
  6. self.pool = pool
  7. async def get_processing_tasks(self):
  8. query = f"""
  9. select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
  10. """
  11. processing_task_list, error = await self.pool.async_fetch(query=query, params=(self.TASK_PROCESSING_STATUS,))
  12. return processing_task_list
  13. async def deal(self):
  14. tasks = await self.get_processing_tasks()
  15. bad_tasks = []
  16. for task in tasks:
  17. task_name = task['task_name']
  18. start_timestamp = task['start_timestamp']
  19. task_timeout = self.get_task_config(task_name)['expire_duration']
  20. if int(time.time()) - start_timestamp >= task_timeout:
  21. bad_tasks.append(task_name)
  22. if bad_tasks:
  23. await feishu_robot.bot(
  24. title="任务执行异常,超过正常时间,请注意!",
  25. detail=bad_tasks,
  26. mention=True
  27. )