run_test.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. """
  2. 统一测试脚本:根据参数选择运行单个视频或批量处理
  3. 功能:
  4. 1. 支持 single 模式:处理单个视频
  5. 2. 支持 batch 模式:批量处理作者历史视频
  6. 3. 自动加载对应目录的数据(视频详情.json)
  7. """
  8. import sys
  9. import subprocess
  10. from pathlib import Path
  11. import argparse
  12. def main():
  13. """主函数"""
  14. # 解析命令行参数
  15. parser = argparse.ArgumentParser(
  16. description='运行What解构工作流测试脚本',
  17. formatter_class=argparse.RawDescriptionHelpFormatter,
  18. epilog="""
  19. 使用示例:
  20. # 单个视频模式
  21. python examples/run_test.py single 56898272 # 视频输入(使用视频详情.json)
  22. # 批量处理模式
  23. python examples/run_test.py batch <目录名>
  24. 注意:
  25. - 视频输入:目录下需要有"视频详情.json"文件
  26. - 视频详情.json格式:包含video、title、body_text等字段
  27. """
  28. )
  29. parser.add_argument(
  30. 'mode',
  31. type=str,
  32. choices=['single', 'batch'],
  33. help='运行模式:single(单个帖子)或 batch(批量处理)'
  34. )
  35. parser.add_argument(
  36. 'directory',
  37. type=str,
  38. help='视频目录名(如"56898272"),目录下需要有"视频详情.json"文件'
  39. )
  40. args = parser.parse_args()
  41. # 获取脚本目录
  42. script_dir = Path(__file__).parent
  43. # 根据模式选择对应的脚本
  44. if args.mode == 'single':
  45. script_path = script_dir / "run_single.py"
  46. mode_name = "单个视频处理"
  47. else: # batch
  48. script_path = script_dir / "run_batch.py"
  49. mode_name = "批量处理"
  50. # 检查脚本是否存在
  51. if not script_path.exists():
  52. print(f"❌ 错误:脚本不存在 {script_path}")
  53. return 1
  54. # 检查目录是否存在
  55. data_dir = script_dir / args.directory
  56. if not data_dir.exists():
  57. print(f"❌ 错误:目录不存在 {data_dir}")
  58. print(f" 请确保以下目录存在:")
  59. print(f" - {data_dir}")
  60. return 1
  61. # 显示运行信息
  62. print("=" * 80)
  63. print(f"运行模式: {mode_name}")
  64. print(f"目录: {args.directory}")
  65. print(f"脚本: {script_path.name}")
  66. print("=" * 80)
  67. print()
  68. # 构建命令
  69. cmd = [sys.executable, str(script_path), args.directory]
  70. # 执行命令
  71. try:
  72. result = subprocess.run(cmd, check=False)
  73. return result.returncode
  74. except KeyboardInterrupt:
  75. print("\n\n⚠️ 用户中断执行")
  76. return 130
  77. except Exception as e:
  78. print(f"\n❌ 执行失败: {e}")
  79. return 1
  80. if __name__ == "__main__":
  81. sys.exit(main())