| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 批量运行脚本解构工作流 V2 (ScriptWorkflowV2)。
- 读取 examples/demo.json 中的视频列表,
- 将每条里的 video / channel_content_id 传入 ScriptWorkflowV2,
- 并将解构结果逐条写入 examples/output_demo_script_v2.json(增量写入)。
- """
- import json
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Dict, Any, List
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from src.workflows.script_workflow_v2 import ScriptWorkflowV2
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- def load_json(path: Path):
- """通用 JSON 读取,支持列表或字典。"""
- if not path.exists():
- return None
- with path.open("r", encoding="utf-8") as f:
- return json.load(f)
- def save_json(path: Path, data) -> None:
- """安全写入 JSON(先写临时文件,再替换)。"""
- tmp_path = path.with_suffix(".tmp")
- with tmp_path.open("w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- tmp_path.replace(path)
- def main() -> None:
- base_dir = Path(__file__).parent
- input_path = base_dir / "demo.json"
- output_path = base_dir / "output_demo_script_v2.json"
- if not input_path.exists():
- raise FileNotFoundError(f"找不到输入文件: {input_path}")
- # 读取 demo.json(为一个列表)
- raw_list = load_json(input_path)
- if not isinstance(raw_list, list):
- raise ValueError(f"demo.json 格式错误,应为列表,实际类型: {type(raw_list)}")
- # 读取已有输出,支持增量
- output_data = load_json(output_path)
- if not output_data:
- output_data = {
- "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
- "total": 0,
- "success_count": 0,
- "fail_count": 0,
- "results": [],
- }
- existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
- # 用 channel_content_id + video URL 去重,避免重复处理
- processed_keys = {
- f"{item.get('video_data', {}).get('channel_content_id','')}|"
- f"{item.get('video_data', {}).get('video','')}"
- for item in existing_results
- }
- workflow = ScriptWorkflowV2()
- for item in raw_list:
- video_data = item or {}
- channel_content_id = video_data.get("channel_content_id", "")
- video_url = video_data.get("video", "")
- key = f"{channel_content_id}|{video_url}"
- if key in processed_keys:
- logger.info(f"已处理过该视频,跳过: {key}")
- continue
- logger.info(
- f"处理视频: channel_content_id={channel_content_id} title={video_data.get('title','')}"
- )
- try:
- # ScriptWorkflowV2 只需要 video 和 channel_content_id
- script_input = {
- "video": video_url,
- "channel_content_id": channel_content_id,
- }
- script_result = workflow.invoke(script_input)
- record = {
- "video_data": video_data,
- "script_result": script_result,
- "success": True,
- "error": None,
- }
- output_data["success_count"] = output_data.get("success_count", 0) + 1
- except Exception as e:
- logger.error(f"脚本解构 V2 处理失败: {e}", exc_info=True)
- record = {
- "video_data": video_data,
- "script_result": None,
- "success": False,
- "error": str(e),
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
- save_json(output_path, output_data)
- logger.info(
- f"批量脚本解构 V2 完成: total={output_data.get('total')}, "
- f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}"
- )
- if __name__ == "__main__":
- main()
|