| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- #!/usr/bin/env python3
- """
- 知识代理评估工作流自动化脚本
- 功能:
- 1. 评估(test_evaluation_v3.py)
- 2. 清洗排序(extract_topn_multimodal.py)
- 3. 可视化生成(Node.js)
- 支持:
- - 连续执行全流程
- - 独立执行单个步骤
- - 灵活的参数配置
- """
- import argparse
- import asyncio
- import os
- import subprocess
- import sys
- from datetime import datetime
- from pathlib import Path
- class WorkflowPipeline:
- """工作流管道"""
- def __init__(self, run_context_path: str, output_dir: str = None):
- """
- 初始化工作流
- Args:
- run_context_path: run_context.json 文件路径
- output_dir: 输出目录(可选,默认与输入文件同目录)
- """
- self.run_context_path = run_context_path
-
- # 确定输出目录
- if output_dir:
- self.output_dir = Path(output_dir)
- else:
- self.output_dir = Path(run_context_path).parent
-
- self.output_dir.mkdir(parents=True, exist_ok=True)
-
- # 生成的文件路径
- self.run_context_v3_path = str(self.output_dir / f"{Path(run_context_path).stem}_v3.json")
- self.visualization_path = str(self.output_dir / f"visualization_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html")
-
- print(f"\n{'='*80}")
- print(f"🚀 知识代理评估工作流")
- print(f"{'='*80}")
- print(f"📂 输入文件: {self.run_context_path}")
- print(f"📁 输出目录: {self.output_dir}")
- print(f"{'='*80}\n")
- def step_1_evaluate(self, max_posts: int = None) -> bool:
- """
- 步骤1: 评估帖子
- Args:
- max_posts: 最多评估的帖子数量(None表示全部)
- Returns:
- 是否成功
- """
- print(f"\n{'='*80}")
- print("📊 步骤1/3: 评估帖子")
- print(f"{'='*80}\n")
- cmd = ["python3", "test_evaluation_v3.py", self.run_context_path]
- if max_posts:
- cmd.append(str(max_posts))
- try:
- result = subprocess.run(cmd, check=True)
- print(f"\n✅ 步骤1完成: 评估成功")
- print(f" 生成文件: {self.run_context_v3_path}")
- return True
- except subprocess.CalledProcessError as e:
- print(f"\n❌ 步骤1失败: 评估出错")
- print(f" 错误信息: {e}")
- return False
- async def step_2_clean_and_sort(self, top_n: int = 10, max_concurrent: int = 5) -> bool:
- """
- 步骤2: 清洗排序(使用评估后的run_context_v3.json)
- Args:
- top_n: 提取前N个帖子
- max_concurrent: 最大并发数
- Returns:
- 是否成功
- """
- print(f"\n{'='*80}")
- print("🧹 步骤2/3: 清洗排序")
- print(f"{'='*80}\n")
- # 检查run_context_v3.json是否存在
- if not os.path.exists(self.run_context_v3_path):
- print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}")
- print(f" 请先运行步骤1(评估)")
- return False
- cmd = [
- "python3", "extract_topn_multimodal.py",
- "-i", self.run_context_v3_path,
- "-o", str(self.output_dir / "multimodal_extraction_topn_cleaned.json"),
- "--top-n", str(top_n),
- "--max-concurrent", str(max_concurrent)
- ]
- try:
- result = subprocess.run(cmd, check=True)
- print(f"\n✅ 步骤2完成: 清洗排序成功")
- print(f" 结果已写入: {self.run_context_v3_path}")
- return True
- except subprocess.CalledProcessError as e:
- print(f"\n❌ 步骤2失败: 清洗排序出错")
- print(f" 错误信息: {e}")
- return False
- def step_3_visualize(self, simplified: bool = False) -> bool:
- """
- 步骤3: 生成可视化
- Args:
- simplified: 是否使用简化视图
- Returns:
- 是否成功
- """
- print(f"\n{'='*80}")
- print("🎨 步骤3/3: 生成可视化")
- print(f"{'='*80}\n")
- # 检查run_context_v3.json是否存在
- if not os.path.exists(self.run_context_v3_path):
- print(f"❌ 错误: 未找到评估后的文件 {self.run_context_v3_path}")
- print(f" 请先运行步骤1(评估)和步骤2(清洗)")
- return False
- cmd = [
- "node",
- "visualization/knowledge_search_traverse/index.js",
- self.run_context_v3_path,
- self.visualization_path
- ]
-
- if simplified:
- cmd.append("--simplified")
- try:
- result = subprocess.run(cmd, check=True, capture_output=True, text=True)
- print(result.stdout)
- print(f"\n✅ 步骤3完成: 可视化生成成功")
- print(f" 生成文件: {self.visualization_path}")
- print(f"\n 📱 使用浏览器打开查看:")
- print(f" open {self.visualization_path}")
- return True
- except subprocess.CalledProcessError as e:
- print(f"\n❌ 步骤3失败: 可视化生成出错")
- print(f" 错误信息: {e}")
- if e.stdout:
- print(f" 输出: {e.stdout}")
- if e.stderr:
- print(f" 错误: {e.stderr}")
- return False
- async def run_full_pipeline(
- self,
- max_posts: int = None,
- top_n: int = 10,
- max_concurrent: int = 5,
- simplified: bool = False
- ) -> bool:
- """
- 运行完整工作流:评估 → 清洗排序 → 可视化
- Args:
- max_posts: 最多评估的帖子数量(None表示全部)
- top_n: 清洗时提取前N个帖子
- max_concurrent: 清洗时的最大并发数
- simplified: 是否使用简化视图
- Returns:
- 是否全部成功
- """
- print(f"\n{'='*80}")
- print("🔄 运行完整工作流")
- print(f"{'='*80}\n")
- # 步骤1: 评估
- if not self.step_1_evaluate(max_posts):
- print(f"\n❌ 工作流中断: 步骤1失败")
- return False
- # 步骤2: 清洗排序
- if not await self.step_2_clean_and_sort(top_n, max_concurrent):
- print(f"\n❌ 工作流中断: 步骤2失败")
- return False
- # 步骤3: 可视化
- if not self.step_3_visualize(simplified):
- print(f"\n❌ 工作流中断: 步骤3失败")
- return False
- print(f"\n{'='*80}")
- print("🎉 完整工作流执行成功!")
- print(f"{'='*80}\n")
- print("📊 生成的文件:")
- print(f" 1. 评估结果: {self.run_context_v3_path}")
- print(f" 2. 可视化: {self.visualization_path}")
- print(f"\n💡 提示: 使用浏览器打开可视化文件查看结果")
- print(f" open {self.visualization_path}\n")
- return True
- async def main():
- parser = argparse.ArgumentParser(
- description='知识代理评估工作流自动化',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog='''
- 示例用法:
- # 运行完整工作流(评估 → 清洗 → 可视化)
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json
- # 只运行评估(步骤1)
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step evaluate
- # 只运行清洗排序(步骤2,需要先有评估结果)
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step clean
- # 只运行可视化(步骤3,需要先有评估和清洗结果)
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json --step visualize
- # 自定义参数运行完整工作流
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\
- --max-posts 20 \\
- --top-n 5 \\
- --max-concurrent 3 \\
- --simplified
- # 指定输出目录
- python3 workflow_pipeline.py input/test_case/output/.../run_context.json \\
- --output-dir ./custom_output
- '''
- )
- parser.add_argument(
- 'run_context_path',
- help='run_context.json 文件路径'
- )
- parser.add_argument(
- '--step',
- choices=['evaluate', 'clean', 'visualize', 'all'],
- default='all',
- help='执行的步骤 (默认: all - 全流程)'
- )
- parser.add_argument(
- '--output-dir',
- help='输出目录(默认与输入文件同目录)'
- )
- parser.add_argument(
- '--max-posts',
- type=int,
- help='评估时最多处理的帖子数量(用于快速测试)'
- )
- parser.add_argument(
- '--top-n',
- type=int,
- default=10,
- help='清洗时提取前N个帖子 (默认: 10)'
- )
- parser.add_argument(
- '--max-concurrent',
- type=int,
- default=5,
- help='清洗时的最大并发数 (默认: 5)'
- )
- parser.add_argument(
- '--simplified',
- action='store_true',
- help='可视化时使用简化视图'
- )
- args = parser.parse_args()
- # 检查输入文件是否存在
- if not os.path.exists(args.run_context_path):
- print(f"❌ 错误: 文件不存在 - {args.run_context_path}")
- sys.exit(1)
- # 创建工作流
- pipeline = WorkflowPipeline(args.run_context_path, args.output_dir)
- # 执行对应步骤
- success = False
- if args.step == 'evaluate':
- success = pipeline.step_1_evaluate(args.max_posts)
-
- elif args.step == 'clean':
- success = await pipeline.step_2_clean_and_sort(args.top_n, args.max_concurrent)
-
- elif args.step == 'visualize':
- success = pipeline.step_3_visualize(args.simplified)
-
- elif args.step == 'all':
- success = await pipeline.run_full_pipeline(
- args.max_posts,
- args.top_n,
- args.max_concurrent,
- args.simplified
- )
- sys.exit(0 if success else 1)
- if __name__ == "__main__":
- asyncio.run(main())
|