what_deconstruction_workflow.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. """
  2. What Deconstruction Workflow.
  3. What解构主工作流:编排三点解构流程(灵感点、目的点、关键点)的执行顺序和流程逻辑。
  4. 流程(视频分析):视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 结果汇总
  5. """
  6. from typing import Dict, Any
  7. from langgraph.graph import StateGraph, END
  8. from src.components.agents.base import BaseGraphAgent
  9. from src.states.what_deconstruction_state import WhatDeconstructionState
  10. from src.components.agents.topic_selection_understanding_agent import TopicSelectionUnderstandingAgent
  11. from src.components.agents.search_keyword_agent import SearchKeywordAgent
  12. from src.components.functions.result_aggregation_function import ResultAggregationFunction
  13. from src.components.functions.video_upload_function import VideoUploadFunction
  14. # 新增三点解构Agent
  15. from src.components.agents.inspiration_points_agent import InspirationPointsAgent
  16. from src.components.agents.purpose_point_agent import PurposePointAgent
  17. from src.components.agents.key_points_agent import KeyPointsAgent
  18. # 新增选题结构Agent V2
  19. from src.components.agents.topic_agent_v2 import TopicAgentV2
  20. # 新增结构化内容库Agent
  21. from src.components.agents.structure_agent import StructureAgent
  22. from src.utils.logger import get_logger
  23. logger = get_logger(__name__)
  24. class WhatDeconstructionWorkflow(BaseGraphAgent):
  25. """What解构主工作流(视频分析版本)
  26. 功能:
  27. - 编排整个What解构流程(针对视频输入)
  28. - 支持条件分支:
  29. * 视频上传 → topic_selection_v2(选题结构分析V2,直接基于视频)→ 结束
  30. * 视频上传 → structure_agent(结构化内容库解构,直接基于视频)→ 结束
  31. * 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → 结束
  32. - 通过 state 中的 use_topic_agent_v2 或 use_structure_agent 标志手动控制分支选择(二选一)
  33. - 管理状态传递
  34. - 仅支持单视频输入
  35. 实现方式:BaseGraphAgent (LangGraph)
  36. """
  37. def __init__(
  38. self,
  39. name: str = "what_deconstruction_workflow",
  40. description: str = "What解构主工作流(视频分析)",
  41. model_provider: str = "google_genai",
  42. max_depth: int = 10
  43. ):
  44. super().__init__(
  45. name=name,
  46. description=description,
  47. state_class=WhatDeconstructionState
  48. )
  49. self.max_depth = max_depth
  50. # 初始化视频上传Function
  51. self.video_upload_func = VideoUploadFunction()
  52. # 初始化选题理解Agent
  53. self.topic_selection_understanding_agent = TopicSelectionUnderstandingAgent(
  54. model_provider=model_provider
  55. )
  56. # 初始化搜索关键词Agent
  57. self.search_keyword_agent = SearchKeywordAgent(
  58. model_provider=model_provider
  59. )
  60. # 初始化结果汇总Function
  61. self.result_aggregation_func = ResultAggregationFunction()
  62. # 初始化新的三点解构Agent
  63. self.inspiration_points_agent = InspirationPointsAgent(
  64. model_provider=model_provider
  65. )
  66. self.purpose_point_agent = PurposePointAgent(
  67. model_provider=model_provider
  68. )
  69. self.key_points_agent = KeyPointsAgent(
  70. model_provider=model_provider
  71. )
  72. # 初始化选题结构Agent V2
  73. self.topic_agent_v2 = TopicAgentV2(
  74. model_provider=model_provider
  75. )
  76. # 初始化结构化内容库Agent
  77. self.structure_agent = StructureAgent(
  78. model_provider=model_provider
  79. )
  80. logger.info(f"WhatDeconstructionWorkflow(视频分析)初始化完成")
  81. def _build_graph(self) -> StateGraph:
  82. """构建工作流图
  83. 新流程(视频分析):
  84. START → 视频上传 → [条件分支]
  85. - 如果 use_topic_agent_v2=True: → topic_selection_v2 → END
  86. - 如果 use_structure_agent=True: → structure_agent → END
  87. - 否则: → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 搜索关键词 → 结果汇总 → END
  88. """
  89. workflow = StateGraph(dict) # 使用dict作为状态类型
  90. # 添加节点
  91. workflow.add_node("video_upload", self._video_upload_node)
  92. workflow.add_node("topic_selection_v2", self._topic_selection_v2_node)
  93. workflow.add_node("structure_agent", self._structure_agent_node)
  94. workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
  95. workflow.add_node("purpose_point_extraction", self._purpose_point_node)
  96. workflow.add_node("key_points_extraction", self._key_points_node)
  97. workflow.add_node("topic_selection_understanding", self._topic_selection_understanding_node)
  98. workflow.add_node("search_keyword_extraction", self._search_keyword_node)
  99. workflow.add_node("result_aggregation", self._result_aggregation_node)
  100. # 定义流程的边:视频上传 → 条件分支 → topic_selection_v2/structure_agent(结束) 或 原流程
  101. workflow.set_entry_point("video_upload")
  102. # 条件分支:根据 use_topic_agent_v2 或 use_structure_agent 标志决定走哪个分支
  103. workflow.add_conditional_edges(
  104. "video_upload",
  105. self._route_after_upload,
  106. {
  107. "topic_selection_v2": "topic_selection_v2",
  108. "structure_agent": "structure_agent",
  109. "normal_flow": "inspiration_points_extraction"
  110. }
  111. )
  112. # topic_selection_v2 分支直接结束
  113. workflow.add_edge("topic_selection_v2", END)
  114. # structure_agent 分支直接结束
  115. workflow.add_edge("structure_agent", END)
  116. # 原流程继续
  117. workflow.add_edge("inspiration_points_extraction", "purpose_point_extraction")
  118. workflow.add_edge("purpose_point_extraction", "key_points_extraction")
  119. workflow.add_edge("key_points_extraction", "topic_selection_understanding")
  120. workflow.add_edge("topic_selection_understanding", "search_keyword_extraction")
  121. workflow.add_edge("search_keyword_extraction", "result_aggregation")
  122. workflow.add_edge("result_aggregation", END)
  123. logger.info("工作流图构建完成 - 视频分析流程:视频上传 → [条件分支: topic_selection_v2 / structure_agent / 三点解构流程]")
  124. return workflow
  125. def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  126. """节点:视频上传(第一步)- 下载视频并上传至Gemini"""
  127. logger.info("=== 执行节点:视频上传 ===")
  128. try:
  129. # 初始化Function
  130. if not self.video_upload_func.is_initialized:
  131. self.video_upload_func.initialize()
  132. # 执行视频上传
  133. result = self.video_upload_func.execute(state)
  134. # 更新状态
  135. state.update(result)
  136. video_uri = result.get("video_uploaded_uri")
  137. if video_uri:
  138. logger.info(f"视频上传完成 - URI: {video_uri}")
  139. else:
  140. error = result.get("video_upload_error", "未知错误")
  141. logger.warning(f"视频上传失败: {error}")
  142. except Exception as e:
  143. logger.error(f"视频上传失败: {e}", exc_info=True)
  144. state.update({
  145. "video_uploaded_uri": None,
  146. "video_upload_error": str(e)
  147. })
  148. return state
  149. def _route_after_upload(self, state: Dict[str, Any]) -> str:
  150. """条件分支函数:路由到不同的处理分支
  151. 通过 state 中的标志来控制:
  152. - use_topic_agent_v2=True: 走 topic_selection_v2 分支,直接结束
  153. - use_structure_agent=True: 走 structure_agent 分支,直接结束
  154. - 否则: 走原来的正常流程
  155. Returns:
  156. "topic_selection_v2" / "structure_agent" / "normal_flow"
  157. """
  158. use_v2 = state.get("use_topic_agent_v2", False)
  159. use_structure = state.get("use_structure_agent", False)
  160. if use_v2:
  161. logger.info("检测到 use_topic_agent_v2=True,将使用 TopicAgentV2 分支")
  162. return "topic_selection_v2"
  163. elif use_structure:
  164. logger.info("检测到 use_structure_agent=True,将使用 StructureAgent 分支")
  165. return "structure_agent"
  166. else:
  167. logger.info("使用正常流程(三点解构 → 选题理解 → 搜索关键词)")
  168. return "normal_flow"
  169. def _topic_selection_v2_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  170. """节点:选题结构分析 V2(直接基于视频)"""
  171. logger.info("=== 执行节点:选题结构分析 V2 ===")
  172. try:
  173. # 初始化Agent
  174. if not self.topic_agent_v2.is_initialized:
  175. self.topic_agent_v2.initialize()
  176. # 执行Agent
  177. result = self.topic_agent_v2.process(state)
  178. # 更新状态
  179. state.update(result)
  180. topic_selection = result.get("topic_selection_v2", {})
  181. if "error" not in topic_selection:
  182. logger.info(f"选题结构分析 V2 完成 - topic_selection_v2: {topic_selection}")
  183. else:
  184. logger.warning(f"选题结构分析 V2 执行出错: {topic_selection.get('error')}")
  185. except Exception as e:
  186. logger.error(f"选题结构分析 V2 失败: {e}", exc_info=True)
  187. state.update({
  188. "video_script": "",
  189. "topic_selection_v2": {
  190. "error": str(e),
  191. }
  192. })
  193. return state
  194. def _structure_agent_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  195. """节点:结构化内容库解构(直接基于视频)"""
  196. logger.info("=== 执行节点:结构化内容库解构 ===")
  197. try:
  198. # 初始化Agent
  199. if not self.structure_agent.is_initialized:
  200. self.structure_agent.initialize()
  201. # 执行Agent
  202. result = self.structure_agent.process(state)
  203. # 更新状态
  204. state.update(result)
  205. structure_data = result.get("structure_data", {})
  206. if "错误" not in structure_data:
  207. logger.info(f"结构化内容库解构完成 - structure_data: {structure_data}")
  208. else:
  209. logger.warning(f"结构化内容库解构执行出错: {structure_data.get('错误')}")
  210. except Exception as e:
  211. logger.error(f"结构化内容库解构失败: {e}", exc_info=True)
  212. state.update({
  213. "structure_data": {
  214. "错误": str(e),
  215. }
  216. })
  217. return state
  218. def _topic_selection_understanding_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  219. """节点:选题理解"""
  220. logger.info("=== 执行节点:选题理解 ===")
  221. try:
  222. # 初始化Agent
  223. if not self.topic_selection_understanding_agent.is_initialized:
  224. self.topic_selection_understanding_agent.initialize()
  225. # 执行Agent
  226. result = self.topic_selection_understanding_agent.process(state)
  227. # 更新状态
  228. state.update(result)
  229. logger.info(f"选题理解完成 - result: {result}")
  230. except Exception as e:
  231. logger.error(f"选题理解失败: {e}", exc_info=True)
  232. state.update({
  233. "topic_selection_understanding": {}
  234. })
  235. return state
  236. def _search_keyword_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  237. """节点:搜索关键词提取"""
  238. logger.info("=== 执行节点:搜索关键词提取 ===")
  239. try:
  240. # 初始化Agent
  241. if not self.search_keyword_agent.is_initialized:
  242. self.search_keyword_agent.initialize()
  243. # 执行Agent
  244. result = self.search_keyword_agent.process(state)
  245. # 更新状态
  246. state.update(result)
  247. search_keywords_count = result.get("search_keywords", {}).get("总数", 0)
  248. logger.info(f"搜索关键词提取完成 - 共 {search_keywords_count} 个搜索词")
  249. except Exception as e:
  250. logger.error(f"搜索关键词提取失败: {e}", exc_info=True)
  251. state.update({
  252. "search_keywords": {
  253. "搜索词列表": [],
  254. "总数": 0,
  255. "错误": str(e)
  256. }
  257. })
  258. return state
  259. def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  260. """节点:灵感点提取(新三点解构)"""
  261. logger.info("=== 执行节点:灵感点提取 ===")
  262. try:
  263. # 初始化Agent
  264. if not self.inspiration_points_agent.is_initialized:
  265. self.inspiration_points_agent.initialize()
  266. # 执行Agent
  267. result = self.inspiration_points_agent.process(state)
  268. # 更新状态
  269. state.update(result)
  270. # 安全地获取灵感点数量:total_count 在 metadata 中
  271. if isinstance(result, dict):
  272. metadata = result.get("metadata", {})
  273. inspiration_count = metadata.get("total_count", 0) if isinstance(metadata, dict) else 0
  274. # 如果 metadata 中没有,尝试从 inspiration_points 列表长度获取
  275. if inspiration_count == 0:
  276. inspiration_points = result.get("inspiration_points", [])
  277. if isinstance(inspiration_points, list):
  278. inspiration_count = len(inspiration_points)
  279. else:
  280. # 如果 result 不是 dict(比如是列表),尝试获取长度
  281. inspiration_count = len(result) if isinstance(result, list) else 0
  282. logger.info(f"灵感点提取完成 - 共 {inspiration_count} 个灵感点")
  283. except Exception as e:
  284. logger.error(f"灵感点提取失败: {e}", exc_info=True)
  285. state.update({
  286. "inspiration_points": {
  287. "error": str(e),
  288. "points": [],
  289. "total_count": 0
  290. }
  291. })
  292. return state
  293. def _purpose_point_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  294. """节点:目的点提取(新三点解构)"""
  295. logger.info("=== 执行节点:目的点提取 ===")
  296. try:
  297. # 初始化Agent
  298. if not self.purpose_point_agent.is_initialized:
  299. self.purpose_point_agent.initialize()
  300. # 执行Agent
  301. result = self.purpose_point_agent.process(state)
  302. # 更新状态
  303. state.update(result)
  304. main_purpose = result.get("purpose_point", {}).get("main_purpose", "未知")
  305. logger.info(f"目的点提取完成 - 主要目的: {main_purpose}")
  306. except Exception as e:
  307. logger.error(f"目的点提取失败: {e}", exc_info=True)
  308. state.update({
  309. "purpose_point": {
  310. "error": str(e),
  311. "main_purpose": "未知"
  312. }
  313. })
  314. return state
  315. def _key_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  316. """节点:关键点提取(新三点解构)"""
  317. logger.info("=== 执行节点:关键点提取 ===")
  318. try:
  319. # 初始化Agent
  320. if not self.key_points_agent.is_initialized:
  321. self.key_points_agent.initialize()
  322. # 执行Agent
  323. result = self.key_points_agent.process(state)
  324. # 更新状态
  325. state.update(result)
  326. total_key_points = result.get("key_points", {}).get("total_count", 0)
  327. logger.info(f"关键点提取完成 - 共 {total_key_points} 个关键点")
  328. except Exception as e:
  329. logger.error(f"关键点提取失败: {e}", exc_info=True)
  330. state.update({
  331. "key_points": {
  332. "error": str(e),
  333. "creator_perspective": {"key_points": [], "summary": ""},
  334. "consumer_perspective": {"key_points": [], "summary": ""}
  335. }
  336. })
  337. return state
  338. def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  339. """节点:结果汇总"""
  340. logger.info("=== 执行节点:结果汇总 ===")
  341. try:
  342. # 初始化Function
  343. if not self.result_aggregation_func.is_initialized:
  344. self.result_aggregation_func.initialize()
  345. # 执行Function
  346. final_result = self.result_aggregation_func.execute(state)
  347. # 更新状态
  348. state["final_result"] = final_result
  349. logger.info("结果汇总完成")
  350. except Exception as e:
  351. logger.error(f"结果汇总失败: {e}", exc_info=True)
  352. state["final_result"] = {
  353. "帖子总结": {"错误": f"汇总失败: {str(e)}"},
  354. "帖子包含元素": []
  355. }
  356. return state
  357. def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
  358. """执行工作流(公共接口)- 视频分析版本
  359. Args:
  360. input_data: 输入数据,包含 video 字段(视频URL)
  361. 格式参考:examples/56898272/视频详情.json
  362. {
  363. "video": "http://...",
  364. "title": "...",
  365. "use_topic_agent_v2": False, # 可选,控制是否使用 topic_agent_v2 分支
  366. "use_structure_agent": True, # 可选,控制是否使用 structure_agent 分支(与 use_topic_agent_v2 二选一),默认 True
  367. ...
  368. }
  369. Returns:
  370. 最终解构结果
  371. """
  372. logger.info("=== 开始执行 What 解构工作流(视频分析) ===")
  373. # 确保工作流已初始化
  374. if not self.is_initialized:
  375. self.initialize()
  376. # 初始化状态(仅视频输入)
  377. initial_state = {
  378. "video": input_data.get("video", ""),
  379. "channel_content_id": input_data.get("channel_content_id", ""),
  380. "text": {
  381. "title": input_data.get("title", ""),
  382. "body": input_data.get("body_text", ""),
  383. "hashtags": []
  384. },
  385. "current_depth": 0,
  386. "max_depth": self.max_depth,
  387. "use_topic_agent_v2": input_data.get("use_topic_agent_v2", False),
  388. "use_structure_agent": input_data.get("use_structure_agent", True)
  389. }
  390. # 执行工作流
  391. result = self.compiled_graph.invoke(initial_state)
  392. logger.info("=== What 解构工作流执行完成(视频分析) ===")
  393. # 如果走的是 topic_agent_v2 分支,返回 topic_selection_v2 结果
  394. if result.get("use_topic_agent_v2") and "topic_selection_v2" in result:
  395. return result.get("topic_selection_v2", {})
  396. # 如果走的是 structure_agent 分支,返回 structure_data 结果
  397. if result.get("use_structure_agent") and "structure_data" in result:
  398. return result.get("structure_data", {})
  399. return result.get("final_result", {})