#!/usr/bin/env python # -*- coding: utf-8 -*- """ DecodeVideo 模型使用示例 演示如何使用 models 模块进行数据库操作 """ import json from contextlib import contextmanager from src.models import get_db, DecodeVideo, DecodeStatus, init_db @contextmanager def db_session(): """数据库会话上下文管理器""" db = next(get_db()) try: yield db db.commit() except Exception as e: db.rollback() print(f"数据库操作失败: {e}") raise finally: db.close() def example_create(): """示例:创建新记录""" print("\n=== 示例1: 创建新记录 ===") with db_session() as db: video = DecodeVideo.create( task_id=10001, video_id="58840748", status=DecodeStatus.PENDING ) db.add(video) print(f"✓ 创建记录成功: {video}") def example_query(): """示例:查询记录""" print("\n=== 示例2: 查询记录 ===") with db_session() as db: # 根据 task_id 查询 video = db.query(DecodeVideo).filter_by(task_id=10001).first() if video: print(f"✓ 查询到记录: {video}") print(f" 详细信息: {json.dumps(video.to_dict(), indent=2, ensure_ascii=False)}") else: print("✗ 未找到记录") def example_update_status(): """示例:更新状态""" print("\n=== 示例3: 更新状态 ===") with db_session() as db: video = db.query(DecodeVideo).filter_by(task_id=10001).first() if video: old_status = video.status video.update_status(DecodeStatus.EXECUTING) print(f"✓ 状态更新: {DecodeStatus.get_description(old_status)} -> {DecodeStatus.get_description(video.status)}") def example_update_result(): """示例:更新解码结果""" print("\n=== 示例4: 更新解码结果 ===") with db_session() as db: video = db.query(DecodeVideo).filter_by(task_id=10001).first() if video: # 模拟解码结果 result_data = { "video_id": "58840748", "title": "示例视频", "status": "success" } result_json = json.dumps(result_data, ensure_ascii=False) video.update_result(result_json) print(f"✓ 结果更新成功") print(f" 结果长度: {len(result_json)} 字符") def example_query_by_status(): """示例:根据状态查询""" print("\n=== 示例5: 根据状态查询 ===") with db_session() as db: # 查询所有待执行的记录 pending_videos = db.query(DecodeVideo).filter_by( status=DecodeStatus.PENDING ).all() print(f"✓ 待执行任务数: {len(pending_videos)}") # 查询所有执行成功的记录 success_videos = db.query(DecodeVideo).filter_by( status=DecodeStatus.SUCCESS ).all() print(f"✓ 执行成功任务数: {len(success_videos)}") def example_update_failed(): """示例:更新为失败状态""" print("\n=== 示例6: 更新为失败状态 ===") with db_session() as db: video = db.query(DecodeVideo).filter_by(task_id=10001).first() if video: video.update_status(DecodeStatus.FAILED, error_reason="处理超时") print(f"✓ 更新为失败状态") print(f" 失败原因: {video.error_reason}") def example_list_all(): """示例:列出所有记录""" print("\n=== 示例7: 列出所有记录 ===") with db_session() as db: all_videos = db.query(DecodeVideo).all() print(f"✓ 总记录数: {len(all_videos)}") for video in all_videos[:5]: # 只显示前5条 print(f" - {video}") def main(): """主函数""" print("=" * 50) print("DecodeVideo 模型使用示例") print("=" * 50) try: # 初始化数据库(如果表不存在则创建) print("\n初始化数据库...") init_db() print("✓ 数据库初始化完成") # 运行示例 example_create() example_query() example_update_status() example_update_result() example_query_by_status() example_update_failed() example_list_all() print("\n" + "=" * 50) print("所有示例执行完成!") print("=" * 50) except Exception as e: print(f"\n✗ 执行出错: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()