#!/usr/bin/env python # -*- coding: utf-8 -*- """ 测试 EvaluateWorkflow 功能 创建测试数据并验证评估工作流的功能 """ import json import sys from pathlib import Path # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.models import get_db, DecodeVideo, DecodeStatus, EvaluateRecord, EvaluateStatus from src.workflows.evaluate_workflow import EvaluateWorkflow from src.utils.logger import get_logger logger = get_logger(__name__) def create_test_decode_video(task_id: int, video_id: str = "test_video_001") -> DecodeVideo: """创建测试用的 DecodeVideo 记录""" # 创建模拟的解构结果 decode_result_v2 = { "视频信息": { "标题": "🔴退伍军人二次入伍的感人画面!若有战,召必回", "视频URL": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4", "正文": "" }, "三点解构": { "灵感点": [ { "候选编号": 1, "分类": "亲情告别", "灵感点": "父亲亲吻熟睡的婴儿", "描述": "视频开头,一位身着便装、背着行囊的男子,在即将离家前,深情地俯身亲吻床上熟睡的小婴儿,并温柔地为其盖好被子,眼中充满不舍。" } ], "目的点": { "perspective": "创作者视角", "purposes": [ { "维度": { "一级分类": "个人", "二级分类": "分享" }, "目的点": "展现军人告别与家人不舍的感人瞬间", "描述": "创作者通过剪辑多位军人在入伍或归队前与亲人进行告别的场景,着重刻画了军人坚毅与亲人依依不舍的复杂情感。" } ] }, "关键点": { "key_points": [ { "候选编号": 1, "维度大类": "实质", "维度细分": "元素", "关键点": "军人发型数字'2'", "描述": "军人头部的理发造型中,清晰可见数字'2',象征着'二次入伍'的身份。" } ] } }, "选题理解": { "核心主题": "军人二次入伍与家人告别", "目标受众": "关注军人生活、家国情怀的观众", "情感基调": "感人、不舍、家国情怀" } } # 创建 DecodeVideo 记录 decode_video = DecodeVideo.create( task_id=task_id, video_id=video_id, status=DecodeStatus.SUCCESS, decode_result_v2=json.dumps(decode_result_v2, ensure_ascii=False) ) return decode_video def create_test_evaluate_record(evaluate_id: int, task_id: int) -> EvaluateRecord: """创建测试用的 EvaluateRecord 记录""" evaluate_record = EvaluateRecord.create( evaluate_id=evaluate_id, task_id=task_id, status=EvaluateStatus.PENDING ) return evaluate_record def create_test_search_result() -> list: """创建测试用的待评估视频列表""" search_result = [ { "video_id": "61626151", "video_url": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4", "title": "🔴退伍军人二次入伍的感人画面!若有战,召必回" }, { "video_id": "61626152", "video_url": "https://rescdn.yishihui.com/pipeline/video/example1.mp4", "title": "军人告别家人的感人瞬间" }, { "video_id": "61626153", "video_url": "https://rescdn.yishihui.com/pipeline/video/example2.mp4", "title": "二次入伍军人的家国情怀" }, { "video_id": "61626154", "video_url": "https://rescdn.yishihui.com/pipeline/video/example3.mp4", "title": "美食制作教程:如何做红烧肉" }, { "video_id": "61626155", "video_url": "https://rescdn.yishihui.com/pipeline/video/example4.mp4", "title": "旅行vlog:探索美丽的风景" } ] return search_result def setup_test_data(task_id: int, evaluate_id: int, video_id: str = "test_video_001"): """设置测试数据(创建数据库记录)""" logger.info("=== 开始设置测试数据 ===") db = next(get_db()) try: # 检查是否已存在记录 existing_decode = db.query(DecodeVideo).filter_by(task_id=task_id).first() if existing_decode: logger.info(f"DecodeVideo 记录已存在: task_id={task_id},将删除后重新创建") db.delete(existing_decode) db.commit() existing_evaluate = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first() if existing_evaluate: logger.info(f"EvaluateRecord 记录已存在: evaluate_id={evaluate_id},将删除后重新创建") db.delete(existing_evaluate) db.commit() # 创建 DecodeVideo 记录 decode_video = create_test_decode_video(task_id, video_id) db.add(decode_video) logger.info(f"✓ 创建 DecodeVideo 记录: task_id={task_id}, video_id={video_id}") # 创建 EvaluateRecord 记录 evaluate_record = create_test_evaluate_record(evaluate_id, task_id) db.add(evaluate_record) logger.info(f"✓ 创建 EvaluateRecord 记录: evaluate_id={evaluate_id}, task_id={task_id}") db.commit() logger.info("✓ 测试数据设置完成") except Exception as e: logger.error(f"设置测试数据失败: {e}", exc_info=True) db.rollback() raise finally: db.close() def test_evaluate_workflow(task_id: int, evaluate_id: int): """测试评估工作流""" logger.info("=== 开始测试评估工作流 ===") # 创建待评估的视频列表 search_result = create_test_search_result() logger.info(f"待评估视频数量: {len(search_result)}") # 准备输入数据 input_data = { "task_id": task_id, "evaluate_id": evaluate_id, "search_result": search_result } logger.info(f"输入数据: task_id={task_id}, evaluate_id={evaluate_id}, search_result数量={len(search_result)}") # 创建并执行工作流 try: workflow = EvaluateWorkflow(model_provider="google_genai") result = workflow.invoke(input_data) logger.info("=== 工作流执行结果 ===") logger.info(f"工作流状态: {result.get('workflow_status')}") if result.get("error"): logger.error(f"工作流执行错误: {result.get('error')}") return result evaluate_result = result.get("evaluate_result", []) logger.info(f"评估结果数量: {len(evaluate_result)}") # 打印评估结果详情 if evaluate_result: logger.info("\n=== 评估结果详情 ===") for i, video in enumerate(evaluate_result, 1): logger.info(f"\n视频 {i}:") logger.info(f" video_id: {video.get('video_id')}") logger.info(f" title: {video.get('title')}") logger.info(f" relevance_score: {video.get('relevance_score', 'N/A')}") logger.info(f" is_selected: {video.get('is_selected', 'N/A')}") # 验证结果 selected_count = sum(1 for v in evaluate_result if v.get("is_selected", False)) logger.info(f"\n入选视频数量: {selected_count}/{len(evaluate_result)}") return result except Exception as e: logger.error(f"测试评估工作流失败: {e}", exc_info=True) raise def verify_database_result(evaluate_id: int): """验证数据库中的评估结果""" logger.info("=== 验证数据库结果 ===") db = next(get_db()) try: evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first() if not evaluate_record: logger.error(f"未找到 evaluate_id={evaluate_id} 的评估记录") return logger.info(f"评估记录状态: {EvaluateStatus.get_description(evaluate_record.status)}") logger.info(f"评估记录状态值: {evaluate_record.status}") if evaluate_record.evaluate_result: try: result_data = json.loads(evaluate_record.evaluate_result) logger.info(f"评估结果数量: {len(result_data) if isinstance(result_data, list) else 'N/A'}") logger.info(f"评估结果已保存到数据库") except json.JSONDecodeError as e: logger.error(f"解析评估结果失败: {e}") else: logger.warning("评估结果为空") except Exception as e: logger.error(f"验证数据库结果失败: {e}", exc_info=True) finally: db.close() def main(): """主函数""" # 测试参数 task_id = 999999 # 测试用的 task_id evaluate_id = 888888 # 测试用的 evaluate_id video_id = "test_video_001" try: # 1. 设置测试数据 setup_test_data(task_id, evaluate_id, video_id) # 2. 执行评估工作流 result = test_evaluate_workflow(task_id, evaluate_id) # 3. 验证数据库结果 verify_database_result(evaluate_id) # 4. 总结 logger.info("\n=== 测试总结 ===") if result.get("workflow_status") == "success": logger.info("✓ 评估工作流测试成功!") evaluate_result = result.get("evaluate_result", []) if evaluate_result: logger.info(f"✓ 成功评估 {len(evaluate_result)} 个视频") selected = [v for v in evaluate_result if v.get("is_selected", False)] logger.info(f"✓ 入选视频 {len(selected)} 个") else: logger.error("✗ 评估工作流测试失败") if result.get("error"): logger.error(f"错误信息: {result.get('error')}") except Exception as e: logger.error(f"测试执行失败: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()