task_utils.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. 任务系统异常定义和工具类
  3. """
  4. import re
  5. from typing import Optional
  6. class TaskError(Exception):
  7. """任务错误基类"""
  8. def __init__(
  9. self, message: str, retryable: bool = True, task_name: Optional[str] = None
  10. ):
  11. self.message = message
  12. self.retryable = retryable
  13. self.task_name = task_name
  14. super().__init__(message)
  15. class TaskValidationError(TaskError):
  16. """任务验证错误(不可重试)"""
  17. def __init__(self, message: str, task_name: Optional[str] = None):
  18. super().__init__(message, retryable=False, task_name=task_name)
  19. class TaskTimeoutError(TaskError):
  20. """任务超时错误(可重试)"""
  21. def __init__(self, message: str, task_name: Optional[str] = None):
  22. super().__init__(message, retryable=True, task_name=task_name)
  23. class TaskConcurrencyError(TaskError):
  24. """任务并发限制错误(不可重试)"""
  25. def __init__(self, message: str, task_name: Optional[str] = None):
  26. super().__init__(message, retryable=False, task_name=task_name)
  27. class TaskLockError(TaskError):
  28. """任务锁获取失败(不可重试)"""
  29. def __init__(self, message: str, task_name: Optional[str] = None):
  30. super().__init__(message, retryable=False, task_name=task_name)
  31. class TaskCancelledError(TaskError):
  32. """任务被取消(不可重试)"""
  33. def __init__(self, message: str, task_name: Optional[str] = None):
  34. super().__init__(message, retryable=False, task_name=task_name)
  35. class TaskUtils:
  36. """任务工具类"""
  37. @staticmethod
  38. def validate_table_name(table: str) -> str:
  39. """验证表名只包含安全字符"""
  40. if not re.match(r"^[a-zA-Z0-9_]+$", table):
  41. raise ValueError(f"Invalid table name: {table}")
  42. return table
  43. @staticmethod
  44. def validate_task_name(task_name: str) -> str:
  45. """验证任务名"""
  46. if not task_name or not isinstance(task_name, str):
  47. raise TaskValidationError("task_name must be a non-empty string")
  48. if not re.match(r"^[a-zA-Z0-9_]+$", task_name):
  49. raise TaskValidationError(f"Invalid task_name format: {task_name}")
  50. return task_name
  51. @staticmethod
  52. def format_error_detail(error: Exception) -> dict:
  53. """格式化错误详情"""
  54. import traceback
  55. return {
  56. "error_type": type(error).__name__,
  57. "error_message": str(error),
  58. "traceback": traceback.format_exc(),
  59. "retryable": getattr(error, "retryable", False),
  60. }
  61. __all__ = [
  62. "TaskError",
  63. "TaskValidationError",
  64. "TaskTimeoutError",
  65. "TaskConcurrencyError",
  66. "TaskLockError",
  67. "TaskCancelledError",
  68. "TaskUtils",
  69. ]