| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- """
- Script Workflow V2.
- 兆恒的脚本解构工作流V2:基于视频直接进行L3单元拆分和整体理解
- 视频脚本解构工作流(简化版):
- - 步骤:视频上传 → L3 单元拆分 → L1/L2 整体理解
- - 只依赖视频和 L3 解构结果,不再引入多余的输入字段。
- """
- from typing import Dict, Any
- from langgraph.graph import StateGraph, END
- from src.components.agents.base import BaseGraphAgent
- from src.components.functions.video_upload_function import VideoUploadFunction
- from src.components.agents.structure_agent import StructureAgent
- from src.components.agents.content_unit_split_agent import ContentUnitSplitAgent
- from src.components.agents.content_unit_understand import ContentUnitUnderstandAgent
- from src.components.agents.script_keyword_agent import ScriptKeywordAgent
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- class ScriptWorkflowV2(BaseGraphAgent):
- """脚本理解工作流 V2(仅视频 + L3 解构 + 整体理解)
- 流程:
- START → 视频上传 → L3 单元拆分 → 整体结构理解 → 结果汇总 → END
- """
- def __init__(
- self,
- name: str = "script_workflow_v2",
- description: str = "脚本理解工作流 V2(视频 → L3 单元 → L1/L2 整体解构)",
- model_provider: str = "google_genai",
- ):
- super().__init__(
- name=name,
- description=description,
- state_class=dict, # 直接使用 dict 作为状态类型
- )
- self.model_provider = model_provider
- # 初始化视频上传 Function
- self.video_upload_func = VideoUploadFunction()
- # 初始化结构化内容库 Agent
- self.structure_agent = StructureAgent(
- model_provider=model_provider
- )
- # 初始化 L3 单元拆分 Agent
- self.content_unit_split_agent = ContentUnitSplitAgent(
- model_provider=model_provider
- )
- # 初始化 L1/L2 整体理解 Agent
- self.content_unit_understand_agent = ContentUnitUnderstandAgent(
- model_provider=model_provider
- )
- # 初始化金句提取 Agent
- self.script_keyword_agent = ScriptKeywordAgent(
- model_provider=model_provider
- )
- logger.info(f"ScriptWorkflowV2 初始化完成,model_provider: {model_provider}")
- def _build_graph(self) -> StateGraph:
- """构建工作流图(简化版)"""
- workflow = StateGraph(dict)
- # 添加节点
- workflow.add_node("video_upload", self._video_upload_node)
- workflow.add_node("structure_analysis", self._structure_analysis_node)
- workflow.add_node("content_unit_split", self._content_unit_split_node)
- workflow.add_node("content_unit_understand", self._content_unit_understand_node)
- workflow.add_node("keyword_extraction", self._keyword_extraction_node)
- workflow.add_node("result_aggregation", self._result_aggregation_node)
- # 定义边
- workflow.set_entry_point("video_upload")
- workflow.add_edge("video_upload", "structure_analysis")
- workflow.add_edge("structure_analysis", "content_unit_split")
- workflow.add_edge("content_unit_split", "content_unit_understand")
- workflow.add_edge("content_unit_understand", "keyword_extraction")
- workflow.add_edge("keyword_extraction", "result_aggregation")
- workflow.add_edge("result_aggregation", END)
- logger.info("ScriptWorkflowV2 图构建完成 - 流程:视频上传 → 结构化分析 → L3 单元拆分 → 整体理解 → 金句提取 → 结果汇总")
- return workflow
- def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:视频上传 - 下载视频并上传至 Gemini"""
- logger.info("=== ScriptWorkflowV2:执行节点 video_upload ===")
- try:
- if not self.video_upload_func.is_initialized:
- self.video_upload_func.initialize()
- result = self.video_upload_func.execute(state)
- state.update(result)
- video_uri = result.get("video_uploaded_uri")
- if video_uri:
- logger.info(f"视频上传完成 - URI: {video_uri}")
- else:
- error = result.get("video_upload_error", "未知错误")
- logger.warning(f"视频上传失败: {error}")
- except Exception as e:
- logger.error(f"视频上传失败: {e}", exc_info=True)
- state.update(
- {
- "video_uploaded_uri": None,
- "video_upload_error": str(e),
- }
- )
- return state
- def _structure_analysis_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:结构化内容库分析 - 基于视频内容进行结构化解构"""
- logger.info("=== ScriptWorkflowV2:执行节点 structure_analysis ===")
- try:
- if not self.structure_agent.is_initialized:
- self.structure_agent.initialize()
- result = self.structure_agent.process(state)
-
- # 将结果存入 state 的 topic 字段
- structure_data = result.get("structure_data", {})
- state["topic"] = structure_data
- logger.info("结构化内容库分析完成")
- if isinstance(structure_data, dict):
- topic_info = structure_data.get("选题信息表", {})
- macro_topic = topic_info.get("宏观母题", "") if isinstance(topic_info, dict) else ""
- if macro_topic:
- logger.info(f"宏观母题: {macro_topic}")
- except Exception as e:
- logger.error(f"结构化内容库分析失败: {e}", exc_info=True)
- state["topic"] = {
- "错误": str(e),
- }
- return state
- def _content_unit_split_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:L3 内容单元拆分"""
- logger.info("=== ScriptWorkflowV2:执行节点 content_unit_split ===")
- try:
- if not self.content_unit_split_agent.is_initialized:
- self.content_unit_split_agent.initialize()
- result = self.content_unit_split_agent.process(state)
- state.update(result)
- analysis = result.get("content_unit_analysis", {})
- logger.info(
- f"L3 单元拆分完成,单元数量: {len(analysis.get('单元列表', [])) if isinstance(analysis, dict) else 0}"
- )
- except Exception as e:
- logger.error(f"L3 内容单元拆分失败: {e}", exc_info=True)
- state["content_unit_analysis"] = {
- "error": str(e),
- "单元列表": [],
- }
- return state
- def _content_unit_understand_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:整体结构理解(L1/L2 基于视频 + L3 解构)"""
- logger.info("=== ScriptWorkflowV2:执行节点 content_unit_understand ===")
- try:
- if not self.content_unit_understand_agent.is_initialized:
- self.content_unit_understand_agent.initialize()
- result = self.content_unit_understand_agent.process(state)
- state.update(result)
- understanding = result.get("content_unit_understanding", {})
- logger.info(
- f"整体结构理解完成,段落数量: {len(understanding.get('段落解构', [])) if isinstance(understanding, dict) else 0}"
- )
- except Exception as e:
- logger.error(f"整体结构理解失败: {e}", exc_info=True)
- state["content_unit_understanding"] = {
- "error": str(e),
- "整体解构": {},
- "段落解构": [],
- "单元解构": {},
- }
- return state
- def _keyword_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:金句提取 - 基于视频内容提取钩子和金句"""
- logger.info("=== ScriptWorkflowV2:执行节点 keyword_extraction ===")
- try:
- if not self.script_keyword_agent.is_initialized:
- self.script_keyword_agent.initialize()
- result = self.script_keyword_agent.process(state)
- state.update(result)
- script_keywords = result.get("script_keywords", {})
- logger.info("金句提取完成")
- if isinstance(script_keywords, dict):
- hooks_count = len(script_keywords.get("hooks", []))
- golden_sentences_count = len(script_keywords.get("golden_sentences", []))
- logger.info(f"提取钩子数量: {hooks_count}, 金句数量: {golden_sentences_count}")
- except Exception as e:
- logger.error(f"金句提取失败: {e}", exc_info=True)
- state["script_keywords"] = {
- "error": str(e),
- }
- return state
- def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:结果汇总 - 组装最终结果"""
- logger.info("=== ScriptWorkflowV2:执行节点 result_aggregation ===")
- try:
- # 从 state 中提取结果
- topic = state.get("topic", {})
- content_unit_analysis = state.get("content_unit_analysis", {})
- content_unit_understanding = state.get("content_unit_understanding", {})
- script_keywords = state.get("script_keywords", {})
- # 组装最终结果
- final_result = {
- "结构化内容库": topic,
- "L3单元解构": content_unit_analysis,
- "整体结构理解": content_unit_understanding,
- "金句提取": script_keywords,
- }
- # 更新状态
- state["final_result"] = final_result
- logger.info("结果汇总完成")
- except Exception as e:
- logger.error(f"结果汇总失败: {e}", exc_info=True)
- state["final_result"] = {
- "error": f"汇总失败: {str(e)}",
- "结构化内容库": {},
- "L3单元解构": {},
- "整体结构理解": {},
- "金句提取": {},
- }
- return state
- def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
- """执行工作流(公共接口)- 简化版
- 只需要传入:
- - video: 原始视频地址或本地路径(由 VideoUploadFunction 处理)
- - video_id: 可选,外部业务 ID(兼容旧字段名 channel_content_id)
- """
- logger.info("=== 开始执行 ScriptWorkflowV2(视频 → L3 单元 → L1/L2 整体解构) ===")
- if not self.is_initialized:
- self.initialize()
- # 初始化最小状态(仅保留必须字段)
- initial_state = {
- "video": input_data.get("video", ""),
- "video_id": input_data.get("video_id", "") or input_data.get("channel_content_id", ""), # 兼容旧字段名
- }
- result_state = self.compiled_graph.invoke(initial_state)
- logger.info("=== ScriptWorkflowV2 执行完成 ===")
- return result_state.get("final_result", {})
|