#!/usr/bin/env python # -*- coding: utf-8 -*- """ 批量运行脚本解构工作流 V2 (ScriptWorkflowV2)。 读取 examples/demo.json 中的视频列表, 将每条里的 video / channel_content_id 传入 ScriptWorkflowV2, 并将解构结果逐条写入 examples/output_demo_script_v2.json(增量写入)。 """ import json import sys from datetime import datetime from pathlib import Path from typing import Dict, Any, List # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.workflows.script_workflow_v2 import ScriptWorkflowV2 from src.utils.logger import get_logger logger = get_logger(__name__) def load_json(path: Path): """通用 JSON 读取,支持列表或字典。""" if not path.exists(): return None with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data) -> None: """安全写入 JSON(先写临时文件,再替换)。""" tmp_path = path.with_suffix(".tmp") with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) tmp_path.replace(path) def main() -> None: base_dir = Path(__file__).parent input_path = base_dir / "demo.json" output_path = base_dir / "output_demo_script_v2.json" if not input_path.exists(): raise FileNotFoundError(f"找不到输入文件: {input_path}") # 读取 demo.json(为一个列表) raw_list = load_json(input_path) if not isinstance(raw_list, list): raise ValueError(f"demo.json 格式错误,应为列表,实际类型: {type(raw_list)}") # 读取已有输出,支持增量 output_data = load_json(output_path) if not output_data: output_data = { "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total": 0, "success_count": 0, "fail_count": 0, "results": [], } existing_results: List[Dict[str, Any]] = output_data.get("results", []) or [] # 用 channel_content_id + video URL 去重,避免重复处理 processed_keys = { f"{item.get('video_data', {}).get('channel_content_id','')}|" f"{item.get('video_data', {}).get('video','')}" for item in existing_results } workflow = ScriptWorkflowV2() for item in raw_list: video_data = item or {} channel_content_id = video_data.get("channel_content_id", "") video_url = video_data.get("video", "") key = f"{channel_content_id}|{video_url}" if key in processed_keys: logger.info(f"已处理过该视频,跳过: {key}") continue logger.info( f"处理视频: channel_content_id={channel_content_id} title={video_data.get('title','')}" ) try: # ScriptWorkflowV2 只需要 video 和 channel_content_id script_input = { "video": video_url, "channel_content_id": channel_content_id, } script_result = workflow.invoke(script_input) record = { "video_data": video_data, "script_result": script_result, "success": True, "error": None, } output_data["success_count"] = output_data.get("success_count", 0) + 1 except Exception as e: logger.error(f"脚本解构 V2 处理失败: {e}", exc_info=True) record = { "video_data": video_data, "script_result": None, "success": False, "error": str(e), } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 # 处理完一条就保存一次,避免长任务中途失败导致全部丢失 save_json(output_path, output_data) logger.info( f"批量脚本解构 V2 完成: total={output_data.get('total')}, " f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}" ) if __name__ == "__main__": main()