Browse Source

增加model模块

jihuaqiang 4 days ago
parent
commit
6ebef57aa4

+ 3 - 3
examples/demo.json

@@ -1,7 +1,7 @@
 [
     {
-        "channel_content_id": "49285161",
-        "video": "https://rescdn.yishihui.com/longvideo/transcode/video/vpc/20250311/66055634YSXA75e4qHx0H806l.mp4",
-        "title": "🔴太厉害了!这一家人是隐藏在民间的绝世高人吧!"
+        "channel_content_id": "58840748",
+        "video": "https://rescdn.yishihui.com/longvideo/transcode/video/vpc/20250929/ce8968e2f346103b83b75c0c8100028e.mp4",
+        "title": "🔴😂笑死了!让狗咬了,还要粘住嘴"
     }
 ]

+ 102 - 0
examples/test_db_connection.py

@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+测试数据库连接
+
+验证数据库配置是否正确,连接是否正常
+"""
+
+import sys
+from src.models import get_engine, init_db, DecodeVideo
+from src.models.database import get_database_url
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+def test_connection():
+    """测试数据库连接"""
+    print("=" * 60)
+    print("数据库连接测试")
+    print("=" * 60)
+    
+    try:
+        # 1. 显示连接信息(隐藏密码)
+        database_url = get_database_url()
+        # 隐藏密码显示
+        safe_url = database_url.split('@')[0].split(':')[0] + ':***@' + '@'.join(database_url.split('@')[1:])
+        print(f"\n✓ 数据库连接URL: {safe_url}")
+        
+        # 2. 测试连接
+        print("\n正在测试数据库连接...")
+        engine = get_engine()
+        
+        # 尝试连接
+        with engine.connect() as conn:
+            result = conn.execute("SELECT 1 as test")
+            row = result.fetchone()
+            if row and row[0] == 1:
+                print("✓ 数据库连接成功!")
+            else:
+                print("✗ 数据库连接测试失败")
+                return False
+        
+        # 3. 测试表是否存在
+        print("\n检查 decode_videos 表...")
+        from sqlalchemy import inspect
+        inspector = inspect(engine)
+        tables = inspector.get_table_names()
+        
+        if 'decode_videos' in tables:
+            print("✓ decode_videos 表已存在")
+            
+            # 显示表结构
+            columns = inspector.get_columns('decode_videos')
+            print(f"\n表结构 ({len(columns)} 个字段):")
+            for col in columns:
+                nullable = "NULL" if col['nullable'] else "NOT NULL"
+                print(f"  - {col['name']}: {col['type']} ({nullable})")
+            
+            # 统计记录数
+            from src.models import get_db
+            db = next(get_db())
+            try:
+                count = db.query(DecodeVideo).count()
+                print(f"\n✓ 当前记录数: {count}")
+            except Exception as e:
+                print(f"⚠ 查询记录数时出错: {e}")
+            finally:
+                db.close()
+        else:
+            print("⚠ decode_videos 表不存在")
+            print("\n是否要创建表?(y/n): ", end="")
+            choice = input().strip().lower()
+            if choice == 'y':
+                print("\n正在创建表...")
+                init_db()
+                print("✓ 表创建完成")
+            else:
+                print("跳过表创建")
+        
+        print("\n" + "=" * 60)
+        print("✓ 所有测试通过!")
+        print("=" * 60)
+        return True
+        
+    except Exception as e:
+        print(f"\n✗ 数据库连接失败: {e}")
+        print("\n请检查以下配置:")
+        print("  1. 数据库主机地址是否正确")
+        print("  2. 数据库端口是否开放")
+        print("  3. 用户名和密码是否正确")
+        print("  4. 数据库名称是否正确")
+        print("  5. 网络连接是否正常(外网地址需要公网访问权限)")
+        import traceback
+        traceback.print_exc()
+        return False
+
+
+if __name__ == "__main__":
+    success = test_connection()
+    sys.exit(0 if success else 1)
+

+ 162 - 0
examples/use_decode_video_model.py

@@ -0,0 +1,162 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+DecodeVideo 模型使用示例
+
+演示如何使用 models 模块进行数据库操作
+"""
+
+import json
+from contextlib import contextmanager
+from src.models import get_db, DecodeVideo, DecodeStatus, init_db
+
+
+@contextmanager
+def db_session():
+    """数据库会话上下文管理器"""
+    db = next(get_db())
+    try:
+        yield db
+        db.commit()
+    except Exception as e:
+        db.rollback()
+        print(f"数据库操作失败: {e}")
+        raise
+    finally:
+        db.close()
+
+
+def example_create():
+    """示例:创建新记录"""
+    print("\n=== 示例1: 创建新记录 ===")
+    
+    with db_session() as db:
+        video = DecodeVideo.create(
+            task_id=10001,
+            video_id="58840748",
+            status=DecodeStatus.PENDING
+        )
+        db.add(video)
+        print(f"✓ 创建记录成功: {video}")
+
+
+def example_query():
+    """示例:查询记录"""
+    print("\n=== 示例2: 查询记录 ===")
+    
+    with db_session() as db:
+        # 根据 task_id 查询
+        video = db.query(DecodeVideo).filter_by(task_id=10001).first()
+        if video:
+            print(f"✓ 查询到记录: {video}")
+            print(f"  详细信息: {json.dumps(video.to_dict(), indent=2, ensure_ascii=False)}")
+        else:
+            print("✗ 未找到记录")
+
+
+def example_update_status():
+    """示例:更新状态"""
+    print("\n=== 示例3: 更新状态 ===")
+    
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=10001).first()
+        if video:
+            old_status = video.status
+            video.update_status(DecodeStatus.EXECUTING)
+            print(f"✓ 状态更新: {DecodeStatus.get_description(old_status)} -> {DecodeStatus.get_description(video.status)}")
+
+
+def example_update_result():
+    """示例:更新解码结果"""
+    print("\n=== 示例4: 更新解码结果 ===")
+    
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=10001).first()
+        if video:
+            # 模拟解码结果
+            result_data = {
+                "video_id": "58840748",
+                "title": "示例视频",
+                "status": "success"
+            }
+            result_json = json.dumps(result_data, ensure_ascii=False)
+            video.update_result(result_json)
+            print(f"✓ 结果更新成功")
+            print(f"  结果长度: {len(result_json)} 字符")
+
+
+def example_query_by_status():
+    """示例:根据状态查询"""
+    print("\n=== 示例5: 根据状态查询 ===")
+    
+    with db_session() as db:
+        # 查询所有待执行的记录
+        pending_videos = db.query(DecodeVideo).filter_by(
+            status=DecodeStatus.PENDING
+        ).all()
+        print(f"✓ 待执行任务数: {len(pending_videos)}")
+        
+        # 查询所有执行成功的记录
+        success_videos = db.query(DecodeVideo).filter_by(
+            status=DecodeStatus.SUCCESS
+        ).all()
+        print(f"✓ 执行成功任务数: {len(success_videos)}")
+
+
+def example_update_failed():
+    """示例:更新为失败状态"""
+    print("\n=== 示例6: 更新为失败状态 ===")
+    
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=10001).first()
+        if video:
+            video.update_status(DecodeStatus.FAILED, error_reason="处理超时")
+            print(f"✓ 更新为失败状态")
+            print(f"  失败原因: {video.error_reason}")
+
+
+def example_list_all():
+    """示例:列出所有记录"""
+    print("\n=== 示例7: 列出所有记录 ===")
+    
+    with db_session() as db:
+        all_videos = db.query(DecodeVideo).all()
+        print(f"✓ 总记录数: {len(all_videos)}")
+        for video in all_videos[:5]:  # 只显示前5条
+            print(f"  - {video}")
+
+
+def main():
+    """主函数"""
+    print("=" * 50)
+    print("DecodeVideo 模型使用示例")
+    print("=" * 50)
+    
+    try:
+        # 初始化数据库(如果表不存在则创建)
+        print("\n初始化数据库...")
+        init_db()
+        print("✓ 数据库初始化完成")
+        
+        # 运行示例
+        example_create()
+        example_query()
+        example_update_status()
+        example_update_result()
+        example_query_by_status()
+        example_update_failed()
+        example_list_all()
+        
+        print("\n" + "=" * 50)
+        print("所有示例执行完成!")
+        print("=" * 50)
+        
+    except Exception as e:
+        print(f"\n✗ 执行出错: {e}")
+        import traceback
+        traceback.print_exc()
+
+
+if __name__ == "__main__":
+    main()
+

+ 4 - 1
requirements.txt

@@ -12,4 +12,7 @@ requests>=2.32.0
 aiohttp>=3.9.0
 pytest>=8.0.0
 pytest-asyncio>=0.24.0
-aiofiles>=24.0.0
+aiofiles>=24.0.0
+sqlalchemy>=2.0.0
+pymysql>=1.1.0
+cryptography>=41.0.0

+ 315 - 0
src/models/README.md

@@ -0,0 +1,315 @@
+# Models 模块
+
+数据库模型模块,用于同步数据库表设计,提供 ORM 支持。
+
+## 目录结构
+
+```
+src/models/
+├── __init__.py          # 模块导出
+├── database.py          # 数据库配置和连接管理
+├── decode_video.py      # DecodeVideo 模型
+└── README.md            # 本文件
+```
+
+## 环境变量配置
+
+在使用模型之前,需要配置以下环境变量(如果使用默认配置,则无需设置):
+
+```bash
+# 数据库配置(使用外网地址)
+export DB_HOST=rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com  # 数据库主机(默认已配置)
+export DB_PORT=3306              # 数据库端口(默认: 3306)
+export DB_USER=content_rw        # 数据库用户名(默认已配置)
+export DB_PASSWORD=bC1aH4bA1lB0 # 数据库密码(默认已配置)
+export DB_NAME=content-deconstruction  # 数据库名称(默认已配置)
+```
+
+或者在 `.env` 文件中配置:
+
+```env
+# 使用外网地址访问数据库
+DB_HOST=rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com
+DB_PORT=3306
+DB_USER=content_rw
+DB_PASSWORD=bC1aH4bA1lB0
+DB_NAME=content-deconstruction
+
+# 如需使用内网地址,可切换为:
+# DB_HOST=rm-t4nh1xx6o2a6vj8qu.mysql.singapore.rds.aliyuncs.com
+```
+
+**注意**:代码中已配置默认值,可以直接使用。如需覆盖,可通过环境变量设置。
+
+## 快速开始
+
+### 1. 初始化数据库
+
+```python
+from src.models import init_db
+
+# 创建所有表(如果不存在)
+init_db()
+```
+
+### 2. 使用模型
+
+#### 创建记录
+
+```python
+from src.models import get_db, DecodeVideo, DecodeStatus
+
+# 获取数据库会话
+db = next(get_db())
+
+# 创建新记录
+video = DecodeVideo.create(
+    task_id=12345,
+    video_id="58840748",
+    status=DecodeStatus.PENDING
+)
+
+# 保存到数据库
+db.add(video)
+db.commit()
+db.close()
+```
+
+#### 查询记录
+
+```python
+from src.models import get_db, DecodeVideo, DecodeStatus
+
+db = next(get_db())
+
+# 根据 task_id 查询
+video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+
+# 根据 video_id 查询
+videos = db.query(DecodeVideo).filter_by(video_id="58840748").all()
+
+# 根据状态查询
+pending_videos = db.query(DecodeVideo).filter_by(status=DecodeStatus.PENDING).all()
+
+# 查询所有记录
+all_videos = db.query(DecodeVideo).all()
+
+db.close()
+```
+
+#### 更新记录
+
+```python
+from src.models import get_db, DecodeVideo, DecodeStatus
+
+db = next(get_db()
+
+# 查询记录
+video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+
+if video:
+    # 更新状态
+    video.update_status(DecodeStatus.EXECUTING)
+    
+    # 更新结果
+    video.update_result('{"result": "..."}')
+    
+    # 或者直接更新字段
+    video.status = DecodeStatus.SUCCESS
+    video.result = '{"result": "..."}'
+    
+    db.commit()
+
+db.close()
+```
+
+#### 删除记录
+
+```python
+from src.models import get_db, DecodeVideo
+
+db = next(get_db())
+
+video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+if video:
+    db.delete(video)
+    db.commit()
+
+db.close()
+```
+
+### 3. 使用上下文管理器(推荐)
+
+```python
+from contextlib import contextmanager
+from src.models import get_db, DecodeVideo, DecodeStatus
+
+@contextmanager
+def db_session():
+    """数据库会话上下文管理器"""
+    db = next(get_db())
+    try:
+        yield db
+        db.commit()
+    except Exception:
+        db.rollback()
+        raise
+    finally:
+        db.close()
+
+# 使用示例
+with db_session() as db:
+    video = DecodeVideo.create(
+        task_id=12345,
+        video_id="58840748",
+        status=DecodeStatus.PENDING
+    )
+    db.add(video)
+    # 自动提交和关闭
+```
+
+## 模型说明
+
+### DecodeVideo
+
+对应数据库表 `decode_videos`
+
+#### 字段
+
+- `task_id` (BigInteger, Primary Key): 任务ID,非空
+- `video_id` (String(100)): 视频ID,可为空,已建立索引
+- `result` (Text): 解码结果(JSON 格式),可为空
+- `status` (Integer): 状态,可为空,默认值为 0(待执行),已建立索引
+  - 0: 待执行 (PENDING)
+  - 1: 执行中 (EXECUTING)
+  - 2: 执行成功 (SUCCESS)
+  - 3: 执行失败 (FAILED)
+- `error_reason` (Text): 失败原因,可为空
+- `created_at` (DateTime): 创建时间,自动设置
+- `updated_at` (DateTime): 更新时间,自动更新
+
+#### 方法
+
+- `to_dict()`: 将模型转换为字典
+- `create()`: 类方法,创建新实例
+- `update_status()`: 更新状态
+- `update_result()`: 更新解码结果
+
+### DecodeStatus
+
+状态枚举类
+
+```python
+DecodeStatus.PENDING    # 0 - 待执行
+DecodeStatus.EXECUTING  # 1 - 执行中
+DecodeStatus.SUCCESS    # 2 - 执行成功
+DecodeStatus.FAILED     # 3 - 执行失败
+```
+
+## 完整示例
+
+```python
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+使用 DecodeVideo 模型的完整示例
+"""
+
+from contextlib import contextmanager
+from src.models import get_db, DecodeVideo, DecodeStatus, init_db
+
+@contextmanager
+def db_session():
+    """数据库会话上下文管理器"""
+    db = next(get_db())
+    try:
+        yield db
+        db.commit()
+    except Exception:
+        db.rollback()
+        raise
+    finally:
+        db.close()
+
+
+def main():
+    # 1. 初始化数据库(首次使用)
+    init_db()
+    
+    # 2. 创建新记录
+    with db_session() as db:
+        video = DecodeVideo.create(
+            task_id=12345,
+            video_id="58840748",
+            status=DecodeStatus.PENDING
+        )
+        db.add(video)
+        print(f"创建记录: {video}")
+    
+    # 3. 查询记录
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+        if video:
+            print(f"查询结果: {video.to_dict()}")
+    
+    # 4. 更新状态
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+        if video:
+            video.update_status(DecodeStatus.EXECUTING)
+            print(f"更新状态为: {DecodeStatus.get_description(video.status)}")
+    
+    # 5. 更新结果
+    with db_session() as db:
+        video = db.query(DecodeVideo).filter_by(task_id=12345).first()
+        if video:
+            result = '{"result": "解码成功"}'
+            video.update_result(result)
+            print(f"更新结果: {video.result}")
+    
+    # 6. 查询所有待执行的记录
+    with db_session() as db:
+        pending_videos = db.query(DecodeVideo).filter_by(
+            status=DecodeStatus.PENDING
+        ).all()
+        print(f"待执行任务数: {len(pending_videos)}")
+
+
+if __name__ == "__main__":
+    main()
+```
+
+## 注意事项
+
+1. **数据库连接**: 使用 `get_db()` 获取数据库会话后,务必在最后调用 `db.close()` 关闭连接
+2. **事务管理**: 修改数据后需要调用 `db.commit()` 提交事务,出错时调用 `db.rollback()` 回滚
+3. **环境变量**: 确保正确配置数据库连接相关的环境变量
+4. **表结构同步**: 如果数据库表结构发生变化,需要更新模型定义以保持同步
+5. **索引**: `video_id` 和 `status` 字段已建立索引,适合用于查询条件
+
+## 与工作流集成
+
+可以在工作流中使用模型来保存和查询解码结果:
+
+```python
+from src.models import get_db, DecodeVideo, DecodeStatus
+
+def save_decode_result(task_id: int, video_id: str, result: dict):
+    """保存解码结果到数据库"""
+    db = next(get_db())
+    try:
+        video = db.query(DecodeVideo).filter_by(task_id=task_id).first()
+        if not video:
+            video = DecodeVideo.create(task_id=task_id, video_id=video_id)
+            db.add(video)
+        
+        import json
+        video.update_result(json.dumps(result, ensure_ascii=False))
+        db.commit()
+    except Exception as e:
+        db.rollback()
+        raise
+    finally:
+        db.close()
+```
+

+ 18 - 0
src/models/__init__.py

@@ -0,0 +1,18 @@
+"""
+Models 模块
+
+提供数据库模型定义,用于同步数据库表设计
+"""
+
+from src.models.database import Base, get_db, get_engine, init_db
+from src.models.decode_video import DecodeVideo, DecodeStatus
+
+__all__ = [
+    "Base",
+    "get_db",
+    "get_engine",
+    "init_db",
+    "DecodeVideo",
+    "DecodeStatus",
+]
+

+ 126 - 0
src/models/database.py

@@ -0,0 +1,126 @@
+"""
+数据库配置模块
+
+提供数据库连接和会话管理
+"""
+
+import os
+from typing import Generator
+from sqlalchemy import create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker, Session
+from sqlalchemy.pool import QueuePool
+
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+# 数据库基础类
+Base = declarative_base()
+
+# 全局变量
+_engine = None
+_SessionLocal = None
+
+
+def get_database_url() -> str:
+    """获取数据库连接URL
+    
+    Returns:
+        str: 数据库连接URL
+        
+    Environment Variables:
+        DB_HOST: 数据库主机地址 (默认: rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com)
+        DB_PORT: 数据库端口 (默认: 3306)
+        DB_USER: 数据库用户名 (默认: content_rw)
+        DB_PASSWORD: 数据库密码 (必需)
+        DB_NAME: 数据库名称 (默认: content-deconstruction)
+    """
+    host = os.getenv("DB_HOST", "rm-t4nh1xx6o2a6vj8qu3o.mysql.singapore.rds.aliyuncs.com")
+    port = os.getenv("DB_PORT", "3306")
+    user = os.getenv("DB_USER", "content_rw")
+    password = os.getenv("DB_PASSWORD", "bC1aH4bA1lB0")
+    database = os.getenv("DB_NAME", "content-deconstruction")
+    
+    if not password:
+        raise ValueError("DB_PASSWORD environment variable is required")
+    
+    return f"mysql+pymysql://{user}:{password}@{host}:{port}/{database}?charset=utf8mb4"
+
+
+def get_engine():
+    """获取数据库引擎(单例模式)
+    
+    Returns:
+        Engine: SQLAlchemy 数据库引擎
+    """
+    global _engine
+    if _engine is None:
+        database_url = get_database_url()
+        _engine = create_engine(
+            database_url,
+            poolclass=QueuePool,
+            pool_size=10,
+            max_overflow=20,
+            pool_pre_ping=True,  # 连接前检查连接是否有效
+            echo=False,  # 设置为 True 可以打印 SQL 语句,用于调试
+        )
+        logger.info(f"Database engine created for database: {os.getenv('DB_NAME', 'content-deconstruction')}")
+    return _engine
+
+
+def get_session_local():
+    """获取会话工厂(单例模式)
+    
+    Returns:
+        sessionmaker: SQLAlchemy 会话工厂
+    """
+    global _SessionLocal
+    if _SessionLocal is None:
+        engine = get_engine()
+        _SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
+    return _SessionLocal
+
+
+def get_db() -> Generator[Session, None, None]:
+    """获取数据库会话(依赖注入)
+    
+    Yields:
+        Session: SQLAlchemy 数据库会话
+        
+    Example:
+        ```python
+        db = next(get_db())
+        video = db.query(DecodeVideo).filter_by(video_id="123").first()
+        db.close()
+        ```
+    """
+    SessionLocal = get_session_local()
+    db = SessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()
+
+
+def init_db():
+    """初始化数据库(创建所有表)
+    
+    注意:此方法会创建所有在 Base.metadata 中注册的表
+    如果表已存在,不会重复创建
+    """
+    engine = get_engine()
+    Base.metadata.create_all(bind=engine)
+    logger.info("Database tables initialized")
+
+
+def drop_db():
+    """删除所有表(谨慎使用)
+    
+    警告:此方法会删除所有在 Base.metadata 中注册的表
+    仅用于开发和测试环境
+    """
+    engine = get_engine()
+    Base.metadata.drop_all(bind=engine)
+    logger.warning("All database tables dropped")
+

+ 157 - 0
src/models/decode_video.py

@@ -0,0 +1,157 @@
+"""
+DecodeVideo 模型
+
+对应数据库表 decode_videos 的 ORM 模型
+"""
+
+from enum import IntEnum
+from typing import Optional
+from sqlalchemy import Column, String, BigInteger, Integer, Text, DateTime
+from sqlalchemy.sql import func
+
+from src.models.database import Base
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+class DecodeStatus(IntEnum):
+    """解码状态枚举
+    
+    对应数据库字段 status 的值:
+    - PENDING: 0 - 待执行
+    - EXECUTING: 1 - 执行中
+    - SUCCESS: 2 - 执行成功
+    - FAILED: 3 - 执行失败
+    """
+    PENDING = 0
+    EXECUTING = 1
+    SUCCESS = 2
+    FAILED = 3
+    
+    @classmethod
+    def get_description(cls, status: int) -> str:
+        """获取状态描述
+        
+        Args:
+            status: 状态值
+            
+        Returns:
+            str: 状态描述
+        """
+        descriptions = {
+            cls.PENDING: "待执行",
+            cls.EXECUTING: "执行中",
+            cls.SUCCESS: "执行成功",
+            cls.FAILED: "执行失败",
+        }
+        return descriptions.get(status, "未知状态")
+
+
+class DecodeVideo(Base):
+    """解码视频模型
+    
+    对应数据库表: decode_videos
+    
+    字段说明:
+    - video_id: 视频ID (varchar(100), nullable)
+    - task_id: 任务ID (bigint, not null)
+    - result: 解码结果 (mediumtext, nullable)
+    - status: 状态 (int, nullable) - 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败
+    - error_reason: 失败原因 (mediumtext, nullable)
+    """
+    
+    __tablename__ = "decode_videos"
+    
+    # 主键:使用 task_id 作为主键(根据业务需求,可能需要调整)
+    task_id = Column(BigInteger, primary_key=True, nullable=False, comment="任务ID")
+    
+    # 视频ID
+    video_id = Column(String(100), nullable=True, index=True, comment="视频ID")
+    
+    # 解码结果(JSON 格式)
+    result = Column(Text, nullable=True, comment="解码结果")
+    
+    # 状态
+    status = Column(Integer, nullable=True, default=DecodeStatus.PENDING, index=True, comment="状态: 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败")
+    
+    # 失败原因
+    error_reason = Column(Text, nullable=True, comment="失败原因")
+    
+    # 时间戳字段(可选,用于记录创建和更新时间)
+    created_at = Column(DateTime, nullable=True, server_default=func.now(), comment="创建时间")
+    updated_at = Column(DateTime, nullable=True, server_default=func.now(), onupdate=func.now(), comment="更新时间")
+    
+    def __repr__(self) -> str:
+        """对象字符串表示"""
+        return f"<DecodeVideo(task_id={self.task_id}, video_id={self.video_id}, status={self.status})>"
+    
+    def to_dict(self) -> dict:
+        """转换为字典
+        
+        Returns:
+            dict: 模型数据字典
+        """
+        return {
+            "task_id": self.task_id,
+            "video_id": self.video_id,
+            "result": self.result,
+            "status": self.status,
+            "status_description": DecodeStatus.get_description(self.status) if self.status is not None else None,
+            "error_reason": self.error_reason,
+            "created_at": self.created_at.isoformat() if self.created_at else None,
+            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
+        }
+    
+    @classmethod
+    def create(
+        cls,
+        task_id: int,
+        video_id: Optional[str] = None,
+        status: Optional[int] = None,
+        result: Optional[str] = None,
+        error_reason: Optional[str] = None
+    ) -> "DecodeVideo":
+        """创建新的解码视频记录
+        
+        Args:
+            task_id: 任务ID
+            video_id: 视频ID
+            status: 状态(默认: PENDING)
+            result: 解码结果
+            error_reason: 失败原因
+            
+        Returns:
+            DecodeVideo: 新创建的模型实例
+        """
+        return cls(
+            task_id=task_id,
+            video_id=video_id,
+            status=status if status is not None else DecodeStatus.PENDING,
+            result=result,
+            error_reason=error_reason
+        )
+    
+    def update_status(self, status: DecodeStatus, error_reason: Optional[str] = None):
+        """更新状态
+        
+        Args:
+            status: 新状态
+            error_reason: 失败原因(仅在失败时使用)
+        """
+        self.status = status.value
+        if status == DecodeStatus.FAILED and error_reason:
+            self.error_reason = error_reason
+        elif status != DecodeStatus.FAILED:
+            self.error_reason = None
+    
+    def update_result(self, result: str):
+        """更新解码结果
+        
+        Args:
+            result: 解码结果(JSON 字符串)
+        """
+        self.result = result
+        if self.status == DecodeStatus.EXECUTING:
+            self.update_status(DecodeStatus.SUCCESS)
+