""" 批量处理脚本:读取demo.json,批量处理视频 功能: 1. 读取demo.json文件 2. 使用run_single.py同样的方法处理每个视频 3. 每处理完一个视频立即写入结果到output_demo.json文件(实时保存) """ import json import sys import os from pathlib import Path from datetime import datetime # 添加项目根目录到路径 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) # 手动加载.env文件 def load_env_file(env_path): """手动加载.env文件""" if not env_path.exists(): return False with open(env_path, 'r') as f: for line in f: line = line.strip() # 跳过注释和空行 if not line or line.startswith('#'): continue # 解析KEY=VALUE if '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() return True env_path = project_root / ".env" if load_env_file(env_path): print(f"✅ 已加载环境变量从: {env_path}") # 验证API密钥 api_key = os.environ.get("GEMINI_API_KEY", "") if api_key: print(f" GEMINI_API_KEY: {api_key[:10]}...") else: print(f"⚠️ 未找到.env文件: {env_path}") from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow from src.utils.logger import get_logger logger = get_logger(__name__) def convert_to_workflow_input(raw_data): """ 将原始数据转换为工作流输入格式(视频分析版本) Args: raw_data: 原始帖子数据(视频格式) """ # 视频分析版本:直接使用视频URL和文本信息 input_data = { "video": raw_data.get("video", ""), "channel_content_id": raw_data.get("channel_content_id", ""), "title": raw_data.get("title", ""), "body_text": raw_data.get("body_text", ""), } return input_data def load_existing_results(output_path): """ 加载已有的结果文件(如果存在) Args: output_path: 结果文件路径 Returns: 已有结果数据,如果文件不存在则返回None """ if not output_path.exists(): return None try: with open(output_path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"⚠️ 读取已有结果文件失败(将重新创建): {e}") return None def save_result(output_path, results, timestamp, total, success_count, fail_count): """ 保存结果到文件 Args: output_path: 结果文件路径 results: 结果列表 timestamp: 时间戳 total: 总数 success_count: 成功数 fail_count: 失败数 """ output_data = { "timestamp": timestamp, "total": total, "success_count": success_count, "fail_count": fail_count, "results": results } try: with open(output_path, "w", encoding="utf-8") as f: json.dump(output_data, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"❌ 保存结果失败: {e}") return False def process_single_video(workflow, video_data, index, total): """ 处理单个视频 Args: workflow: WhatDeconstructionWorkflow实例 video_data: 视频数据字典 index: 当前索引(从1开始) total: 总数 Returns: 处理结果字典,包含原始数据和结果 """ channel_content_id = video_data.get("channel_content_id", "unknown") title = video_data.get("title", "") print(f"\n{'=' * 80}") print(f"[{index}/{total}] 处理视频: {channel_content_id}") print(f"标题: {title}") print(f"{'=' * 80}") # 转换数据格式 try: input_data = convert_to_workflow_input(video_data) print(f"✅ 数据格式转换成功") except Exception as e: print(f"❌ 数据格式转换失败: {e}") return { "video_data": video_data, "success": False, "error": f"数据格式转换失败: {e}", "result": None } # 执行工作流 print(f" 开始执行工作流(这可能需要几分钟时间)...") try: result = workflow.invoke(input_data) print(f"✅ 工作流执行成功") return { "video_data": video_data, "success": True, "error": None, "result": result } except Exception as e: print(f"❌ 工作流执行失败: {e}") import traceback traceback.print_exc() return { "video_data": video_data, "success": False, "error": f"工作流执行失败: {e}", "result": None } def main(): """主函数""" print("=" * 80) print("批量处理视频 - What 解构工作流(视频分析版本)") print("=" * 80) # 1. 读取demo.json print("\n[1] 读取demo.json...") demo_json_path = Path(__file__).parent / "demo.json" if not demo_json_path.exists(): print(f"❌ 未找到demo.json文件: {demo_json_path}") return try: with open(demo_json_path, "r", encoding="utf-8") as f: video_list = json.load(f) print(f"✅ 成功读取demo.json,共 {len(video_list)} 个视频") except Exception as e: print(f"❌ 读取demo.json失败: {e}") return # 2. 初始化工作流 print("\n[2] 初始化工作流...") try: workflow = WhatDeconstructionWorkflow( model_provider="google_genai", max_depth=10 ) print(f"✅ 工作流初始化成功") except Exception as e: print(f"❌ 工作流初始化失败: {e}") import traceback traceback.print_exc() return # 3. 准备结果文件路径和时间戳 print("\n[3] 准备结果文件...") output_path = Path(__file__).parent / "output_demo.json" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 检查是否已有结果文件(用于提示) existing_results = load_existing_results(output_path) if existing_results: print(f"⚠️ 检测到已有结果文件: {output_path}") print(f" 将覆盖已有结果") else: print(f"✅ 将创建新的结果文件: {output_path}") # 4. 批量处理视频(每处理完一个立即保存) print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...") results = [] total = len(video_list) success_count = 0 fail_count = 0 for index, video_data in enumerate(video_list, 1): # 处理单个视频 result = process_single_video(workflow, video_data, index, total) results.append(result) # 更新统计 if result["success"]: success_count += 1 else: fail_count += 1 # 立即保存结果到文件 print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]") if save_result(output_path, results, timestamp, total, success_count, fail_count): print(f"✅ 结果已实时保存到: {output_path}") else: print(f"❌ 保存结果失败,但将继续处理") # 5. 显示最终处理摘要 print("\n" + "=" * 80) print("最终处理摘要") print("=" * 80) print(f"总计: {total} 个视频") print(f"成功: {success_count} 个") print(f"失败: {fail_count} 个") print(f"结果文件: {output_path}") if fail_count > 0: print("\n失败的视频:") for i, result in enumerate(results, 1): if not result["success"]: video_data = result["video_data"] print(f" [{i}] {video_data.get('channel_content_id')}: {result.get('error')}") if __name__ == "__main__": main()