""" What Deconstruction Workflow. What解构主工作流:编排三点解构流程(灵感点、目的点、关键点)的执行顺序和流程逻辑。 流程(视频分析):视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 结果汇总 """ from typing import Dict, Any from langgraph.graph import StateGraph, END from src.components.agents.base import BaseGraphAgent from src.states.what_deconstruction_state import WhatDeconstructionState from src.components.agents.topic_selection_understanding_agent import TopicSelectionUnderstandingAgent from src.components.agents.search_keyword_agent import SearchKeywordAgent from src.components.functions.result_aggregation_function import ResultAggregationFunction from src.components.functions.video_upload_function import VideoUploadFunction # 新增三点解构Agent from src.components.agents.inspiration_points_agent import InspirationPointsAgent from src.components.agents.purpose_point_agent import PurposePointAgent from src.components.agents.key_points_agent import KeyPointsAgent # 新增选题结构Agent V2 from src.components.agents.topic_agent_v2 import TopicAgentV2 # 新增结构化内容库Agent from src.components.agents.structure_agent import StructureAgent from src.utils.logger import get_logger logger = get_logger(__name__) class WhatDeconstructionWorkflow(BaseGraphAgent): """What解构主工作流(视频分析版本) 功能: - 编排整个What解构流程(针对视频输入) - 支持条件分支: * 视频上传 → topic_selection_v2(选题结构分析V2,直接基于视频)→ 结束 * 视频上传 → structure_agent(结构化内容库解构,直接基于视频)→ 结束 * 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → 结束 - 通过 state 中的 use_topic_agent_v2 或 use_structure_agent 标志手动控制分支选择(二选一) - 管理状态传递 - 仅支持单视频输入 实现方式:BaseGraphAgent (LangGraph) """ def __init__( self, name: str = "what_deconstruction_workflow", description: str = "What解构主工作流(视频分析)", model_provider: str = "google_genai", max_depth: int = 10 ): super().__init__( name=name, description=description, state_class=WhatDeconstructionState ) self.max_depth = max_depth # 初始化视频上传Function self.video_upload_func = VideoUploadFunction() # 初始化选题理解Agent self.topic_selection_understanding_agent = TopicSelectionUnderstandingAgent( model_provider=model_provider ) # 初始化搜索关键词Agent self.search_keyword_agent = SearchKeywordAgent( model_provider=model_provider ) # 初始化结果汇总Function self.result_aggregation_func = ResultAggregationFunction() # 初始化新的三点解构Agent self.inspiration_points_agent = InspirationPointsAgent( model_provider=model_provider ) self.purpose_point_agent = PurposePointAgent( model_provider=model_provider ) self.key_points_agent = KeyPointsAgent( model_provider=model_provider ) # 初始化选题结构Agent V2 self.topic_agent_v2 = TopicAgentV2( model_provider=model_provider ) # 初始化结构化内容库Agent self.structure_agent = StructureAgent( model_provider=model_provider ) logger.info(f"WhatDeconstructionWorkflow(视频分析)初始化完成") def _build_graph(self) -> StateGraph: """构建工作流图 新流程(视频分析): START → 视频上传 → [条件分支] - 如果 use_topic_agent_v2=True: → topic_selection_v2 → END - 如果 use_structure_agent=True: → structure_agent → END - 否则: → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → END """ workflow = StateGraph(dict) # 使用dict作为状态类型 # 添加节点 workflow.add_node("video_upload", self._video_upload_node) workflow.add_node("topic_selection_v2", self._topic_selection_v2_node) workflow.add_node("structure_agent", self._structure_agent_node) workflow.add_node("inspiration_points_extraction", self._inspiration_points_node) workflow.add_node("purpose_point_extraction", self._purpose_point_node) workflow.add_node("key_points_extraction", self._key_points_node) workflow.add_node("topic_selection_understanding", self._topic_selection_understanding_node) workflow.add_node("search_keyword_extraction", self._search_keyword_node) workflow.add_node("result_aggregation", self._result_aggregation_node) # 定义流程的边:视频上传 → 条件分支 → topic_selection_v2/structure_agent(结束) 或 原流程 workflow.set_entry_point("video_upload") # 条件分支:根据 use_topic_agent_v2 或 use_structure_agent 标志决定走哪个分支 workflow.add_conditional_edges( "video_upload", self._route_after_upload, { "topic_selection_v2": "topic_selection_v2", "structure_agent": "structure_agent", "normal_flow": "inspiration_points_extraction" } ) # topic_selection_v2 分支直接结束 workflow.add_edge("topic_selection_v2", END) # structure_agent 分支直接结束 workflow.add_edge("structure_agent", END) # 原流程继续 workflow.add_edge("inspiration_points_extraction", "purpose_point_extraction") workflow.add_edge("purpose_point_extraction", "key_points_extraction") workflow.add_edge("key_points_extraction", "topic_selection_understanding") workflow.add_edge("topic_selection_understanding", "search_keyword_extraction") workflow.add_edge("search_keyword_extraction", "result_aggregation") workflow.add_edge("result_aggregation", END) logger.info("工作流图构建完成 - 视频分析流程:视频上传 → [条件分支: topic_selection_v2 / structure_agent / 三点解构流程]") return workflow def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:视频上传(第一步)- 下载视频并上传至Gemini""" logger.info("=== 执行节点:视频上传 ===") try: # 初始化Function if not self.video_upload_func.is_initialized: self.video_upload_func.initialize() # 执行视频上传 result = self.video_upload_func.execute(state) # 更新状态 state.update(result) video_uri = result.get("video_uploaded_uri") if video_uri: logger.info(f"视频上传完成 - URI: {video_uri}") else: error = result.get("video_upload_error", "未知错误") logger.warning(f"视频上传失败: {error}") except Exception as e: logger.error(f"视频上传失败: {e}", exc_info=True) state.update({ "video_uploaded_uri": None, "video_upload_error": str(e) }) return state def _route_after_upload(self, state: Dict[str, Any]) -> str: """条件分支函数:路由到不同的处理分支 通过 state 中的标志来控制: - use_topic_agent_v2=True: 走 topic_selection_v2 分支,直接结束 - use_structure_agent=True: 走 structure_agent 分支,直接结束 - 否则: 走原来的正常流程 Returns: "topic_selection_v2" / "structure_agent" / "normal_flow" """ use_v2 = state.get("use_topic_agent_v2", False) use_structure = state.get("use_structure_agent", False) if use_v2: logger.info("检测到 use_topic_agent_v2=True,将使用 TopicAgentV2 分支") return "topic_selection_v2" elif use_structure: logger.info("检测到 use_structure_agent=True,将使用 StructureAgent 分支") return "structure_agent" else: logger.info("使用正常流程(三点解构 → 选题理解 → 搜索关键词)") return "normal_flow" def _topic_selection_v2_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:选题结构分析 V2(直接基于视频)""" logger.info("=== 执行节点:选题结构分析 V2 ===") try: # 初始化Agent if not self.topic_agent_v2.is_initialized: self.topic_agent_v2.initialize() # 执行Agent result = self.topic_agent_v2.process(state) # 更新状态 state.update(result) topic_selection = result.get("topic_selection_v2", {}) if "error" not in topic_selection: logger.info(f"选题结构分析 V2 完成 - topic_selection_v2: {topic_selection}") else: logger.warning(f"选题结构分析 V2 执行出错: {topic_selection.get('error')}") except Exception as e: logger.error(f"选题结构分析 V2 失败: {e}", exc_info=True) state.update({ "video_script": "", "topic_selection_v2": { "error": str(e), } }) return state def _structure_agent_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:结构化内容库解构(直接基于视频)""" logger.info("=== 执行节点:结构化内容库解构 ===") try: # 初始化Agent if not self.structure_agent.is_initialized: self.structure_agent.initialize() # 执行Agent result = self.structure_agent.process(state) # 更新状态 state.update(result) structure_data = result.get("structure_data", {}) if "错误" not in structure_data: logger.info(f"结构化内容库解构完成 - structure_data: {structure_data}") else: logger.warning(f"结构化内容库解构执行出错: {structure_data.get('错误')}") except Exception as e: logger.error(f"结构化内容库解构失败: {e}", exc_info=True) state.update({ "structure_data": { "错误": str(e), } }) return state def _topic_selection_understanding_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:选题理解""" logger.info("=== 执行节点:选题理解 ===") try: # 初始化Agent if not self.topic_selection_understanding_agent.is_initialized: self.topic_selection_understanding_agent.initialize() # 执行Agent result = self.topic_selection_understanding_agent.process(state) # 更新状态 state.update(result) logger.info(f"选题理解完成 - result: {result}") except Exception as e: logger.error(f"选题理解失败: {e}", exc_info=True) state.update({ "topic_selection_understanding": {} }) return state def _search_keyword_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:搜索关键词提取""" logger.info("=== 执行节点:搜索关键词提取 ===") try: # 初始化Agent if not self.search_keyword_agent.is_initialized: self.search_keyword_agent.initialize() # 执行Agent result = self.search_keyword_agent.process(state) # 更新状态 state.update(result) search_keywords_count = result.get("search_keywords", {}).get("总数", 0) logger.info(f"搜索关键词提取完成 - 共 {search_keywords_count} 个搜索词") except Exception as e: logger.error(f"搜索关键词提取失败: {e}", exc_info=True) state.update({ "search_keywords": { "搜索词列表": [], "总数": 0, "错误": str(e) } }) return state def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:灵感点提取(新三点解构)""" logger.info("=== 执行节点:灵感点提取 ===") try: # 初始化Agent if not self.inspiration_points_agent.is_initialized: self.inspiration_points_agent.initialize() # 执行Agent result = self.inspiration_points_agent.process(state) # 更新状态 state.update(result) # 安全地获取灵感点数量:total_count 在 metadata 中 if isinstance(result, dict): metadata = result.get("metadata", {}) inspiration_count = metadata.get("total_count", 0) if isinstance(metadata, dict) else 0 # 如果 metadata 中没有,尝试从 inspiration_points 列表长度获取 if inspiration_count == 0: inspiration_points = result.get("inspiration_points", []) if isinstance(inspiration_points, list): inspiration_count = len(inspiration_points) else: # 如果 result 不是 dict(比如是列表),尝试获取长度 inspiration_count = len(result) if isinstance(result, list) else 0 logger.info(f"灵感点提取完成 - 共 {inspiration_count} 个灵感点") except Exception as e: logger.error(f"灵感点提取失败: {e}", exc_info=True) state.update({ "inspiration_points": { "error": str(e), "points": [], "total_count": 0 } }) return state def _purpose_point_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:目的点提取(新三点解构)""" logger.info("=== 执行节点:目的点提取 ===") try: # 初始化Agent if not self.purpose_point_agent.is_initialized: self.purpose_point_agent.initialize() # 执行Agent result = self.purpose_point_agent.process(state) # 更新状态 state.update(result) main_purpose = result.get("purpose_point", {}).get("main_purpose", "未知") logger.info(f"目的点提取完成 - 主要目的: {main_purpose}") except Exception as e: logger.error(f"目的点提取失败: {e}", exc_info=True) state.update({ "purpose_point": { "error": str(e), "main_purpose": "未知" } }) return state def _key_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:关键点提取(新三点解构)""" logger.info("=== 执行节点:关键点提取 ===") try: # 初始化Agent if not self.key_points_agent.is_initialized: self.key_points_agent.initialize() # 执行Agent result = self.key_points_agent.process(state) # 更新状态 state.update(result) total_key_points = result.get("key_points", {}).get("total_count", 0) logger.info(f"关键点提取完成 - 共 {total_key_points} 个关键点") except Exception as e: logger.error(f"关键点提取失败: {e}", exc_info=True) state.update({ "key_points": { "error": str(e), "creator_perspective": {"key_points": [], "summary": ""}, "consumer_perspective": {"key_points": [], "summary": ""} } }) return state def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]: """节点:结果汇总""" logger.info("=== 执行节点:结果汇总 ===") try: # 初始化Function if not self.result_aggregation_func.is_initialized: self.result_aggregation_func.initialize() # 执行Function final_result = self.result_aggregation_func.execute(state) # 更新状态 state["final_result"] = final_result logger.info("结果汇总完成") except Exception as e: logger.error(f"结果汇总失败: {e}", exc_info=True) state["final_result"] = { "帖子总结": {"错误": f"汇总失败: {str(e)}"}, "帖子包含元素": [] } return state def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]: """执行工作流(公共接口)- 视频分析版本 Args: input_data: 输入数据,包含 video 字段(视频URL) 格式参考:examples/56898272/视频详情.json { "video": "http://...", "title": "...", "use_topic_agent_v2": False, # 可选,控制是否使用 topic_agent_v2 分支 "use_structure_agent": True, # 可选,控制是否使用 structure_agent 分支(与 use_topic_agent_v2 二选一),默认 True ... } Returns: 最终解构结果 """ logger.info("=== 开始执行 What 解构工作流(视频分析) ===") # 确保工作流已初始化 if not self.is_initialized: self.initialize() # 初始化状态(仅视频输入) initial_state = { "video": input_data.get("video", ""), "channel_content_id": input_data.get("channel_content_id", ""), "text": { "title": input_data.get("title", ""), "body": input_data.get("body_text", ""), "hashtags": [] }, "current_depth": 0, "max_depth": self.max_depth, "use_topic_agent_v2": input_data.get("use_topic_agent_v2", False), "use_structure_agent": input_data.get("use_structure_agent", True) } # 执行工作流 result = self.compiled_graph.invoke(initial_state) logger.info("=== What 解构工作流执行完成(视频分析) ===") # 如果走的是 topic_agent_v2 分支,返回 topic_selection_v2 结果 if result.get("use_topic_agent_v2") and "topic_selection_v2" in result: return result.get("topic_selection_v2", {}) # 如果走的是 structure_agent 分支,返回 structure_data 结果 if result.get("use_structure_agent") and "structure_data" in result: return result.get("structure_data", {}) return result.get("final_result", {})