task_utils.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. 任务系统异常定义和工具类
  3. """
  4. import re
  5. from typing import Optional
  6. class TaskError(Exception):
  7. """任务错误基类"""
  8. def __init__(
  9. self, message: str, retryable: bool = True, task_name: Optional[str] = None
  10. ):
  11. self.message = message
  12. self.retryable = retryable
  13. self.task_name = task_name
  14. super().__init__(message)
  15. class TaskValidationError(TaskError):
  16. """任务验证错误(不可重试)"""
  17. def __init__(self, message: str, task_name: Optional[str] = None):
  18. super().__init__(message, retryable=False, task_name=task_name)
  19. class TaskTimeoutError(TaskError):
  20. """任务超时错误(可重试)"""
  21. def __init__(self, message: str, task_name: Optional[str] = None):
  22. super().__init__(message, retryable=True, task_name=task_name)
  23. class TaskConcurrencyError(TaskError):
  24. """任务并发限制错误(不可重试)"""
  25. def __init__(self, message: str, task_name: Optional[str] = None):
  26. super().__init__(message, retryable=False, task_name=task_name)
  27. class TaskLockError(TaskError):
  28. """任务锁获取失败(不可重试)"""
  29. def __init__(self, message: str, task_name: Optional[str] = None):
  30. super().__init__(message, retryable=False, task_name=task_name)
  31. class TaskUtils:
  32. """任务工具类"""
  33. @staticmethod
  34. def validate_table_name(table: str) -> str:
  35. """验证表名只包含安全字符"""
  36. if not re.match(r"^[a-zA-Z0-9_]+$", table):
  37. raise ValueError(f"Invalid table name: {table}")
  38. return table
  39. @staticmethod
  40. def validate_task_name(task_name: str) -> str:
  41. """验证任务名"""
  42. if not task_name or not isinstance(task_name, str):
  43. raise TaskValidationError("task_name must be a non-empty string")
  44. if not re.match(r"^[a-zA-Z0-9_]+$", task_name):
  45. raise TaskValidationError(f"Invalid task_name format: {task_name}")
  46. return task_name
  47. @staticmethod
  48. def format_error_detail(error: Exception) -> dict:
  49. """格式化错误详情"""
  50. import traceback
  51. return {
  52. "error_type": type(error).__name__,
  53. "error_message": str(error),
  54. "traceback": traceback.format_exc(),
  55. "retryable": getattr(error, "retryable", False),
  56. }
  57. __all__ = [
  58. "TaskError",
  59. "TaskValidationError",
  60. "TaskTimeoutError",
  61. "TaskConcurrencyError",
  62. "TaskLockError",
  63. "TaskUtils",
  64. ]