#!/usr/bin/env python # -*- coding: utf-8 -*- """ 批量运行解码工作流 (DecodeWorkflow)。 读取 examples/demo.json 中的视频列表, 逐一调用 DecodeWorkflow 进行处理, 并将结果输出到 examples/output_decode_result.json。 """ import json import sys from datetime import datetime from pathlib import Path from typing import Dict, Any, List # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.workflows.decode_workflow import DecodeWorkflow from src.utils.logger import get_logger logger = get_logger(__name__) def load_json(path: Path) -> List[Dict[str, Any]]: """加载JSON文件""" if not path.exists(): return [] with path.open("r", encoding="utf-8") as f: data = json.load(f) # 如果是字典且有 results 字段,提取 results if isinstance(data, dict) and "results" in data: return data["results"] # 如果是列表,直接返回 elif isinstance(data, list): return data else: return [] def save_json(path: Path, data: Dict[str, Any]) -> None: """保存JSON文件(使用临时文件确保原子性)""" tmp_path = path.with_suffix(".tmp") with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) tmp_path.replace(path) def build_decode_input(video_data: Dict[str, Any]) -> Dict[str, Any]: """根据视频数据构造 DecodeWorkflow 的输入结构""" return { "video": video_data.get("video", ""), "channel_content_id": video_data.get("channel_content_id", ""), "title": video_data.get("title", ""), "body_text": video_data.get("body_text", ""), } def main() -> None: """主函数""" base_dir = Path(__file__).parent input_path = base_dir / "demo.json" output_path = base_dir / "output_decode_result.json" if not input_path.exists(): raise FileNotFoundError(f"找不到输入文件: {input_path}") # 读取视频列表 video_list = load_json(input_path) if not video_list: logger.warning(f"输入文件 {input_path} 中没有视频数据") return logger.info(f"共读取到 {len(video_list)} 个视频") # 读取已有的输出结果,支持增量追加 output_data = {} if output_path.exists(): try: with output_path.open("r", encoding="utf-8") as f: output_data = json.load(f) except Exception as e: logger.warning(f"读取已有输出文件失败,将创建新文件: {e}") output_data = {} if not output_data: output_data = { "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total": 0, "success_count": 0, "fail_count": 0, "results": [], } existing_results: List[Dict[str, Any]] = output_data.get("results", []) or [] # 用 channel_content_id + video URL 去重,避免重复处理 processed_keys = { f"{item.get('video_data', {}).get('channel_content_id', '')}|" f"{item.get('video_data', {}).get('video', '')}" for item in existing_results } # 初始化工作流 logger.info("初始化 DecodeWorkflow...") workflow = DecodeWorkflow() logger.info("DecodeWorkflow 初始化完成") # 处理每个视频 for idx, video_data in enumerate(video_list, 1): video_url = video_data.get("video", "") channel_content_id = video_data.get("channel_content_id", "") title = video_data.get("title", "") # 生成唯一键用于去重 key = f"{channel_content_id}|{video_url}" if key in processed_keys: logger.info(f"[{idx}/{len(video_list)}] 已处理过该视频,跳过: channel_content_id={channel_content_id}") continue logger.info( f"[{idx}/{len(video_list)}] 开始处理视频: " f"channel_content_id={channel_content_id}, title={title[:50]}..." ) try: # 构建输入数据 decode_input = build_decode_input(video_data) # 调用工作流 decode_result = workflow.invoke(decode_input) # 检查workflow返回结果中是否包含错误 if isinstance(decode_result, dict): # 检查是否有错误字段(支持多种错误字段名) error_msg = ( decode_result.get("error") or decode_result.get("错误") or decode_result.get("workflow_error") ) workflow_status = decode_result.get("workflow_status") # 如果返回了错误信息,视为失败 if error_msg or workflow_status == "failed" or workflow_status == "incomplete": error_msg = error_msg or "工作流执行失败" logger.error( f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={error_msg}" ) record = { "video_data": video_data, "what_deconstruction_result": None, "script_result": None, "success": False, "error": error_msg, } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 save_json(output_path, output_data) continue # 检查结果是否为空(可能表示失败) # 如果所有关键字段都为空,可能表示处理失败 video_info = decode_result.get("视频信息", {}) three_points = decode_result.get("三点解构", {}) topic_understanding = decode_result.get("选题理解", {}) script_understanding = decode_result.get("脚本理解", {}) # 如果所有关键结果都为空,且没有明确的成功标志,视为失败 if (not video_info and not three_points and not topic_understanding and not script_understanding): error_msg = "工作流执行完成,但所有结果都为空" logger.warning( f"[{idx}/{len(video_list)}] 处理结果为空: channel_content_id={channel_content_id}" ) # 这里可以选择记录为失败或警告,根据业务需求决定 # 暂时记录为失败 record = { "video_data": video_data, "what_deconstruction_result": None, "script_result": None, "success": False, "error": error_msg, } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 save_json(output_path, output_data) continue # 按照 output_demo_script.json 的格式组织结果 # what_deconstruction_result: 包含视频信息、三点解构、选题理解 what_deconstruction_result = { "视频信息": decode_result.get("视频信息", {}), "三点解构": decode_result.get("三点解构", {}), "选题理解": decode_result.get("选题理解", {}), } # script_result: 包含选题描述和脚本理解 # 从选题理解中提取选题描述 topic_understanding = decode_result.get("选题理解", {}) selected_topic = {} if isinstance(topic_understanding, dict): if "选题" in topic_understanding: selected_topic = topic_understanding.get("选题", {}) else: selected_topic = { "主题": topic_understanding.get("主题", ""), "描述": topic_understanding.get("描述", ""), } script_result = { "选题描述": selected_topic, "脚本理解": decode_result.get("脚本理解", {}), } # 构造结果记录(参考 output_demo_script.json 格式) record = { "video_data": video_data, "what_deconstruction_result": what_deconstruction_result, "script_result": script_result, "success": True, "error": None, } output_data["success_count"] = output_data.get("success_count", 0) + 1 logger.info( f"[{idx}/{len(video_list)}] 处理成功: channel_content_id={channel_content_id}" ) except Exception as e: logger.error( f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={e}", exc_info=True ) record = { "video_data": video_data, "what_deconstruction_result": None, "script_result": None, "success": False, "error": str(e), } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 # 处理完一条就保存一次,避免长任务中途失败导致全部丢失 save_json(output_path, output_data) logger.info(f"结果已保存到 {output_path}") logger.info( f"\n{'='*60}\n" f"批量解码完成:\n" f" 总计: {output_data.get('total', 0)}\n" f" 成功: {output_data.get('success_count', 0)}\n" f" 失败: {output_data.get('fail_count', 0)}\n" f" 输出文件: {output_path}\n" f"{'='*60}" ) if __name__ == "__main__": main()