task_processing_monitor.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import time
  2. from applications.api import feishu_robot
  3. from applications.tasks.task_mapper import TaskMapper
  4. class TaskProcessingMonitor(TaskMapper):
  5. def __init__(self, pool):
  6. self.pool = pool
  7. async def get_processing_tasks(self):
  8. query = f"""
  9. select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
  10. """
  11. processing_task_list = await self.pool.async_fetch(
  12. query=query, params=(self.TASK_PROCESSING_STATUS,)
  13. )
  14. return processing_task_list
  15. async def deal(self):
  16. tasks = await self.get_processing_tasks()
  17. bad_tasks = []
  18. for task in tasks:
  19. task_name = task["task_name"]
  20. start_timestamp = task["start_timestamp"]
  21. task_timeout = self.get_task_config(task_name)["expire_duration"]
  22. if int(time.time()) - start_timestamp >= task_timeout:
  23. bad_tasks.append(task_name)
  24. if bad_tasks:
  25. await feishu_robot.bot(
  26. title="任务执行异常,超过正常时间,请注意!",
  27. detail=bad_tasks,
  28. mention=True,
  29. )