jihuaqiang пре 4 дана
родитељ
комит
05271441e5

+ 7 - 7
examples/run_batch_script_v2.py

@@ -65,9 +65,9 @@ def main() -> None:
         }
 
     existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
-    # 用 channel_content_id + video URL 去重,避免重复处理
+    # 用 video_id + video URL 去重,避免重复处理(兼容旧字段名 channel_content_id)
     processed_keys = {
-        f"{item.get('video_data', {}).get('channel_content_id','')}|"
+        f"{item.get('video_data', {}).get('video_id', '') or item.get('video_data', {}).get('channel_content_id','')}|"
         f"{item.get('video_data', {}).get('video','')}"
         for item in existing_results
     }
@@ -76,23 +76,23 @@ def main() -> None:
 
     for item in raw_list:
         video_data = item or {}
-        channel_content_id = video_data.get("channel_content_id", "")
+        video_id = video_data.get("video_id", "") or video_data.get("channel_content_id", "")  # 兼容旧字段名
         video_url = video_data.get("video", "")
 
-        key = f"{channel_content_id}|{video_url}"
+        key = f"{video_id}|{video_url}"
         if key in processed_keys:
             logger.info(f"已处理过该视频,跳过: {key}")
             continue
 
         logger.info(
-            f"处理视频: channel_content_id={channel_content_id} title={video_data.get('title','')}"
+            f"处理视频: video_id={video_id} title={video_data.get('title','')}"
         )
 
         try:
-            # ScriptWorkflowV2 只需要 video 和 channel_content_id
+            # ScriptWorkflowV2 只需要 video 和 video_id
             script_input = {
                 "video": video_url,
-                "channel_content_id": channel_content_id,
+                "video_id": video_id,
             }
             script_result = workflow.invoke(script_input)
 

+ 11 - 11
examples/run_decode_script.py

@@ -52,7 +52,7 @@ def build_decode_input(video_data: Dict[str, Any]) -> Dict[str, Any]:
     """根据视频数据构造 DecodeWorkflow 的输入结构"""
     return {
         "video": video_data.get("video", ""),
-        "channel_content_id": video_data.get("channel_content_id", ""),
+        "video_id": video_data.get("video_id", "") or video_data.get("channel_content_id", ""),  # 兼容旧字段名
         "title": video_data.get("title", ""),
         "body_text": video_data.get("body_text", ""),
     }
@@ -95,9 +95,9 @@ def main() -> None:
         }
 
     existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
-    # 用 channel_content_id + video URL 去重,避免重复处理
+    # 用 video_id + video URL 去重,避免重复处理(兼容旧字段名 channel_content_id)
     processed_keys = {
-        f"{item.get('video_data', {}).get('channel_content_id', '')}|"
+        f"{item.get('video_data', {}).get('video_id', '') or item.get('video_data', {}).get('channel_content_id', '')}|"
         f"{item.get('video_data', {}).get('video', '')}"
         for item in existing_results
     }
@@ -110,18 +110,18 @@ def main() -> None:
     # 处理每个视频
     for idx, video_data in enumerate(video_list, 1):
         video_url = video_data.get("video", "")
-        channel_content_id = video_data.get("channel_content_id", "")
+        video_id = video_data.get("video_id", "") or video_data.get("channel_content_id", "")  # 兼容旧字段名
         title = video_data.get("title", "")
 
         # 生成唯一键用于去重
-        key = f"{channel_content_id}|{video_url}"
+        key = f"{video_id}|{video_url}"
         if key in processed_keys:
-            logger.info(f"[{idx}/{len(video_list)}] 已处理过该视频,跳过: channel_content_id={channel_content_id}")
+            logger.info(f"[{idx}/{len(video_list)}] 已处理过该视频,跳过: video_id={video_id}")
             continue
 
         logger.info(
             f"[{idx}/{len(video_list)}] 开始处理视频: "
-            f"channel_content_id={channel_content_id}, title={title[:50]}..."
+            f"video_id={video_id}, title={title[:50]}..."
         )
 
         try:
@@ -145,7 +145,7 @@ def main() -> None:
                 if error_msg or workflow_status == "failed" or workflow_status == "incomplete":
                     error_msg = error_msg or "工作流执行失败"
                     logger.error(
-                        f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={error_msg}"
+                        f"[{idx}/{len(video_list)}] 处理失败: video_id={video_id}, error={error_msg}"
                     )
                     record = {
                         "video_data": video_data,
@@ -172,7 +172,7 @@ def main() -> None:
                     not topic_understanding and not script_understanding):
                     error_msg = "工作流执行完成,但所有结果都为空"
                     logger.warning(
-                        f"[{idx}/{len(video_list)}] 处理结果为空: channel_content_id={channel_content_id}"
+                        f"[{idx}/{len(video_list)}] 处理结果为空: video_id={video_id}"
                     )
                     # 这里可以选择记录为失败或警告,根据业务需求决定
                     # 暂时记录为失败
@@ -226,12 +226,12 @@ def main() -> None:
 
             output_data["success_count"] = output_data.get("success_count", 0) + 1
             logger.info(
-                f"[{idx}/{len(video_list)}] 处理成功: channel_content_id={channel_content_id}"
+                f"[{idx}/{len(video_list)}] 处理成功: video_id={video_id}"
             )
 
         except Exception as e:
             logger.error(
-                f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={e}",
+                f"[{idx}/{len(video_list)}] 处理失败: video_id={video_id}, error={e}",
                 exc_info=True
             )
             record = {

+ 4 - 4
examples/visualize_script_results.py

@@ -339,14 +339,14 @@ class ScriptResultVisualizer:
                     script_data["关键点"] = deconstruction["关键点"]
 
             video_data = item.get("video_data") or {}
-            channel_content_id = video_data.get("channel_content_id")
+            video_id = video_data.get("video_id") or video_data.get("channel_content_id")  # 兼容旧字段名
 
             # 用于 HTML 内部展示的"文件名"标签
             json_label = f"{self.json_file.name}#{idx}"
 
-            # 生成输出文件名(优先使用 channel_content_id,回退到序号)
-            if channel_content_id:
-                output_filename = f"script_result_{channel_content_id}.html"
+            # 生成输出文件名(优先使用 video_id,回退到序号)
+            if video_id:
+                output_filename = f"script_result_{video_id}.html"
             else:
                 output_filename = f"{self.json_file.stem}_{idx}.html"
 

+ 18 - 18
src/components/functions/video_upload_function.py

@@ -69,9 +69,9 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
             logger.info(f"开始下载视频: {video_url}")
 
             # 1. 下载视频到本地(或使用examples/videos目录下的现有文件)
-            # 从input_data中获取channel_content_id,用于查找examples/videos目录下的文件
-            channel_content_id = input_data.get("channel_content_id", "")
-            local_video_path, is_temp_file = self._download_video(video_url, channel_content_id)
+            # 从input_data中获取video_id,用于查找examples/videos目录下的文件
+            video_id = input_data.get("video_id", "")
+            local_video_path, is_temp_file = self._download_video(video_url, video_id)
             
             if not local_video_path:
                 return {
@@ -135,12 +135,12 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
                 "video_upload_error": str(e)
             }
 
-    def _download_video(self, video_url: str, channel_content_id: str = "") -> Tuple[Optional[str], bool]:
+    def _download_video(self, video_url: str, video_id: str = "") -> Tuple[Optional[str], bool]:
         """下载视频到本地,或使用examples/videos目录下的现有文件
         
         Args:
             video_url: 视频URL
-            channel_content_id: 频道内容ID,用于查找examples/videos目录下的文件
+            video_id: 视频ID,用于查找examples/videos目录下的文件
             
         Returns:
             (本地文件路径, 是否为临时文件) 的元组,失败返回 (None, True)
@@ -149,14 +149,14 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
         """
         try:
             # 1. 首先检查examples/videos目录下是否有对应的mp4文件
-            existing_file = self._check_examples_directory(channel_content_id)
+            existing_file = self._check_examples_directory(video_id)
             if existing_file:
                 logger.info(f"在examples/videos目录下找到现有文件,直接使用: {existing_file}")
                 return existing_file, False
             
             # 2. 如果没有找到,则下载到examples/videos目录
-            if not channel_content_id:
-                logger.warning("未提供channel_content_id,无法保存到examples/videos目录")
+            if not video_id:
+                logger.warning("未提供video_id,无法保存到examples/videos目录")
                 return None, True
             
             logger.info("未在examples/videos目录下找到同名文件,开始下载...")
@@ -168,8 +168,8 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
             # 确保目录存在
             videos_dir.mkdir(parents=True, exist_ok=True)
             
-            # 构建文件路径:examples/videos/{channel_content_id}.mp4
-            target_path = videos_dir / f"{channel_content_id}.mp4"
+            # 构建文件路径:examples/videos/{video_id}.mp4
+            target_path = videos_dir / f"{video_id}.mp4"
             
             # 如果文件已存在(并发情况),直接返回
             if target_path.exists():
@@ -253,21 +253,21 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
             logger.error(f"下载视频失败: {e}", exc_info=True)
             return None, True
     
-    def _check_examples_directory(self, channel_content_id: str) -> Optional[str]:
+    def _check_examples_directory(self, video_id: str) -> Optional[str]:
         """检查examples/videos目录下是否有对应的mp4文件
         
-        文件路径格式:examples/videos/{channel_content_id}.mp4
+        文件路径格式:examples/videos/{video_id}.mp4
         
         Args:
-            channel_content_id: 频道内容ID
+            video_id: 视频ID
             
         Returns:
             如果找到文件,返回文件路径;否则返回None
         """
         try:
-            # 如果没有提供channel_content_id,无法查找
-            if not channel_content_id:
-                logger.info("未提供channel_content_id,跳过examples/videos目录检查")
+            # 如果没有提供video_id,无法查找
+            if not video_id:
+                logger.info("未提供video_id,跳过examples/videos目录检查")
                 return None
             
             # 获取项目根目录
@@ -280,8 +280,8 @@ class VideoUploadFunction(BaseFunction[Dict[str, Any], Dict[str, Any]]):
                 logger.info(f"examples/videos目录不存在: {videos_dir}")
                 return None
             
-            # 构建文件路径:examples/videos/{channel_content_id}.mp4
-            mp4_file = videos_dir / f"{channel_content_id}.mp4"
+            # 构建文件路径:examples/videos/{video_id}.mp4
+            mp4_file = videos_dir / f"{video_id}.mp4"
             logger.info(f"构建文件路径: {mp4_file}")
             
             # 检查文件是否存在

+ 1 - 1
src/states/what_deconstruction_state.py

@@ -12,7 +12,7 @@ class WhatDeconstructionState(TypedDict, total=False):
 
     # ========== 输入数据 ==========
     video: Optional[str]  # 视频URL
-    channel_content_id: Optional[str]  # 视频内容ID
+    video_id: Optional[str]  # 视频ID
     title: Optional[str]  # 视频标题
     body_text: Optional[str]  # 视频正文
     text: Optional[Dict[str, Any]]  # {title, body, hashtags}(兼容格式)

+ 140 - 7
src/workflows/decode_workflow.py

@@ -2,7 +2,7 @@
 Decode Workflow.
 
 解码工作流:合并 What 解构工作流和脚本理解工作流的完整流程。
-流程:视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+流程:初始化数据库记录 → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
       段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
 """
 
@@ -21,6 +21,7 @@ from src.components.agents.key_points_agent import KeyPointsAgent
 from src.components.agents.script_section_division_agent import ScriptSectionDivisionAgent
 from src.components.agents.script_substance_extraction_agent import ScriptSubstanceExtractionAgent
 from src.components.agents.script_form_extraction_agent import ScriptFormExtractionAgent
+from src.models import get_db, DecodeVideo, DecodeStatus
 from src.utils.logger import get_logger
 
 logger = get_logger(__name__)
@@ -31,7 +32,7 @@ class DecodeWorkflow(BaseGraphAgent):
 
     功能:
     - 编排完整的解码流程(视频分析)
-    - 流程:视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+    - 流程:初始化数据库记录 → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
            段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总
     - 管理状态传递
     - 仅支持单视频输入
@@ -92,12 +93,13 @@ class DecodeWorkflow(BaseGraphAgent):
         """构建工作流图
 
         完整流程:
-        START → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
+        START → 初始化数据库记录 → 视频上传 → 灵感点提取 → 目的点提取 → 关键点提取 → 选题理解 → 
         段落划分 → 实质提取 → 形式提取 → 分离结果 → 结果汇总 → END
         """
         workflow = StateGraph(dict)  # 使用dict作为状态类型
 
         # 添加所有节点
+        workflow.add_node("init_db_record", self._init_db_record_node)
         workflow.add_node("video_upload", self._video_upload_node)
         # What解构节点
         workflow.add_node("inspiration_points_extraction", self._inspiration_points_node)
@@ -112,7 +114,9 @@ class DecodeWorkflow(BaseGraphAgent):
         workflow.add_node("result_aggregation", self._result_aggregation_node)
 
         # 定义流程的边
-        workflow.set_entry_point("video_upload")
+        workflow.set_entry_point("init_db_record")
+        # 数据库记录初始化后进入视频上传
+        workflow.add_edge("init_db_record", "video_upload")
         # 视频上传后使用条件边:成功则继续,失败则终止
         workflow.add_conditional_edges(
             "video_upload",
@@ -183,7 +187,7 @@ class DecodeWorkflow(BaseGraphAgent):
         workflow.add_edge("merge_all_results", "result_aggregation")
         workflow.add_edge("result_aggregation", END)
 
-        logger.info("工作流图构建完成 - 完整流程:视频上传 → What解构 → 脚本理解 → 结果汇总")
+        logger.info("工作流图构建完成 - 完整流程:初始化数据库记录 → 视频上传 → What解构 → 脚本理解 → 结果汇总")
 
         return workflow
 
@@ -206,6 +210,8 @@ class DecodeWorkflow(BaseGraphAgent):
             # 设置失败信息到状态中
             state["workflow_failed"] = True
             state["workflow_error"] = error_msg
+            # 更新数据库记录为失败状态
+            self._update_db_record_after_workflow(state, success=False, error_msg=error_msg)
             return "failure"
 
     def _check_critical_error(self, state: Dict[str, Any], error_source: str = "") -> bool:
@@ -326,6 +332,68 @@ class DecodeWorkflow(BaseGraphAgent):
 
         return state
 
+    def _init_db_record_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
+        """节点:初始化数据库记录
+        
+        根据 video_id 查询 decode_videos 表:
+        - 如果存在记录,则不新建,使用现有记录
+        - 如果不存在,则新建记录(状态为 EXECUTING)
+        """
+        logger.info("=== 执行节点:初始化数据库记录 ===")
+        
+        try:
+            video_id = state.get("video_id", "")
+            task_id = state.get("task_id")
+            
+            if not video_id:
+                logger.warning("未提供 video_id,跳过数据库记录初始化")
+                return state
+            
+            db = next(get_db())
+            try:
+                # 根据 video_id 查询是否已有记录
+                existing_record = db.query(DecodeVideo).filter_by(video_id=video_id).first()
+                
+                if existing_record:
+                    # 如果存在记录,使用现有的 task_id
+                    logger.info(f"找到已存在的数据库记录: task_id={existing_record.task_id}, video_id={video_id}")
+                    state["db_task_id"] = existing_record.task_id
+                    state["db_record_exists"] = True
+                    # 更新状态为执行中
+                    existing_record.update_status(DecodeStatus.EXECUTING)
+                    db.commit()
+                else:
+                    # 如果不存在,创建新记录
+                    # 如果没有提供 task_id,使用 video_id 的 hash 值作为 task_id
+                    if not task_id:
+                        import hashlib
+                        task_id = int(hashlib.md5(video_id.encode()).hexdigest()[:15], 16) % (10 ** 15)
+                        logger.info(f"未提供 task_id,自动生成: {task_id}")
+                    
+                    new_record = DecodeVideo.create(
+                        task_id=task_id,
+                        video_id=video_id,
+                        status=DecodeStatus.EXECUTING
+                    )
+                    db.add(new_record)
+                    db.commit()
+                    logger.info(f"创建新的数据库记录: task_id={task_id}, video_id={video_id}")
+                    state["db_task_id"] = task_id
+                    state["db_record_exists"] = False
+                    
+            except Exception as e:
+                logger.error(f"数据库操作失败: {e}", exc_info=True)
+                db.rollback()
+                # 数据库操作失败不影响 workflow 继续执行
+            finally:
+                db.close()
+                
+        except Exception as e:
+            logger.error(f"初始化数据库记录节点执行失败: {e}", exc_info=True)
+            # 数据库操作失败不影响 workflow 继续执行
+        
+        return state
+
     def _inspiration_points_node(self, state: Dict[str, Any]) -> Dict[str, Any]:
         """节点:灵感点提取(What解构)"""
         logger.info("=== 执行节点:灵感点提取 ===")
@@ -756,6 +824,57 @@ class DecodeWorkflow(BaseGraphAgent):
 
         return state
 
+    def _update_db_record_after_workflow(
+        self, 
+        state: Dict[str, Any], 
+        success: bool, 
+        final_result: Dict[str, Any] = None,
+        error_msg: str = None
+    ):
+        """工作流执行完毕后更新数据库记录
+        
+        Args:
+            state: 工作流执行后的状态
+            success: 是否成功
+            final_result: 最终结果(成功时使用)
+            error_msg: 错误信息(失败时使用)
+        """
+        try:
+            task_id = state.get("db_task_id")
+            if not task_id:
+                logger.warning("未找到 db_task_id,跳过数据库记录更新")
+                return
+            
+            db = next(get_db())
+            try:
+                record = db.query(DecodeVideo).filter_by(task_id=task_id).first()
+                if not record:
+                    logger.warning(f"未找到 task_id={task_id} 的数据库记录,跳过更新")
+                    return
+                
+                if success:
+                    # 更新为成功状态
+                    import json
+                    result_json = json.dumps(final_result, ensure_ascii=False) if final_result else None
+                    record.update_status(DecodeStatus.SUCCESS)
+                    record.update_result(result_json)
+                    logger.info(f"更新数据库记录为成功: task_id={task_id}")
+                else:
+                    # 更新为失败状态
+                    record.update_status(DecodeStatus.FAILED, error_reason=error_msg)
+                    logger.info(f"更新数据库记录为失败: task_id={task_id}, error={error_msg}")
+                
+                db.commit()
+                
+            except Exception as e:
+                logger.error(f"更新数据库记录失败: {e}", exc_info=True)
+                db.rollback()
+            finally:
+                db.close()
+                
+        except Exception as e:
+            logger.error(f"更新数据库记录节点执行失败: {e}", exc_info=True)
+
     def invoke(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
         """执行工作流(公共接口)- 视频分析版本
 
@@ -769,7 +888,7 @@ class DecodeWorkflow(BaseGraphAgent):
             self.initialize()
 
         # 验证输入参数
-        video_url = input_data.get("video", "")
+        video_url = input_data.get("video_", "")
         if not video_url:
             error_msg = "未提供视频URL,无法执行工作流"
             logger.error(error_msg)
@@ -782,7 +901,7 @@ class DecodeWorkflow(BaseGraphAgent):
         # 初始化状态(包含视频信息,供视频上传和后续Agent使用)
         initial_state = {
             "video": video_url,
-            "channel_content_id": input_data.get("channel_content_id", ""),
+            "video_id": input_data.get("video_id", ""),
             "title": input_data.get("title", ""),
             "current_depth": 0,
             "max_depth": self.max_depth,
@@ -790,11 +909,16 @@ class DecodeWorkflow(BaseGraphAgent):
         }
 
         # 执行工作流
+        result = None
         try:
             result = self.compiled_graph.invoke(initial_state)
         except Exception as e:
             error_msg = f"工作流执行异常: {str(e)}"
             logger.error(error_msg, exc_info=True)
+            # 更新数据库记录为失败状态(使用 initial_state 作为 fallback)
+            if result is None:
+                result = initial_state
+            self._update_db_record_after_workflow(result, success=False, error_msg=error_msg)
             return {
                 "error": error_msg,
                 "workflow_status": "failed",
@@ -805,6 +929,8 @@ class DecodeWorkflow(BaseGraphAgent):
         if result.get("workflow_failed"):
             error_msg = result.get("workflow_error", "工作流执行失败")
             logger.error(f"工作流因错误而终止: {error_msg}")
+            # 更新数据库记录为失败状态
+            self._update_db_record_after_workflow(result, success=False, error_msg=error_msg)
             return {
                 "error": error_msg,
                 "video_upload_error": result.get("video_upload_error"),
@@ -819,18 +945,25 @@ class DecodeWorkflow(BaseGraphAgent):
             if result.get("workflow_error"):
                 error_msg = result.get("workflow_error", "工作流执行失败,未生成结果")
                 logger.error(f"工作流执行失败: {error_msg}")
+                # 更新数据库记录为失败状态
+                self._update_db_record_after_workflow(result, success=False, error_msg=error_msg)
                 return {
                     "error": error_msg,
                     "workflow_status": "failed"
                 }
             else:
                 logger.warning("工作流执行完成,但未生成最终结果")
+                # 更新数据库记录为失败状态
+                self._update_db_record_after_workflow(result, success=False, error_msg="工作流执行完成,但未生成最终结果")
                 return {
                     "error": "工作流执行完成,但未生成最终结果",
                     "workflow_status": "incomplete",
                     "state": result
                 }
 
+        # 工作流执行成功,更新数据库记录
+        self._update_db_record_after_workflow(result, success=True, final_result=final_result)
+
         logger.info("=== 解码工作流执行完成(视频分析) ===")
 
         return final_result

+ 2 - 2
src/workflows/script_workflow_v2.py

@@ -263,7 +263,7 @@ class ScriptWorkflowV2(BaseGraphAgent):
 
         只需要传入:
         - video: 原始视频地址或本地路径(由 VideoUploadFunction 处理)
-        - channel_content_id: 可选,外部业务 ID
+        - video_id: 可选,外部业务 ID(兼容旧字段名 channel_content_id)
         """
         logger.info("=== 开始执行 ScriptWorkflowV2(视频 → L3 单元 → L1/L2 整体解构) ===")
 
@@ -273,7 +273,7 @@ class ScriptWorkflowV2(BaseGraphAgent):
         # 初始化最小状态(仅保留必须字段)
         initial_state = {
             "video": input_data.get("video", ""),
-            "channel_content_id": input_data.get("channel_content_id", ""),
+            "video_id": input_data.get("video_id", "") or input_data.get("channel_content_id", ""),  # 兼容旧字段名
         }
 
         result_state = self.compiled_graph.invoke(initial_state)