| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- """
- 批量处理脚本:读取demo.json,批量处理视频
- 功能:
- 1. 读取demo.json文件
- 2. 使用run_single.py同样的方法处理每个视频
- 3. 每处理完一个视频立即写入结果到output_demo.json文件(实时保存)
- """
- import json
- import sys
- import os
- from pathlib import Path
- from datetime import datetime
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent
- sys.path.insert(0, str(project_root))
- # 手动加载.env文件
- def load_env_file(env_path):
- """手动加载.env文件"""
- if not env_path.exists():
- return False
- with open(env_path, 'r') as f:
- for line in f:
- line = line.strip()
- # 跳过注释和空行
- if not line or line.startswith('#'):
- continue
- # 解析KEY=VALUE
- if '=' in line:
- key, value = line.split('=', 1)
- os.environ[key.strip()] = value.strip()
- return True
- env_path = project_root / ".env"
- if load_env_file(env_path):
- print(f"✅ 已加载环境变量从: {env_path}")
- # 验证API密钥
- api_key = os.environ.get("GEMINI_API_KEY", "")
- if api_key:
- print(f" GEMINI_API_KEY: {api_key[:10]}...")
- else:
- print(f"⚠️ 未找到.env文件: {env_path}")
- from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
- from src.utils.logger import get_logger
- logger = get_logger(__name__)
- def convert_to_workflow_input(raw_data):
- """
- 将原始数据转换为工作流输入格式(视频分析版本)
- Args:
- raw_data: 原始帖子数据(视频格式)
- """
- # 视频分析版本:直接使用视频URL和文本信息
- input_data = {
- "video": raw_data.get("video", ""),
- "channel_content_id": raw_data.get("channel_content_id", ""),
- "title": raw_data.get("title", ""),
- "body_text": raw_data.get("body_text", ""),
- }
- return input_data
- def load_existing_results(output_path):
- """
- 加载已有的结果文件(如果存在)
- Args:
- output_path: 结果文件路径
- Returns:
- 已有结果数据,如果文件不存在则返回None
- """
- if not output_path.exists():
- return None
- try:
- with open(output_path, "r", encoding="utf-8") as f:
- return json.load(f)
- except Exception as e:
- print(f"⚠️ 读取已有结果文件失败(将重新创建): {e}")
- return None
- def save_result(output_path, results, timestamp, total, success_count, fail_count):
- """
- 保存结果到文件
- Args:
- output_path: 结果文件路径
- results: 结果列表
- timestamp: 时间戳
- total: 总数
- success_count: 成功数
- fail_count: 失败数
- """
- output_data = {
- "timestamp": timestamp,
- "total": total,
- "success_count": success_count,
- "fail_count": fail_count,
- "results": results
- }
- try:
- with open(output_path, "w", encoding="utf-8") as f:
- json.dump(output_data, f, ensure_ascii=False, indent=2)
- return True
- except Exception as e:
- print(f"❌ 保存结果失败: {e}")
- return False
- def process_single_video(workflow, video_data, index, total):
- """
- 处理单个视频
- Args:
- workflow: WhatDeconstructionWorkflow实例
- video_data: 视频数据字典
- index: 当前索引(从1开始)
- total: 总数
- Returns:
- 处理结果字典,包含原始数据和结果
- """
- channel_content_id = video_data.get("channel_content_id", "unknown")
- title = video_data.get("title", "")
- print(f"\n{'=' * 80}")
- print(f"[{index}/{total}] 处理视频: {channel_content_id}")
- print(f"标题: {title}")
- print(f"{'=' * 80}")
- # 转换数据格式
- try:
- input_data = convert_to_workflow_input(video_data)
- print(f"✅ 数据格式转换成功")
- except Exception as e:
- print(f"❌ 数据格式转换失败: {e}")
- return {
- "video_data": video_data,
- "success": False,
- "error": f"数据格式转换失败: {e}",
- "result": None
- }
- # 执行工作流
- print(f" 开始执行工作流(这可能需要几分钟时间)...")
- try:
- result = workflow.invoke(input_data)
- print(f"✅ 工作流执行成功")
- return {
- "video_data": video_data,
- "success": True,
- "error": None,
- "result": result
- }
- except Exception as e:
- print(f"❌ 工作流执行失败: {e}")
- import traceback
- traceback.print_exc()
- return {
- "video_data": video_data,
- "success": False,
- "error": f"工作流执行失败: {e}",
- "result": None
- }
- def main():
- """主函数"""
- print("=" * 80)
- print("批量处理视频 - What 解构工作流(视频分析版本)")
- print("=" * 80)
- # 1. 读取demo.json
- print("\n[1] 读取demo.json...")
- demo_json_path = Path(__file__).parent / "demo.json"
- if not demo_json_path.exists():
- print(f"❌ 未找到demo.json文件: {demo_json_path}")
- return
- try:
- with open(demo_json_path, "r", encoding="utf-8") as f:
- video_list = json.load(f)
- print(f"✅ 成功读取demo.json,共 {len(video_list)} 个视频")
- except Exception as e:
- print(f"❌ 读取demo.json失败: {e}")
- return
- # 2. 初始化工作流
- print("\n[2] 初始化工作流...")
- try:
- workflow = WhatDeconstructionWorkflow(
- model_provider="google_genai",
- max_depth=10
- )
- print(f"✅ 工作流初始化成功")
- except Exception as e:
- print(f"❌ 工作流初始化失败: {e}")
- import traceback
- traceback.print_exc()
- return
- # 3. 准备结果文件路径和时间戳
- print("\n[3] 准备结果文件...")
- output_path = Path(__file__).parent / "output_demo.json"
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- # 检查是否已有结果文件(用于提示)
- existing_results = load_existing_results(output_path)
- if existing_results:
- print(f"⚠️ 检测到已有结果文件: {output_path}")
- print(f" 将覆盖已有结果")
- else:
- print(f"✅ 将创建新的结果文件: {output_path}")
- # 4. 批量处理视频(每处理完一个立即保存)
- print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
- results = []
- total = len(video_list)
- success_count = 0
- fail_count = 0
- for index, video_data in enumerate(video_list, 1):
- # 处理单个视频
- result = process_single_video(workflow, video_data, index, total)
- results.append(result)
- # 更新统计
- if result["success"]:
- success_count += 1
- else:
- fail_count += 1
- # 立即保存结果到文件
- print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
- if save_result(output_path, results, timestamp, total, success_count, fail_count):
- print(f"✅ 结果已实时保存到: {output_path}")
- else:
- print(f"❌ 保存结果失败,但将继续处理")
- # 5. 显示最终处理摘要
- print("\n" + "=" * 80)
- print("最终处理摘要")
- print("=" * 80)
- print(f"总计: {total} 个视频")
- print(f"成功: {success_count} 个")
- print(f"失败: {fail_count} 个")
- print(f"结果文件: {output_path}")
- if fail_count > 0:
- print("\n失败的视频:")
- for i, result in enumerate(results, 1):
- if not result["success"]:
- video_data = result["video_data"]
- print(f" [{i}] {video_data.get('channel_content_id')}: {result.get('error')}")
- if __name__ == "__main__":
- main()
|