#!/usr/bin/env python # -*- coding: utf-8 -*- """ 批量运行脚本理解工作流 (ScriptWorkflow)。 读取 examples/output_demo.json 中三点解构的结果, 将每条里的 video_data 和 result 组合后传入 ScriptWorkflow, 并将脚本理解结果增量写入 examples/output_demo_script.json。 """ import json import os import sys from datetime import datetime from pathlib import Path from typing import Dict, Any, List # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.workflows.script_workflow import ScriptWorkflow from src.utils.logger import get_logger logger = get_logger(__name__) def load_json(path: Path) -> Dict[str, Any]: if not path.exists(): return {} with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: Path, data: Dict[str, Any]) -> None: tmp_path = path.with_suffix(".tmp") with tmp_path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) tmp_path.replace(path) def build_script_input(video_data: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]: """根据 output_demo.json 单条结果构造 ScriptWorkflow 的输入结构。""" video_info = result.get("视频信息", {}) or {} three_points = result.get("三点解构", {}) or {} topic_understanding = result.get("选题理解", {}) or {} # 基本视频信息 video_url = video_data.get("video") or video_info.get("视频URL", "") channel_content_id = video_data.get("channel_content_id", "") title = video_data.get("title") or video_info.get("标题", "") body_text = video_info.get("正文", "") # 三点解构映射到脚本理解需要的字段 inspiration_points: List[Dict[str, Any]] = three_points.get("灵感点", []) or [] purpose_block = three_points.get("目的点", {}) or {} purpose_points: List[Dict[str, Any]] = purpose_block.get("purposes", []) or [] key_points_block = three_points.get("关键点", {}) or {} key_points: List[Dict[str, Any]] = key_points_block.get("key_points", []) or [] input_data: Dict[str, Any] = { "video": video_url, "channel_content_id": channel_content_id, "text": { "title": title, "body": body_text, }, "topic_selection_understanding": topic_understanding, "content_weight": {}, # 目前没有对应数据,留空 "inspiration_points": inspiration_points, "purpose_points": purpose_points, "key_points": key_points, } return input_data def main() -> None: base_dir = Path(__file__).parent input_path = base_dir / "output_demo.json" output_path = base_dir / "output_demo_script.json" if not input_path.exists(): raise FileNotFoundError(f"找不到输入文件: {input_path}") # 读取原始三点解构结果 raw = load_json(input_path) raw_results: List[Dict[str, Any]] = raw.get("results", []) or [] # 读取已有的脚本理解输出,支持增量追加 output_data = load_json(output_path) if not output_data: output_data = { "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "total": 0, "success_count": 0, "fail_count": 0, "results": [], } existing_results: List[Dict[str, Any]] = output_data.get("results", []) or [] # 用 channel_content_id + video URL 去重,避免重复处理 processed_keys = { f"{item.get('video_data', {}).get('channel_content_id','')}|" f"{item.get('video_data', {}).get('video','')}" for item in existing_results } workflow = ScriptWorkflow() for item in raw_results: video_data = item.get("video_data", {}) or {} result = item.get("result", {}) or {} key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}" if key in processed_keys: logger.info(f"已处理过该视频,跳过: {key}") continue logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}") try: script_input = build_script_input(video_data, result) script_result = workflow.invoke(script_input) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": script_result, "success": True, "error": None, } output_data["success_count"] = output_data.get("success_count", 0) + 1 except Exception as e: logger.error(f"脚本理解处理失败: {e}", exc_info=True) record = { "video_data": video_data, "what_deconstruction_result": result, "script_result": None, "success": False, "error": str(e), } output_data["fail_count"] = output_data.get("fail_count", 0) + 1 output_data["results"].append(record) output_data["total"] = output_data.get("total", 0) + 1 # 处理完一条就保存一次,避免长任务中途失败导致全部丢失 save_json(output_path, output_data) logger.info( f"批量脚本理解完成: total={output_data.get('total')}, " f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}" ) if __name__ == "__main__": main() # 脚本解构