| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- """
- What Deconstruction Workflow.
- What解构主工作流:编排三点解构流程(灵感点、目的点、关键点)的执行顺序和流程逻辑。
- 流程(视频分析):视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 结果汇总
- """
- from typing import Dict, Any
- from langgraph.graph import StateGraph, END
- from src.components.agents.base import BaseGraphAgent
- from src.states.what_deconstruction_state import WhatDeconstructionState
- from src.components.agents.topic_selection_understanding_agent import TopicSelectionUnderstandingAgent
- from src.components.agents.search_keyword_agent import SearchKeywordAgent
- from src.components.functions.result_aggregation_function import ResultAggregationFunction
- from src.components.functions.video_upload_function import VideoUploadFunction
- # 新增三点解构Agent
- from src.components.agents.inspiration_points_agent import InspirationPointsAgent
- from src.components.agents.purpose_point_agent import PurposePointAgent
- from src.components.agents.key_points_agent import KeyPointsAgent
- # 新增选题结构Agent V2
- from src.components.agents.topic_agent_v2 import TopicAgentV2
- # 新增结构化内容库Agent
- from src.components.agents.structure_agent import StructureAgent
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- class WhatDeconstructionWorkflow(BaseGraphAgent):
- """What解构主工作流(视频分析版本)
- 功能:
- - 编排整个What解构流程(针对视频输入)
- - 支持条件分支:
- * 视频上传 → topic_selection_v2(选题结构分析V2,直接基于视频)→ 结束
- * 视频上传 → structure_agent(结构化内容库解构,直接基于视频)→ 结束
- * 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → 结束
- - 通过 state 中的 use_topic_agent_v2 或 use_structure_agent 标志手动控制分支选择(二选一)
- - 管理状态传递
- - 仅支持单视频输入
- 实现方式:BaseGraphAgent (LangGraph)
- """
- def __init__(
- self,
- name: str = "what_deconstruction_workflow",
- description: str = "What解构主工作流(视频分析)",
- model_provider: str = "google_genai",
- max_depth: int = 10
- ):
- super().__init__(
- name=name,
- description=description,
- state_class=WhatDeconstructionState
- )
- self.max_depth = max_depth
- # 初始化视频上传Function
- self.video_upload_func = VideoUploadFunction()
- # 初始化选题理解Agent
- self.topic_selection_understanding_agent = TopicSelectionUnderstandingAgent(
- model_provider=model_provider
- )
-
- # 初始化搜索关键词Agent
- self.search_keyword_agent = SearchKeywordAgent(
- model_provider=model_provider
- )
-
- # 初始化结果汇总Function
- self.result_aggregation_func = ResultAggregationFunction()
- # 初始化新的三点解构Agent
- self.inspiration_points_agent = InspirationPointsAgent(
- model_provider=model_provider
- )
- self.purpose_point_agent = PurposePointAgent(
- model_provider=model_provider
- )
- self.key_points_agent = KeyPointsAgent(
- model_provider=model_provider
- )
- # 初始化选题结构Agent V2
- self.topic_agent_v2 = TopicAgentV2(
- model_provider=model_provider
- )
- # 初始化结构化内容库Agent
- self.structure_agent = StructureAgent(
- model_provider=model_provider
- )
- logger.info(f"WhatDeconstructionWorkflow(视频分析)初始化完成")
- def _build_graph(self) -> StateGraph:
- """构建工作流图
- 新流程(视频分析):
- START → 视频上传 → [条件分支]
- - 如果 use_topic_agent_v2=True: → topic_selection_v2 → END
- - 如果 use_structure_agent=True: → structure_agent → END
- - 否则: → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → END
- """
- workflow = StateGraph(dict) # 使用dict作为状态类型
- # 添加节点
- workflow.add_node("video_upload", self._video_upload_node)
- workflow.add_node("topic_selection_v2", self._topic_selection_v2_node)
- workflow.add_node("structure_agent", self._structure_agent_node)
- workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
- workflow.add_node("purpose_point_extraction", self._purpose_point_node)
- workflow.add_node("key_points_extraction", self._key_points_node)
- workflow.add_node("topic_selection_understanding", self._topic_selection_understanding_node)
- workflow.add_node("search_keyword_extraction", self._search_keyword_node)
- workflow.add_node("result_aggregation", self._result_aggregation_node)
- # 定义流程的边:视频上传 → 条件分支 → topic_selection_v2/structure_agent(结束) 或 原流程
- workflow.set_entry_point("video_upload")
- # 条件分支:根据 use_topic_agent_v2 或 use_structure_agent 标志决定走哪个分支
- workflow.add_conditional_edges(
- "video_upload",
- self._route_after_upload,
- {
- "topic_selection_v2": "topic_selection_v2",
- "structure_agent": "structure_agent",
- "normal_flow": "inspiration_points_extraction"
- }
- )
- # topic_selection_v2 分支直接结束
- workflow.add_edge("topic_selection_v2", END)
- # structure_agent 分支直接结束
- workflow.add_edge("structure_agent", END)
- # 原流程继续
- workflow.add_edge("inspiration_points_extraction", "purpose_point_extraction")
- workflow.add_edge("purpose_point_extraction", "key_points_extraction")
- workflow.add_edge("key_points_extraction", "topic_selection_understanding")
- workflow.add_edge("topic_selection_understanding", "search_keyword_extraction")
- workflow.add_edge("search_keyword_extraction", "result_aggregation")
- workflow.add_edge("result_aggregation", END)
- logger.info("工作流图构建完成 - 视频分析流程:视频上传 → [条件分支: topic_selection_v2 / structure_agent / 三点解构流程]")
- return workflow
- def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:视频上传(第一步)- 下载视频并上传至Gemini"""
- logger.info("=== 执行节点:视频上传 ===")
- try:
- # 初始化Function
- if not self.video_upload_func.is_initialized:
- self.video_upload_func.initialize()
- # 执行视频上传
- result = self.video_upload_func.execute(state)
- # 更新状态
- state.update(result)
- video_uri = result.get("video_uploaded_uri")
- if video_uri:
- logger.info(f"视频上传完成 - URI: {video_uri}")
- else:
- error = result.get("video_upload_error", "未知错误")
- logger.warning(f"视频上传失败: {error}")
- except Exception as e:
- logger.error(f"视频上传失败: {e}", exc_info=True)
- state.update({
- "video_uploaded_uri": None,
- "video_upload_error": str(e)
- })
- return state
- def _route_after_upload(self, state: Dict[str, Any]) -> str:
- """条件分支函数:路由到不同的处理分支
-
- 通过 state 中的标志来控制:
- - use_topic_agent_v2=True: 走 topic_selection_v2 分支,直接结束
- - use_structure_agent=True: 走 structure_agent 分支,直接结束
- - 否则: 走原来的正常流程
-
- Returns:
- "topic_selection_v2" / "structure_agent" / "normal_flow"
- """
- use_v2 = state.get("use_topic_agent_v2", False)
- use_structure = state.get("use_structure_agent", False)
-
- if use_v2:
- logger.info("检测到 use_topic_agent_v2=True,将使用 TopicAgentV2 分支")
- return "topic_selection_v2"
- elif use_structure:
- logger.info("检测到 use_structure_agent=True,将使用 StructureAgent 分支")
- return "structure_agent"
- else:
- logger.info("使用正常流程(三点解构 → 选题理解 → 搜索关键词)")
- return "normal_flow"
- def _topic_selection_v2_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:选题结构分析 V2(直接基于视频)"""
- logger.info("=== 执行节点:选题结构分析 V2 ===")
- try:
- # 初始化Agent
- if not self.topic_agent_v2.is_initialized:
- self.topic_agent_v2.initialize()
- # 执行Agent
- result = self.topic_agent_v2.process(state)
- # 更新状态
- state.update(result)
- topic_selection = result.get("topic_selection_v2", {})
- if "error" not in topic_selection:
- logger.info(f"选题结构分析 V2 完成 - topic_selection_v2: {topic_selection}")
- else:
- logger.warning(f"选题结构分析 V2 执行出错: {topic_selection.get('error')}")
- except Exception as e:
- logger.error(f"选题结构分析 V2 失败: {e}", exc_info=True)
- state.update({
- "video_script": "",
- "topic_selection_v2": {
- "error": str(e),
- }
- })
- return state
- def _structure_agent_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:结构化内容库解构(直接基于视频)"""
- logger.info("=== 执行节点:结构化内容库解构 ===")
- try:
- # 初始化Agent
- if not self.structure_agent.is_initialized:
- self.structure_agent.initialize()
- # 执行Agent
- result = self.structure_agent.process(state)
- # 更新状态
- state.update(result)
- structure_data = result.get("structure_data", {})
- if "错误" not in structure_data:
- logger.info(f"结构化内容库解构完成 - structure_data: {structure_data}")
- else:
- logger.warning(f"结构化内容库解构执行出错: {structure_data.get('错误')}")
- except Exception as e:
- logger.error(f"结构化内容库解构失败: {e}", exc_info=True)
- state.update({
- "structure_data": {
- "错误": str(e),
- }
- })
- return state
- def _topic_selection_understanding_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:选题理解"""
- logger.info("=== 执行节点:选题理解 ===")
- try:
- # 初始化Agent
- if not self.topic_selection_understanding_agent.is_initialized:
- self.topic_selection_understanding_agent.initialize()
- # 执行Agent
- result = self.topic_selection_understanding_agent.process(state)
- # 更新状态
- state.update(result)
- logger.info(f"选题理解完成 - result: {result}")
- except Exception as e:
- logger.error(f"选题理解失败: {e}", exc_info=True)
- state.update({
- "topic_selection_understanding": {}
- })
- return state
- def _search_keyword_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:搜索关键词提取"""
- logger.info("=== 执行节点:搜索关键词提取 ===")
- try:
- # 初始化Agent
- if not self.search_keyword_agent.is_initialized:
- self.search_keyword_agent.initialize()
- # 执行Agent
- result = self.search_keyword_agent.process(state)
- # 更新状态
- state.update(result)
- search_keywords_count = result.get("search_keywords", {}).get("总数", 0)
- logger.info(f"搜索关键词提取完成 - 共 {search_keywords_count} 个搜索词")
- except Exception as e:
- logger.error(f"搜索关键词提取失败: {e}", exc_info=True)
- state.update({
- "search_keywords": {
- "搜索词列表": [],
- "总数": 0,
- "错误": str(e)
- }
- })
- return state
- def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:灵感点提取(新三点解构)"""
- logger.info("=== 执行节点:灵感点提取 ===")
- try:
- # 初始化Agent
- if not self.inspiration_points_agent.is_initialized:
- self.inspiration_points_agent.initialize()
- # 执行Agent
- result = self.inspiration_points_agent.process(state)
- # 更新状态
- state.update(result)
- # 安全地获取灵感点数量:total_count 在 metadata 中
- if isinstance(result, dict):
- metadata = result.get("metadata", {})
- inspiration_count = metadata.get("total_count", 0) if isinstance(metadata, dict) else 0
- # 如果 metadata 中没有,尝试从 inspiration_points 列表长度获取
- if inspiration_count == 0:
- inspiration_points = result.get("inspiration_points", [])
- if isinstance(inspiration_points, list):
- inspiration_count = len(inspiration_points)
- else:
- # 如果 result 不是 dict(比如是列表),尝试获取长度
- inspiration_count = len(result) if isinstance(result, list) else 0
-
- logger.info(f"灵感点提取完成 - 共 {inspiration_count} 个灵感点")
- except Exception as e:
- logger.error(f"灵感点提取失败: {e}", exc_info=True)
- state.update({
- "inspiration_points": {
- "error": str(e),
- "points": [],
- "total_count": 0
- }
- })
- return state
- def _purpose_point_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:目的点提取(新三点解构)"""
- logger.info("=== 执行节点:目的点提取 ===")
- try:
- # 初始化Agent
- if not self.purpose_point_agent.is_initialized:
- self.purpose_point_agent.initialize()
- # 执行Agent
- result = self.purpose_point_agent.process(state)
- # 更新状态
- state.update(result)
- main_purpose = result.get("purpose_point", {}).get("main_purpose", "未知")
- logger.info(f"目的点提取完成 - 主要目的: {main_purpose}")
- except Exception as e:
- logger.error(f"目的点提取失败: {e}", exc_info=True)
- state.update({
- "purpose_point": {
- "error": str(e),
- "main_purpose": "未知"
- }
- })
- return state
- def _key_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:关键点提取(新三点解构)"""
- logger.info("=== 执行节点:关键点提取 ===")
- try:
- # 初始化Agent
- if not self.key_points_agent.is_initialized:
- self.key_points_agent.initialize()
- # 执行Agent
- result = self.key_points_agent.process(state)
- # 更新状态
- state.update(result)
- total_key_points = result.get("key_points", {}).get("total_count", 0)
- logger.info(f"关键点提取完成 - 共 {total_key_points} 个关键点")
- except Exception as e:
- logger.error(f"关键点提取失败: {e}", exc_info=True)
- state.update({
- "key_points": {
- "error": str(e),
- "creator_perspective": {"key_points": [], "summary": ""},
- "consumer_perspective": {"key_points": [], "summary": ""}
- }
- })
- return state
- def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
- """节点:结果汇总"""
- logger.info("=== 执行节点:结果汇总 ===")
- try:
- # 初始化Function
- if not self.result_aggregation_func.is_initialized:
- self.result_aggregation_func.initialize()
- # 执行Function
- final_result = self.result_aggregation_func.execute(state)
- # 更新状态
- state["final_result"] = final_result
- logger.info("结果汇总完成")
- except Exception as e:
- logger.error(f"结果汇总失败: {e}", exc_info=True)
- state["final_result"] = {
- "帖子总结": {"错误": f"汇总失败: {str(e)}"},
- "帖子包含元素": []
- }
- return state
- def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
- """执行工作流(公共接口)- 视频分析版本
- Args:
- input_data: 输入数据,包含 video 字段(视频URL)
- 格式参考:examples/56898272/视频详情.json
- {
- "video": "http://...",
- "title": "...",
- "use_topic_agent_v2": False, # 可选,控制是否使用 topic_agent_v2 分支
- "use_structure_agent": True, # 可选,控制是否使用 structure_agent 分支(与 use_topic_agent_v2 二选一),默认 True
- ...
- }
- Returns:
- 最终解构结果
- """
- logger.info("=== 开始执行 What 解构工作流(视频分析) ===")
- # 确保工作流已初始化
- if not self.is_initialized:
- self.initialize()
- # 初始化状态(仅视频输入)
- initial_state = {
- "video": input_data.get("video", ""),
- "channel_content_id": input_data.get("channel_content_id", ""),
- "text": {
- "title": input_data.get("title", ""),
- "body": input_data.get("body_text", ""),
- "hashtags": []
- },
- "current_depth": 0,
- "max_depth": self.max_depth,
- "use_topic_agent_v2": input_data.get("use_topic_agent_v2", False),
- "use_structure_agent": input_data.get("use_structure_agent", True)
- }
- # 执行工作流
- result = self.compiled_graph.invoke(initial_state)
- logger.info("=== What 解构工作流执行完成(视频分析) ===")
- # 如果走的是 topic_agent_v2 分支,返回 topic_selection_v2 结果
- if result.get("use_topic_agent_v2") and "topic_selection_v2" in result:
- return result.get("topic_selection_v2", {})
-
- # 如果走的是 structure_agent 分支,返回 structure_data 结果
- if result.get("use_structure_agent") and "structure_data" in result:
- return result.get("structure_data", {})
- return result.get("final_result", {})
|