""" Script Workflow V2. 兆恒的脚本解构工作流V2:基于视频直接进行L3单元拆分和整体理解 视频脚本解构工作流(简化版): - 步骤:视频上传 → L3 单元拆分 → L1/L2 整体理解 - 只依赖视频和 L3 解构结果,不再引入多余的输入字段。 """ from typing import Dict, Any from langgraph.graph import StateGraph, END from src.components.agents.base import BaseGraphAgent from src.components.functions.video_upload_function import VideoUploadFunction from src.components.agents.structure_agent import StructureAgent from src.components.agents.content_unit_split_agent import ContentUnitSplitAgent from src.components.agents.content_unit_understand import ContentUnitUnderstandAgent from src.components.agents.script_keyword_agent import ScriptKeywordAgent from src.utils.logger import get_logger logger = get_logger(__name__) class ScriptWorkflowV2(BaseGraphAgent): """脚本理解工作流 V2(仅视频 + L3 解构 + 整体理解) 流程: START → 视频上传 → L3 单元拆分 → 整体结构理解 → 结果汇总 → END """ def __init__( self, name: str = "script_workflow_v2", description: str = "脚本理解工作流 V2(视频 → L3 单元 → L1/L2 整体解构)", model_provider: str = "google_genai", ): super().__init__( name=name, description=description, state_class=dict, # 直接使用 dict 作为状态类型 ) self.model_provider = model_provider # 初始化视频上传 Function self.video_upload_func = VideoUploadFunction() # 初始化结构化内容库 Agent self.structure_agent = StructureAgent( model_provider=model_provider ) # 初始化 L3 单元拆分 Agent self.content_unit_split_agent = ContentUnitSplitAgent( model_provider=model_provider ) # 初始化 L1/L2 整体理解 Agent self.content_unit_understand_agent = ContentUnitUnderstandAgent( model_provider=model_provider ) # 初始化金句提取 Agent self.script_keyword_agent = ScriptKeywordAgent( model_provider=model_provider ) logger.info(f"ScriptWorkflowV2 初始化完成,model_provider: {model_provider}") def _build_graph(self) -> StateGraph: """构建工作流图(简化版)""" workflow = StateGraph(dict) # 添加节点 workflow.add_node("video_upload", self._video_upload_node) workflow.add_node("structure_analysis", self._structure_analysis_node) workflow.add_node("content_unit_split", self._content_unit_split_node) workflow.add_node("content_unit_understand", self._content_unit_understand_node) workflow.add_node("keyword_extraction", self._keyword_extraction_node) workflow.add_node("result_aggregation", self._result_aggregation_node) # 定义边 workflow.set_entry_point("video_upload") workflow.add_edge("video_upload", "structure_analysis") workflow.add_edge("structure_analysis", "content_unit_split") workflow.add_edge("content_unit_split", "content_unit_understand") workflow.add_edge("content_unit_understand", "keyword_extraction") workflow.add_edge("keyword_extraction", "result_aggregation") workflow.add_edge("result_aggregation", END) logger.info("ScriptWorkflowV2 图构建完成 - 流程:视频上传 → 结构化分析 → L3 单元拆分 → 整体理解 → 金句提取 → 结果汇总") return workflow def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:视频上传 - 下载视频并上传至 Gemini""" logger.info("=== ScriptWorkflowV2:执行节点 video_upload ===") try: if not self.video_upload_func.is_initialized: self.video_upload_func.initialize() result = self.video_upload_func.execute(state) state.update(result) video_uri = result.get("video_uploaded_uri") if video_uri: logger.info(f"视频上传完成 - URI: {video_uri}") else: error = result.get("video_upload_error", "未知错误") logger.warning(f"视频上传失败: {error}") except Exception as e: logger.error(f"视频上传失败: {e}", exc_info=True) state.update( { "video_uploaded_uri": None, "video_upload_error": str(e), } ) return state def _structure_analysis_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:结构化内容库分析 - 基于视频内容进行结构化解构""" logger.info("=== ScriptWorkflowV2:执行节点 structure_analysis ===") try: if not self.structure_agent.is_initialized: self.structure_agent.initialize() result = self.structure_agent.process(state) # 将结果存入 state 的 topic 字段 structure_data = result.get("structure_data", {}) state["topic"] = structure_data logger.info("结构化内容库分析完成") if isinstance(structure_data, dict): topic_info = structure_data.get("选题信息表", {}) macro_topic = topic_info.get("宏观母题", "") if isinstance(topic_info, dict) else "" if macro_topic: logger.info(f"宏观母题: {macro_topic}") except Exception as e: logger.error(f"结构化内容库分析失败: {e}", exc_info=True) state["topic"] = { "错误": str(e), } return state def _content_unit_split_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:L3 内容单元拆分""" logger.info("=== ScriptWorkflowV2:执行节点 content_unit_split ===") try: if not self.content_unit_split_agent.is_initialized: self.content_unit_split_agent.initialize() result = self.content_unit_split_agent.process(state) state.update(result) analysis = result.get("content_unit_analysis", {}) logger.info( f"L3 单元拆分完成,单元数量: {len(analysis.get('单元列表', [])) if isinstance(analysis, dict) else 0}" ) except Exception as e: logger.error(f"L3 内容单元拆分失败: {e}", exc_info=True) state["content_unit_analysis"] = { "error": str(e), "单元列表": [], } return state def _content_unit_understand_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:整体结构理解(L1/L2 基于视频 + L3 解构)""" logger.info("=== ScriptWorkflowV2:执行节点 content_unit_understand ===") try: if not self.content_unit_understand_agent.is_initialized: self.content_unit_understand_agent.initialize() result = self.content_unit_understand_agent.process(state) state.update(result) understanding = result.get("content_unit_understanding", {}) logger.info( f"整体结构理解完成,段落数量: {len(understanding.get('段落解构', [])) if isinstance(understanding, dict) else 0}" ) except Exception as e: logger.error(f"整体结构理解失败: {e}", exc_info=True) state["content_unit_understanding"] = { "error": str(e), "整体解构": {}, "段落解构": [], "单元解构": {}, } return state def _keyword_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:金句提取 - 基于视频内容提取钩子和金句""" logger.info("=== ScriptWorkflowV2:执行节点 keyword_extraction ===") try: if not self.script_keyword_agent.is_initialized: self.script_keyword_agent.initialize() result = self.script_keyword_agent.process(state) state.update(result) script_keywords = result.get("script_keywords", {}) logger.info("金句提取完成") if isinstance(script_keywords, dict): hooks_count = len(script_keywords.get("hooks", [])) golden_sentences_count = len(script_keywords.get("golden_sentences", [])) logger.info(f"提取钩子数量: {hooks_count}, 金句数量: {golden_sentences_count}") except Exception as e: logger.error(f"金句提取失败: {e}", exc_info=True) state["script_keywords"] = { "error": str(e), } return state def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:结果汇总 - 组装最终结果""" logger.info("=== ScriptWorkflowV2:执行节点 result_aggregation ===") try: # 从 state 中提取结果 topic = state.get("topic", {}) content_unit_analysis = state.get("content_unit_analysis", {}) content_unit_understanding = state.get("content_unit_understanding", {}) script_keywords = state.get("script_keywords", {}) # 组装最终结果 final_result = { "结构化内容库": topic, "L3单元解构": content_unit_analysis, "整体结构理解": content_unit_understanding, "金句提取": script_keywords, } # 更新状态 state["final_result"] = final_result logger.info("结果汇总完成") except Exception as e: logger.error(f"结果汇总失败: {e}", exc_info=True) state["final_result"] = { "error": f"汇总失败: {str(e)}", "结构化内容库": {}, "L3单元解构": {}, "整体结构理解": {}, "金句提取": {}, } return state def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """执行工作流(公共接口)- 简化版 只需要传入: - video: 原始视频地址或本地路径(由 VideoUploadFunction 处理) - video_id: 可选,外部业务 ID(兼容旧字段名 channel_content_id) """ logger.info("=== 开始执行 ScriptWorkflowV2(视频 → L3 单元 → L1/L2 整体解构) ===") if not self.is_initialized: self.initialize() # 初始化最小状态(仅保留必须字段) initial_state = { "video": input_data.get("video", ""), "video_id": input_data.get("video_id", "") or input_data.get("channel_content_id", ""), # 兼容旧字段名 } result_state = self.compiled_graph.invoke(initial_state) logger.info("=== ScriptWorkflowV2 执行完成 ===") return result_state.get("final_result", {})