||
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 测试 EvaluateWorkflow 功能
- 创建测试数据并验证评估工作流的功能
- """
- import json
- import sys
- from pathlib import Path
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from src.models import get_db, DecodeVideo, DecodeStatus, EvaluateRecord, EvaluateStatus
- from src.workflows.evaluate_workflow import EvaluateWorkflow
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- def create_test_decode_video(task_id: int, video_id: str = "test_video_001") -> DecodeVideo:
- """创建测试用的 DecodeVideo 记录"""
-
- # 创建模拟的解构结果
- decode_result_v2 = {
- "视频信息": {
- "标题": "🔴退伍军人二次入伍的感人画面!若有战,召必回",
- "视频URL": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4",
- "正文": ""
- },
- "三点解构": {
- "灵感点": [
- {
- "候选编号": 1,
- "分类": "亲情告别",
- "灵感点": "父亲亲吻熟睡的婴儿",
- "描述": "视频开头,一位身着便装、背着行囊的男子,在即将离家前,深情地俯身亲吻床上熟睡的小婴儿,并温柔地为其盖好被子,眼中充满不舍。"
- }
- ],
- "目的点": {
- "perspective": "创作者视角",
- "purposes": [
- {
- "维度": {
- "一级分类": "个人",
- "二级分类": "分享"
- },
- "目的点": "展现军人告别与家人不舍的感人瞬间",
- "描述": "创作者通过剪辑多位军人在入伍或归队前与亲人进行告别的场景,着重刻画了军人坚毅与亲人依依不舍的复杂情感。"
- }
- ]
- },
- "关键点": {
- "key_points": [
- {
- "候选编号": 1,
- "维度大类": "实质",
- "维度细分": "元素",
- "关键点": "军人发型数字'2'",
- "描述": "军人头部的理发造型中,清晰可见数字'2',象征着'二次入伍'的身份。"
- }
- ]
- }
- },
- "选题理解": {
- "核心主题": "军人二次入伍与家人告别",
- "目标受众": "关注军人生活、家国情怀的观众",
- "情感基调": "感人、不舍、家国情怀"
- }
- }
-
- # 创建 DecodeVideo 记录
- decode_video = DecodeVideo.create(
- task_id=task_id,
- video_id=video_id,
- status=DecodeStatus.SUCCESS,
- decode_result_v2=json.dumps(decode_result_v2, ensure_ascii=False)
- )
-
- return decode_video
- def create_test_evaluate_record(evaluate_id: int, task_id: int) -> EvaluateRecord:
- """创建测试用的 EvaluateRecord 记录"""
-
- evaluate_record = EvaluateRecord.create(
- evaluate_id=evaluate_id,
- task_id=task_id,
- status=EvaluateStatus.PENDING
- )
-
- return evaluate_record
- def create_test_search_result() -> list:
- """创建测试用的待评估视频列表"""
-
- search_result = [
- {
- "video_id": "61626151",
- "video_url": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4",
- "title": "🔴退伍军人二次入伍的感人画面!若有战,召必回"
- },
- {
- "video_id": "61626152",
- "video_url": "https://rescdn.yishihui.com/pipeline/video/example1.mp4",
- "title": "军人告别家人的感人瞬间"
- },
- {
- "video_id": "61626153",
- "video_url": "https://rescdn.yishihui.com/pipeline/video/example2.mp4",
- "title": "二次入伍军人的家国情怀"
- },
- {
- "video_id": "61626154",
- "video_url": "https://rescdn.yishihui.com/pipeline/video/example3.mp4",
- "title": "美食制作教程:如何做红烧肉"
- },
- {
- "video_id": "61626155",
- "video_url": "https://rescdn.yishihui.com/pipeline/video/example4.mp4",
- "title": "旅行vlog:探索美丽的风景"
- }
- ]
-
- return search_result
- def setup_test_data(task_id: int, evaluate_id: int, video_id: str = "test_video_001"):
- """设置测试数据(创建数据库记录)"""
-
- logger.info("=== 开始设置测试数据 ===")
-
- db = next(get_db())
- try:
- # 检查是否已存在记录
- existing_decode = db.query(DecodeVideo).filter_by(task_id=task_id).first()
- if existing_decode:
- logger.info(f"DecodeVideo 记录已存在: task_id={task_id},将删除后重新创建")
- db.delete(existing_decode)
- db.commit()
-
- existing_evaluate = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
- if existing_evaluate:
- logger.info(f"EvaluateRecord 记录已存在: evaluate_id={evaluate_id},将删除后重新创建")
- db.delete(existing_evaluate)
- db.commit()
-
- # 创建 DecodeVideo 记录
- decode_video = create_test_decode_video(task_id, video_id)
- db.add(decode_video)
- logger.info(f"✓ 创建 DecodeVideo 记录: task_id={task_id}, video_id={video_id}")
-
- # 创建 EvaluateRecord 记录
- evaluate_record = create_test_evaluate_record(evaluate_id, task_id)
- db.add(evaluate_record)
- logger.info(f"✓ 创建 EvaluateRecord 记录: evaluate_id={evaluate_id}, task_id={task_id}")
-
- db.commit()
- logger.info("✓ 测试数据设置完成")
-
- except Exception as e:
- logger.error(f"设置测试数据失败: {e}", exc_info=True)
- db.rollback()
- raise
- finally:
- db.close()
- def test_evaluate_workflow(task_id: int, evaluate_id: int):
- """测试评估工作流"""
-
- logger.info("=== 开始测试评估工作流 ===")
-
- # 创建待评估的视频列表
- search_result = create_test_search_result()
- logger.info(f"待评估视频数量: {len(search_result)}")
-
- # 准备输入数据
- input_data = {
- "task_id": task_id,
- "evaluate_id": evaluate_id,
- "search_result": search_result
- }
-
- logger.info(f"输入数据: task_id={task_id}, evaluate_id={evaluate_id}, search_result数量={len(search_result)}")
-
- # 创建并执行工作流
- try:
- workflow = EvaluateWorkflow(model_provider="google_genai")
- result = workflow.invoke(input_data)
-
- logger.info("=== 工作流执行结果 ===")
- logger.info(f"工作流状态: {result.get('workflow_status')}")
-
- if result.get("error"):
- logger.error(f"工作流执行错误: {result.get('error')}")
- return result
-
- evaluate_result = result.get("evaluate_result", [])
- logger.info(f"评估结果数量: {len(evaluate_result)}")
-
- # 打印评估结果详情
- if evaluate_result:
- logger.info("\n=== 评估结果详情 ===")
- for i, video in enumerate(evaluate_result, 1):
- logger.info(f"\n视频 {i}:")
- logger.info(f" video_id: {video.get('video_id')}")
- logger.info(f" title: {video.get('title')}")
- logger.info(f" relevance_score: {video.get('relevance_score', 'N/A')}")
- logger.info(f" is_selected: {video.get('is_selected', 'N/A')}")
-
- # 验证结果
- selected_count = sum(1 for v in evaluate_result if v.get("is_selected", False))
- logger.info(f"\n入选视频数量: {selected_count}/{len(evaluate_result)}")
-
- return result
-
- except Exception as e:
- logger.error(f"测试评估工作流失败: {e}", exc_info=True)
- raise
- def verify_database_result(evaluate_id: int):
- """验证数据库中的评估结果"""
-
- logger.info("=== 验证数据库结果 ===")
-
- db = next(get_db())
- try:
- evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
-
- if not evaluate_record:
- logger.error(f"未找到 evaluate_id={evaluate_id} 的评估记录")
- return
-
- logger.info(f"评估记录状态: {EvaluateStatus.get_description(evaluate_record.status)}")
- logger.info(f"评估记录状态值: {evaluate_record.status}")
-
- if evaluate_record.evaluate_result:
- try:
- result_data = json.loads(evaluate_record.evaluate_result)
- logger.info(f"评估结果数量: {len(result_data) if isinstance(result_data, list) else 'N/A'}")
- logger.info(f"评估结果已保存到数据库")
- except json.JSONDecodeError as e:
- logger.error(f"解析评估结果失败: {e}")
- else:
- logger.warning("评估结果为空")
-
- except Exception as e:
- logger.error(f"验证数据库结果失败: {e}", exc_info=True)
- finally:
- db.close()
- def main():
- """主函数"""
-
- # 测试参数
- task_id = 999999 # 测试用的 task_id
- evaluate_id = 888888 # 测试用的 evaluate_id
- video_id = "test_video_001"
-
- try:
- # 1. 设置测试数据
- setup_test_data(task_id, evaluate_id, video_id)
-
- # 2. 执行评估工作流
- result = test_evaluate_workflow(task_id, evaluate_id)
-
- # 3. 验证数据库结果
- verify_database_result(evaluate_id)
-
- # 4. 总结
- logger.info("\n=== 测试总结 ===")
- if result.get("workflow_status") == "success":
- logger.info("✓ 评估工作流测试成功!")
- evaluate_result = result.get("evaluate_result", [])
- if evaluate_result:
- logger.info(f"✓ 成功评估 {len(evaluate_result)} 个视频")
- selected = [v for v in evaluate_result if v.get("is_selected", False)]
- logger.info(f"✓ 入选视频 {len(selected)} 个")
- else:
- logger.error("✗ 评估工作流测试失败")
- if result.get("error"):
- logger.error(f"错误信息: {result.get('error')}")
-
- except Exception as e:
- logger.error(f"测试执行失败: {e}", exc_info=True)
- sys.exit(1)
- if __name__ == "__main__":
- main()
|