| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 批量运行脚本理解工作流 (ScriptWorkflow)。
- 读取 examples/output_demo.json 中三点解构的结果,
- 将每条里的 video_data 和 result 组合后传入 ScriptWorkflow,
- 并将脚本理解结果增量写入 examples/output_demo_script.json。
- """
- import json
- import os
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Dict, Any, List
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from src.workflows.script_workflow import ScriptWorkflow
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- def load_json(path: Path) -> Dict[str, Any]:
- if not path.exists():
- return {}
- with path.open("r", encoding="utf-8") as f:
- return json.load(f)
- def save_json(path: Path, data: Dict[str, Any]) -> None:
- tmp_path = path.with_suffix(".tmp")
- with tmp_path.open("w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- tmp_path.replace(path)
- def build_script_input(video_data: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
- """根据 output_demo.json 单条结果构造 ScriptWorkflow 的输入结构。"""
- video_info = result.get("视频信息", {}) or {}
- three_points = result.get("三点解构", {}) or {}
- topic_understanding = result.get("选题理解", {}) or {}
- # 基本视频信息
- video_url = video_data.get("video") or video_info.get("视频URL", "")
- channel_content_id = video_data.get("channel_content_id", "")
- title = video_data.get("title") or video_info.get("标题", "")
- body_text = video_info.get("正文", "")
- # 三点解构映射到脚本理解需要的字段
- inspiration_points: List[Dict[str, Any]] = three_points.get("灵感点", []) or []
- purpose_block = three_points.get("目的点", {}) or {}
- purpose_points: List[Dict[str, Any]] = purpose_block.get("purposes", []) or []
- key_points_block = three_points.get("关键点", {}) or {}
- key_points: List[Dict[str, Any]] = key_points_block.get("key_points", []) or []
- input_data: Dict[str, Any] = {
- "video": video_url,
- "channel_content_id": channel_content_id,
- "text": {
- "title": title,
- "body": body_text,
- },
- "topic_selection_understanding": topic_understanding,
- "content_weight": {}, # 目前没有对应数据,留空
- "inspiration_points": inspiration_points,
- "purpose_points": purpose_points,
- "key_points": key_points,
- }
- return input_data
- def main() -> None:
- base_dir = Path(__file__).parent
- input_path = base_dir / "output_demo.json"
- output_path = base_dir / "output_demo_script.json"
- if not input_path.exists():
- raise FileNotFoundError(f"找不到输入文件: {input_path}")
- # 读取原始三点解构结果
- raw = load_json(input_path)
- raw_results: List[Dict[str, Any]] = raw.get("results", []) or []
- # 读取已有的脚本理解输出,支持增量追加
- output_data = load_json(output_path)
- if not output_data:
- output_data = {
- "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
- "total": 0,
- "success_count": 0,
- "fail_count": 0,
- "results": [],
- }
- existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
- # 用 channel_content_id + video URL 去重,避免重复处理
- processed_keys = {
- f"{item.get('video_data', {}).get('channel_content_id','')}|"
- f"{item.get('video_data', {}).get('video','')}"
- for item in existing_results
- }
- workflow = ScriptWorkflow()
- for item in raw_results:
- video_data = item.get("video_data", {}) or {}
- result = item.get("result", {}) or {}
- key = f"{video_data.get('channel_content_id','')}|{video_data.get('video','')}"
- if key in processed_keys:
- logger.info(f"已处理过该视频,跳过: {key}")
- continue
- logger.info(f"处理视频: channel_content_id={video_data.get('channel_content_id')} title={video_data.get('title')}")
- try:
- script_input = build_script_input(video_data, result)
- script_result = workflow.invoke(script_input)
- record = {
- "video_data": video_data,
- "what_deconstruction_result": result,
- "script_result": script_result,
- "success": True,
- "error": None,
- }
- output_data["success_count"] = output_data.get("success_count", 0) + 1
- except Exception as e:
- logger.error(f"脚本理解处理失败: {e}", exc_info=True)
- record = {
- "video_data": video_data,
- "what_deconstruction_result": result,
- "script_result": None,
- "success": False,
- "error": str(e),
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
- save_json(output_path, output_data)
- logger.info(
- f"批量脚本理解完成: total={output_data.get('total')}, "
- f"success={output_data.get('success_count')}, fail={output_data.get('fail_count')}"
- )
- if __name__ == "__main__":
- main()
- # 脚本解构
|