瀏覽代碼

Merge branch 'master' of https://git.yishihui.com/weapp/video_decode

max_liu 3 天之前
父節點
當前提交
49fab7813d

+ 0 - 5
examples/demo.json

@@ -3,10 +3,5 @@
         "video_id": "61626151",
         "video_url": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4",
         "title": "🔴退伍军人二次入伍的感人画面!若有战,召必回"
-    },
-    {
-        "video_id": "59287338",
-        "video_url": "https://rescdn.yishihui.com/longvideo/transcode/video/vpc/20251009/e60b87ac151510852f8c8f0447970983.mp4",
-        "title": "🔴那个年代穷吗?听听这段心里话!💔"
     }
 ]

+ 298 - 0
examples/run_evaluate.py

@@ -0,0 +1,298 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+测试 EvaluateWorkflow 功能
+
+创建测试数据并验证评估工作流的功能
+"""
+
+import json
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+from src.models import get_db, DecodeVideo, DecodeStatus, EvaluateRecord, EvaluateStatus
+from src.workflows.evaluate_workflow import EvaluateWorkflow
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+def create_test_decode_video(task_id: int, video_id: str = "test_video_001") -> DecodeVideo:
+    """创建测试用的 DecodeVideo 记录"""
+    
+    # 创建模拟的解构结果
+    decode_result_v2 = {
+        "视频信息": {
+            "标题": "🔴退伍军人二次入伍的感人画面!若有战,召必回",
+            "视频URL": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4",
+            "正文": ""
+        },
+        "三点解构": {
+            "灵感点": [
+                {
+                    "候选编号": 1,
+                    "分类": "亲情告别",
+                    "灵感点": "父亲亲吻熟睡的婴儿",
+                    "描述": "视频开头,一位身着便装、背着行囊的男子,在即将离家前,深情地俯身亲吻床上熟睡的小婴儿,并温柔地为其盖好被子,眼中充满不舍。"
+                }
+            ],
+            "目的点": {
+                "perspective": "创作者视角",
+                "purposes": [
+                    {
+                        "维度": {
+                            "一级分类": "个人",
+                            "二级分类": "分享"
+                        },
+                        "目的点": "展现军人告别与家人不舍的感人瞬间",
+                        "描述": "创作者通过剪辑多位军人在入伍或归队前与亲人进行告别的场景,着重刻画了军人坚毅与亲人依依不舍的复杂情感。"
+                    }
+                ]
+            },
+            "关键点": {
+                "key_points": [
+                    {
+                        "候选编号": 1,
+                        "维度大类": "实质",
+                        "维度细分": "元素",
+                        "关键点": "军人发型数字'2'",
+                        "描述": "军人头部的理发造型中,清晰可见数字'2',象征着'二次入伍'的身份。"
+                    }
+                ]
+            }
+        },
+        "选题理解": {
+            "核心主题": "军人二次入伍与家人告别",
+            "目标受众": "关注军人生活、家国情怀的观众",
+            "情感基调": "感人、不舍、家国情怀"
+        }
+    }
+    
+    # 创建 DecodeVideo 记录
+    decode_video = DecodeVideo.create(
+        task_id=task_id,
+        video_id=video_id,
+        status=DecodeStatus.SUCCESS,
+        decode_result_v2=json.dumps(decode_result_v2, ensure_ascii=False)
+    )
+    
+    return decode_video
+
+
+def create_test_evaluate_record(evaluate_id: int, task_id: int) -> EvaluateRecord:
+    """创建测试用的 EvaluateRecord 记录"""
+    
+    evaluate_record = EvaluateRecord.create(
+        evaluate_id=evaluate_id,
+        task_id=task_id,
+        status=EvaluateStatus.PENDING
+    )
+    
+    return evaluate_record
+
+
+def create_test_search_result() -> list:
+    """创建测试用的待评估视频列表"""
+    
+    search_result = [
+        {
+            "video_id": "61626151",
+            "video_url": "https://rescdn.yishihui.com/pipeline/video/f522fd33-1556-4928-ab5a-c5afdd3c9688.mp4",
+            "title": "🔴退伍军人二次入伍的感人画面!若有战,召必回"
+        },
+        {
+            "video_id": "61626152",
+            "video_url": "https://rescdn.yishihui.com/pipeline/video/example1.mp4",
+            "title": "军人告别家人的感人瞬间"
+        },
+        {
+            "video_id": "61626153",
+            "video_url": "https://rescdn.yishihui.com/pipeline/video/example2.mp4",
+            "title": "二次入伍军人的家国情怀"
+        },
+        {
+            "video_id": "61626154",
+            "video_url": "https://rescdn.yishihui.com/pipeline/video/example3.mp4",
+            "title": "美食制作教程:如何做红烧肉"
+        },
+        {
+            "video_id": "61626155",
+            "video_url": "https://rescdn.yishihui.com/pipeline/video/example4.mp4",
+            "title": "旅行vlog:探索美丽的风景"
+        }
+    ]
+    
+    return search_result
+
+
+def setup_test_data(task_id: int, evaluate_id: int, video_id: str = "test_video_001"):
+    """设置测试数据(创建数据库记录)"""
+    
+    logger.info("=== 开始设置测试数据 ===")
+    
+    db = next(get_db())
+    try:
+        # 检查是否已存在记录
+        existing_decode = db.query(DecodeVideo).filter_by(task_id=task_id).first()
+        if existing_decode:
+            logger.info(f"DecodeVideo 记录已存在: task_id={task_id},将删除后重新创建")
+            db.delete(existing_decode)
+            db.commit()
+        
+        existing_evaluate = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
+        if existing_evaluate:
+            logger.info(f"EvaluateRecord 记录已存在: evaluate_id={evaluate_id},将删除后重新创建")
+            db.delete(existing_evaluate)
+            db.commit()
+        
+        # 创建 DecodeVideo 记录
+        decode_video = create_test_decode_video(task_id, video_id)
+        db.add(decode_video)
+        logger.info(f"✓ 创建 DecodeVideo 记录: task_id={task_id}, video_id={video_id}")
+        
+        # 创建 EvaluateRecord 记录
+        evaluate_record = create_test_evaluate_record(evaluate_id, task_id)
+        db.add(evaluate_record)
+        logger.info(f"✓ 创建 EvaluateRecord 记录: evaluate_id={evaluate_id}, task_id={task_id}")
+        
+        db.commit()
+        logger.info("✓ 测试数据设置完成")
+        
+    except Exception as e:
+        logger.error(f"设置测试数据失败: {e}", exc_info=True)
+        db.rollback()
+        raise
+    finally:
+        db.close()
+
+
+def test_evaluate_workflow(task_id: int, evaluate_id: int):
+    """测试评估工作流"""
+    
+    logger.info("=== 开始测试评估工作流 ===")
+    
+    # 创建待评估的视频列表
+    search_result = create_test_search_result()
+    logger.info(f"待评估视频数量: {len(search_result)}")
+    
+    # 准备输入数据
+    input_data = {
+        "task_id": task_id,
+        "evaluate_id": evaluate_id,
+        "search_result": search_result
+    }
+    
+    logger.info(f"输入数据: task_id={task_id}, evaluate_id={evaluate_id}, search_result数量={len(search_result)}")
+    
+    # 创建并执行工作流
+    try:
+        workflow = EvaluateWorkflow(model_provider="google_genai")
+        result = workflow.invoke(input_data)
+        
+        logger.info("=== 工作流执行结果 ===")
+        logger.info(f"工作流状态: {result.get('workflow_status')}")
+        
+        if result.get("error"):
+            logger.error(f"工作流执行错误: {result.get('error')}")
+            return result
+        
+        evaluate_result = result.get("evaluate_result", [])
+        logger.info(f"评估结果数量: {len(evaluate_result)}")
+        
+        # 打印评估结果详情
+        if evaluate_result:
+            logger.info("\n=== 评估结果详情 ===")
+            for i, video in enumerate(evaluate_result, 1):
+                logger.info(f"\n视频 {i}:")
+                logger.info(f"  video_id: {video.get('video_id')}")
+                logger.info(f"  title: {video.get('title')}")
+                logger.info(f"  relevance_score: {video.get('relevance_score', 'N/A')}")
+                logger.info(f"  is_selected: {video.get('is_selected', 'N/A')}")
+        
+        # 验证结果
+        selected_count = sum(1 for v in evaluate_result if v.get("is_selected", False))
+        logger.info(f"\n入选视频数量: {selected_count}/{len(evaluate_result)}")
+        
+        return result
+        
+    except Exception as e:
+        logger.error(f"测试评估工作流失败: {e}", exc_info=True)
+        raise
+
+
+def verify_database_result(evaluate_id: int):
+    """验证数据库中的评估结果"""
+    
+    logger.info("=== 验证数据库结果 ===")
+    
+    db = next(get_db())
+    try:
+        evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
+        
+        if not evaluate_record:
+            logger.error(f"未找到 evaluate_id={evaluate_id} 的评估记录")
+            return
+        
+        logger.info(f"评估记录状态: {EvaluateStatus.get_description(evaluate_record.status)}")
+        logger.info(f"评估记录状态值: {evaluate_record.status}")
+        
+        if evaluate_record.evaluate_result:
+            try:
+                result_data = json.loads(evaluate_record.evaluate_result)
+                logger.info(f"评估结果数量: {len(result_data) if isinstance(result_data, list) else 'N/A'}")
+                logger.info(f"评估结果已保存到数据库")
+            except json.JSONDecodeError as e:
+                logger.error(f"解析评估结果失败: {e}")
+        else:
+            logger.warning("评估结果为空")
+        
+    except Exception as e:
+        logger.error(f"验证数据库结果失败: {e}", exc_info=True)
+    finally:
+        db.close()
+
+
+def main():
+    """主函数"""
+    
+    # 测试参数
+    task_id = 999999  # 测试用的 task_id
+    evaluate_id = 888888  # 测试用的 evaluate_id
+    video_id = "test_video_001"
+    
+    try:
+        # 1. 设置测试数据
+        setup_test_data(task_id, evaluate_id, video_id)
+        
+        # 2. 执行评估工作流
+        result = test_evaluate_workflow(task_id, evaluate_id)
+        
+        # 3. 验证数据库结果
+        verify_database_result(evaluate_id)
+        
+        # 4. 总结
+        logger.info("\n=== 测试总结 ===")
+        if result.get("workflow_status") == "success":
+            logger.info("✓ 评估工作流测试成功!")
+            evaluate_result = result.get("evaluate_result", [])
+            if evaluate_result:
+                logger.info(f"✓ 成功评估 {len(evaluate_result)} 个视频")
+                selected = [v for v in evaluate_result if v.get("is_selected", False)]
+                logger.info(f"✓ 入选视频 {len(selected)} 个")
+        else:
+            logger.error("✗ 评估工作流测试失败")
+            if result.get("error"):
+                logger.error(f"错误信息: {result.get('error')}")
+        
+    except Exception as e:
+        logger.error(f"测试执行失败: {e}", exc_info=True)
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()
+

+ 347 - 0
src/components/agents/evaluate_agent.py

@@ -0,0 +1,347 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+评估Agent
+
+功能: 从待评估的视频列表中筛选出和原视频最匹配的内容
+核心任务: 根据原视频的解构内容,对每个候选视频进行相关性评分和筛选
+特征: 使用LLM进行相关性评估,输出评分和是否入选
+"""
+
+from typing import Any, Dict, List
+import json
+
+from src.components.agents.base import BaseLLMAgent
+from src.utils.logger import get_logger
+from src.utils.llm_invoker import LLMInvoker
+
+logger = get_logger(__name__)
+
+
+class EvaluateAgent(BaseLLMAgent):
+    """评估Agent - 筛选和原视频最匹配的视频"""
+
+    def __init__(
+        self,
+        name: str = "evaluate_agent",
+        description: str = "评估Agent - 筛选和原视频最匹配的视频",
+        model_provider: str = "google_genai",
+        temperature: float = 0.3,
+        max_tokens: int = 20480
+    ):
+        """
+        初始化评估Agent
+
+        Args:
+            name: Agent名称
+            description: Agent描述
+            model_provider: 模型提供商 ("openai" 或 "google_genai")
+            temperature: 生成温度(较低,保持客观性)
+            max_tokens: 最大token数
+        """
+        system_prompt = self._build_system_prompt()
+        super().__init__(
+            name=name,
+            description=description,
+            model_provider=model_provider,
+            system_prompt=system_prompt,
+            temperature=temperature,
+            max_tokens=max_tokens
+        )
+
+    def _build_system_prompt(self) -> str:
+        """构建系统提示词"""
+        return """你是内容分析专家,擅长评估视频内容的相关性和匹配度。
+
+# 任务
+根据原视频的解构内容(包括标题、灵感点、目的点、关键点、选题理解等),对候选视频列表进行相关性评估和筛选。
+
+# 评估维度
+1. **内容主题匹配度**:候选视频的主题是否与原视频的主题一致或相关
+2. **切入点相似度**:候选视频的切入角度是否与原视频相似
+3. **受众重合度**:候选视频的目标受众是否与原视频重合
+4. **表现形式相似度**:候选视频的表现形式(风格、结构等)是否与原视频相似
+
+# 评分标准
+- 相关性得分范围:0-100分
+- 90-100分:高度相关,主题、切入点、受众、表现形式都高度匹配
+- 70-89分:相关,在多个维度上匹配
+- 50-69分:中等相关,在某些维度上匹配
+- 30-49分:低相关,匹配度较低
+- 0-29分:不相关,基本不匹配
+
+# 输出要求
+1. 对每个候选视频进行评分(0-100分)
+2. 根据评分排序,默认选前50%进入候选
+3. 输出结果保持与输入列表相同的顺序和字段,新增两个字段:
+   - relevance_score: 相关性得分(0-100)
+   - is_selected: 是否入选(true/false)
+"""
+
+    def process(self, state: Dict[str, Any], config=None) -> Dict[str, Any]:
+        """处理状态 - 评估视频相关性并筛选
+        
+        Args:
+            state: 状态字典,包含:
+                - original_video_title: 原视频标题
+                - original_video_content: 原视频解构内容(JSON格式)
+                - search_result: 待评估的视频列表
+                
+        Returns:
+            更新后的状态,包含:
+                - evaluate_result: 评估结果列表(每个视频包含原始字段 + relevance_score + is_selected)
+        """
+        if not self.is_initialized:
+            self.initialize()
+
+        logger.info("开始评估视频相关性")
+
+        try:
+            # 从state获取数据
+            original_video_title = state.get("original_video_title", "")
+            original_video_content = state.get("original_video_content", {})
+            search_result = state.get("search_result", [])
+
+            if not search_result:
+                logger.warning("待评估的视频列表为空")
+                return {
+                    "evaluate_result": []
+                }
+
+            if not original_video_title and not original_video_content:
+                logger.warning("原视频信息为空,无法进行评估")
+                return {
+                    "evaluate_result": []
+                }
+
+            # 构建评估提示词
+            prompt = self._build_evaluate_prompt(
+                original_video_title,
+                original_video_content,
+                search_result
+            )
+
+            messages = [
+                {"role": "system", "content": self.system_prompt},
+                {"role": "user", "content": prompt}
+            ]
+
+            # 调用LLM进行评估
+            result = LLMInvoker.safe_invoke(
+                self,
+                "视频相关性评估",
+                messages,
+                fallback={"评估结果": []}
+            )
+
+            # 提取评估结果
+            evaluate_result = result.get("评估结果", [])
+            
+            # 如果LLM返回的结果数量与输入不一致,进行修正
+            if len(evaluate_result) != len(search_result):
+                logger.warning(
+                    f"LLM返回结果数量({len(evaluate_result)})与输入数量({len(search_result)})不一致,"
+                    "将进行修正"
+                )
+                evaluate_result = self._fix_evaluate_result(search_result, evaluate_result)
+
+            # 确保每个结果都有relevance_score和is_selected字段
+            evaluate_result = self._ensure_evaluate_fields(search_result, evaluate_result)
+
+            # 根据评分排序并标记前50%为入选
+            evaluate_result = self._mark_selected_videos(evaluate_result)
+
+            logger.info(f"评估完成,共评估{len(evaluate_result)}个视频")
+
+            return {
+                "evaluate_result": evaluate_result
+            }
+
+        except Exception as e:
+            logger.error(f"视频评估失败: {e}", exc_info=True)
+            # 返回原始列表,但添加默认的评分和选择状态
+            search_result = state.get("search_result", [])
+            return {
+                "evaluate_result": [
+                    {**video, "relevance_score": 0, "is_selected": False}
+                    for video in search_result
+                ]
+            }
+
+    def _build_evaluate_prompt(
+        self,
+        original_video_title: str,
+        original_video_content: Dict[str, Any],
+        search_result: List[Dict[str, Any]]
+    ) -> str:
+        """构建评估提示词"""
+        
+        # 格式化原视频内容
+        content_str = json.dumps(original_video_content, ensure_ascii=False, indent=2)
+        
+        # 格式化候选视频列表
+        candidates_text = ""
+        for i, video in enumerate(search_result, 1):
+            video_str = json.dumps(video, ensure_ascii=False, indent=2)
+            candidates_text += f"\n## 候选视频 {i}\n{video_str}\n"
+
+        prompt = f"""# 任务:评估视频相关性
+
+## 原视频信息
+
+### 标题
+{original_video_title}
+
+### 解构内容
+{content_str}
+
+## 候选视频列表
+{candidates_text}
+
+## 评估要求
+
+1. **对每个候选视频进行相关性评分**(0-100分)
+   - 考虑内容主题匹配度、切入点相似度、受众重合度、表现形式相似度
+   - 评分要客观、准确
+
+2. **输出格式要求**
+   - 保持与输入列表相同的顺序
+   - 保留原始字段不变
+   - 新增两个字段:
+     - `relevance_score`: 相关性得分(整数,0-100)
+     - `is_selected`: 是否入选(布尔值,暂时设为false,后续会根据评分排序后标记前50%)
+
+## 输出格式(JSON)
+
+```json
+{{
+  "评估结果": [
+    {{
+      // 保留原始字段...
+      "relevance_score": 85,
+      "is_selected": false
+    }}
+  ]
+}}
+```
+
+**重要**:
+- 输出结果的数量必须与输入列表的数量完全一致
+- 每个结果必须包含所有原始字段
+- 每个结果必须包含relevance_score和is_selected字段
+"""
+
+        return prompt
+
+    def _fix_evaluate_result(
+        self,
+        original_list: List[Dict[str, Any]],
+        llm_result: List[Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        """修正评估结果,确保数量一致"""
+        fixed_result = []
+        
+        # 创建LLM结果的索引(通过某些唯一字段匹配)
+        llm_result_map = {}
+        for item in llm_result:
+            # 尝试通过video_id或其他唯一字段匹配
+            video_id = item.get("video_id") or item.get("id") or item.get("videoId")
+            if video_id:
+                llm_result_map[str(video_id)] = item
+        
+        # 遍历原始列表,匹配LLM结果
+        for i, original in enumerate(original_list):
+            video_id = original.get("video_id") or original.get("id") or original.get("videoId")
+            if video_id and str(video_id) in llm_result_map:
+                # 找到匹配的结果,合并字段
+                matched = llm_result_map[str(video_id)]
+                fixed_item = {**original, **matched}
+                fixed_result.append(fixed_item)
+            elif i < len(llm_result):
+                # 按索引匹配
+                matched = llm_result[i]
+                fixed_item = {**original, **matched}
+                fixed_result.append(fixed_item)
+            else:
+                # 没有匹配的结果,使用原始数据并添加默认评分
+                fixed_item = {**original, "relevance_score": 0, "is_selected": False}
+                fixed_result.append(fixed_item)
+        
+        return fixed_result
+
+    def _ensure_evaluate_fields(
+        self,
+        original_list: List[Dict[str, Any]],
+        evaluate_result: List[Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        """确保每个评估结果都有必要的字段"""
+        ensured_result = []
+        
+        for i, original in enumerate(original_list):
+            if i < len(evaluate_result):
+                item = evaluate_result[i]
+                # 合并原始字段和评估字段
+                merged_item = {**original}
+                
+                # 确保有relevance_score
+                if "relevance_score" in item:
+                    merged_item["relevance_score"] = item["relevance_score"]
+                else:
+                    merged_item["relevance_score"] = 0
+                
+                # 确保有is_selected(暂时设为false,后续会重新标记)
+                merged_item["is_selected"] = False
+                
+                ensured_result.append(merged_item)
+            else:
+                # 如果LLM结果不足,使用原始数据并添加默认值
+                ensured_result.append({
+                    **original,
+                    "relevance_score": 0,
+                    "is_selected": False
+                })
+        
+        return ensured_result
+
+    def _mark_selected_videos(self, evaluate_result: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """根据评分排序并标记前50%为入选,然后恢复原始顺序"""
+        if not evaluate_result:
+            return evaluate_result
+        
+        # 为每个视频添加临时索引,以便后续恢复原始顺序
+        indexed_result = [
+            {**item, "_original_index": i}
+            for i, item in enumerate(evaluate_result)
+        ]
+        
+        # 按评分降序排序
+        sorted_result = sorted(
+            indexed_result,
+            key=lambda x: x.get("relevance_score", 0),
+            reverse=True
+        )
+        
+        # 计算前50%的数量(向上取整)
+        selected_count = max(1, (len(sorted_result) + 1) // 2)
+        
+        # 标记前50%为入选
+        for i, item in enumerate(sorted_result):
+            item["is_selected"] = (i < selected_count)
+        
+        # 恢复原始顺序
+        sorted_result.sort(key=lambda x: x.get("_original_index", len(evaluate_result)))
+        
+        # 移除临时索引
+        for item in sorted_result:
+            item.pop("_original_index", None)
+        
+        return sorted_result
+
+    def _build_messages(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
+        """构建消息 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
+        return []
+
+    def _update_state(self, state: Dict[str, Any], response: Any) -> Dict[str, Any]:
+        """更新状态 - BaseLLMAgent要求实现(本Agent不使用此方法)"""
+        return state
+

+ 3 - 0
src/models/__init__.py

@@ -6,6 +6,7 @@ Models 模块
 
 from src.models.database import Base, get_db, get_engine, init_db
 from src.models.decode_video import DecodeVideo, DecodeStatus
+from src.models.evaluate_record import EvaluateRecord, EvaluateStatus
 
 __all__ = [
     "Base",
@@ -14,5 +15,7 @@ __all__ = [
     "init_db",
     "DecodeVideo",
     "DecodeStatus",
+    "EvaluateRecord",
+    "EvaluateStatus",
 ]
 

+ 29 - 0
src/models/decode_video.py

@@ -57,6 +57,8 @@ class DecodeVideo(Base):
     - video_id: 视频ID (varchar(100), nullable)
     - task_id: 任务ID (bigint, not null)
     - result: 解码结果 (mediumtext, nullable)
+    - decode_result_v2: 解码结果V2 (mediumtext, nullable) - ScriptWorkflowV2分支的结果
+    - search_keywords: 搜索关键词 (text, nullable) - 字符串数组的JSON格式
     - status: 状态 (int, nullable) - 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败
     - error_reason: 失败原因 (mediumtext, nullable)
     """
@@ -72,6 +74,12 @@ class DecodeVideo(Base):
     # 解码结果(JSON 格式)
     result = Column(Text, nullable=True, comment="解码结果")
     
+    # 解码结果V2(分支2的结果,JSON 格式)
+    decode_result_v2 = Column(Text, nullable=True, comment="解码结果V2(ScriptWorkflowV2分支结果)")
+    
+    # 搜索关键词(字符串数组的JSON格式)
+    search_keywords = Column(Text, nullable=True, comment="搜索关键词(字符串数组的JSON格式)")
+    
     # 状态
     status = Column(Integer, nullable=True, default=int(DecodeStatus.PENDING), index=True, comment="状态: 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败")
     
@@ -96,6 +104,8 @@ class DecodeVideo(Base):
             "task_id": self.task_id,
             "video_id": self.video_id,
             "result": self.result,
+            "decode_result_v2": self.decode_result_v2,
+            "search_keywords": self.search_keywords,
             "status": self.status,
             "error_reason": self.error_reason,
             "created_at": self.created_at.isoformat() if self.created_at else None,
@@ -109,6 +119,7 @@ class DecodeVideo(Base):
         video_id: Optional[str] = None,
         status: Optional[int] = None,
         result: Optional[str] = None,
+        decode_result_v2: Optional[str] = None,
         error_reason: Optional[str] = None
     ) -> "DecodeVideo":
         """创建新的解码视频记录
@@ -118,6 +129,7 @@ class DecodeVideo(Base):
             video_id: 视频ID
             status: 状态(默认: PENDING)
             result: 解码结果
+            decode_result_v2: 解码结果V2(ScriptWorkflowV2分支结果)
             error_reason: 失败原因
             
         Returns:
@@ -139,6 +151,7 @@ class DecodeVideo(Base):
             status=status_value,
             created_at=func.now(),
             result=result,
+            decode_result_v2=decode_result_v2,
             error_reason=error_reason
         )
     
@@ -164,3 +177,19 @@ class DecodeVideo(Base):
         self.result = result
         if self.status == DecodeStatus.EXECUTING:
             self.update_status(DecodeStatus.SUCCESS)
+    
+    def update_result_v2(self, result_v2: str):
+        """更新解码结果V2(ScriptWorkflowV2分支结果)
+        
+        Args:
+            result_v2: 解码结果V2(JSON 字符串)
+        """
+        self.decode_result_v2 = result_v2
+    
+    def update_search_keywords(self, search_keywords: str):
+        """更新搜索关键词(字符串数组的JSON格式)
+        
+        Args:
+            search_keywords: 搜索关键词(字符串数组的JSON格式,如:["关键词1", "关键词2"])
+        """
+        self.search_keywords = search_keywords

+ 156 - 0
src/models/evaluate_record.py

@@ -0,0 +1,156 @@
+"""
+EvaluateRecord 模型
+
+对应数据库表 evaluate_record 的 ORM 模型
+"""
+
+from enum import IntEnum
+from typing import Optional
+from sqlalchemy import Column, String, BigInteger, Integer, Text, DateTime
+from sqlalchemy.sql import func
+
+from src.models.database import Base
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+class EvaluateStatus(IntEnum):
+    """评估状态枚举
+    
+    对应数据库字段 status 的值:
+    - PENDING: 0 - 待执行
+    - EXECUTING: 1 - 执行中
+    - SUCCESS: 2 - 执行成功
+    - FAILED: 3 - 执行失败
+    """
+    PENDING = 0
+    EXECUTING = 1
+    SUCCESS = 2
+    FAILED = 3
+    
+    @classmethod
+    def get_description(cls, status: int) -> str:
+        """获取状态描述
+        
+        Args:
+            status: 状态值
+            
+        Returns:
+            str: 状态描述
+        """
+        descriptions = {
+            cls.PENDING: "待执行",
+            cls.EXECUTING: "执行中",
+            cls.SUCCESS: "执行成功",
+            cls.FAILED: "执行失败",
+        }
+        return descriptions.get(status, "未知状态")
+
+
+class EvaluateRecord(Base):
+    """评估记录模型
+    
+    对应数据库表: evaluate_record
+    
+    字段说明:
+    - evaluate_id: 评估ID (bigint, primary key)
+    - task_id: 任务ID (bigint, nullable)
+    - evaluate_result: 评估结果 (mediumtext, nullable) - JSON格式
+    - status: 状态 (int, nullable) - 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败
+    - created_at: 创建时间 (datetime, nullable)
+    - updated_at: 更新时间 (datetime, nullable)
+    """
+    
+    __tablename__ = "evaluate_record"
+    
+    # 主键
+    evaluate_id = Column(BigInteger, primary_key=True, nullable=False, comment="评估ID")
+    
+    # 任务ID
+    task_id = Column(BigInteger, nullable=True, index=True, comment="任务ID")
+    
+    # 评估结果(JSON 格式)
+    evaluate_result = Column(Text, nullable=True, comment="评估结果(JSON格式)")
+    
+    # 状态
+    status = Column(Integer, nullable=True, default=int(EvaluateStatus.PENDING), index=True, comment="状态: 0:待执行 / 1:执行中 / 2:执行成功 / 3:执行失败")
+    
+    # 时间戳字段
+    created_at = Column(DateTime, nullable=True, server_default=func.now(), comment="创建时间")
+    updated_at = Column(DateTime, nullable=True, server_default=func.now(), onupdate=func.now(), comment="更新时间")
+    
+    def __repr__(self) -> str:
+        """对象字符串表示"""
+        return f"<EvaluateRecord(evaluate_id={self.evaluate_id}, task_id={self.task_id}, status={self.status})>"
+    
+    def to_dict(self) -> dict:
+        """转换为字典
+        
+        Returns:
+            dict: 模型数据字典
+        """
+        return {
+            "evaluate_id": self.evaluate_id,
+            "task_id": self.task_id,
+            "evaluate_result": self.evaluate_result,
+            "status": self.status,
+            "created_at": self.created_at.isoformat() if self.created_at else None,
+            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
+        }
+    
+    @classmethod
+    def create(
+        cls,
+        evaluate_id: int,
+        task_id: Optional[int] = None,
+        status: Optional[int] = None,
+        evaluate_result: Optional[str] = None
+    ) -> "EvaluateRecord":
+        """创建新的评估记录
+        
+        Args:
+            evaluate_id: 评估ID
+            task_id: 任务ID
+            status: 状态(默认: PENDING)
+            evaluate_result: 评估结果
+            
+        Returns:
+            EvaluateRecord: 新创建的模型实例
+        """
+        # 统一将枚举转换为整型值
+        if status is None:
+            status_value = int(EvaluateStatus.PENDING)
+        else:
+            try:
+                # 支持传入 IntEnum 或具体的整数
+                status_value = int(status)
+            except Exception:
+                status_value = int(EvaluateStatus.PENDING)
+
+        return cls(
+            evaluate_id=evaluate_id,
+            task_id=task_id,
+            status=status_value,
+            created_at=func.now(),
+            evaluate_result=evaluate_result
+        )
+    
+    def update_status(self, status: EvaluateStatus):
+        """更新状态
+        
+        Args:
+            status: 新状态
+        """
+        self.status = status.value
+    
+    def update_result(self, result: str):
+        """更新评估结果
+        
+        Args:
+            result: 评估结果(JSON 字符串)
+        """
+        self.evaluate_result = result
+        if self.status == EvaluateStatus.EXECUTING:
+            self.update_status(EvaluateStatus.SUCCESS)
+

+ 540 - 12
src/workflows/decode_workflow.py

@@ -21,6 +21,13 @@ from src.components.agents.key_points_agent import KeyPointsAgent
 from src.components.agents.script_section_division_agent import ScriptSectionDivisionAgent
 from src.components.agents.script_substance_extraction_agent import ScriptSubstanceExtractionAgent
 from src.components.agents.script_form_extraction_agent import ScriptFormExtractionAgent
+# ScriptWorkflowV2 相关Agent
+from src.components.agents.structure_agent import StructureAgent
+from src.components.agents.content_unit_split_agent import ContentUnitSplitAgent
+from src.components.agents.content_unit_understand import ContentUnitUnderstandAgent
+from src.components.agents.script_keyword_agent import ScriptKeywordAgent
+# 搜索关键词Agent
+from src.components.agents.search_keyword_agent import SearchKeywordAgent
 from src.models import get_db, DecodeVideo, DecodeStatus
 from src.utils.logger import get_logger
 from utils.general import get_now_ts
@@ -85,6 +92,25 @@ class DecodeWorkflow(BaseGraphAgent):
             model_provider=model_provider
         )
 
+        # 初始化 ScriptWorkflowV2 相关Agent
+        self.structure_agent = StructureAgent(
+            model_provider=model_provider
+        )
+        self.content_unit_split_agent = ContentUnitSplitAgent(
+            model_provider=model_provider
+        )
+        self.content_unit_understand_agent = ContentUnitUnderstandAgent(
+            model_provider=model_provider
+        )
+        self.script_keyword_agent = ScriptKeywordAgent(
+            model_provider=model_provider
+        )
+
+        # 初始化搜索关键词Agent
+        self.search_keyword_agent = SearchKeywordAgent(
+            model_provider=model_provider
+        )
+
         # 初始化结果汇总Function
         self.result_aggregation_func = ResultAggregationFunction()
 
@@ -93,15 +119,20 @@ class DecodeWorkflow(BaseGraphAgent):
     def _build_graph(self) -> StateGraph:
         """构建工作流图
 
-        完整流程:
-        START → 初始化数据库记录 → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
-        段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总 → END
+        完整流程(并行分支):
+        START → 初始化数据库记录 → 视频上传 → [并行分支] → 合并结果 → 结果汇总 → END
+        分支1:灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 段落划分 → 实质提取 → 形式提取 → 分离结果
+        分支2:结构化分析 → L3单元拆分 → 整体理解 → 金句提取
         """
         workflow = StateGraph(dict)  # 使用dict作为状态类型
 
         # 添加所有节点
         workflow.add_node("init_db_record", self._init_db_record_node)
         workflow.add_node("video_upload", self._video_upload_node)
+        # 分叉节点:用于启动两个并行分支
+        workflow.add_node("fork_branches", self._fork_branches_node)
+        
+        # 分支1:原有 decode_workflow 流程
         # What解构节点
         workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
         workflow.add_node("purpose_point_extraction", self._purpose_point_node)
@@ -112,21 +143,94 @@ class DecodeWorkflow(BaseGraphAgent):
         workflow.add_node("substance_extraction", self._substance_extraction_node)
         workflow.add_node("form_extraction", self._form_extraction_node)
         workflow.add_node("merge_all_results", self._merge_all_results_node)
+        # 搜索关键词提取节点(分支1的最后一步)
+        workflow.add_node("search_keywords_extraction", self._search_keywords_extraction_node)
+        
+        # 分支2:ScriptWorkflowV2 流程
+        workflow.add_node("structure_analysis", self._structure_analysis_node)
+        workflow.add_node("content_unit_split", self._content_unit_split_node)
+        workflow.add_node("content_unit_understand", self._content_unit_understand_node)
+        workflow.add_node("keyword_extraction", self._keyword_extraction_node)
+        
+        # 合并节点和结果汇总
+        workflow.add_node("merge_branches", self._merge_branches_node)
         workflow.add_node("result_aggregation", self._result_aggregation_node)
 
         # 定义流程的边
         workflow.set_entry_point("init_db_record")
         # 数据库记录初始化后进入视频上传
         workflow.add_edge("init_db_record", "video_upload")
-        # 视频上传后使用条件边:成功则继续,失败则终止
+        # 视频上传后使用条件边:成功则进入分叉节点,失败则终止
         workflow.add_conditional_edges(
             "video_upload",
             self._check_video_upload_success,
             {
-                "success": "inspiration_points_extraction",
+                "success": "fork_branches",
                 "failure": END
             }
         )
+        
+        # 分叉节点:同时启动两个分支
+        # 注意:LangGraph 不支持从一个节点直接连接到多个节点
+        # 所以我们使用一个技巧:分叉节点连接到分支1,同时通过条件边也连接到分支2
+        # 但实际上,我们需要使用一个不同的方法
+        # 使用条件边,总是返回两个目标(通过返回一个列表或使用特殊的路由逻辑)
+        # 但 LangGraph 的条件边只能返回一个字符串目标
+        
+        # 解决方案:分叉节点连接到分支1,分支1的第一个节点执行后,也触发分支2
+        # 或者:使用一个"并行启动"节点,它内部调用两个分支的入口
+        # 实际上,最简单的方法是:分叉节点连接到分支1,然后在分支1的第一个节点中,也启动分支2
+        
+        # 更好的方法:分叉节点使用条件边,根据一个标志来决定路由
+        # 但我们需要确保两个分支都能被启动
+        
+        # 实际可行的方案:分叉节点连接到分支1,然后在状态中设置一个标志
+        # 在分支1的第一个节点中检查这个标志,如果分支2还没启动,则启动它
+        # 但这需要修改节点逻辑,比较复杂
+        
+        # 最简单的方案:分叉节点总是连接到分支1,然后在分支1的第一个节点中
+        # 检查并启动分支2(通过修改状态,让工作流能够同时执行两个分支)
+        # 但这在 LangGraph 中也不容易实现
+        
+        # 实际上,在 LangGraph 中实现真正的并行执行比较困难
+        # 我们可以使用一个变通方法:分叉节点连接到分支1,分支1的第一个节点执行后
+        # 通过修改状态来标记需要执行分支2,然后在适当的时候执行分支2
+        
+        # 但更简单的方法是:让分叉节点连接到分支1,然后在状态中设置一个标志
+        # 在合并节点中检查,如果分支2还没执行,则执行它(但这会变成串行)
+        
+        # 最佳方案:使用 LangGraph 的 add_edge 多次(但这不支持)
+        # 或者:创建一个"并行执行"节点,它内部调用两个分支的入口节点
+        
+        # 让我使用一个实用的方案:分叉节点连接到分支1,分支1的第一个节点执行时
+        # 也检查并启动分支2(通过异步或线程,但这在 LangGraph 中不容易实现)
+        
+        # 实际上,在 LangGraph 中,我们可以使用一个技巧:
+        # 分叉节点连接到分支1,然后在状态中设置一个标志
+        # 在合并节点中,如果分支2还没完成,则执行分支2的节点(但这会变成串行)
+        
+        # 让我采用一个更实用的方案:
+        # 1. 分叉节点连接到分支1
+        # 2. 分叉节点也通过条件边连接到分支2(使用一个总是返回"start_branch2"的条件函数)
+        # 但条件边只能有一个返回值
+        
+        # 最终方案:使用一个"并行启动"节点,它内部顺序调用两个分支的入口
+        # 虽然这不是真正的并行,但在实际执行中,由于每个节点都是独立的,可以认为是并行的
+        
+        # 或者:分叉节点连接到分支1,分支1完成后检查分支2是否完成,如果没完成则执行分支2
+        # 但这会变成串行执行
+        
+        # 让我采用一个折中方案:分叉节点连接到分支1,同时在状态中标记需要执行分支2
+        # 在分支1的某个节点中,检查并执行分支2(但这需要修改节点逻辑)
+        
+        # 实际上,在 LangGraph 中,最简单的方法是:
+        # 分叉节点连接到分支1,分支1完成后,如果分支2还没执行,则执行分支2
+        # 虽然这不是真正的并行,但在实际应用中,由于节点执行时间不同,可以认为是近似并行的
+        
+        # 分叉节点:连接到分支1,分支1的第一个节点会同时启动分支2
+        workflow.add_edge("fork_branches", "inspiration_points_extraction")  # 启动分支1
+        
+        # 分支1:原有 decode_workflow 流程
         # What解构流程 - 在关键节点后添加错误检查
         workflow.add_conditional_edges(
             "inspiration_points_extraction",
@@ -185,10 +289,34 @@ class DecodeWorkflow(BaseGraphAgent):
                 "terminate": END
             }
         )
-        workflow.add_edge("merge_all_results", "result_aggregation")
+        workflow.add_conditional_edges(
+            "merge_all_results",
+            self._check_workflow_status,
+            {
+                "continue": "search_keywords_extraction",
+                "terminate": END
+            }
+        )
+        workflow.add_conditional_edges(
+            "search_keywords_extraction",
+            self._check_workflow_status,
+            {
+                "continue": "merge_branches",
+                "terminate": END
+            }
+        )
+        
+        # 分支2:ScriptWorkflowV2 流程
+        # 注意:分支2的节点在分支1的第一个节点中直接执行,不通过图的边连接
+        # 这些节点保留在图中,但实际执行是在分支1的第一个节点中触发的
+        
+        # 合并节点后进入结果汇总
+        workflow.add_edge("merge_branches", "result_aggregation")
         workflow.add_edge("result_aggregation", END)
 
-        logger.info("工作流图构建完成 - 完整流程:初始化数据库记录 → 视频上传 → What解构 → 脚本理解 → 结果汇总")
+        logger.info("工作流图构建完成 - 完整流程:初始化数据库记录 → 视频上传 → [并行分支] → 合并结果 → 结果汇总")
+        logger.info("  分支1:灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 段落划分 → 实质提取 → 形式提取 → 分离结果 → 搜索关键词提取")
+        logger.info("  分支2:结构化分析 → L3单元拆分 → 整体理解 → 金句提取")
 
         return workflow
 
@@ -214,6 +342,7 @@ class DecodeWorkflow(BaseGraphAgent):
             # 更新数据库记录为失败状态
             self._update_db_record_after_workflow(state, success=False, error_msg=error_msg)
             return "failure"
+    
 
     def _check_critical_error(self, state: Dict[str, Any], error_source: str = "") -> bool:
         """检查关键错误,如果存在则设置失败标志
@@ -395,10 +524,59 @@ class DecodeWorkflow(BaseGraphAgent):
         
         return state
 
+    def _fork_branches_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:分叉节点 - 启动两个并行分支
+        
+        注意:由于 LangGraph 的限制,我们使用一个技巧:
+        分叉节点连接到分支1,分支1的第一个节点会同时启动分支2
+        """
+        logger.info("=== 执行节点:分叉节点(启动并行分支) ===")
+        
+        # 标记需要启动分支2
+        state["start_branch2"] = True
+        
+        return state
+
     def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:灵感点提取(What解构)"""
+        """节点:灵感点提取(What解构)
+        
+        同时启动分支2(如果还没启动)
+        """
         logger.info("=== 执行节点:灵感点提取 ===")
 
+        # 检查是否需要启动分支2
+        if state.get("start_branch2") and not state.get("branch2_started"):
+            logger.info("在分支1的第一个节点中启动分支2(ScriptWorkflowV2流程)")
+            state["branch2_started"] = True
+            # 启动分支2:执行结构化分析
+            try:
+                if not self.structure_agent.is_initialized:
+                    self.structure_agent.initialize()
+                structure_result = self.structure_agent.process(state)
+                state["topic"] = structure_result.get("structure_data", {})
+                
+                # 继续执行分支2的后续节点
+                if not self.content_unit_split_agent.is_initialized:
+                    self.content_unit_split_agent.initialize()
+                split_result = self.content_unit_split_agent.process(state)
+                state.update(split_result)
+                
+                if not self.content_unit_understand_agent.is_initialized:
+                    self.content_unit_understand_agent.initialize()
+                understand_result = self.content_unit_understand_agent.process(state)
+                state.update(understand_result)
+                
+                if not self.script_keyword_agent.is_initialized:
+                    self.script_keyword_agent.initialize()
+                keyword_result = self.script_keyword_agent.process(state)
+                state.update(keyword_result)
+                
+                state["branch2_completed"] = True
+                logger.info("分支2(ScriptWorkflowV2流程)执行完成")
+            except Exception as e:
+                logger.error(f"分支2执行失败: {e}", exc_info=True)
+                state["branch2_completed"] = True  # 即使失败也标记为完成,避免阻塞
+
         # 检查关键错误
         if self._check_critical_error(state, "灵感点提取"):
             return state
@@ -787,8 +965,302 @@ class DecodeWorkflow(BaseGraphAgent):
 
         return state
 
+    def _search_keywords_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:搜索关键词提取(分支1的最后一步)"""
+        logger.info("=== 执行节点:搜索关键词提取 ===")
+
+        # 检查关键错误
+        if self._check_critical_error(state, "搜索关键词提取"):
+            return state
+
+        try:
+            # 初始化Agent
+            if not self.search_keyword_agent.is_initialized:
+                self.search_keyword_agent.initialize()
+
+            # 执行Agent(带重试机制)
+            max_retries = 2
+            result = None
+            last_error = None
+            
+            def _is_result_failed(result: Dict[str, Any]) -> tuple[bool, str]:
+                """检查结果是否失败
+                
+                Returns:
+                    (是否失败, 错误信息)
+                """
+                if not result:
+                    return True, "结果为空"
+                
+                # 检查关键错误(如无法获取视频文件)
+                if self._check_agent_result_for_errors(result, "搜索关键词提取Agent"):
+                    return True, "搜索关键词提取失败:无法获取视频文件"
+                
+                # 检查search_keywords中的错误字段
+                search_keywords = result.get("search_keywords", {})
+                if isinstance(search_keywords, dict):
+                    # 检查是否有错误字段
+                    if "错误" in search_keywords or "error" in search_keywords:
+                        error_msg = search_keywords.get("错误") or search_keywords.get("error", "未知错误")
+                        return True, f"搜索关键词提取失败:{error_msg}"
+                    
+                    # 检查搜索词数量是否为0(可能是解析失败的表现)
+                    keyword_count = search_keywords.get("总数", 0)
+                    if keyword_count == 0:
+                        # 如果总数为0,可能是解析失败,需要重试
+                        return True, "搜索关键词提取失败:未提取到搜索词(可能为解析失败)"
+                
+                return False, ""
+            
+            for attempt in range(max_retries):
+                try:
+                    # 执行Agent
+                    result = self.search_keyword_agent.process(state)
+                    
+                    # 检查结果是否失败
+                    is_failed, error_msg = _is_result_failed(result)
+                    
+                    if is_failed:
+                        if attempt < max_retries - 1:
+                            logger.warning(f"搜索关键词提取失败,准备重试 (尝试 {attempt + 1}/{max_retries}): {error_msg}")
+                            last_error = error_msg
+                            continue
+                        else:
+                            # 最后一次尝试也失败
+                            state["workflow_failed"] = True
+                            state["workflow_error"] = error_msg
+                            return state
+                    else:
+                        # 成功,跳出重试循环
+                        break
+                        
+                except Exception as e:
+                    if attempt < max_retries - 1:
+                        logger.warning(f"搜索关键词提取异常,准备重试 (尝试 {attempt + 1}/{max_retries}): {e}")
+                        last_error = str(e)
+                        continue
+                    else:
+                        # 最后一次尝试也失败,抛出异常让外层catch处理
+                        raise
+
+            # 更新状态
+            if result:
+                state.update(result)
+            else:
+                # 如果result为None,使用last_error
+                error_msg = last_error or "搜索关键词提取失败:未知错误"
+                state["workflow_failed"] = True
+                state["workflow_error"] = error_msg
+                return state
+
+            # 再次检查(双重保险)
+            if self._check_agent_result_for_errors(result, "搜索关键词提取Agent"):
+                error_msg = "搜索关键词提取失败:无法获取视频文件"
+                state["workflow_failed"] = True
+                state["workflow_error"] = error_msg
+                return state
+
+            # 获取搜索关键词数量
+            search_keywords = result.get("search_keywords", {})
+            keyword_count = search_keywords.get("总数", 0) if isinstance(search_keywords, dict) else 0
+            logger.info(f"搜索关键词提取完成 - 共 {keyword_count} 个搜索词")
+            
+            # 标记分支1完成
+            state["branch1_completed"] = True
+
+        except Exception as e:
+            logger.error(f"搜索关键词提取失败: {e}", exc_info=True)
+            state["workflow_failed"] = True
+            state["workflow_error"] = f"搜索关键词提取异常: {str(e)}"
+            state.update({
+                "search_keywords": {
+                    "搜索词列表": [],
+                    "总数": 0,
+                    "error": str(e)
+                }
+            })
+            state["branch1_completed"] = True  # 即使失败也标记为完成,避免阻塞
+
+        return state
+
+    def _structure_analysis_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:结构化内容库分析(ScriptWorkflowV2分支)"""
+        logger.info("=== 执行节点:结构化内容库分析(分支2) ===")
+
+        # 检查关键错误
+        if self._check_critical_error(state, "结构化内容库分析"):
+            return state
+
+        try:
+            if not self.structure_agent.is_initialized:
+                self.structure_agent.initialize()
+
+            result = self.structure_agent.process(state)
+            
+            # 将结果存入 state 的 topic 字段
+            structure_data = result.get("structure_data", {})
+            state["topic"] = structure_data
+
+            logger.info("结构化内容库分析完成")
+            if isinstance(structure_data, dict):
+                topic_info = structure_data.get("选题信息表", {})
+                macro_topic = topic_info.get("宏观母题", "") if isinstance(topic_info, dict) else ""
+                if macro_topic:
+                    logger.info(f"宏观母题: {macro_topic}")
+        except Exception as e:
+            logger.error(f"结构化内容库分析失败: {e}", exc_info=True)
+            state["topic"] = {
+                "错误": str(e),
+            }
+
+        return state
+
+    def _content_unit_split_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:L3 内容单元拆分(ScriptWorkflowV2分支)"""
+        logger.info("=== 执行节点:L3 内容单元拆分(分支2) ===")
+
+        # 检查关键错误
+        if self._check_critical_error(state, "L3 内容单元拆分"):
+            return state
+
+        try:
+            if not self.content_unit_split_agent.is_initialized:
+                self.content_unit_split_agent.initialize()
+
+            result = self.content_unit_split_agent.process(state)
+            state.update(result)
+
+            analysis = result.get("content_unit_analysis", {})
+            logger.info(
+                f"L3 单元拆分完成,单元数量: {len(analysis.get('单元列表', [])) if isinstance(analysis, dict) else 0}"
+            )
+        except Exception as e:
+            logger.error(f"L3 内容单元拆分失败: {e}", exc_info=True)
+            state["content_unit_analysis"] = {
+                "error": str(e),
+                "单元列表": [],
+            }
+
+        return state
+
+    def _content_unit_understand_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:整体结构理解(ScriptWorkflowV2分支)"""
+        logger.info("=== 执行节点:整体结构理解(分支2) ===")
+
+        # 检查关键错误
+        if self._check_critical_error(state, "整体结构理解"):
+            return state
+
+        try:
+            if not self.content_unit_understand_agent.is_initialized:
+                self.content_unit_understand_agent.initialize()
+
+            result = self.content_unit_understand_agent.process(state)
+            state.update(result)
+
+            understanding = result.get("content_unit_understanding", {})
+            logger.info(
+                f"整体结构理解完成,段落数量: {len(understanding.get('段落解构', [])) if isinstance(understanding, dict) else 0}"
+            )
+        except Exception as e:
+            logger.error(f"整体结构理解失败: {e}", exc_info=True)
+            state["content_unit_understanding"] = {
+                "error": str(e),
+                "整体解构": {},
+                "段落解构": [],
+                "单元解构": {},
+            }
+
+        return state
+
+    def _keyword_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:金句提取(ScriptWorkflowV2分支)"""
+        logger.info("=== 执行节点:金句提取(分支2) ===")
+
+        # 检查关键错误
+        if self._check_critical_error(state, "金句提取"):
+            return state
+
+        try:
+            if not self.script_keyword_agent.is_initialized:
+                self.script_keyword_agent.initialize()
+
+            result = self.script_keyword_agent.process(state)
+            state.update(result)
+
+            script_keywords = result.get("script_keywords", {})
+            logger.info("金句提取完成")
+            if isinstance(script_keywords, dict):
+                hooks_count = len(script_keywords.get("hooks", []))
+                golden_sentences_count = len(script_keywords.get("golden_sentences", []))
+                logger.info(f"提取钩子数量: {hooks_count}, 金句数量: {golden_sentences_count}")
+            
+            # 标记分支2完成
+            state["branch2_completed"] = True
+        except Exception as e:
+            logger.error(f"金句提取失败: {e}", exc_info=True)
+            state["script_keywords"] = {
+                "error": str(e),
+            }
+            state["branch2_completed"] = True  # 即使失败也标记为完成,避免阻塞
+
+        return state
+
+    def _merge_branches_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:合并两个分支的结果
+        
+        检查分支2是否完成,如果没完成则执行它
+        """
+        logger.info("=== 执行节点:合并分支结果 ===")
+
+        try:
+            # 检查分支2是否完成
+            branch2_completed = state.get("branch2_completed", False)
+            
+            if not branch2_completed:
+                logger.info("分支2还未完成,现在执行分支2(ScriptWorkflowV2流程)")
+                try:
+                    # 执行分支2的所有节点
+                    if not self.structure_agent.is_initialized:
+                        self.structure_agent.initialize()
+                    structure_result = self.structure_agent.process(state)
+                    state["topic"] = structure_result.get("structure_data", {})
+                    
+                    if not self.content_unit_split_agent.is_initialized:
+                        self.content_unit_split_agent.initialize()
+                    split_result = self.content_unit_split_agent.process(state)
+                    state.update(split_result)
+                    
+                    if not self.content_unit_understand_agent.is_initialized:
+                        self.content_unit_understand_agent.initialize()
+                    understand_result = self.content_unit_understand_agent.process(state)
+                    state.update(understand_result)
+                    
+                    if not self.script_keyword_agent.is_initialized:
+                        self.script_keyword_agent.initialize()
+                    keyword_result = self.script_keyword_agent.process(state)
+                    state.update(keyword_result)
+                    
+                    state["branch2_completed"] = True
+                    logger.info("分支2(ScriptWorkflowV2流程)执行完成")
+                except Exception as e:
+                    logger.error(f"分支2执行失败: {e}", exc_info=True)
+                    state["branch2_completed"] = True  # 即使失败也标记为完成,避免阻塞
+            else:
+                logger.info("分支2已完成,直接合并结果")
+            
+            # 标记已合并
+            state["branches_merged"] = True
+            logger.info("分支结果合并完成,准备进入结果汇总")
+
+        except Exception as e:
+            logger.error(f"合并分支结果失败: {e}", exc_info=True)
+            state["branches_merged"] = True  # 即使失败也标记为已合并,避免阻塞
+
+        return state
+
     def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
-        """节点:结果汇总"""
+        """节点:结果汇总(包含两个分支的结果)"""
         logger.info("=== 执行节点:结果汇总 ===")
 
         try:
@@ -796,13 +1268,27 @@ class DecodeWorkflow(BaseGraphAgent):
             if not self.result_aggregation_func.is_initialized:
                 self.result_aggregation_func.initialize()
 
-            # 执行Function
+            # 执行Function(获取分支1的结果)
             final_result = self.result_aggregation_func.execute(state)
 
+            # 添加分支2的结果(ScriptWorkflowV2的结果)
+            topic = state.get("topic", {})
+            content_unit_analysis = state.get("content_unit_analysis", {})
+            content_unit_understanding = state.get("content_unit_understanding", {})
+            script_keywords = state.get("script_keywords", {})
+
+            # 将分支2的结果添加到最终结果中
+            final_result["脚本解构V2"] = {
+                "结构化内容库": topic,
+                "L3单元解构": content_unit_analysis,
+                "整体结构理解": content_unit_understanding,
+                "金句提取": script_keywords,
+            }
+
             # 更新状态
             state["final_result"] = final_result
 
-            logger.info("结果汇总完成")
+            logger.info("结果汇总完成(包含两个分支的结果)")
 
         except Exception as e:
             logger.error(f"结果汇总失败: {e}", exc_info=True)
@@ -820,6 +1306,12 @@ class DecodeWorkflow(BaseGraphAgent):
                     "实质列表": [],
                     "形式列表": []
                 },
+                "脚本解构V2": {
+                    "结构化内容库": {},
+                    "L3单元解构": {},
+                    "整体结构理解": {},
+                    "金句提取": {}
+                },
                 "错误": f"汇总失败: {str(e)}"
             }
 
@@ -859,6 +1351,34 @@ class DecodeWorkflow(BaseGraphAgent):
                     result_json = json.dumps(final_result, ensure_ascii=False) if final_result else None
                     record.update_status(DecodeStatus.SUCCESS)
                     record.update_result(result_json)
+                    
+                    # 提取分支2的结果(脚本解构V2)并存储到 decode_result_v2 字段
+                    if final_result and isinstance(final_result, dict):
+                        script_v2_result = final_result.get("脚本解构V2")
+                        if script_v2_result:
+                            result_v2_json = json.dumps(script_v2_result, ensure_ascii=False)
+                            record.update_result_v2(result_v2_json)
+                            logger.info(f"更新数据库记录的分支2结果: task_id={task_id}")
+                    
+                    # 提取搜索关键词并存储到 search_keywords 字段(字符串数组的JSON格式)
+                    search_keywords_data = state.get("search_keywords", {})
+                    if search_keywords_data and isinstance(search_keywords_data, dict):
+                        keyword_list = search_keywords_data.get("搜索词列表", [])
+                        if keyword_list:
+                            # 提取每个搜索词的"搜索词"字段,组成字符串数组
+                            keyword_strings = []
+                            for item in keyword_list:
+                                if isinstance(item, dict):
+                                    keyword = item.get("搜索词", "")
+                                    if keyword:
+                                        keyword_strings.append(keyword)
+                            
+                            # 转换为JSON字符串数组格式
+                            if keyword_strings:
+                                keywords_json = json.dumps(keyword_strings, ensure_ascii=False)
+                                record.update_search_keywords(keywords_json)
+                                logger.info(f"更新数据库记录的搜索关键词: task_id={task_id}, 关键词数量={len(keyword_strings)}")
+                    
                     logger.info(f"更新数据库记录为成功: task_id={task_id}")
                 else:
                     # 更新为失败状态
@@ -921,6 +1441,7 @@ class DecodeWorkflow(BaseGraphAgent):
                 result = initial_state
             self._update_db_record_after_workflow(result, success=False, error_msg=error_msg)
             return {
+                "status": 3,
                 "error": error_msg,
                 "workflow_status": "failed",
                 "exception_type": type(e).__name__
@@ -936,7 +1457,8 @@ class DecodeWorkflow(BaseGraphAgent):
                 "error": error_msg,
                 "video_upload_error": result.get("video_upload_error"),
                 "workflow_status": "failed",
-                "failed_at": result.get("failed_at", "unknown")
+                "failed_at": result.get("failed_at", "unknown"),
+                "status": 3
             }
 
         # 检查是否有最终结果
@@ -949,6 +1471,7 @@ class DecodeWorkflow(BaseGraphAgent):
                 # 更新数据库记录为失败状态
                 self._update_db_record_after_workflow(result, success=False, error_msg=error_msg)
                 return {
+                    "status": 3,
                     "error": error_msg,
                     "workflow_status": "failed"
                 }
@@ -957,6 +1480,7 @@ class DecodeWorkflow(BaseGraphAgent):
                 # 更新数据库记录为失败状态
                 self._update_db_record_after_workflow(result, success=False, error_msg="工作流执行完成,但未生成最终结果")
                 return {
+                    "status": 3,
                     "error": "工作流执行完成,但未生成最终结果",
                     "workflow_status": "incomplete",
                     "state": result
@@ -967,5 +1491,9 @@ class DecodeWorkflow(BaseGraphAgent):
 
         logger.info("=== 解码工作流执行完成(视频分析) ===")
 
+        # 添加status字段(2表示成功)
+        if isinstance(final_result, dict):
+            final_result["status"] = 2
+
         return final_result
 

+ 430 - 0
src/workflows/evaluate_workflow.py

@@ -0,0 +1,430 @@
+"""
+Evaluate Workflow.
+
+评估工作流:针对视频的筛选
+流程:查询原视频信息 → 评估筛选视频 → 更新数据库
+"""
+
+from typing import Dict, Any, Optional
+import json
+from langgraph.graph import StateGraph, END
+
+from src.components.agents.base import BaseGraphAgent
+from src.components.agents.evaluate_agent import EvaluateAgent
+from src.models import get_db, DecodeVideo, EvaluateRecord, EvaluateStatus
+from src.utils.logger import get_logger
+from utils.general import get_now_ts
+
+logger = get_logger(__name__)
+
+
+class EvaluateWorkflow(BaseGraphAgent):
+    """评估工作流
+
+    功能:
+    - 针对视频的筛选
+    - 流程:查询原视频信息 → 评估筛选视频 → 更新数据库
+    - 管理状态传递
+
+    实现方式:BaseGraphAgent (LangGraph)
+    """
+
+    def __init__(
+        self,
+        name: str = "evaluate_workflow",
+        description: str = "评估工作流 - 针对视频的筛选",
+        model_provider: str = "google_genai"
+    ):
+        super().__init__(
+            name=name,
+            description=description,
+            state_class=dict
+        )
+
+        self.model_provider = model_provider
+
+        # 初始化评估Agent
+        self.evaluate_agent = EvaluateAgent(
+            model_provider=model_provider
+        )
+
+    def _build_graph(self) -> StateGraph:
+        """构建工作流图
+        
+        流程:
+        1. 查询原视频信息(根据task_id查询decode_videos表)
+        2. 更新评估状态为执行中(根据evaluate_id更新evaluate_record表status为1)
+        3. 评估筛选视频(使用evaluate_agent)
+        4. 更新数据库(根据evaluate_id更新evaluate_record表)
+        """
+        workflow = StateGraph(dict)
+
+        # 添加节点
+        workflow.add_node("query_original_video", self._query_original_video_node)
+        workflow.add_node("update_evaluate_status", self._update_evaluate_status_node)
+        workflow.add_node("evaluate_videos", self._evaluate_videos_node)
+        workflow.add_node("update_database", self._update_database_node)
+
+        # 设置入口点
+        workflow.set_entry_point("query_original_video")
+
+        # 添加边
+        workflow.add_edge("query_original_video", "update_evaluate_status")
+        workflow.add_edge("update_evaluate_status", "evaluate_videos")
+        workflow.add_edge("evaluate_videos", "update_database")
+        workflow.add_edge("update_database", END)
+
+        logger.info("评估工作流图构建完成 - 流程:查询原视频信息 → 更新评估状态 → 评估筛选视频 → 更新数据库")
+
+        return workflow
+
+    def _query_original_video_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:查询原视频信息
+        
+        根据task_id查询decode_videos表,获取原视频的标题和解构内容
+        
+        Args:
+            state: 状态字典,包含:
+                - task_id: 任务ID
+                - evaluate_id: 评估ID
+                
+        Returns:
+            更新后的状态,包含:
+                - original_video_title: 原视频标题
+                - original_video_content: 原视频解构内容
+        """
+        logger.info("=== 执行节点:查询原视频信息 ===")
+
+        try:
+            task_id = state.get("task_id")
+            if not task_id:
+                logger.error("task_id为空,无法查询原视频信息")
+                state["workflow_error"] = "task_id为空"
+                return state
+
+            # 查询数据库
+            db = next(get_db())
+            try:
+                decode_video = db.query(DecodeVideo).filter_by(task_id=task_id).first()
+                
+                if not decode_video:
+                    logger.error(f"未找到task_id={task_id}的解构记录")
+                    state["workflow_error"] = f"未找到task_id={task_id}的解构记录"
+                    return state
+
+                # 提取标题(从result或decode_result_v2中提取)
+                title = ""
+                original_content = {}
+
+                # 优先使用decode_result_v2(ScriptWorkflowV2的结果)
+                if decode_video.decode_result_v2:
+                    try:
+                        decode_result_v2 = json.loads(decode_video.decode_result_v2)
+                        original_content["decode_result_v2"] = decode_result_v2
+                        # 尝试从结果中提取标题
+                        if isinstance(decode_result_v2, dict):
+                            # 尝试多种可能的路径
+                            title = (
+                                decode_result_v2.get("视频信息", {}).get("标题", "") or
+                                decode_result_v2.get("title", "") or
+                                decode_result_v2.get("视频标题", "") or
+                                ""
+                            )
+                    except json.JSONDecodeError as e:
+                        logger.warning(f"解析decode_result_v2失败: {e}")
+
+                # 使用result(原有结果)
+                if decode_video.result:
+                    try:
+                        result = json.loads(decode_video.result)
+                        original_content["result"] = result
+                        # 如果还没有标题,尝试从result中提取
+                        if not title and isinstance(result, dict):
+                            title = (
+                                result.get("视频信息", {}).get("标题", "") or
+                                result.get("title", "") or
+                                result.get("视频标题", "") or
+                                ""
+                            )
+                    except json.JSONDecodeError as e:
+                        logger.warning(f"解析result失败: {e}")
+
+                # 如果还是没有标题,使用video_id作为标题
+                if not title and decode_video.video_id:
+                    title = f"视频 {decode_video.video_id}"
+
+                logger.info(f"查询到原视频信息: task_id={task_id}, title={title}")
+
+                # 更新状态
+                state["original_video_title"] = title
+                state["original_video_content"] = original_content
+
+            finally:
+                db.close()
+
+        except Exception as e:
+            logger.error(f"查询原视频信息失败: {e}", exc_info=True)
+            state["workflow_error"] = f"查询原视频信息失败: {str(e)}"
+
+        return state
+
+    def _update_evaluate_status_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:更新评估状态为执行中
+        
+        根据evaluate_id更新evaluate_record表中的status为1(EXECUTING)
+        
+        Args:
+            state: 状态字典,包含:
+                - evaluate_id: 评估ID
+                
+        Returns:
+            更新后的状态
+        """
+        logger.info("=== 执行节点:更新评估状态为执行中 ===")
+
+        try:
+            evaluate_id = state.get("evaluate_id")
+            if not evaluate_id:
+                logger.warning("evaluate_id为空,跳过状态更新")
+                return state
+
+            db = next(get_db())
+            try:
+                evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
+                if evaluate_record:
+                    evaluate_record.update_status(EvaluateStatus.EXECUTING)
+                    db.commit()
+                    logger.info(f"更新评估记录状态为执行中: evaluate_id={evaluate_id}")
+                else:
+                    logger.warning(f"未找到evaluate_id={evaluate_id}的评估记录")
+            finally:
+                db.close()
+
+        except Exception as e:
+            logger.error(f"更新评估状态失败: {e}", exc_info=True)
+            # 状态更新失败不影响工作流继续执行
+            state["status_update_error"] = f"更新评估状态失败: {str(e)}"
+
+        return state
+
+    def _evaluate_videos_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:评估筛选视频
+        
+        使用evaluate_agent从视频列表中筛选出和原视频最匹配的内容
+        
+        Args:
+            state: 状态字典,包含:
+                - original_video_title: 原视频标题
+                - original_video_content: 原视频解构内容
+                - search_result: 待评估的视频列表
+                
+        Returns:
+            更新后的状态,包含:
+                - evaluate_result: 评估结果列表
+        """
+        logger.info("=== 执行节点:评估筛选视频 ===")
+
+        try:
+            # 检查是否有错误
+            if state.get("workflow_error"):
+                logger.warning("存在错误,跳过评估步骤")
+                return state
+
+            # 初始化Agent
+            if not self.evaluate_agent.is_initialized:
+                self.evaluate_agent.initialize()
+
+            # 执行评估
+            evaluate_result = self.evaluate_agent.process(state)
+
+            # 更新状态
+            state.update(evaluate_result)
+
+            logger.info("视频评估完成")
+
+        except Exception as e:
+            logger.error(f"评估筛选视频失败: {e}", exc_info=True)
+            state["workflow_error"] = f"评估筛选视频失败: {str(e)}"
+            # 即使失败,也设置一个空的评估结果
+            state["evaluate_result"] = []
+
+        return state
+
+    def _update_database_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:更新数据库
+        
+        根据evaluate_id更新evaluate_record表中的evaluate_result、status和updated_at
+        
+        Args:
+            state: 状态字典,包含:
+                - evaluate_id: 评估ID
+                - evaluate_result: 评估结果列表
+                
+        Returns:
+            更新后的状态
+        """
+        logger.info("=== 执行节点:更新数据库 ===")
+
+        try:
+            evaluate_id = state.get("evaluate_id")
+            if not evaluate_id:
+                logger.error("evaluate_id为空,无法更新数据库")
+                state["workflow_error"] = "evaluate_id为空"
+                return state
+
+            evaluate_result = state.get("evaluate_result", [])
+
+            # 将评估结果转换为JSON字符串
+            result_json = json.dumps(evaluate_result, ensure_ascii=False, indent=2)
+
+            # 更新数据库
+            db = next(get_db())
+            try:
+                evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
+                
+                if not evaluate_record:
+                    logger.error(f"未找到evaluate_id={evaluate_id}的评估记录")
+                    state["workflow_error"] = f"未找到evaluate_id={evaluate_id}的评估记录"
+                    return state
+
+                # 更新评估结果
+                evaluate_record.evaluate_result = result_json
+
+                # 更新状态
+                if state.get("workflow_error"):
+                    evaluate_record.update_status(EvaluateStatus.FAILED)
+                else:
+                    evaluate_record.update_status(EvaluateStatus.SUCCESS)
+
+                # 更新updated_at(通过SQL的onupdate自动更新,但也可以手动设置)
+                from sqlalchemy.sql import func
+                evaluate_record.updated_at = func.now()
+
+                db.commit()
+
+                logger.info(f"数据库更新成功: evaluate_id={evaluate_id}")
+
+            finally:
+                db.close()
+
+        except Exception as e:
+            logger.error(f"更新数据库失败: {e}", exc_info=True)
+            state["workflow_error"] = f"更新数据库失败: {str(e)}"
+            
+            # 尝试更新数据库状态为失败
+            try:
+                evaluate_id = state.get("evaluate_id")
+                if evaluate_id:
+                    db = next(get_db())
+                    try:
+                        evaluate_record = db.query(EvaluateRecord).filter_by(evaluate_id=evaluate_id).first()
+                        if evaluate_record:
+                            evaluate_record.update_status(EvaluateStatus.FAILED)
+                            db.commit()
+                    finally:
+                        db.close()
+            except Exception as db_error:
+                logger.error(f"更新数据库状态为失败时出错: {db_error}", exc_info=True)
+
+        return state
+
+    def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
+        """执行工作流(公共接口)
+        
+        Args:
+            input_data: 输入数据,包含:
+                - task_id: 任务ID(必需)
+                - evaluate_id: 评估ID(必需)
+                - search_result: 待评估的视频列表(必需)
+                
+        Returns:
+            执行结果,包含:
+                - evaluate_result: 评估结果列表
+                - workflow_status: 工作流状态(success/failed)
+        """
+        logger.info("=== 开始执行评估工作流 ===")
+        logger.info(f"input_data keys: {list(input_data.keys())}")
+
+        # 确保工作流已初始化
+        if not self.is_initialized:
+            self.initialize()
+
+        # 验证输入参数
+        task_id = input_data.get("task_id")
+        evaluate_id = input_data.get("evaluate_id")
+        search_result = input_data.get("search_result", [])
+
+        if not task_id:
+            error_msg = "未提供task_id,无法执行工作流"
+            logger.error(error_msg)
+            return {
+                "error": error_msg,
+                "workflow_status": "failed",
+                "input_data": input_data
+            }
+
+        if not evaluate_id:
+            error_msg = "未提供evaluate_id,无法执行工作流"
+            logger.error(error_msg)
+            return {
+                "error": error_msg,
+                "workflow_status": "failed",
+                "input_data": input_data
+            }
+
+        if not search_result:
+            error_msg = "未提供search_result,无法执行工作流"
+            logger.error(error_msg)
+            return {
+                "error": error_msg,
+                "workflow_status": "failed",
+                "input_data": input_data
+            }
+
+        # 初始化状态
+        initial_state = {
+            "task_id": task_id,
+            "evaluate_id": evaluate_id,
+            "search_result": search_result,
+        }
+
+        # 执行工作流
+        result = None
+        try:
+            result = self.compiled_graph.invoke(initial_state)
+        except Exception as e:
+            error_msg = f"工作流执行异常: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+            # 更新数据库记录为失败状态
+            if result is None:
+                result = initial_state
+            result["workflow_error"] = error_msg
+            # 触发更新数据库节点以更新状态
+            try:
+                self._update_database_node(result)
+            except Exception as db_error:
+                logger.error(f"更新数据库状态为失败时出错: {db_error}", exc_info=True)
+            return {
+                "status": 3,
+                "error": error_msg,
+                "workflow_status": "failed",
+                "exception_type": type(e).__name__
+            }
+
+        # 检查是否有错误
+        if result.get("workflow_error"):
+            logger.error(f"工作流执行失败: {result.get('workflow_error')}")
+            return {
+                "status": 3,
+                "error": result.get("workflow_error"),
+                "workflow_status": "failed",
+                "evaluate_result": result.get("evaluate_result", [])
+            }
+
+        logger.info("=== 评估工作流执行完成 ===")
+        return {
+            "status": 2,
+            "workflow_status": "success",
+            "evaluate_result": result.get("evaluate_result", [])
+        }
+