workflow_pipeline.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #!/usr/bin/env python3
  2. """
  3. 知识代理评估工作流自动化脚本
  4. 功能:
  5. 1. 评估(test_evaluation_v3.py)
  6. 2. 清洗排序(extract_topn_multimodal.py)
  7. 3. 可视化生成(Node.js)
  8. 支持:
  9. - 连续执行全流程
  10. - 独立执行单个步骤
  11. - 灵活的参数配置
  12. """
  13. import argparse
  14. import asyncio
  15. import os
  16. import subprocess
  17. import sys
  18. from datetime import datetime
  19. from pathlib import Path
  20. class WorkflowPipeline:
  21. """工作流管道"""
  22. def __init__(self, run_context_path: str, output_dir: str = None):
  23. """
  24. 初始化工作流
  25. Args:
  26. run_context_path: run_context.json 文件路径
  27. output_dir: 输出目录(可选,默认与输入文件同目录)
  28. """
  29. self.run_context_path = run_context_path
  30. # 确定输出目录
  31. if output_dir:
  32. self.output_dir = Path(output_dir)
  33. else:
  34. self.output_dir = Path(run_context_path).parent
  35. self.output_dir.mkdir(parents=True, exist_ok=True)
  36. # 生成的文件路径
  37. self.run_context_v3_path = str(self.output_dir / f"{Path(run_context_path).stem}_v3.json")
  38. self.visualization_path = str(self.output_dir / f"visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
  39. print(f"\n{'='*80}")
  40. print(f"🚀 知识代理评估工作流")
  41. print(f"{'='*80}")
  42. print(f"📂 输入文件: {self.run_context_path}")
  43. print(f"📁 输出目录: {self.output_dir}")
  44. print(f"{'='*80}\n")
  45. def step_1_evaluate(self, max_posts: int = None) -> bool:
  46. """
  47. 步骤1: 评估帖子
  48. Args:
  49. max_posts: 最多评估的帖子数量(None表示全部)
  50. Returns:
  51. 是否成功
  52. """
  53. print(f"\n{'='*80}")
  54. print("📊 步骤1/3: 评估帖子")
  55. print(f"{'='*80}\n")
  56. cmd = ["python3", "test_evaluation_v3.py", self.run_context_path]
  57. if max_posts:
  58. cmd.append(str(max_posts))
  59. try:
  60. result = subprocess.run(cmd, check=True)
  61. print(f"\n✅ 步骤1完成: 评估成功")
  62. print(f" 生成文件: {self.run_context_v3_path}")
  63. return True
  64. except subprocess.CalledProcessError as e:
  65. print(f"\n❌ 步骤1失败: 评估出错")
  66. print(f" 错误信息: {e}")
  67. return False
  68. async def step_2_clean_and_sort(self, top_n: int = 10, max_concurrent: int = 5) -> bool:
  69. """
  70. 步骤2: 清洗排序(使用评估后的run_context_v3.json)
  71. Args:
  72. top_n: 提取前N个帖子
  73. max_concurrent: 最大并发数
  74. Returns:
  75. 是否成功
  76. """
  77. print(f"\n{'='*80}")
  78. print("🧹 步骤2/3: 清洗排序")
  79. print(f"{'='*80}\n")
  80. # 检查run_context_v3.json是否存在
  81. if not os.path.exists(self.run_context_v3_path):
  82. print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}")
  83. print(f" 请先运行步骤1(评估)")
  84. return False
  85. cmd = [
  86. "python3", "extract_topn_multimodal.py",
  87. "-i", self.run_context_v3_path,
  88. "-o", str(self.output_dir / "multimodal_extraction_topn_cleaned.json"),
  89. "--top-n", str(top_n),
  90. "--max-concurrent", str(max_concurrent)
  91. ]
  92. try:
  93. result = subprocess.run(cmd, check=True)
  94. print(f"\n✅ 步骤2完成: 清洗排序成功")
  95. print(f" 结果已写入: {self.run_context_v3_path}")
  96. return True
  97. except subprocess.CalledProcessError as e:
  98. print(f"\n❌ 步骤2失败: 清洗排序出错")
  99. print(f" 错误信息: {e}")
  100. return False
  101. def step_3_visualize(self, simplified: bool = False) -> bool:
  102. """
  103. 步骤3: 生成可视化
  104. Args:
  105. simplified: 是否使用简化视图
  106. Returns:
  107. 是否成功
  108. """
  109. print(f"\n{'='*80}")
  110. print("🎨 步骤3/3: 生成可视化")
  111. print(f"{'='*80}\n")
  112. # 检查run_context_v3.json是否存在
  113. if not os.path.exists(self.run_context_v3_path):
  114. print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}")
  115. print(f" 请先运行步骤1(评估)和步骤2(清洗)")
  116. return False
  117. cmd = [
  118. "node",
  119. "visualization/knowledge_search_traverse/index.js",
  120. self.run_context_v3_path,
  121. self.visualization_path
  122. ]
  123. if simplified:
  124. cmd.append("--simplified")
  125. try:
  126. result = subprocess.run(cmd, check=True, capture_output=True, text=True)
  127. print(result.stdout)
  128. print(f"\n✅ 步骤3完成: 可视化生成成功")
  129. print(f" 生成文件: {self.visualization_path}")
  130. print(f"\n 📱 使用浏览器打开查看:")
  131. print(f" open {self.visualization_path}")
  132. return True
  133. except subprocess.CalledProcessError as e:
  134. print(f"\n❌ 步骤3失败: 可视化生成出错")
  135. print(f" 错误信息: {e}")
  136. if e.stdout:
  137. print(f" 输出: {e.stdout}")
  138. if e.stderr:
  139. print(f" 错误: {e.stderr}")
  140. return False
  141. async def run_full_pipeline(
  142. self,
  143. max_posts: int = None,
  144. top_n: int = 10,
  145. max_concurrent: int = 5,
  146. simplified: bool = False
  147. ) -> bool:
  148. """
  149. 运行完整工作流:评估 → 清洗排序 → 可视化
  150. Args:
  151. max_posts: 最多评估的帖子数量(None表示全部)
  152. top_n: 清洗时提取前N个帖子
  153. max_concurrent: 清洗时的最大并发数
  154. simplified: 是否使用简化视图
  155. Returns:
  156. 是否全部成功
  157. """
  158. print(f"\n{'='*80}")
  159. print("🔄 运行完整工作流")
  160. print(f"{'='*80}\n")
  161. # 步骤1: 评估
  162. if not self.step_1_evaluate(max_posts):
  163. print(f"\n❌ 工作流中断: 步骤1失败")
  164. return False
  165. # 步骤2: 清洗排序
  166. if not await self.step_2_clean_and_sort(top_n, max_concurrent):
  167. print(f"\n❌ 工作流中断: 步骤2失败")
  168. return False
  169. # 步骤3: 可视化
  170. if not self.step_3_visualize(simplified):
  171. print(f"\n❌ 工作流中断: 步骤3失败")
  172. return False
  173. print(f"\n{'='*80}")
  174. print("🎉 完整工作流执行成功!")
  175. print(f"{'='*80}\n")
  176. print("📊 生成的文件:")
  177. print(f" 1. 评估结果: {self.run_context_v3_path}")
  178. print(f" 2. 可视化: {self.visualization_path}")
  179. print(f"\n💡 提示: 使用浏览器打开可视化文件查看结果")
  180. print(f" open {self.visualization_path}\n")
  181. return True
  182. async def main():
  183. parser = argparse.ArgumentParser(
  184. description='知识代理评估工作流自动化',
  185. formatter_class=argparse.RawDescriptionHelpFormatter,
  186. epilog='''
  187. 示例用法:
  188. # 运行完整工作流(评估 → 清洗 → 可视化)
  189. python3 workflow_pipeline.py input/test_case/output/.../run_context.json
  190. # 只运行评估(步骤1)
  191. python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step evaluate
  192. # 只运行清洗排序(步骤2,需要先有评估结果)
  193. python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step clean
  194. # 只运行可视化(步骤3,需要先有评估和清洗结果)
  195. python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step visualize
  196. # 自定义参数运行完整工作流
  197. python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\
  198. --max-posts 20 \\
  199. --top-n 5 \\
  200. --max-concurrent 3 \\
  201. --simplified
  202. # 指定输出目录
  203. python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\
  204. --output-dir ./custom_output
  205. '''
  206. )
  207. parser.add_argument(
  208. 'run_context_path',
  209. help='run_context.json 文件路径'
  210. )
  211. parser.add_argument(
  212. '--step',
  213. choices=['evaluate', 'clean', 'visualize', 'all'],
  214. default='all',
  215. help='执行的步骤 (默认: all - 全流程)'
  216. )
  217. parser.add_argument(
  218. '--output-dir',
  219. help='输出目录(默认与输入文件同目录)'
  220. )
  221. parser.add_argument(
  222. '--max-posts',
  223. type=int,
  224. help='评估时最多处理的帖子数量(用于快速测试)'
  225. )
  226. parser.add_argument(
  227. '--top-n',
  228. type=int,
  229. default=10,
  230. help='清洗时提取前N个帖子 (默认: 10)'
  231. )
  232. parser.add_argument(
  233. '--max-concurrent',
  234. type=int,
  235. default=5,
  236. help='清洗时的最大并发数 (默认: 5)'
  237. )
  238. parser.add_argument(
  239. '--simplified',
  240. action='store_true',
  241. help='可视化时使用简化视图'
  242. )
  243. args = parser.parse_args()
  244. # 检查输入文件是否存在
  245. if not os.path.exists(args.run_context_path):
  246. print(f"❌ 错误: 文件不存在 - {args.run_context_path}")
  247. sys.exit(1)
  248. # 创建工作流
  249. pipeline = WorkflowPipeline(args.run_context_path, args.output_dir)
  250. # 执行对应步骤
  251. success = False
  252. if args.step == 'evaluate':
  253. success = pipeline.step_1_evaluate(args.max_posts)
  254. elif args.step == 'clean':
  255. success = await pipeline.step_2_clean_and_sort(args.top_n, args.max_concurrent)
  256. elif args.step == 'visualize':
  257. success = pipeline.step_3_visualize(args.simplified)
  258. elif args.step == 'all':
  259. success = await pipeline.run_full_pipeline(
  260. args.max_posts,
  261. args.top_n,
  262. args.max_concurrent,
  263. args.simplified
  264. )
  265. sys.exit(0 if success else 1)
  266. if __name__ == "__main__":
  267. asyncio.run(main())