#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' 知识获取工作流可视化 1. 读取知识获取工作流详细过程数据文件(参考 .cache/9f510b2a8348/execution_record.json) 2. 文件路径可设置,每个文件表示一个输入信息执行过程,可能有多个。多个文件路径硬编码在代码中,用list表示 3. 将知识获取工作流用html页面展示出来 4. HTML 文件输出路径在当前目录下,文件名称为 workflow_visualization_datetime.html ''' import json import os from datetime import datetime from pathlib import Path # 硬编码的文件路径列表(相对于项目根目录) DATA_FILE_PATHS = [ "../.cache/557e5ce27c1c/execution_record.json", "../.cache/d0608df43f77/execution_record.json", # 可以在这里添加更多文件路径 ] def load_data_files(file_paths): """读取并解析JSON文件,返回数据和文件夹名称的列表""" data_list = [] script_dir = Path(__file__).parent for file_path in file_paths: # 将相对路径转换为绝对路径 abs_path = (script_dir / file_path).resolve() if not abs_path.exists(): print(f"警告: 文件不存在: {abs_path}") continue try: with open(abs_path, 'r', encoding='utf-8') as f: data = json.load(f) # 从文件路径中提取文件夹名称(如 a588c7a380ee) # 路径格式:../.cache/a588c7a380ee/execution_record.json folder_name = abs_path.parent.name data_list.append((data, folder_name)) except (json.JSONDecodeError, IOError) as e: print(f"错误: 读取文件失败 {abs_path}: {e}") return data_list def parse_workflow_data(data, folder_name=''): """解析工作流数据,提取关键信息""" workflow = { 'input': {}, 'steps': [], 'output': {}, 'folder_name': folder_name } # 提取输入信息(字符串格式) if 'input' in data: input_data = data['input'] if isinstance(input_data, str): # 直接保存字符串 workflow['input'] = {'text': input_data} else: # 兼容旧格式:转换为字符串 question = input_data.get('question', '') post_info = input_data.get('post_info', '') persona_info = input_data.get('persona_info', '') input_text = f'问题:{question}\n' if question else '' if post_info: input_text += f'{post_info}\n' if persona_info: input_text += f'账号人设信息:{persona_info}\n' workflow['input'] = {'text': input_text.strip()} # 提取执行流程(新格式:execution 直接包含各步骤,不再有 modules.function_knowledge) if 'execution' in data: execution = data['execution'] # 步骤1: 生成query(暂时隐藏,不添加到步骤列表) # if 'generate_query' in execution: # generate_query = execution['generate_query'] # workflow['steps'].append({ # 'step': 'generate_query', # 'name': '生成查询', # 'query': generate_query.get('query', '') or generate_query.get('response', ''), # 'prompt': generate_query.get('prompt', '') # }) # 步骤1: 选择工具(原步骤2,现在变成步骤1) if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) workflow['steps'].append({ 'step': 'select_tool', 'name': '选择工具', 'prompt': select_tool.get('prompt', ''), 'tool_name': response.get('工具名', '') if isinstance(response, dict) else '', 'tool_id': response.get('工具调用ID', '') if isinstance(response, dict) else '', 'tool_usage': response.get('使用方法', '') if isinstance(response, dict) else '' }) # 判断是否选择了工具(如果 response 为空字典或没有工具信息,则没有选择到工具) has_tool = False if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) if isinstance(response, dict) and response.get('工具名'): has_tool = True # 如果选择了工具,执行工具调用流程 if has_tool: # 步骤3: 提取参数 if 'extract_params' in execution: extract_params = execution['extract_params'] workflow['steps'].append({ 'step': 'extract_params', 'name': '提取参数', 'prompt': extract_params.get('prompt', ''), 'params': extract_params.get('params', {}) }) # 步骤4: 执行工具(新格式:tool_call 替代 execute_tool) if 'tool_call' in execution: tool_call = execution['tool_call'] # 优先使用 result,如果没有则使用 response result = tool_call.get('result', '') if not result and tool_call.get('response'): # 如果 response 是字典,尝试提取其中的 result response = tool_call.get('response', {}) if isinstance(response, dict): result = response.get('result', response) else: result = response workflow['steps'].append({ 'step': 'execute_tool', 'name': '执行工具', 'response': result or tool_call.get('response', '') }) # 如果没有选择到工具,进行知识搜索流程 else: if 'knowledge_search' in execution: knowledge_search = execution['knowledge_search'] # 步骤3: LLM搜索(大模型+search 渠道的搜索过程) if 'llm_search' in knowledge_search: llm_search = knowledge_search['llm_search'] search_results = llm_search.get('search_results', []) workflow['steps'].append({ 'step': 'llm_search', 'name': 'LLM搜索', 'search_results': search_results }) # 步骤4: 多渠道搜索结果整合 if 'multi_search_merge' in knowledge_search: multi_search_merge = knowledge_search['multi_search_merge'] workflow['steps'].append({ 'step': 'multi_search_merge', 'name': '多渠道搜索结果整合', 'prompt': multi_search_merge.get('prompt', ''), 'response': multi_search_merge.get('response', ''), 'sources_count': multi_search_merge.get('sources_count', 0), 'valid_sources_count': multi_search_merge.get('valid_sources_count', 0) }) # 步骤5: 发现新工具 if 'extra_tools' in knowledge_search: extra_tools = knowledge_search['extra_tools'] match_tool_response = extra_tools.get('match_tool_response', {}) selected_tools = match_tool_response.get('selected_tools', []) workflow['steps'].append({ 'step': 'extra_tools', 'name': '发现新工具', 'prompt': extra_tools.get('match_tool_prompt', ''), 'selected_tools': selected_tools, 'analysis_summary': match_tool_response.get('analysis_summary', '') }) # 提取输出信息 if 'output' in data: workflow['output'] = { 'result': data['output'].get('result', '') } return workflow def escape_html(text): """转义HTML特殊字符""" if not isinstance(text, str): text = str(text) return (text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def format_json_for_display(obj): """格式化JSON对象用于显示""" if isinstance(obj, dict): return json.dumps(obj, ensure_ascii=False, indent=2) elif isinstance(obj, str): try: parsed = json.loads(obj) return json.dumps(parsed, ensure_ascii=False, indent=2) except (json.JSONDecodeError, ValueError): return obj return str(obj) def generate_html(workflows): """生成HTML页面""" html = '''