script_workflow_v2.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. """
  2. Script Workflow V2.
  3. 兆恒的脚本解构工作流V2:基于视频直接进行L3单元拆分和整体理解
  4. 视频脚本解构工作流(简化版):
  5. - 步骤:视频上传 → L3 单元拆分 → L1/L2 整体理解
  6. - 只依赖视频和 L3 解构结果,不再引入多余的输入字段。
  7. """
  8. from typing import Dict, Any
  9. from langgraph.graph import StateGraph, END
  10. from src.components.agents.base import BaseGraphAgent
  11. from src.components.functions.video_upload_function import VideoUploadFunction
  12. from src.components.agents.structure_agent import StructureAgent
  13. from src.components.agents.content_unit_split_agent import ContentUnitSplitAgent
  14. from src.components.agents.content_unit_understand import ContentUnitUnderstandAgent
  15. from src.components.agents.script_keyword_agent import ScriptKeywordAgent
  16. from src.utils.logger import get_logger
  17. logger = get_logger(__name__)
  18. class ScriptWorkflowV2(BaseGraphAgent):
  19. """脚本理解工作流 V2(仅视频 + L3 解构 + 整体理解)
  20. 流程:
  21. START → 视频上传 → L3 单元拆分 → 整体结构理解 → 结果汇总 → END
  22. """
  23. def __init__(
  24. self,
  25. name: str = "script_workflow_v2",
  26. description: str = "脚本理解工作流 V2(视频 → L3 单元 → L1/L2 整体解构)",
  27. model_provider: str = "google_genai",
  28. ):
  29. super().__init__(
  30. name=name,
  31. description=description,
  32. state_class=dict, # 直接使用 dict 作为状态类型
  33. )
  34. self.model_provider = model_provider
  35. # 初始化视频上传 Function
  36. self.video_upload_func = VideoUploadFunction()
  37. # 初始化结构化内容库 Agent
  38. self.structure_agent = StructureAgent(
  39. model_provider=model_provider
  40. )
  41. # 初始化 L3 单元拆分 Agent
  42. self.content_unit_split_agent = ContentUnitSplitAgent(
  43. model_provider=model_provider
  44. )
  45. # 初始化 L1/L2 整体理解 Agent
  46. self.content_unit_understand_agent = ContentUnitUnderstandAgent(
  47. model_provider=model_provider
  48. )
  49. # 初始化金句提取 Agent
  50. self.script_keyword_agent = ScriptKeywordAgent(
  51. model_provider=model_provider
  52. )
  53. logger.info(f"ScriptWorkflowV2 初始化完成,model_provider: {model_provider}")
  54. def _build_graph(self) -> StateGraph:
  55. """构建工作流图(简化版)"""
  56. workflow = StateGraph(dict)
  57. # 添加节点
  58. workflow.add_node("video_upload", self._video_upload_node)
  59. workflow.add_node("structure_analysis", self._structure_analysis_node)
  60. workflow.add_node("content_unit_split", self._content_unit_split_node)
  61. workflow.add_node("content_unit_understand", self._content_unit_understand_node)
  62. workflow.add_node("keyword_extraction", self._keyword_extraction_node)
  63. workflow.add_node("result_aggregation", self._result_aggregation_node)
  64. # 定义边
  65. workflow.set_entry_point("video_upload")
  66. workflow.add_edge("video_upload", "structure_analysis")
  67. workflow.add_edge("structure_analysis", "content_unit_split")
  68. workflow.add_edge("content_unit_split", "content_unit_understand")
  69. workflow.add_edge("content_unit_understand", "keyword_extraction")
  70. workflow.add_edge("keyword_extraction", "result_aggregation")
  71. workflow.add_edge("result_aggregation", END)
  72. logger.info("ScriptWorkflowV2 图构建完成 - 流程:视频上传 → 结构化分析 → L3 单元拆分 → 整体理解 → 金句提取 → 结果汇总")
  73. return workflow
  74. def _video_upload_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  75. """节点:视频上传 - 下载视频并上传至 Gemini"""
  76. logger.info("=== ScriptWorkflowV2:执行节点 video_upload ===")
  77. try:
  78. if not self.video_upload_func.is_initialized:
  79. self.video_upload_func.initialize()
  80. result = self.video_upload_func.execute(state)
  81. state.update(result)
  82. video_uri = result.get("video_uploaded_uri")
  83. if video_uri:
  84. logger.info(f"视频上传完成 - URI: {video_uri}")
  85. else:
  86. error = result.get("video_upload_error", "未知错误")
  87. logger.warning(f"视频上传失败: {error}")
  88. except Exception as e:
  89. logger.error(f"视频上传失败: {e}", exc_info=True)
  90. state.update(
  91. {
  92. "video_uploaded_uri": None,
  93. "video_upload_error": str(e),
  94. }
  95. )
  96. return state
  97. def _structure_analysis_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  98. """节点:结构化内容库分析 - 基于视频内容进行结构化解构"""
  99. logger.info("=== ScriptWorkflowV2:执行节点 structure_analysis ===")
  100. try:
  101. if not self.structure_agent.is_initialized:
  102. self.structure_agent.initialize()
  103. result = self.structure_agent.process(state)
  104. # 将结果存入 state 的 topic 字段
  105. structure_data = result.get("structure_data", {})
  106. state["topic"] = structure_data
  107. logger.info("结构化内容库分析完成")
  108. if isinstance(structure_data, dict):
  109. topic_info = structure_data.get("选题信息表", {})
  110. macro_topic = topic_info.get("宏观母题", "") if isinstance(topic_info, dict) else ""
  111. if macro_topic:
  112. logger.info(f"宏观母题: {macro_topic}")
  113. except Exception as e:
  114. logger.error(f"结构化内容库分析失败: {e}", exc_info=True)
  115. state["topic"] = {
  116. "错误": str(e),
  117. }
  118. return state
  119. def _content_unit_split_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  120. """节点:L3 内容单元拆分"""
  121. logger.info("=== ScriptWorkflowV2:执行节点 content_unit_split ===")
  122. try:
  123. if not self.content_unit_split_agent.is_initialized:
  124. self.content_unit_split_agent.initialize()
  125. result = self.content_unit_split_agent.process(state)
  126. state.update(result)
  127. analysis = result.get("content_unit_analysis", {})
  128. logger.info(
  129. f"L3 单元拆分完成,单元数量: {len(analysis.get('单元列表', [])) if isinstance(analysis, dict) else 0}"
  130. )
  131. except Exception as e:
  132. logger.error(f"L3 内容单元拆分失败: {e}", exc_info=True)
  133. state["content_unit_analysis"] = {
  134. "error": str(e),
  135. "单元列表": [],
  136. }
  137. return state
  138. def _content_unit_understand_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  139. """节点:整体结构理解(L1/L2 基于视频 + L3 解构)"""
  140. logger.info("=== ScriptWorkflowV2:执行节点 content_unit_understand ===")
  141. try:
  142. if not self.content_unit_understand_agent.is_initialized:
  143. self.content_unit_understand_agent.initialize()
  144. result = self.content_unit_understand_agent.process(state)
  145. state.update(result)
  146. understanding = result.get("content_unit_understanding", {})
  147. logger.info(
  148. f"整体结构理解完成,段落数量: {len(understanding.get('段落解构', [])) if isinstance(understanding, dict) else 0}"
  149. )
  150. except Exception as e:
  151. logger.error(f"整体结构理解失败: {e}", exc_info=True)
  152. state["content_unit_understanding"] = {
  153. "error": str(e),
  154. "整体解构": {},
  155. "段落解构": [],
  156. "单元解构": {},
  157. }
  158. return state
  159. def _keyword_extraction_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  160. """节点:金句提取 - 基于视频内容提取钩子和金句"""
  161. logger.info("=== ScriptWorkflowV2:执行节点 keyword_extraction ===")
  162. try:
  163. if not self.script_keyword_agent.is_initialized:
  164. self.script_keyword_agent.initialize()
  165. result = self.script_keyword_agent.process(state)
  166. state.update(result)
  167. script_keywords = result.get("script_keywords", {})
  168. logger.info("金句提取完成")
  169. if isinstance(script_keywords, dict):
  170. hooks_count = len(script_keywords.get("hooks", []))
  171. golden_sentences_count = len(script_keywords.get("golden_sentences", []))
  172. logger.info(f"提取钩子数量: {hooks_count}, 金句数量: {golden_sentences_count}")
  173. except Exception as e:
  174. logger.error(f"金句提取失败: {e}", exc_info=True)
  175. state["script_keywords"] = {
  176. "error": str(e),
  177. }
  178. return state
  179. def _result_aggregation_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
  180. """节点:结果汇总 - 组装最终结果"""
  181. logger.info("=== ScriptWorkflowV2:执行节点 result_aggregation ===")
  182. try:
  183. # 从 state 中提取结果
  184. topic = state.get("topic", {})
  185. content_unit_analysis = state.get("content_unit_analysis", {})
  186. content_unit_understanding = state.get("content_unit_understanding", {})
  187. script_keywords = state.get("script_keywords", {})
  188. # 组装最终结果
  189. final_result = {
  190. "结构化内容库": topic,
  191. "L3单元解构": content_unit_analysis,
  192. "整体结构理解": content_unit_understanding,
  193. "金句提取": script_keywords,
  194. }
  195. # 更新状态
  196. state["final_result"] = final_result
  197. logger.info("结果汇总完成")
  198. except Exception as e:
  199. logger.error(f"结果汇总失败: {e}", exc_info=True)
  200. state["final_result"] = {
  201. "error": f"汇总失败: {str(e)}",
  202. "结构化内容库": {},
  203. "L3单元解构": {},
  204. "整体结构理解": {},
  205. "金句提取": {},
  206. }
  207. return state
  208. def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
  209. """执行工作流(公共接口)- 简化版
  210. 只需要传入:
  211. - video: 原始视频地址或本地路径(由 VideoUploadFunction 处理)
  212. - video_id: 可选,外部业务 ID(兼容旧字段名 channel_content_id)
  213. """
  214. logger.info("=== 开始执行 ScriptWorkflowV2(视频 → L3 单元 → L1/L2 整体解构) ===")
  215. if not self.is_initialized:
  216. self.initialize()
  217. # 初始化最小状态(仅保留必须字段)
  218. initial_state = {
  219. "video": input_data.get("video", ""),
  220. "video_id": input_data.get("video_id", "") or input_data.get("channel_content_id", ""), # 兼容旧字段名
  221. }
  222. result_state = self.compiled_graph.invoke(initial_state)
  223. logger.info("=== ScriptWorkflowV2 执行完成 ===")
  224. return result_state.get("final_result", {})