| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- DecodeVideo 模型使用示例
- 演示如何使用 models 模块进行数据库操作
- """
- import json
- from contextlib import contextmanager
- from src.models import get_db, DecodeVideo, DecodeStatus, init_db
- @contextmanager
- def db_session():
- """数据库会话上下文管理器"""
- db = next(get_db())
- try:
- yield db
- db.commit()
- except Exception as e:
- db.rollback()
- print(f"数据库操作失败: {e}")
- raise
- finally:
- db.close()
- def example_create():
- """示例:创建新记录"""
- print("\n=== 示例1: 创建新记录 ===")
-
- with db_session() as db:
- video = DecodeVideo.create(
- task_id=10001,
- video_id="58840748",
- status=DecodeStatus.PENDING
- )
- db.add(video)
- print(f"✓ 创建记录成功: {video}")
- def example_query():
- """示例:查询记录"""
- print("\n=== 示例2: 查询记录 ===")
-
- with db_session() as db:
- # 根据 task_id 查询
- video = db.query(DecodeVideo).filter_by(task_id=10001).first()
- if video:
- print(f"✓ 查询到记录: {video}")
- print(f" 详细信息: {json.dumps(video.to_dict(), indent=2, ensure_ascii=False)}")
- else:
- print("✗ 未找到记录")
- def example_update_status():
- """示例:更新状态"""
- print("\n=== 示例3: 更新状态 ===")
-
- with db_session() as db:
- video = db.query(DecodeVideo).filter_by(task_id=10001).first()
- if video:
- old_status = video.status
- video.update_status(DecodeStatus.EXECUTING)
- print(f"✓ 状态更新: {DecodeStatus.get_description(old_status)} -> {DecodeStatus.get_description(video.status)}")
- def example_update_result():
- """示例:更新解码结果"""
- print("\n=== 示例4: 更新解码结果 ===")
-
- with db_session() as db:
- video = db.query(DecodeVideo).filter_by(task_id=10001).first()
- if video:
- # 模拟解码结果
- result_data = {
- "video_id": "58840748",
- "title": "示例视频",
- "status": "success"
- }
- result_json = json.dumps(result_data, ensure_ascii=False)
- video.update_result(result_json)
- print(f"✓ 结果更新成功")
- print(f" 结果长度: {len(result_json)} 字符")
- def example_query_by_status():
- """示例:根据状态查询"""
- print("\n=== 示例5: 根据状态查询 ===")
-
- with db_session() as db:
- # 查询所有待执行的记录
- pending_videos = db.query(DecodeVideo).filter_by(
- status=DecodeStatus.PENDING
- ).all()
- print(f"✓ 待执行任务数: {len(pending_videos)}")
-
- # 查询所有执行成功的记录
- success_videos = db.query(DecodeVideo).filter_by(
- status=DecodeStatus.SUCCESS
- ).all()
- print(f"✓ 执行成功任务数: {len(success_videos)}")
- def example_update_failed():
- """示例:更新为失败状态"""
- print("\n=== 示例6: 更新为失败状态 ===")
-
- with db_session() as db:
- video = db.query(DecodeVideo).filter_by(task_id=10001).first()
- if video:
- video.update_status(DecodeStatus.FAILED, error_reason="处理超时")
- print(f"✓ 更新为失败状态")
- print(f" 失败原因: {video.error_reason}")
- def example_list_all():
- """示例:列出所有记录"""
- print("\n=== 示例7: 列出所有记录 ===")
-
- with db_session() as db:
- all_videos = db.query(DecodeVideo).all()
- print(f"✓ 总记录数: {len(all_videos)}")
- for video in all_videos[:5]: # 只显示前5条
- print(f" - {video}")
- def main():
- """主函数"""
- print("=" * 50)
- print("DecodeVideo 模型使用示例")
- print("=" * 50)
-
- try:
- # 初始化数据库(如果表不存在则创建)
- print("\n初始化数据库...")
- init_db()
- print("✓ 数据库初始化完成")
-
- # 运行示例
- example_create()
- example_query()
- example_update_status()
- example_update_result()
- example_query_by_status()
- example_update_failed()
- example_list_all()
-
- print("\n" + "=" * 50)
- print("所有示例执行完成!")
- print("=" * 50)
-
- except Exception as e:
- print(f"\n✗ 执行出错: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- main()
|