decode_video.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. """
  2. DecodeVideo 模型
  3. 对应数据库表 decode_videos 的 ORM 模型
  4. """
  5. from enum import IntEnum
  6. from typing import Optional
  7. from sqlalchemy import Column, String, BigInteger, Integer, Text, DateTime
  8. from sqlalchemy.sql import func
  9. from src.models.database import Base
  10. from src.utils.logger import get_logger
  11. logger = get_logger(__name__)
  12. class DecodeStatus(IntEnum):
  13. """解码状态枚举
  14. 对应数据库字段 status 的值:
  15. - PENDING: 0 - 待执行
  16. - EXECUTING: 1 - 执行中
  17. - SUCCESS: 2 - 执行成功
  18. - FAILED: 3 - 执行失败
  19. """
  20. PENDING = 0
  21. EXECUTING = 1
  22. SUCCESS = 2
  23. FAILED = 3
  24. @classmethod
  25. def get_description(cls, status: int) -> str:
  26. """获取状态描述
  27. Args:
  28. status: 状态值
  29. Returns:
  30. str: 状态描述
  31. """
  32. descriptions = {
  33. cls.PENDING: "待执行",
  34. cls.EXECUTING: "执行中",
  35. cls.SUCCESS: "执行成功",
  36. cls.FAILED: "执行失败",
  37. }
  38. return descriptions.get(status, "未知状态")
  39. class DecodeVideo(Base):
  40. """解码视频模型
  41. 对应数据库表: decode_videos
  42. 字段说明:
  43. - video_id: 视频ID (varchar(100), nullable)
  44. - task_id: 任务ID (bigint, not null)
  45. - result: 解码结果 (mediumtext, nullable)
  46. - decode_result_v2: 解码结果V2 (mediumtext, nullable) - ScriptWorkflowV2分支的结果
  47. - search_keywords: 搜索关键词 (text, nullable) - 字符串数组的JSON格式
  48. - status: 状态 (int, nullable) - 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败
  49. - error_reason: 失败原因 (mediumtext, nullable)
  50. """
  51. __tablename__ = "decode_videos"
  52. # 主键:使用 task_id 作为主键(根据业务需求,可能需要调整)
  53. task_id = Column(BigInteger, primary_key=True, nullable=False, comment="任务ID")
  54. # 视频ID
  55. video_id = Column(String(100), nullable=True, index=True, comment="视频ID")
  56. # 解码结果(JSON 格式)
  57. result = Column(Text, nullable=True, comment="解码结果")
  58. # 解码结果V2(分支2的结果,JSON 格式)
  59. decode_result_v2 = Column(Text, nullable=True, comment="解码结果V2(ScriptWorkflowV2分支结果)")
  60. # 搜索关键词(字符串数组的JSON格式)
  61. search_keywords = Column(Text, nullable=True, comment="搜索关键词(字符串数组的JSON格式)")
  62. # 状态
  63. status = Column(Integer, nullable=True, default=int(DecodeStatus.PENDING), index=True, comment="状态: 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败")
  64. # 失败原因
  65. error_reason = Column(Text, nullable=True, comment="失败原因")
  66. # 时间戳字段(可选,用于记录创建和更新时间)
  67. created_at = Column(DateTime, nullable=True, server_default=func.now(), comment="创建时间")
  68. updated_at = Column(DateTime, nullable=True, server_default=func.now(), onupdate=func.now(), comment="更新时间")
  69. def __repr__(self) -> str:
  70. """对象字符串表示"""
  71. return f"<DecodeVideo(task_id={self.task_id}, video_id={self.video_id}, status={self.status})>"
  72. def to_dict(self) -> dict:
  73. """转换为字典
  74. Returns:
  75. dict: 模型数据字典
  76. """
  77. return {
  78. "task_id": self.task_id,
  79. "video_id": self.video_id,
  80. "result": self.result,
  81. "decode_result_v2": self.decode_result_v2,
  82. "search_keywords": self.search_keywords,
  83. "status": self.status,
  84. "error_reason": self.error_reason,
  85. "created_at": self.created_at.isoformat() if self.created_at else None,
  86. "updated_at": self.updated_at.isoformat() if self.updated_at else None,
  87. }
  88. @classmethod
  89. def create(
  90. cls,
  91. task_id: int,
  92. video_id: Optional[str] = None,
  93. status: Optional[int] = None,
  94. result: Optional[str] = None,
  95. decode_result_v2: Optional[str] = None,
  96. error_reason: Optional[str] = None
  97. ) -> "DecodeVideo":
  98. """创建新的解码视频记录
  99. Args:
  100. task_id: 任务ID
  101. video_id: 视频ID
  102. status: 状态(默认: PENDING)
  103. result: 解码结果
  104. decode_result_v2: 解码结果V2(ScriptWorkflowV2分支结果)
  105. error_reason: 失败原因
  106. Returns:
  107. DecodeVideo: 新创建的模型实例
  108. """
  109. # 统一将枚举转换为整型值
  110. if status is None:
  111. status_value = int(DecodeStatus.PENDING)
  112. else:
  113. try:
  114. # 支持传入 IntEnum 或具体的整数
  115. status_value = int(status)
  116. except Exception:
  117. status_value = int(DecodeStatus.PENDING)
  118. return cls(
  119. task_id=task_id,
  120. video_id=video_id,
  121. status=status_value,
  122. created_at=func.now(),
  123. result=result,
  124. decode_result_v2=decode_result_v2,
  125. error_reason=error_reason
  126. )
  127. def update_status(self, status: DecodeStatus, error_reason: Optional[str] = None):
  128. """更新状态
  129. Args:
  130. status: 新状态
  131. error_reason: 失败原因(仅在失败时使用)
  132. """
  133. self.status = status.value
  134. if status == DecodeStatus.FAILED and error_reason:
  135. self.error_reason = error_reason
  136. elif status != DecodeStatus.FAILED:
  137. self.error_reason = None
  138. def update_result(self, result: str):
  139. """更新解码结果
  140. Args:
  141. result: 解码结果(JSON 字符串)
  142. """
  143. self.result = result
  144. if self.status == DecodeStatus.EXECUTING:
  145. self.update_status(DecodeStatus.SUCCESS)
  146. def update_result_v2(self, result_v2: str):
  147. """更新解码结果V2(ScriptWorkflowV2分支结果)
  148. Args:
  149. result_v2: 解码结果V2(JSON 字符串)
  150. """
  151. self.decode_result_v2 = result_v2
  152. def update_search_keywords(self, search_keywords: str):
  153. """更新搜索关键词(字符串数组的JSON格式)
  154. Args:
  155. search_keywords: 搜索关键词(字符串数组的JSON格式,如:["关键词1", "关键词2"])
  156. """
  157. self.search_keywords = search_keywords