#!/usr/bin/env python3 """ 知识代理评估工作流自动化脚本 功能: 1. 评估(test_evaluation_v3.py) 2. 清洗排序(extract_topn_multimodal.py) 3. 可视化生成(Node.js) 支持: - 连续执行全流程 - 独立执行单个步骤 - 灵活的参数配置 """ import argparse import asyncio import os import subprocess import sys from datetime import datetime from pathlib import Path class WorkflowPipeline: """工作流管道""" def __init__(self, run_context_path: str, output_dir: str = None): """ 初始化工作流 Args: run_context_path: run_context.json 文件路径 output_dir: 输出目录(可选,默认与输入文件同目录) """ self.run_context_path = run_context_path # 确定输出目录 if output_dir: self.output_dir = Path(output_dir) else: self.output_dir = Path(run_context_path).parent self.output_dir.mkdir(parents=True, exist_ok=True) # 生成的文件路径 self.run_context_v3_path = str(self.output_dir / f"{Path(run_context_path).stem}_v3.json") self.visualization_path = str(self.output_dir / f"visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html") print(f"\n{'='*80}") print(f"🚀 知识代理评估工作流") print(f"{'='*80}") print(f"📂 输入文件: {self.run_context_path}") print(f"📁 输出目录: {self.output_dir}") print(f"{'='*80}\n") def step_1_evaluate(self, max_posts: int = None) -> bool: """ 步骤1: 评估帖子 Args: max_posts: 最多评估的帖子数量(None表示全部) Returns: 是否成功 """ print(f"\n{'='*80}") print("📊 步骤1/3: 评估帖子") print(f"{'='*80}\n") cmd = ["python3", "test_evaluation_v3.py", self.run_context_path] if max_posts: cmd.append(str(max_posts)) try: result = subprocess.run(cmd, check=True) print(f"\n✅ 步骤1完成: 评估成功") print(f" 生成文件: {self.run_context_v3_path}") return True except subprocess.CalledProcessError as e: print(f"\n❌ 步骤1失败: 评估出错") print(f" 错误信息: {e}") return False async def step_2_clean_and_sort(self, top_n: int = 10, max_concurrent: int = 5) -> bool: """ 步骤2: 清洗排序(使用评估后的run_context_v3.json) Args: top_n: 提取前N个帖子 max_concurrent: 最大并发数 Returns: 是否成功 """ print(f"\n{'='*80}") print("🧹 步骤2/3: 清洗排序") print(f"{'='*80}\n") # 检查run_context_v3.json是否存在 if not os.path.exists(self.run_context_v3_path): print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}") print(f" 请先运行步骤1(评估)") return False cmd = [ "python3", "extract_topn_multimodal.py", "-i", self.run_context_v3_path, "-o", str(self.output_dir / "multimodal_extraction_topn_cleaned.json"), "--top-n", str(top_n), "--max-concurrent", str(max_concurrent) ] try: result = subprocess.run(cmd, check=True) print(f"\n✅ 步骤2完成: 清洗排序成功") print(f" 结果已写入: {self.run_context_v3_path}") return True except subprocess.CalledProcessError as e: print(f"\n❌ 步骤2失败: 清洗排序出错") print(f" 错误信息: {e}") return False def step_3_visualize(self, simplified: bool = False) -> bool: """ 步骤3: 生成可视化 Args: simplified: 是否使用简化视图 Returns: 是否成功 """ print(f"\n{'='*80}") print("🎨 步骤3/3: 生成可视化") print(f"{'='*80}\n") # 检查run_context_v3.json是否存在 if not os.path.exists(self.run_context_v3_path): print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}") print(f" 请先运行步骤1(评估)和步骤2(清洗)") return False cmd = [ "node", "visualization/knowledge_search_traverse/index.js", self.run_context_v3_path, self.visualization_path ] if simplified: cmd.append("--simplified") try: result = subprocess.run(cmd, check=True, capture_output=True, text=True) print(result.stdout) print(f"\n✅ 步骤3完成: 可视化生成成功") print(f" 生成文件: {self.visualization_path}") print(f"\n 📱 使用浏览器打开查看:") print(f" open {self.visualization_path}") return True except subprocess.CalledProcessError as e: print(f"\n❌ 步骤3失败: 可视化生成出错") print(f" 错误信息: {e}") if e.stdout: print(f" 输出: {e.stdout}") if e.stderr: print(f" 错误: {e.stderr}") return False async def run_full_pipeline( self, max_posts: int = None, top_n: int = 10, max_concurrent: int = 5, simplified: bool = False ) -> bool: """ 运行完整工作流:评估 → 清洗排序 → 可视化 Args: max_posts: 最多评估的帖子数量(None表示全部) top_n: 清洗时提取前N个帖子 max_concurrent: 清洗时的最大并发数 simplified: 是否使用简化视图 Returns: 是否全部成功 """ print(f"\n{'='*80}") print("🔄 运行完整工作流") print(f"{'='*80}\n") # 步骤1: 评估 if not self.step_1_evaluate(max_posts): print(f"\n❌ 工作流中断: 步骤1失败") return False # 步骤2: 清洗排序 if not await self.step_2_clean_and_sort(top_n, max_concurrent): print(f"\n❌ 工作流中断: 步骤2失败") return False # 步骤3: 可视化 if not self.step_3_visualize(simplified): print(f"\n❌ 工作流中断: 步骤3失败") return False print(f"\n{'='*80}") print("🎉 完整工作流执行成功!") print(f"{'='*80}\n") print("📊 生成的文件:") print(f" 1. 评估结果: {self.run_context_v3_path}") print(f" 2. 可视化: {self.visualization_path}") print(f"\n💡 提示: 使用浏览器打开可视化文件查看结果") print(f" open {self.visualization_path}\n") return True async def main(): parser = argparse.ArgumentParser( description='知识代理评估工作流自动化', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' 示例用法: # 运行完整工作流(评估 → 清洗 → 可视化) python3 workflow_pipeline.py input/test_case/output/.../run_context.json # 只运行评估(步骤1) python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step evaluate # 只运行清洗排序(步骤2,需要先有评估结果) python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step clean # 只运行可视化(步骤3,需要先有评估和清洗结果) python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step visualize # 自定义参数运行完整工作流 python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\ --max-posts 20 \\ --top-n 5 \\ --max-concurrent 3 \\ --simplified # 指定输出目录 python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\ --output-dir ./custom_output ''' ) parser.add_argument( 'run_context_path', help='run_context.json 文件路径' ) parser.add_argument( '--step', choices=['evaluate', 'clean', 'visualize', 'all'], default='all', help='执行的步骤 (默认: all - 全流程)' ) parser.add_argument( '--output-dir', help='输出目录(默认与输入文件同目录)' ) parser.add_argument( '--max-posts', type=int, help='评估时最多处理的帖子数量(用于快速测试)' ) parser.add_argument( '--top-n', type=int, default=10, help='清洗时提取前N个帖子 (默认: 10)' ) parser.add_argument( '--max-concurrent', type=int, default=5, help='清洗时的最大并发数 (默认: 5)' ) parser.add_argument( '--simplified', action='store_true', help='可视化时使用简化视图' ) args = parser.parse_args() # 检查输入文件是否存在 if not os.path.exists(args.run_context_path): print(f"❌ 错误: 文件不存在 - {args.run_context_path}") sys.exit(1) # 创建工作流 pipeline = WorkflowPipeline(args.run_context_path, args.output_dir) # 执行对应步骤 success = False if args.step == 'evaluate': success = pipeline.step_1_evaluate(args.max_posts) elif args.step == 'clean': success = await pipeline.step_2_clean_and_sort(args.top_n, args.max_concurrent) elif args.step == 'visualize': success = pipeline.step_3_visualize(args.simplified) elif args.step == 'all': success = await pipeline.run_full_pipeline( args.max_posts, args.top_n, args.max_concurrent, args.simplified ) sys.exit(0 if success else 1) if __name__ == "__main__": asyncio.run(main())