run_single.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. """
  2. 测试脚本:运行视频解构
  3. 功能:
  4. 1. 支持视频输入(视频详情.json)
  5. 2. 运行 WhatDeconstructionWorkflow(视频分析版本)
  6. """
  7. import json
  8. import sys
  9. import os
  10. import argparse
  11. from pathlib import Path
  12. from datetime import datetime
  13. # 添加项目根目录到路径
  14. project_root = Path(__file__).parent.parent
  15. sys.path.insert(0, str(project_root))
  16. # 手动加载.env文件
  17. def load_env_file(env_path):
  18. """手动加载.env文件"""
  19. if not env_path.exists():
  20. return False
  21. with open(env_path, 'r') as f:
  22. for line in f:
  23. line = line.strip()
  24. # 跳过注释和空行
  25. if not line or line.startswith('#'):
  26. continue
  27. # 解析KEY=VALUE
  28. if '=' in line:
  29. key, value = line.split('=', 1)
  30. os.environ[key.strip()] = value.strip()
  31. return True
  32. env_path = project_root / ".env"
  33. if load_env_file(env_path):
  34. print(f"✅ 已加载环境变量从: {env_path}")
  35. # 验证API密钥
  36. api_key = os.environ.get("GEMINI_API_KEY", "")
  37. if api_key:
  38. print(f" GEMINI_API_KEY: {api_key[:10]}...")
  39. else:
  40. print(f"⚠️ 未找到.env文件: {env_path}")
  41. from src.workflows.what_deconstruction_workflow import WhatDeconstructionWorkflow
  42. from src.utils.logger import get_logger
  43. logger = get_logger(__name__)
  44. def load_test_data(directory):
  45. """
  46. 加载测试数据(视频格式)
  47. Args:
  48. directory: 视频目录名(如"56898272")
  49. """
  50. # 加载视频详情文件
  51. video_data_path = Path(__file__).parent / directory / "视频详情.json"
  52. if not video_data_path.exists():
  53. raise FileNotFoundError(f"未找到视频详情文件:{video_data_path}\n请确保目录下存在'视频详情.json'文件")
  54. with open(video_data_path, "r", encoding="utf-8") as f:
  55. data = json.load(f)
  56. return data
  57. def convert_to_workflow_input(raw_data):
  58. """
  59. 将原始数据转换为工作流输入格式(视频分析版本)
  60. Args:
  61. raw_data: 原始帖子数据(视频格式)
  62. """
  63. # 视频分析版本:直接使用视频URL和文本信息
  64. input_data = {
  65. "video": raw_data.get("video", ""),
  66. "channel_content_id": raw_data.get("channel_content_id", ""),
  67. "title": raw_data.get("title", ""),
  68. "body_text": raw_data.get("body_text", ""),
  69. }
  70. return input_data
  71. def main():
  72. """主函数"""
  73. # 解析命令行参数
  74. parser = argparse.ArgumentParser(description='运行单个视频的What解构工作流(视频分析版本)')
  75. parser.add_argument('directory', type=str, help='视频目录名(如"56898272"),目录下需要有"视频详情.json"文件')
  76. args = parser.parse_args()
  77. directory = args.directory
  78. print("=" * 80)
  79. print(f"开始测试 What 解构工作流(视频分析版本)- 目录: {directory}")
  80. print("=" * 80)
  81. # 1. 加载测试数据(目标视频)
  82. print("\n[1] 加载测试数据(目标视频)...")
  83. try:
  84. raw_data = load_test_data(directory)
  85. print(f"✅ 成功加载测试数据")
  86. print(f" - 标题: {raw_data.get('title')}")
  87. print(f" - 内容类型: {raw_data.get('content_type', 'unknown')}")
  88. video_url = raw_data.get('video', '')
  89. if video_url:
  90. print(f" - 视频URL: {video_url[:50]}...")
  91. else:
  92. print(f" - 视频URL: 未提供")
  93. except Exception as e:
  94. print(f"❌ 加载测试数据失败: {e}")
  95. return
  96. # 2. 转换数据格式
  97. print("\n[2] 转换数据格式...")
  98. try:
  99. input_data = convert_to_workflow_input(raw_data)
  100. print(f"✅ 数据格式转换成功")
  101. print(f" - 视频URL: {input_data.get('video', '')[:50]}...")
  102. print(f" - 标题: {input_data.get('title', '')}")
  103. except Exception as e:
  104. print(f"❌ 数据格式转换失败: {e}")
  105. return
  106. # 3. 初始化工作流
  107. print("\n[3] 初始化工作流...")
  108. try:
  109. workflow = WhatDeconstructionWorkflow(
  110. model_provider="google_genai",
  111. max_depth=10
  112. )
  113. print(f"✅ 工作流初始化成功")
  114. except Exception as e:
  115. print(f"❌ 工作流初始化失败: {e}")
  116. import traceback
  117. traceback.print_exc()
  118. return
  119. # 4. 执行工作流
  120. print("\n[4] 执行工作流...")
  121. print(" 注意:这可能需要几分钟时间...")
  122. try:
  123. result = workflow.invoke(input_data)
  124. print(f"✅ 工作流执行成功")
  125. except Exception as e:
  126. print(f"❌ 工作流执行失败: {e}")
  127. import traceback
  128. traceback.print_exc()
  129. return
  130. # 5. 保存结果
  131. print("\n[5] 保存结果...")
  132. try:
  133. # 生成带时间戳的文件名
  134. timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
  135. output_filename = f"result_{timestamp}.json"
  136. output_path = Path(__file__).parent / directory / "output" / output_filename
  137. output_path.parent.mkdir(parents=True, exist_ok=True)
  138. with open(output_path, "w", encoding="utf-8") as f:
  139. json.dump(result, f, ensure_ascii=False, indent=2)
  140. print(f"✅ 结果已保存到: {output_path}")
  141. print(f" 文件名: {output_filename}")
  142. except Exception as e:
  143. print(f"❌ 保存结果失败: {e}")
  144. return
  145. # 6. 显示结果摘要
  146. print("\n" + "=" * 80)
  147. print("结果摘要")
  148. print("=" * 80)
  149. if result:
  150. # 1. 视频信息
  151. topic_selection_v2 = result.get("topic_selection_v2", {})
  152. if topic_selection_v2:
  153. print(f"✅ 选题结构分析 V2 完成")
  154. print(f" - 选题结构: {topic_selection_v2}")
  155. else:
  156. print(f"❌ 选题结构分析 V2 失败")
  157. return
  158. if __name__ == "__main__":
  159. main()