run_batch.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. """
  2. 批量处理脚本:读取demo.json,批量处理视频
  3. 功能:
  4. 1. 读取demo.json文件
  5. 2. 使用run_single.py同样的方法处理每个视频
  6. 3. 每处理完一个视频立即写入结果到output_demo.json文件(实时保存)
  7. """
  8. import json
  9. import sys
  10. import os
  11. from pathlib import Path
  12. from datetime import datetime
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent
  15. sys.path.insert(0, str(project_root))
  16. # 手动加载.env文件
  17. def load_env_file(env_path):
  18. """手动加载.env文件"""
  19. if not env_path.exists():
  20. return False
  21. with open(env_path, 'r') as f:
  22. for line in f:
  23. line = line.strip()
  24. # 跳过注释和空行
  25. if not line or line.startswith('#'):
  26. continue
  27. # 解析KEY=VALUE
  28. if '=' in line:
  29. key, value = line.split('=', 1)
  30. os.environ[key.strip()] = value.strip()
  31. return True
  32. env_path = project_root / ".env"
  33. if load_env_file(env_path):
  34. print(f"✅ 已加载环境变量从: {env_path}")
  35. # 验证API密钥
  36. api_key = os.environ.get("GEMINI_API_KEY", "")
  37. if api_key:
  38. print(f" GEMINI_API_KEY: {api_key[:10]}...")
  39. else:
  40. print(f"⚠️ 未找到.env文件: {env_path}")
  41. from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
  42. from src.utils.logger import get_logger
  43. logger = get_logger(__name__)
  44. def convert_to_workflow_input(raw_data):
  45. """
  46. 将原始数据转换为工作流输入格式(视频分析版本)
  47. Args:
  48. raw_data: 原始帖子数据(视频格式)
  49. """
  50. # 视频分析版本:直接使用视频URL和文本信息
  51. input_data = {
  52. "video": raw_data.get("video", ""),
  53. "channel_content_id": raw_data.get("channel_content_id", ""),
  54. "title": raw_data.get("title", ""),
  55. "body_text": raw_data.get("body_text", ""),
  56. }
  57. return input_data
  58. def load_existing_results(output_path):
  59. """
  60. 加载已有的结果文件(如果存在)
  61. Args:
  62. output_path: 结果文件路径
  63. Returns:
  64. 已有结果数据,如果文件不存在则返回None
  65. """
  66. if not output_path.exists():
  67. return None
  68. try:
  69. with open(output_path, "r", encoding="utf-8") as f:
  70. return json.load(f)
  71. except Exception as e:
  72. print(f"⚠️ 读取已有结果文件失败(将重新创建): {e}")
  73. return None
  74. def save_result(output_path, results, timestamp, total, success_count, fail_count):
  75. """
  76. 保存结果到文件
  77. Args:
  78. output_path: 结果文件路径
  79. results: 结果列表
  80. timestamp: 时间戳
  81. total: 总数
  82. success_count: 成功数
  83. fail_count: 失败数
  84. """
  85. output_data = {
  86. "timestamp": timestamp,
  87. "total": total,
  88. "success_count": success_count,
  89. "fail_count": fail_count,
  90. "results": results
  91. }
  92. try:
  93. with open(output_path, "w", encoding="utf-8") as f:
  94. json.dump(output_data, f, ensure_ascii=False, indent=2)
  95. return True
  96. except Exception as e:
  97. print(f"❌ 保存结果失败: {e}")
  98. return False
  99. def process_single_video(workflow, video_data, index, total):
  100. """
  101. 处理单个视频
  102. Args:
  103. workflow: WhatDeconstructionWorkflow实例
  104. video_data: 视频数据字典
  105. index: 当前索引(从1开始)
  106. total: 总数
  107. Returns:
  108. 处理结果字典,包含原始数据和结果
  109. """
  110. channel_content_id = video_data.get("channel_content_id", "unknown")
  111. title = video_data.get("title", "")
  112. print(f"\n{'=' * 80}")
  113. print(f"[{index}/{total}] 处理视频: {channel_content_id}")
  114. print(f"标题: {title}")
  115. print(f"{'=' * 80}")
  116. # 转换数据格式
  117. try:
  118. input_data = convert_to_workflow_input(video_data)
  119. print(f"✅ 数据格式转换成功")
  120. except Exception as e:
  121. print(f"❌ 数据格式转换失败: {e}")
  122. return {
  123. "video_data": video_data,
  124. "success": False,
  125. "error": f"数据格式转换失败: {e}",
  126. "result": None
  127. }
  128. # 执行工作流
  129. print(f" 开始执行工作流(这可能需要几分钟时间)...")
  130. try:
  131. result = workflow.invoke(input_data)
  132. print(f"✅ 工作流执行成功")
  133. return {
  134. "video_data": video_data,
  135. "success": True,
  136. "error": None,
  137. "result": result
  138. }
  139. except Exception as e:
  140. print(f"❌ 工作流执行失败: {e}")
  141. import traceback
  142. traceback.print_exc()
  143. return {
  144. "video_data": video_data,
  145. "success": False,
  146. "error": f"工作流执行失败: {e}",
  147. "result": None
  148. }
  149. def main():
  150. """主函数"""
  151. print("=" * 80)
  152. print("批量处理视频 - What 解构工作流(视频分析版本)")
  153. print("=" * 80)
  154. # 1. 读取demo.json
  155. print("\n[1] 读取demo.json...")
  156. demo_json_path = Path(__file__).parent / "demo.json"
  157. if not demo_json_path.exists():
  158. print(f"❌ 未找到demo.json文件: {demo_json_path}")
  159. return
  160. try:
  161. with open(demo_json_path, "r", encoding="utf-8") as f:
  162. video_list = json.load(f)
  163. print(f"✅ 成功读取demo.json,共 {len(video_list)} 个视频")
  164. except Exception as e:
  165. print(f"❌ 读取demo.json失败: {e}")
  166. return
  167. # 2. 初始化工作流
  168. print("\n[2] 初始化工作流...")
  169. try:
  170. workflow = WhatDeconstructionWorkflow(
  171. model_provider="google_genai",
  172. max_depth=10
  173. )
  174. print(f"✅ 工作流初始化成功")
  175. except Exception as e:
  176. print(f"❌ 工作流初始化失败: {e}")
  177. import traceback
  178. traceback.print_exc()
  179. return
  180. # 3. 准备结果文件路径和时间戳
  181. print("\n[3] 准备结果文件...")
  182. output_path = Path(__file__).parent / "output_demo.json"
  183. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  184. # 检查是否已有结果文件(用于提示)
  185. existing_results = load_existing_results(output_path)
  186. if existing_results:
  187. print(f"⚠️ 检测到已有结果文件: {output_path}")
  188. print(f" 将覆盖已有结果")
  189. else:
  190. print(f"✅ 将创建新的结果文件: {output_path}")
  191. # 4. 批量处理视频(每处理完一个立即保存)
  192. print("\n[4] 开始批量处理视频(每处理完一个立即保存结果)...")
  193. results = []
  194. total = len(video_list)
  195. success_count = 0
  196. fail_count = 0
  197. for index, video_data in enumerate(video_list, 1):
  198. # 处理单个视频
  199. result = process_single_video(workflow, video_data, index, total)
  200. results.append(result)
  201. # 更新统计
  202. if result["success"]:
  203. success_count += 1
  204. else:
  205. fail_count += 1
  206. # 立即保存结果到文件
  207. print(f" 保存结果到文件... [{success_count}成功/{fail_count}失败/{total}总计]")
  208. if save_result(output_path, results, timestamp, total, success_count, fail_count):
  209. print(f"✅ 结果已实时保存到: {output_path}")
  210. else:
  211. print(f"❌ 保存结果失败,但将继续处理")
  212. # 5. 显示最终处理摘要
  213. print("\n" + "=" * 80)
  214. print("最终处理摘要")
  215. print("=" * 80)
  216. print(f"总计: {total} 个视频")
  217. print(f"成功: {success_count} 个")
  218. print(f"失败: {fail_count} 个")
  219. print(f"结果文件: {output_path}")
  220. if fail_count > 0:
  221. print("\n失败的视频:")
  222. for i, result in enumerate(results, 1):
  223. if not result["success"]:
  224. video_data = result["video_data"]
  225. print(f" [{i}] {video_data.get('channel_content_id')}: {result.get('error')}")
  226. if __name__ == "__main__":
  227. main()