#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' 知识获取工作流可视化 1. 读取知识获取工作流详细过程数据文件(参考 .cache/9f510b2a8348/execution_record.json) 2. 文件路径可设置,每个文件表示一个输入信息执行过程,可能有多个。多个文件路径硬编码在代码中,用list表示 3. 将知识获取工作流用html页面展示出来 4. HTML 文件输出路径在当前目录下,文件名称为 workflow_visualization_datetime.html ''' import json import os from datetime import datetime from pathlib import Path # 硬编码的文件路径列表(相对于项目根目录) DATA_FILE_PATHS = [ "../.cache/dd8dd68ebf0c/execution_record.json", "../.cache/70a208f70a7e/execution_record.json", "../.cache/996e626b9c85/execution_record.json", # 可以在这里添加更多文件路径 ] def load_data_files(file_paths): """读取并解析JSON文件""" data_list = [] script_dir = Path(__file__).parent for file_path in file_paths: # 将相对路径转换为绝对路径 abs_path = (script_dir / file_path).resolve() if not abs_path.exists(): print(f"警告: 文件不存在: {abs_path}") continue try: with open(abs_path, 'r', encoding='utf-8') as f: data = json.load(f) data_list.append(data) except (json.JSONDecodeError, IOError) as e: print(f"错误: 读取文件失败 {abs_path}: {e}") return data_list def parse_workflow_data(data): """解析工作流数据,提取关键信息""" workflow = { 'input': {}, 'steps': [], 'output': {} } # 提取输入信息 if 'input' in data: workflow['input'] = { 'question': data['input'].get('question', ''), 'post_info': data['input'].get('post_info', ''), 'persona_info': data['input'].get('persona_info', '') } # 提取执行流程(新格式:execution 直接包含各步骤,不再有 modules.function_knowledge) if 'execution' in data: execution = data['execution'] # 步骤1: 生成query if 'generate_query' in execution: generate_query = execution['generate_query'] workflow['steps'].append({ 'step': 'generate_query', 'name': '生成查询', 'query': generate_query.get('query', '') or generate_query.get('response', ''), 'prompt': generate_query.get('prompt', '') }) # 步骤2: 选择工具 if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) workflow['steps'].append({ 'step': 'select_tool', 'name': '选择工具', 'prompt': select_tool.get('prompt', ''), 'tool_name': response.get('工具名', '') if isinstance(response, dict) else '', 'tool_id': response.get('工具调用ID', '') if isinstance(response, dict) else '', 'tool_usage': response.get('使用方法', '') if isinstance(response, dict) else '' }) # 判断是否选择了工具(如果 response 为空字典或没有工具信息,则没有选择到工具) has_tool = False if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) if isinstance(response, dict) and response.get('工具名'): has_tool = True # 如果选择了工具,执行工具调用流程 if has_tool: # 步骤3: 提取参数 if 'extract_params' in execution: extract_params = execution['extract_params'] workflow['steps'].append({ 'step': 'extract_params', 'name': '提取参数', 'prompt': extract_params.get('prompt', ''), 'params': extract_params.get('params', {}) }) # 步骤4: 执行工具(新格式:tool_call 替代 execute_tool) if 'tool_call' in execution: tool_call = execution['tool_call'] # 优先使用 result,如果没有则使用 response result = tool_call.get('result', '') if not result and tool_call.get('response'): # 如果 response 是字典,尝试提取其中的 result response = tool_call.get('response', {}) if isinstance(response, dict): result = response.get('result', response) else: result = response workflow['steps'].append({ 'step': 'execute_tool', 'name': '执行工具', 'response': result or tool_call.get('response', '') }) # 如果没有选择到工具,进行知识搜索流程 else: if 'knowledge_search' in execution: knowledge_search = execution['knowledge_search'] # 步骤3: LLM搜索(大模型+search 渠道的搜索过程) if 'llm_search' in knowledge_search: llm_search = knowledge_search['llm_search'] search_results = llm_search.get('search_results', []) workflow['steps'].append({ 'step': 'llm_search', 'name': 'LLM搜索', 'search_results': search_results }) # 步骤4: 多渠道搜索结果整合 if 'multi_search_merge' in knowledge_search: multi_search_merge = knowledge_search['multi_search_merge'] workflow['steps'].append({ 'step': 'multi_search_merge', 'name': '多渠道搜索结果整合', 'prompt': multi_search_merge.get('prompt', ''), 'response': multi_search_merge.get('response', ''), 'sources_count': multi_search_merge.get('sources_count', 0), 'valid_sources_count': multi_search_merge.get('valid_sources_count', 0) }) # 提取输出信息 if 'output' in data: workflow['output'] = { 'result': data['output'].get('result', '') } return workflow def escape_html(text): """转义HTML特殊字符""" if not isinstance(text, str): text = str(text) return (text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def format_json_for_display(obj): """格式化JSON对象用于显示""" if isinstance(obj, dict): return json.dumps(obj, ensure_ascii=False, indent=2) elif isinstance(obj, str): try: parsed = json.loads(obj) return json.dumps(parsed, ensure_ascii=False, indent=2) except (json.JSONDecodeError, ValueError): return obj return str(obj) def generate_html(workflows): """生成HTML页面""" html = '''