| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 批量运行解码工作流 (DecodeWorkflow)。
- 读取 examples/demo.json 中的视频列表,
- 逐一调用 DecodeWorkflow 进行处理,
- 并将结果输出到 examples/output_decode_result.json。
- """
- import json
- import sys
- from datetime import datetime
- from pathlib import Path
- from typing import Dict, Any, List
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- from src.workflows.decode_workflow import DecodeWorkflow
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- def load_json(path: Path) -> List[Dict[str, Any]]:
- """加载JSON文件"""
- if not path.exists():
- return []
- with path.open("r", encoding="utf-8") as f:
- data = json.load(f)
- # 如果是字典且有 results 字段,提取 results
- if isinstance(data, dict) and "results" in data:
- return data["results"]
- # 如果是列表,直接返回
- elif isinstance(data, list):
- return data
- else:
- return []
- def save_json(path: Path, data: Dict[str, Any]) -> None:
- """保存JSON文件(使用临时文件确保原子性)"""
- tmp_path = path.with_suffix(".tmp")
- with tmp_path.open("w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- tmp_path.replace(path)
- def build_decode_input(video_data: Dict[str, Any]) -> Dict[str, Any]:
- """根据视频数据构造 DecodeWorkflow 的输入结构"""
- return {
- "video": video_data.get("video", ""),
- "channel_content_id": video_data.get("channel_content_id", ""),
- "title": video_data.get("title", ""),
- "body_text": video_data.get("body_text", ""),
- }
- def main() -> None:
- """主函数"""
- base_dir = Path(__file__).parent
- input_path = base_dir / "demo.json"
- output_path = base_dir / "output_decode_result.json"
- if not input_path.exists():
- raise FileNotFoundError(f"找不到输入文件: {input_path}")
- # 读取视频列表
- video_list = load_json(input_path)
- if not video_list:
- logger.warning(f"输入文件 {input_path} 中没有视频数据")
- return
- logger.info(f"共读取到 {len(video_list)} 个视频")
- # 读取已有的输出结果,支持增量追加
- output_data = {}
- if output_path.exists():
- try:
- with output_path.open("r", encoding="utf-8") as f:
- output_data = json.load(f)
- except Exception as e:
- logger.warning(f"读取已有输出文件失败,将创建新文件: {e}")
- output_data = {}
- if not output_data:
- output_data = {
- "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
- "total": 0,
- "success_count": 0,
- "fail_count": 0,
- "results": [],
- }
- existing_results: List[Dict[str, Any]] = output_data.get("results", []) or []
- # 用 channel_content_id + video URL 去重,避免重复处理
- processed_keys = {
- f"{item.get('video_data', {}).get('channel_content_id', '')}|"
- f"{item.get('video_data', {}).get('video', '')}"
- for item in existing_results
- }
- # 初始化工作流
- logger.info("初始化 DecodeWorkflow...")
- workflow = DecodeWorkflow()
- logger.info("DecodeWorkflow 初始化完成")
- # 处理每个视频
- for idx, video_data in enumerate(video_list, 1):
- video_url = video_data.get("video", "")
- channel_content_id = video_data.get("channel_content_id", "")
- title = video_data.get("title", "")
- # 生成唯一键用于去重
- key = f"{channel_content_id}|{video_url}"
- if key in processed_keys:
- logger.info(f"[{idx}/{len(video_list)}] 已处理过该视频,跳过: channel_content_id={channel_content_id}")
- continue
- logger.info(
- f"[{idx}/{len(video_list)}] 开始处理视频: "
- f"channel_content_id={channel_content_id}, title={title[:50]}..."
- )
- try:
- # 构建输入数据
- decode_input = build_decode_input(video_data)
- # 调用工作流
- decode_result = workflow.invoke(decode_input)
- # 检查workflow返回结果中是否包含错误
- if isinstance(decode_result, dict):
- # 检查是否有错误字段(支持多种错误字段名)
- error_msg = (
- decode_result.get("error") or
- decode_result.get("错误") or
- decode_result.get("workflow_error")
- )
- workflow_status = decode_result.get("workflow_status")
-
- # 如果返回了错误信息,视为失败
- if error_msg or workflow_status == "failed" or workflow_status == "incomplete":
- error_msg = error_msg or "工作流执行失败"
- logger.error(
- f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={error_msg}"
- )
- record = {
- "video_data": video_data,
- "what_deconstruction_result": None,
- "script_result": None,
- "success": False,
- "error": error_msg,
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- save_json(output_path, output_data)
- continue
-
- # 检查结果是否为空(可能表示失败)
- # 如果所有关键字段都为空,可能表示处理失败
- video_info = decode_result.get("视频信息", {})
- three_points = decode_result.get("三点解构", {})
- topic_understanding = decode_result.get("选题理解", {})
- script_understanding = decode_result.get("脚本理解", {})
-
- # 如果所有关键结果都为空,且没有明确的成功标志,视为失败
- if (not video_info and not three_points and
- not topic_understanding and not script_understanding):
- error_msg = "工作流执行完成,但所有结果都为空"
- logger.warning(
- f"[{idx}/{len(video_list)}] 处理结果为空: channel_content_id={channel_content_id}"
- )
- # 这里可以选择记录为失败或警告,根据业务需求决定
- # 暂时记录为失败
- record = {
- "video_data": video_data,
- "what_deconstruction_result": None,
- "script_result": None,
- "success": False,
- "error": error_msg,
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- save_json(output_path, output_data)
- continue
- # 按照 output_demo_script.json 的格式组织结果
- # what_deconstruction_result: 包含视频信息、三点解构、选题理解
- what_deconstruction_result = {
- "视频信息": decode_result.get("视频信息", {}),
- "三点解构": decode_result.get("三点解构", {}),
- "选题理解": decode_result.get("选题理解", {}),
- }
-
- # script_result: 包含选题描述和脚本理解
- # 从选题理解中提取选题描述
- topic_understanding = decode_result.get("选题理解", {})
- selected_topic = {}
- if isinstance(topic_understanding, dict):
- if "选题" in topic_understanding:
- selected_topic = topic_understanding.get("选题", {})
- else:
- selected_topic = {
- "主题": topic_understanding.get("主题", ""),
- "描述": topic_understanding.get("描述", ""),
- }
-
- script_result = {
- "选题描述": selected_topic,
- "脚本理解": decode_result.get("脚本理解", {}),
- }
- # 构造结果记录(参考 output_demo_script.json 格式)
- record = {
- "video_data": video_data,
- "what_deconstruction_result": what_deconstruction_result,
- "script_result": script_result,
- "success": True,
- "error": None,
- }
- output_data["success_count"] = output_data.get("success_count", 0) + 1
- logger.info(
- f"[{idx}/{len(video_list)}] 处理成功: channel_content_id={channel_content_id}"
- )
- except Exception as e:
- logger.error(
- f"[{idx}/{len(video_list)}] 处理失败: channel_content_id={channel_content_id}, error={e}",
- exc_info=True
- )
- record = {
- "video_data": video_data,
- "what_deconstruction_result": None,
- "script_result": None,
- "success": False,
- "error": str(e),
- }
- output_data["fail_count"] = output_data.get("fail_count", 0) + 1
- output_data["results"].append(record)
- output_data["total"] = output_data.get("total", 0) + 1
- # 处理完一条就保存一次,避免长任务中途失败导致全部丢失
- save_json(output_path, output_data)
- logger.info(f"结果已保存到 {output_path}")
- logger.info(
- f"\n{'='*60}\n"
- f"批量解码完成:\n"
- f" 总计: {output_data.get('total', 0)}\n"
- f" 成功: {output_data.get('success_count', 0)}\n"
- f" 失败: {output_data.get('fail_count', 0)}\n"
- f" 输出文件: {output_path}\n"
- f"{'='*60}"
- )
- if __name__ == "__main__":
- main()
|