瀏覽代碼

增加decode_agent

jihuaqiang 5 天之前
父節點
當前提交
8b06c539c2

+ 208 - 0
examples/run_decode_script.py

@@ -0,0 +1,208 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+"""
+批量运行解码工作流 (DecodeWorkflow)。
+
+读取 examples/demo.json 中的视频列表,
+逐一调用 DecodeWorkflow 进行处理,
+并将结果输出到 examples/output_decode_result.json。
+"""
+
+import json
+import sys
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, Any, List
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent
+sys.path.insert(0, str(project_root))
+
+from src.workflows.decode_workflow import DecodeWorkflow
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+def load_json(path: Path) -> List[Dict[str, Any]]:
+    """加载JSON文件"""
+    if not path.exists():
+        return []
+    with path.open("r", encoding="utf-8") as f:
+        data = json.load(f)
+        # 如果是字典且有 results 字段,提取 results
+        if isinstance(data, dict) and "results" in data:
+            return data["results"]
+        # 如果是列表,直接返回
+        elif isinstance(data, list):
+            return data
+        else:
+            return []
+
+
+def save_json(path: Path, data: Dict[str, Any]) -> None:
+    """保存JSON文件(使用临时文件确保原子性)"""
+    tmp_path = path.with_suffix(".tmp")
+    with tmp_path.open("w", encoding="utf-8") as f:
+        json.dump(data, f, ensure_ascii=False, indent=2)
+    tmp_path.replace(path)
+
+
+def build_decode_input(video_data: Dict[str, Any]) -> Dict[str, Any]:
+    """根据视频数据构造 DecodeWorkflow 的输入结构"""
+    return {
+        "video": video_data.get("video", ""),
+        "channel_content_id": video_data.get("channel_content_id", ""),
+        "title": video_data.get("title", ""),
+        "body_text": video_data.get("body_text", ""),
+    }
+
+
+def main() -> None:
+    """主函数"""
+    base_dir = Path(__file__).parent
+    input_path = base_dir / "demo.json"
+    output_path = base_dir / "output_decode_result.json"
+
+    if not input_path.exists():
+        raise FileNotFoundError(f"找不到输入文件: {input_path}")
+
+    # 读取视频列表
+    video_list = load_json(input_path)
+    if not video_list:
+        logger.warning(f"输入文件 {input_path} 中没有视频数据")
+        return
+
+    logger.info(f"共读取到 {len(video_list)} 个视频")
+
+    # 读取已有的输出结果,支持增量追加
+    output_data = {}
+    if output_path.exists():
+        try:
+            with output_path.open("r", encoding="utf-8") as f:
+                output_data = json.load(f)
+        except Exception as e:
+            logger.warning(f"读取已有输出文件失败,将创建新文件: {e}")
+            output_data = {}
+
+    if not output_data:
+        output_data = {
+            "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
+            "total": 0,
+            "success_count": 0,
+            "fail_count": 0,
+            "results": [],
+        }
+
+    existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
+    # 用 channel_content_id + video URL 去重,避免重复处理
+    processed_keys = {
+        f"{item.get('video_data', {}).get('channel_content_id', '')}|"
+        f"{item.get('video_data', {}).get('video', '')}"
+        for item in existing_results
+    }
+
+    # 初始化工作流
+    logger.info("初始化 DecodeWorkflow...")
+    workflow = DecodeWorkflow()
+    logger.info("DecodeWorkflow 初始化完成")
+
+    # 处理每个视频
+    for idx, video_data in enumerate(video_list, 1):
+        video_url = video_data.get("video", "")
+        channel_content_id = video_data.get("channel_content_id", "")
+        title = video_data.get("title", "")
+
+        # 生成唯一键用于去重
+        key = f"{channel_content_id}|{video_url}"
+        if key in processed_keys:
+            logger.info(f"[{idx}/{len(video_list)}] 已处理过该视频,跳过: channel_content_id={channel_content_id}")
+            continue
+
+        logger.info(
+            f"[{idx}/{len(video_list)}] 开始处理视频: "
+            f"channel_content_id={channel_content_id}, title={title[:50]}..."
+        )
+
+        try:
+            # 构建输入数据
+            decode_input = build_decode_input(video_data)
+
+            # 调用工作流
+            decode_result = workflow.invoke(decode_input)
+
+            # 按照 output_demo_script.json 的格式组织结果
+            # what_deconstruction_result: 包含视频信息、三点解构、选题理解
+            what_deconstruction_result = {
+                "视频信息": decode_result.get("视频信息", {}),
+                "三点解构": decode_result.get("三点解构", {}),
+                "选题理解": decode_result.get("选题理解", {}),
+            }
+            
+            # script_result: 包含选题描述和脚本理解
+            # 从选题理解中提取选题描述
+            topic_understanding = decode_result.get("选题理解", {})
+            selected_topic = {}
+            if isinstance(topic_understanding, dict):
+                if "选题" in topic_understanding:
+                    selected_topic = topic_understanding.get("选题", {})
+                else:
+                    selected_topic = {
+                        "主题": topic_understanding.get("主题", ""),
+                        "描述": topic_understanding.get("描述", ""),
+                    }
+            
+            script_result = {
+                "选题描述": selected_topic,
+                "脚本理解": decode_result.get("脚本理解", {}),
+            }
+
+            # 构造结果记录(参考 output_demo_script.json 格式)
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": what_deconstruction_result,
+                "script_result": script_result,
+                "success": True,
+                "error": None,
+            }
+
+            output_data["success_count"] = output_data.get("success_count", 0) + 1
+            logger.info(
+                f"[{idx}/{len(video_list)}] 处理成功: channel_content_id={channel_content_id}"
+            )
+
+        except Exception as e:
+            logger.error(
+                f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={e}",
+                exc_info=True
+            )
+            record = {
+                "video_data": video_data,
+                "what_deconstruction_result": None,
+                "script_result": None,
+                "success": False,
+                "error": str(e),
+            }
+            output_data["fail_count"] = output_data.get("fail_count", 0) + 1
+
+        output_data["results"].append(record)
+        output_data["total"] = output_data.get("total", 0) + 1
+
+        # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
+        save_json(output_path, output_data)
+        logger.info(f"结果已保存到 {output_path}")
+
+    logger.info(
+        f"\n{'='*60}\n"
+        f"批量解码完成:\n"
+        f"  总计: {output_data.get('total', 0)}\n"
+        f"  成功: {output_data.get('success_count', 0)}\n"
+        f"  失败: {output_data.get('fail_count', 0)}\n"
+        f"  输出文件: {output_path}\n"
+        f"{'='*60}"
+    )
+
+
+if __name__ == "__main__":
+    main()
+

二進制
src/components/agents/__pycache__/script_form_extraction_agent.cpython-313.pyc


二進制
src/components/agents/__pycache__/script_section_division_agent.cpython-313.pyc


二進制
src/components/agents/__pycache__/script_substance_extraction_agent.cpython-313.pyc


+ 28 - 9
src/components/agents/script_form_extraction_agent.py

@@ -122,18 +122,18 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
             concrete_elements = [
                 elem
                 for elem in substance_final_elements
-                if elem.get("维度", {}).get("二级") == "具体元素"
+                if isinstance(elem, dict) and elem.get("维度", {}).get("二级") == "具体元素"
             ]
             concrete_concepts = [
                 elem
                 for elem in substance_final_elements
-                if elem.get("维度", {}).get("二级") == "具象概念"
+                if isinstance(elem, dict) and elem.get("维度", {}).get("二级") == "具象概念"
             ]
             # 抽象概念
             abstract_concepts = [
                 elem
                 for elem in substance_final_elements
-                if elem.get("维度", {}).get("二级") == "抽象概念"
+                if isinstance(elem, dict) and elem.get("维度", {}).get("二级") == "抽象概念"
             ]
             # 隐含概念相关逻辑已移除
             implicit_concepts: List[dict] = []
@@ -147,7 +147,7 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         # 只保留形式类关键点
         form_key_points = [
             kp for kp in key_points
-            if kp.get("维度大类") == "形式"
+            if isinstance(kp, dict) and kp.get("维度大类") == "形式"
         ] if key_points else []
 
         logger.info(
@@ -232,13 +232,16 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
             # 统计频次分布
             freq_distribution = {}
             for item in analyzed_result:
+                if not isinstance(item, dict):
+                    continue
                 freq = item.get("出现频次", 0)
                 freq_distribution[freq] = freq_distribution.get(freq, 0) + 1
             logger.info(
                 f"   频次分布: {dict(sorted(freq_distribution.items(), reverse=True)[:5])}"
             )
             # 显示高频元素
-            high_freq_items = sorted(analyzed_result, key=lambda x: x.get("出现频次", 0), reverse=True)[:3]
+            dict_items = [item for item in analyzed_result if isinstance(item, dict)]
+            high_freq_items = sorted(dict_items, key=lambda x: x.get("出现频次", 0), reverse=True)[:3]
             if high_freq_items:
                 logger.info(
                     f"   高频元素(前3): {[(item.get('名称', 'N/A'), item.get('出现频次', 0)) for item in high_freq_items]}"
@@ -762,6 +765,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         }
 
         for elem in form_elements:
+            if not isinstance(elem, dict):
+                continue
             second_level = elem.get("维度", {}).get("二级", "")
             if second_level in dimension_groups:
                 dimension_groups[second_level].append(elem)
@@ -938,6 +943,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
 
         merged_map = {}
         for item in all_results:
+            if not isinstance(item, dict):
+                continue
             element_id = item.get("id")
             if element_id not in merged_map:
                 merged_map[element_id] = {
@@ -1207,7 +1214,7 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
             return {}
 
         # 创建映射
-        analyzed_map = {item.get("id"): item for item in analyzed_result}
+        analyzed_map = {item.get("id"): item for item in analyzed_result if isinstance(item, dict)}
 
         # 统计每个元素支撑的点,并收集所有被支撑的点
         element_support_count = {}
@@ -1235,6 +1242,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
 
                 # 记录该元素支撑的具体点
                 for support_item in support_results:
+                    if not isinstance(support_item, dict):
+                        continue
                     point_name = support_item.get("点", "")
                     if point_name:
                         element_support_count[element_id][point_type].append(point_name)
@@ -1431,7 +1440,7 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
 
         filtered_elements = [
             elem for elem in form_elements
-            if elem.get("id") in filtered_ids
+            if isinstance(elem, dict) and elem.get("id") in filtered_ids
         ]
 
         dimension_groups: Dict[str, List[dict]] = {
@@ -1441,6 +1450,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         }
 
         for elem in filtered_elements:
+            if not isinstance(elem, dict):
+                continue
             second_level = elem.get("维度", {}).get("二级", "")
             if second_level in dimension_groups:
                 dimension_groups[second_level].append(elem)
@@ -1487,7 +1498,7 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
 
         elements_text = json.dumps([
             {"id": elem.get("id"), "名称": elem.get("名称"), "描述": elem.get("描述")}
-            for elem in elements
+            for elem in elements if isinstance(elem, dict)
         ], ensure_ascii=False, indent=2)
 
         prompt = f"""# 任务
@@ -1549,7 +1560,7 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         if not filtered_ids:
             return []
 
-        extraction_map = {item.get("id"): item for item in form_elements}
+        extraction_map = {item.get("id"): item for item in form_elements if isinstance(item, dict)}
 
         scored_map: Dict[str, Dict[str, list]] = {}
         for dimension in ["灵感点", "目的点", "关键点"]:
@@ -1565,8 +1576,12 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
 
         category_map: Dict[str, Any] = {}
         for dimension_data in categorized_result.values():
+            if not isinstance(dimension_data, dict):
+                continue
             element_classifications = dimension_data.get("元素分类", [])
             for classification in element_classifications:
+                if not isinstance(classification, dict):
+                    continue
                 element_id = classification.get("元素id")
                 category_info = classification.get("分类", {})
                 if element_id:
@@ -1640,6 +1655,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         }
 
         for elem in elements:
+            if not isinstance(elem, dict):
+                continue
             element_type = elem.get("维度", {}).get("二级", "N/A")
             element_data = {
                 "id": elem.get("id", "N/A"),
@@ -1657,6 +1674,8 @@ class ScriptFormExtractionAgent(BaseLLMAgent):
         dimension_elements = []
 
         for elem in elements:
+            if not isinstance(elem, dict):
+                continue
             element_type = elem.get('维度', {}).get('二级', 'N/A')
             element_data = {
                 "id": elem.get('id', 'N/A'),

+ 22 - 3
src/components/agents/script_substance_extraction_agent.py

@@ -125,13 +125,32 @@ class ScriptSubstanceExtractionAgent(BaseLLMAgent):
         else:
             inspiration_points = []
         
-        purpose_points = state.get("purpose_points", [])
-        key_points = state.get("key_points", [])
+        # 兼容 purpose_point 的多种格式:
+        # 1. 字典格式:{"purpose_point": {"purposes": [...], "total_count": ...}}
+        # 2. 列表格式:[...](直接是目的点列表)
+        purpose_point_raw = state.get("purpose_point", {})
+        if isinstance(purpose_point_raw, dict):
+            purpose_points = purpose_point_raw.get("purposes", [])
+        elif isinstance(purpose_point_raw, list):
+            purpose_points = purpose_point_raw
+        else:
+            purpose_points = []
+        
+        # 兼容 key_points 的多种格式:
+        # 1. 字典格式:{"key_points": [...], "total_count": ...}
+        # 2. 列表格式:[...](直接是关键点列表)
+        key_points_raw = state.get("key_points", {})
+        if isinstance(key_points_raw, dict):
+            key_points = key_points_raw.get("key_points", [])
+        elif isinstance(key_points_raw, list):
+            key_points = key_points_raw
+        else:
+            key_points = []
 
         # 只保留实质类关键点
         substance_key_points = [
             kp for kp in key_points
-            if kp.get("维度大类") == "实质"
+            if isinstance(kp, dict) and kp.get("维度大类") == "实质"
         ] if key_points else []
         
         logger.info(

二進制
src/components/functions/__pycache__/result_aggregation_function.cpython-313.pyc


+ 117 - 58
src/components/functions/result_aggregation_function.py

@@ -1,8 +1,7 @@
 """
-Result Aggregation Function(视频分析版本)
+Result Aggregation Function(解码工作流版本)
 
-结果汇总函数:将所有解构结果汇总为最终的JSON结构,包括三点解构(灵感点、目的点、关键点)、选题理解和搜索关键词。
-视频分析版本:不包含元素树,只汇总三点解构、选题理解和搜索关键词结果。
+结果汇总函数:将所有解构结果汇总为最终的JSON结构,包括三点解构(灵感点、目的点、关键点)、选题理解、脚本理解(实质、形式)等所有数据。
 """
 
 from typing import Dict, Any, List, Optional
@@ -15,22 +14,20 @@ logger = get_logger(__name__)
 
 
 class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str, Any]]):
-    """结果汇总函数(视频分析版本)
+    """结果汇总函数(解码工作流版本)
 
     功能:
     - 将各Agent的解构结果组装为JSON结构
     - 汇总三点解构结果(灵感点、目的点、关键点)
     - 汇总选题理解结果
-    - 汇总搜索关键词结果
+    - 汇总脚本理解结果(实质、形式、段落划分等)
     - 验证JSON格式
-
-    注意:视频分析版本不包含元素树构建功能
     """
 
     def __init__(
         self,
         name: str = "result_aggregation_function",
-        description: str = "汇总所有解构结果为最终的树状JSON结构(包括三点解构、选题理解和搜索关键词)"
+        description: str = "汇总所有解构结果为最终的JSON结构(包括三点解构、选题理解、脚本理解等所有数据)"
     ):
         super().__init__(name, description)
 
@@ -49,7 +46,7 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
             最终的树状JSON结构
         """
         try:
-            # 视频分析版本:仅提取视频相关结果
+            # 提取视频相关结果
             video_url = input_data.get("video", "")
             
             # 从 state 中获取标题和正文(优先从直接字段获取,兼容text字典)
@@ -61,15 +58,41 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
                 body_text = body_text or text_data.get("body", "")
 
             # 提取三点解构结果
-            inspiration_points = input_data.get("inspiration_points", {})
+            # 灵感点:可能是数组或包含数组的对象
+            inspiration_points_raw = input_data.get("inspiration_points", {})
+            if isinstance(inspiration_points_raw, list):
+                inspiration_points = inspiration_points_raw
+            elif isinstance(inspiration_points_raw, dict):
+                # 可能是包含 points 或直接是数组的字典
+                inspiration_points = inspiration_points_raw.get("points", inspiration_points_raw.get("inspiration_points", []))
+            else:
+                inspiration_points = []
+            
+            # 目的点:保持原始结构(包含 perspective, purposes, total_count)
             purpose_point = input_data.get("purpose_point", {})
+            
+            # 关键点:保持原始结构(包含 key_points, total_count, root_count 等)
             key_points = input_data.get("key_points", {})
 
-            # 提取选题理解结果
+            # 提取选题理解结果 - 保持完整结构(包含主题、描述、覆盖情况等所有字段)
             topic_selection_understanding = input_data.get("topic_selection_understanding", {})
 
-            # 提取搜索关键词结果
-            search_keywords = input_data.get("search_keywords", {})
+            # 提取脚本理解结果
+            # 段落划分相关
+            content_category = input_data.get("内容品类", "未知")
+            section_list = input_data.get("段落列表", [])
+            
+            # 实质和形式列表
+            substance_list = input_data.get("实质列表", [])
+            form_list = input_data.get("形式列表", [])
+            
+            # 如果没有实质列表,尝试从 substance_final_elements 获取
+            if not substance_list:
+                substance_list = input_data.get("substance_final_elements", [])
+            
+            # 如果没有形式列表,尝试从 form_final_elements 获取
+            if not form_list:
+                form_list = input_data.get("form_final_elements", [])
 
             # 构建视频基本信息
             video_info = {
@@ -78,41 +101,52 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
                 "正文": body_text
             }
 
-            # 组装最终JSON(视频分析版本
-            final_result: FinalOutput = {
+            # 组装最终JSON(包含所有数据,参考 output_demo_script.json 格式
+            final_result = {
                 "视频信息": video_info,
                 "三点解构": {
-                    "灵感点": inspiration_points,
-                    "目的点": purpose_point,
-                    "关键点": key_points
+                    "灵感点": inspiration_points,  # 数组格式
+                    "目的点": purpose_point,  # 对象,包含 perspective, purposes, total_count
+                    "关键点": key_points  # 对象,包含 key_points, total_count, root_count
                 },
-                "选题理解": topic_selection_understanding,
-                "搜索关键词": search_keywords
+                "选题理解": topic_selection_understanding,  # 保持完整结构,包含 主题、描述、覆盖情况等
+                "脚本理解": {
+                    "内容品类": content_category,
+                    "段落列表": section_list,
+                    "实质列表": substance_list,  # 数组格式
+                    "形式列表": form_list  # 数组格式
+                }
             }
 
             # 验证JSON格式
             self._validate_result(final_result)
 
-            search_keywords_count = search_keywords.get("总数", 0)
-            logger.info(f"结果汇总完成(视频分析):三点解构 + 选题理解 + 搜索关键词({search_keywords_count}个)")
+            substance_count = len(substance_list) if isinstance(substance_list, list) else 0
+            form_count = len(form_list) if isinstance(form_list, list) else 0
+            logger.info(
+                f"结果汇总完成:选题 + 三点解构 + 脚本理解(实质:{substance_count}个, 形式:{form_count}个)"
+            )
 
             return final_result
 
         except Exception as e:
             logger.error(f"结果汇总失败: {e}", exc_info=True)
-            # 返回错误结果
+            # 返回错误结果(保持格式一致性)
             return {
                 "视频信息": {},
                 "三点解构": {
-                    "灵感点": {},
+                    "灵感点": [],
                     "目的点": {},
                     "关键点": {}
                 },
                 "选题理解": {},
-                "搜索关键词": {
-                    "搜索词列表": [],
-                    "总数": 0
-                }
+                "脚本理解": {
+                    "内容品类": "未知",
+                    "段落列表": [],
+                    "实质列表": [],
+                    "形式列表": []
+                },
+                "错误": f"汇总失败: {str(e)}"
             }
 
     def _build_element_tree(
@@ -163,7 +197,7 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
         return element
 
     def _validate_result(self, result: Dict[str, Any]) -> None:
-        """验证JSON格式(视频分析版本)
+        """验证JSON格式(解码工作流版本)
 
         Args:
             result: 最终结果
@@ -171,7 +205,7 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
         Raises:
             ValueError: 格式不正确时抛出异常
         """
-        # 检查必需字段(视频分析版本)
+        # 检查必需字段
         if "视频信息" not in result:
             logger.warning("缺少'视频信息'字段")
             result["视频信息"] = {}
@@ -179,7 +213,7 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
         if "三点解构" not in result:
             logger.warning("缺少'三点解构'字段")
             result["三点解构"] = {
-                "灵感点": {},
+                "灵感点": [],
                 "目的点": {},
                 "关键点": {}
             }
@@ -188,55 +222,80 @@ class ResultAggregationFunction(BaseFunction[WhatDeconstructionState, Dict[str,
             logger.warning("缺少'选题理解'字段")
             result["选题理解"] = {}
 
+        if "脚本理解" not in result:
+            logger.warning("缺少'脚本理解'字段")
+            result["脚本理解"] = {
+                "内容品类": "未知",
+                "段落列表": [],
+                "实质列表": [],
+                "形式列表": []
+            }
+
         # 验证三点解构结构
-        three_points = result["三点解构"]
+        three_points = result.get("三点解构", {})
         if not isinstance(three_points, dict):
             logger.warning("'三点解构'必须是字典")
             result["三点解构"] = {
-                "灵感点": {},
+                "灵感点": [],
                 "目的点": {},
                 "关键点": {}
             }
         else:
-            # 确保三点字段存在
+            # 确保三点字段存在,灵感点是数组
             if "灵感点" not in three_points:
                 logger.warning("三点解构缺少'灵感点'字段")
-                three_points["灵感点"] = {}
+                three_points["灵感点"] = []
+            elif not isinstance(three_points["灵感点"], list):
+                logger.warning("'灵感点'必须是列表")
+                three_points["灵感点"] = []
+            
             if "目的点" not in three_points:
                 logger.warning("三点解构缺少'目的点'字段")
                 three_points["目的点"] = {}
+            elif not isinstance(three_points["目的点"], dict):
+                logger.warning("'目的点'必须是字典")
+                three_points["目的点"] = {}
+            
             if "关键点" not in three_points:
                 logger.warning("三点解构缺少'关键点'字段")
                 three_points["关键点"] = {}
+            elif not isinstance(three_points["关键点"], dict):
+                logger.warning("'关键点'必须是字典")
+                three_points["关键点"] = {}
 
-        # 验证选题理解
-        if not isinstance(result["选题理解"], dict):
+        # 验证选题理解(保持完整结构,不做拆解)
+        if "选题理解" not in result:
+            result["选题理解"] = {}
+        elif not isinstance(result["选题理解"], dict):
             logger.warning("'选题理解'必须是字典")
             result["选题理解"] = {}
 
-        # 验证搜索关键词
-        if "搜索关键词" not in result:
-            logger.warning("缺少'搜索关键词'字段")
-            result["搜索关键词"] = {
-                "搜索词列表": [],
-                "总数": 0
-            }
-
-        if not isinstance(result["搜索关键词"], dict):
-            logger.warning("'搜索关键词'必须是字典")
-            result["搜索关键词"] = {
-                "搜索词列表": [],
-                "总数": 0
+        # 验证脚本理解结构
+        script_understanding = result.get("脚本理解", {})
+        if not isinstance(script_understanding, dict):
+            logger.warning("'脚本理解'必须是字典")
+            result["脚本理解"] = {
+                "内容品类": "未知",
+                "段落列表": [],
+                "实质列表": [],
+                "形式列表": []
             }
         else:
-            # 确保搜索关键词结构完整
-            if "搜索词列表" not in result["搜索关键词"]:
-                logger.warning("搜索关键词缺少'搜索词列表'字段")
-                result["搜索关键词"]["搜索词列表"] = []
-            if "总数" not in result["搜索关键词"]:
-                # 自动计算总数
-                search_list = result["搜索关键词"].get("搜索词列表", [])
-                result["搜索关键词"]["总数"] = len(search_list)
+            # 确保脚本理解字段存在
+            if "内容品类" not in script_understanding:
+                script_understanding["内容品类"] = "未知"
+            if "段落列表" not in script_understanding:
+                script_understanding["段落列表"] = []
+            if "实质列表" not in script_understanding:
+                script_understanding["实质列表"] = []
+            if "形式列表" not in script_understanding:
+                script_understanding["形式列表"] = []
+            
+            # 确保列表类型正确
+            for field in ["段落列表", "实质列表", "形式列表"]:
+                if not isinstance(script_understanding.get(field), list):
+                    logger.warning(f"脚本理解中的'{field}'必须是列表")
+                    script_understanding[field] = []
 
     def _validate_element(self, element: Dict[str, Any]) -> None:
         """验证元素节点格式(视频分析版本中不再使用,保留以保持代码完整性)

+ 496 - 0
src/workflows/decode_workflow.py

@@ -0,0 +1,496 @@
+"""
+Decode Workflow.
+
+解码工作流:合并 What 解构工作流和脚本理解工作流的完整流程。
+流程:视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+      段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
+"""
+
+from typing import Dict, Any
+from langgraph.graph import StateGraph, END
+
+from src.components.agents.base import BaseGraphAgent
+from src.components.agents.topic_selection_understanding_agent import TopicSelectionUnderstandingAgent
+from src.components.functions.result_aggregation_function import ResultAggregationFunction
+from src.components.functions.video_upload_function import VideoUploadFunction
+# What解构相关Agent
+from src.components.agents.inspiration_points_agent import InspirationPointsAgent
+from src.components.agents.purpose_point_agent import PurposePointAgent
+from src.components.agents.key_points_agent import KeyPointsAgent
+# 脚本理解相关Agent
+from src.components.agents.script_section_division_agent import ScriptSectionDivisionAgent
+from src.components.agents.script_substance_extraction_agent import ScriptSubstanceExtractionAgent
+from src.components.agents.script_form_extraction_agent import ScriptFormExtractionAgent
+from src.utils.logger import get_logger
+
+logger = get_logger(__name__)
+
+
+class DecodeWorkflow(BaseGraphAgent):
+    """解码工作流(合并 What 解构和脚本理解)
+
+    功能:
+    - 编排完整的解码流程(视频分析)
+    - 流程:视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+           段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
+    - 管理状态传递
+    - 仅支持单视频输入
+
+    实现方式:BaseGraphAgent (LangGraph)
+    """
+
+    def __init__(
+        self,
+        name: str = "decode_workflow",
+        description: str = "解码工作流(合并 What 解构和脚本理解)",
+        model_provider: str = "google_genai",
+        max_depth: int = 10
+    ):
+        super().__init__(
+            name=name,
+            description=description,
+            state_class=dict
+        )
+
+        self.max_depth = max_depth
+        self.model_provider = model_provider
+
+        # 初始化视频上传Function
+        self.video_upload_func = VideoUploadFunction()
+
+        # 初始化What解构相关Agent
+        self.inspiration_points_agent = InspirationPointsAgent(
+            model_provider=model_provider
+        )
+        self.purpose_point_agent = PurposePointAgent(
+            model_provider=model_provider
+        )
+        self.key_points_agent = KeyPointsAgent(
+            model_provider=model_provider
+        )
+        self.topic_selection_understanding_agent = TopicSelectionUnderstandingAgent(
+            model_provider=model_provider
+        )
+
+        # 初始化脚本理解相关Agent
+        self.section_agent = ScriptSectionDivisionAgent(
+            model_provider=model_provider
+        )
+        self.substance_agent = ScriptSubstanceExtractionAgent(
+            model_provider=model_provider
+        )
+        self.form_agent = ScriptFormExtractionAgent(
+            model_provider=model_provider
+        )
+
+        # 初始化结果汇总Function
+        self.result_aggregation_func = ResultAggregationFunction()
+
+        logger.info(f"DecodeWorkflow 初始化完成")
+
+    def _build_graph(self) -> StateGraph:
+        """构建工作流图
+
+        完整流程:
+        START → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+        段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总 → END
+        """
+        workflow = StateGraph(dict)  # 使用dict作为状态类型
+
+        # 添加所有节点
+        workflow.add_node("video_upload", self._video_upload_node)
+        # What解构节点
+        workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
+        workflow.add_node("purpose_point_extraction", self._purpose_point_node)
+        workflow.add_node("key_points_extraction", self._key_points_node)
+        workflow.add_node("topic_selection_understanding", self._topic_selection_understanding_node)
+        # 脚本理解节点
+        workflow.add_node("section_division", self._section_division_node)
+        workflow.add_node("substance_extraction", self._substance_extraction_node)
+        workflow.add_node("form_extraction", self._form_extraction_node)
+        workflow.add_node("merge_all_results", self._merge_all_results_node)
+        workflow.add_node("result_aggregation", self._result_aggregation_node)
+
+        # 定义流程的边
+        workflow.set_entry_point("video_upload")
+        # What解构流程
+        workflow.add_edge("video_upload", "inspiration_points_extraction")
+        workflow.add_edge("inspiration_points_extraction", "purpose_point_extraction")
+        workflow.add_edge("purpose_point_extraction", "key_points_extraction")
+        workflow.add_edge("key_points_extraction", "topic_selection_understanding")
+        # 脚本理解流程
+        workflow.add_edge("topic_selection_understanding", "section_division")
+        workflow.add_edge("section_division", "substance_extraction")
+        workflow.add_edge("substance_extraction", "form_extraction")
+        workflow.add_edge("form_extraction", "merge_all_results")
+        workflow.add_edge("merge_all_results", "result_aggregation")
+        workflow.add_edge("result_aggregation", END)
+
+        logger.info("工作流图构建完成 - 完整流程:视频上传 → What解构 → 脚本理解 → 结果汇总")
+
+        return workflow
+
+    def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:视频上传(第一步)- 下载视频并上传至Gemini"""
+        logger.info("=== 执行节点:视频上传 ===")
+
+        try:
+            # 初始化Function
+            if not self.video_upload_func.is_initialized:
+                self.video_upload_func.initialize()
+
+            # 执行视频上传
+            result = self.video_upload_func.execute(state)
+
+            # 更新状态
+            state.update(result)
+
+            video_uri = result.get("video_uploaded_uri")
+            if video_uri:
+                logger.info(f"视频上传完成 - URI: {video_uri}")
+            else:
+                error = result.get("video_upload_error", "未知错误")
+                logger.warning(f"视频上传失败: {error}")
+
+        except Exception as e:
+            logger.error(f"视频上传失败: {e}", exc_info=True)
+            state.update({
+                "video_uploaded_uri": None,
+                "video_upload_error": str(e)
+            })
+
+        return state
+
+    def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:灵感点提取(What解构)"""
+        logger.info("=== 执行节点:灵感点提取 ===")
+
+        try:
+            # 初始化Agent
+            if not self.inspiration_points_agent.is_initialized:
+                self.inspiration_points_agent.initialize()
+
+            # 执行Agent
+            result = self.inspiration_points_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            # 安全地获取灵感点数量:total_count 在 metadata 中
+            if isinstance(result, dict):
+                metadata = result.get("metadata", {})
+                inspiration_count = metadata.get("total_count", 0) if isinstance(metadata, dict) else 0
+                # 如果 metadata 中没有,尝试从 inspiration_points 列表长度获取
+                if inspiration_count == 0:
+                    inspiration_points = result.get("inspiration_points", [])
+                    if isinstance(inspiration_points, list):
+                        inspiration_count = len(inspiration_points)
+            else:
+                # 如果 result 不是 dict(比如是列表),尝试获取长度
+                inspiration_count = len(result) if isinstance(result, list) else 0
+            
+            logger.info(f"灵感点提取完成 - 共 {inspiration_count} 个灵感点")
+
+        except Exception as e:
+            logger.error(f"灵感点提取失败: {e}", exc_info=True)
+            state.update({
+                "inspiration_points": {
+                    "error": str(e),
+                    "points": [],
+                    "total_count": 0
+                }
+            })
+
+        return state
+
+    def _purpose_point_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:目的点提取(What解构)"""
+        logger.info("=== 执行节点:目的点提取 ===")
+
+        try:
+            # 初始化Agent
+            if not self.purpose_point_agent.is_initialized:
+                self.purpose_point_agent.initialize()
+
+            # 执行Agent
+            result = self.purpose_point_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            main_purpose = result.get("purpose_point", {}).get("main_purpose", "未知")
+            logger.info(f"目的点提取完成 - 主要目的: {main_purpose}")
+
+        except Exception as e:
+            logger.error(f"目的点提取失败: {e}", exc_info=True)
+            state.update({
+                "purpose_point": {
+                    "error": str(e),
+                    "main_purpose": "未知"
+                }
+            })
+
+        return state
+
+    def _key_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:关键点提取(What解构)"""
+        logger.info("=== 执行节点:关键点提取 ===")
+
+        try:
+            # 初始化Agent
+            if not self.key_points_agent.is_initialized:
+                self.key_points_agent.initialize()
+
+            # 执行Agent
+            result = self.key_points_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            total_key_points = result.get("key_points", {}).get("total_count", 0)
+            logger.info(f"关键点提取完成 - 共 {total_key_points} 个关键点")
+
+        except Exception as e:
+            logger.error(f"关键点提取失败: {e}", exc_info=True)
+            state.update({
+                "key_points": {
+                    "error": str(e),
+                    "creator_perspective": {"key_points": [], "summary": ""},
+                    "consumer_perspective": {"key_points": [], "summary": ""}
+                }
+            })
+
+        return state
+
+    def _topic_selection_understanding_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:选题理解(What解构)"""
+        logger.info("=== 执行节点:选题理解 ===")
+
+        try:
+            # 初始化Agent
+            if not self.topic_selection_understanding_agent.is_initialized:
+                self.topic_selection_understanding_agent.initialize()
+
+            # 执行Agent
+            result = self.topic_selection_understanding_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            logger.info(f"选题理解完成 - result: {result}")
+
+        except Exception as e:
+            logger.error(f"选题理解失败: {e}", exc_info=True)
+            state.update({
+                "topic_selection_understanding": {}
+            })
+
+        return state
+
+    def _section_division_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:脚本段落划分(脚本理解)"""
+        logger.info("=== 执行节点:脚本段落划分 ===")
+
+        try:
+            # 初始化Agent
+            if not self.section_agent.is_initialized:
+                self.section_agent.initialize()
+
+            # 执行Agent
+            result = self.section_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            sections = result.get("段落列表", [])
+            content_category = result.get("内容品类", "未知")
+            logger.info(f"脚本段落划分完成 - 内容品类: {content_category}, 段落数: {len(sections)}")
+
+        except Exception as e:
+            logger.error(f"脚本段落划分失败: {e}", exc_info=True)
+            state.update({
+                "内容品类": "未知品类",
+                "段落列表": []
+            })
+
+        return state
+
+    def _substance_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:实质元素提取(脚本理解)"""
+        logger.info("=== 执行节点:实质元素提取 ===")
+
+        try:
+            # 初始化Agent
+            if not self.substance_agent.is_initialized:
+                self.substance_agent.initialize()
+
+            # 准备状态:将段落列表包装到section_division字段中
+            sections = state.get("段落列表", [])
+            state["section_division"] = {"段落列表": sections}
+
+            # 执行Agent
+            result = self.substance_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            final_elements = result.get("substance_final_elements", [])
+            logger.info(f"实质元素提取完成 - 最终元素数: {len(final_elements)}")
+
+        except Exception as e:
+            logger.error(f"实质元素提取失败: {e}", exc_info=True)
+            state.update({
+                "concrete_elements": [],
+                "concrete_concepts": [],
+                "abstract_concepts": [],
+                "substance_elements": [],
+                "substance_analyzed_result": [],
+                "substance_scored_result": {},
+                "substance_filtered_ids": [],
+                "substance_categorized_result": {},
+                "substance_final_elements": []
+            })
+
+        return state
+
+    def _form_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:形式元素提取(脚本理解)"""
+        logger.info("=== 执行节点:形式元素提取 ===")
+
+        try:
+            # 初始化Agent
+            if not self.form_agent.is_initialized:
+                self.form_agent.initialize()
+
+            # 执行Agent(依赖实质元素)
+            result = self.form_agent.process(state)
+
+            # 更新状态
+            state.update(result)
+
+            final_elements = result.get("form_final_elements", [])
+            logger.info(f"形式元素提取完成 - 最终元素数: {len(final_elements)}")
+
+        except Exception as e:
+            logger.error(f"形式元素提取失败: {e}", exc_info=True)
+            state.update({
+                "concrete_element_forms": [],
+                "concrete_concept_forms": [],
+                "overall_forms": [],
+                "form_elements": [],
+                "form_analyzed_result": [],
+                "form_scored_result": {},
+                "form_weighted_result": {},
+                "form_filtered_ids": [],
+                "form_categorized_result": {},
+                "form_final_elements": []
+            })
+
+        return state
+
+    def _merge_all_results_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:分离实质和形式结果(脚本理解)"""
+        logger.info("=== 执行节点:分离实质和形式结果 ===")
+
+        try:
+            # 获取实质和形式的最终元素
+            substance_final_elements = state.get("substance_final_elements", [])
+            form_final_elements = state.get("form_final_elements", [])
+
+            # 分别存储实质列表和形式列表
+            state["实质列表"] = substance_final_elements
+            state["形式列表"] = form_final_elements
+
+            logger.info(f"分离完成 - 实质元素: {len(substance_final_elements)}, 形式元素: {len(form_final_elements)}")
+
+        except Exception as e:
+            logger.error(f"分离结果失败: {e}", exc_info=True)
+            state["实质列表"] = []
+            state["形式列表"] = []
+
+        return state
+
+    def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:结果汇总"""
+        logger.info("=== 执行节点:结果汇总 ===")
+
+        try:
+            # 初始化Function
+            if not self.result_aggregation_func.is_initialized:
+                self.result_aggregation_func.initialize()
+
+            # 执行Function
+            final_result = self.result_aggregation_func.execute(state)
+
+            # 更新状态
+            state["final_result"] = final_result
+
+            logger.info("结果汇总完成")
+
+        except Exception as e:
+            logger.error(f"结果汇总失败: {e}", exc_info=True)
+            state["final_result"] = {
+                "视频信息": {},
+                "三点解构": {
+                    "灵感点": [],
+                    "目的点": {},
+                    "关键点": {}
+                },
+                "选题理解": {},
+                "脚本理解": {
+                    "内容品类": "未知",
+                    "段落列表": [],
+                    "实质列表": [],
+                    "形式列表": []
+                },
+                "错误": f"汇总失败: {str(e)}"
+            }
+
+        return state
+
+    def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
+        """执行工作流(公共接口)- 视频分析版本
+
+        Args:
+            input_data: 输入数据,包含 video 字段(视频URL)
+                格式参考:examples/56898272/视频详情.json
+                {
+                    "video": "http://...",
+                    "title": "...",
+                    ...
+                }
+
+        Returns:
+            最终解码结果
+        """
+        logger.info("=== 开始执行解码工作流(视频分析) ===")
+
+        # 确保工作流已初始化
+        if not self.is_initialized:
+            self.initialize()
+
+        # 构建 text(兼容两种输入方式)
+        if "text" in input_data and isinstance(input_data.get("text"), dict):
+            text = input_data.get("text", {})
+        else:
+            text = {
+                "title": input_data.get("title", ""),
+                "body": input_data.get("body_text", ""),
+            }
+
+        # 初始化状态(包含视频信息,供视频上传和后续Agent使用)
+        initial_state = {
+            "video": input_data.get("video", ""),
+            "channel_content_id": input_data.get("channel_content_id", ""),
+            "text": text,
+            "current_depth": 0,
+            "max_depth": self.max_depth,
+        }
+
+        # 执行工作流
+        result = self.compiled_graph.invoke(initial_state)
+
+        logger.info("=== 解码工作流执行完成(视频分析) ===")
+
+        return result.get("final_result", {})
+