#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' 知识获取工作流可视化 1. 读取知识获取工作流详细过程数据文件(参考 .cache/9f510b2a8348/execution_record.json) 2. 文件路径可设置,每个文件表示一个输入信息执行过程,可能有多个。多个文件路径硬编码在代码中,用list表示 3. 将知识获取工作流用html页面展示出来 4. HTML 文件输出路径在当前目录下,文件名称为 workflow_visualization_datetime.html ''' import json import os from datetime import datetime from pathlib import Path # 硬编码的文件路径列表(相对于项目根目录) DATA_FILE_PATHS = [ "../.cache/9f510b2a8348/execution_record.json", # 可以在这里添加更多文件路径 ] def load_data_files(file_paths): """读取并解析JSON文件""" data_list = [] script_dir = Path(__file__).parent for file_path in file_paths: # 将相对路径转换为绝对路径 abs_path = (script_dir / file_path).resolve() if not abs_path.exists(): print(f"警告: 文件不存在: {abs_path}") continue try: with open(abs_path, 'r', encoding='utf-8') as f: data = json.load(f) data_list.append(data) except (json.JSONDecodeError, IOError) as e: print(f"错误: 读取文件失败 {abs_path}: {e}") return data_list def parse_workflow_data(data): """解析工作流数据,提取关键信息""" workflow = { 'input': {}, 'steps': [], 'output': {} } # 提取输入信息 if 'input' in data: workflow['input'] = { 'question': data['input'].get('question', ''), 'post_info': data['input'].get('post_info', ''), 'persona_info': data['input'].get('persona_info', '') } # 提取执行流程 if 'execution' in data and 'modules' in data['execution']: function_knowledge = data['execution']['modules'].get('function_knowledge', {}) # 步骤1: 生成query if 'generate_query' in function_knowledge: generate_query = function_knowledge['generate_query'] workflow['steps'].append({ 'step': 'generate_query', 'name': '生成查询', 'query': generate_query.get('query', ''), 'prompt': generate_query.get('prompt', '') }) # 步骤2: 选择工具 if 'select_tool' in function_knowledge: select_tool = function_knowledge['select_tool'] response = select_tool.get('response', {}) workflow['steps'].append({ 'step': 'select_tool', 'name': '选择工具', 'prompt': select_tool.get('prompt', ''), 'tool_name': response.get('工具名', ''), 'tool_id': response.get('工具调用ID', ''), 'tool_usage': response.get('使用方法', '') }) # 步骤3: 提取参数 if 'extract_params' in function_knowledge: extract_params = function_knowledge['extract_params'] workflow['steps'].append({ 'step': 'extract_params', 'name': '提取参数', 'prompt': extract_params.get('prompt', ''), 'params': extract_params.get('params', {}) }) # 步骤4: 执行工具 if 'execute_tool' in function_knowledge: execute_tool = function_knowledge['execute_tool'] workflow['steps'].append({ 'step': 'execute_tool', 'name': '执行工具', 'response': execute_tool.get('response', '') }) # 提取输出信息 if 'output' in data: workflow['output'] = { 'result': data['output'].get('result', '') } return workflow def escape_html(text): """转义HTML特殊字符""" if not isinstance(text, str): text = str(text) return (text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def format_json_for_display(obj): """格式化JSON对象用于显示""" if isinstance(obj, dict): return json.dumps(obj, ensure_ascii=False, indent=2) elif isinstance(obj, str): try: parsed = json.loads(obj) return json.dumps(parsed, ensure_ascii=False, indent=2) except (json.JSONDecodeError, ValueError): return obj return str(obj) def generate_html(workflows): """生成HTML页面""" html = ''' 知识获取工作流可视化

知识获取工作流可视化

''' # 生成Tab标签 for i, workflow in enumerate(workflows): question = workflow['input'].get('question', f'问题 {i+1}') active_class = 'active' if i == 0 else '' html += f' \n' html += '
\n' # 生成Tab内容 for i, workflow in enumerate(workflows): active_class = 'active' if i == 0 else '' html += f'
\n' # 输入信息 html += '
\n' html += '

输入信息

\n' html += f'
问题: {escape_html(workflow["input"].get("question", ""))}
\n' post_info = workflow['input'].get('post_info', '') post_info_display = escape_html(post_info) if post_info else '(无)' html += f'
帖子信息: {post_info_display}
\n' persona_info = workflow['input'].get('persona_info', '') persona_info_display = escape_html(persona_info) if persona_info else '(无)' html += f'
人设信息: {persona_info_display}
\n' html += '
\n' # 工作流程 html += '
\n' for j, step in enumerate(workflow['steps']): step_id = f"step-{i}-{j}" html += f'
\n' html += '
\n' html += '
\n' html += f' {j+1}\n' html += f' {escape_html(step["name"])}\n' html += '
\n' html += ' \n' html += '
\n' html += '
\n' # 根据步骤类型显示不同内容(prompt放在最后,默认隐藏) prompt_id = f"prompt-{step_id}" if step['step'] == 'generate_query': if step.get('query'): html += '
\n' html += ' 生成的Query:\n' html += f'
{escape_html(step["query"])}
\n' html += '
\n' elif step['step'] == 'select_tool': if step.get('tool_name'): html += '
\n' html += ' 工具名称:\n' html += f'
{escape_html(step["tool_name"])}
\n' html += '
\n' if step.get('tool_id'): html += '
\n' html += ' 工具调用ID:\n' html += f'
{escape_html(step["tool_id"])}
\n' html += '
\n' if step.get('tool_usage'): html += '
\n' html += ' 使用方法:\n' html += f'
{escape_html(step["tool_usage"])}
\n' html += '
\n' elif step['step'] == 'extract_params': if step.get('params'): html += '
\n' html += ' 提取的参数:\n' params_str = format_json_for_display(step['params']) html += f'
{escape_html(params_str)}
\n' html += '
\n' elif step['step'] == 'execute_tool': if step.get('response'): html += '
\n' html += ' 执行结果:\n' response_str = format_json_for_display(step['response']) html += f'
{escape_html(response_str)}
\n' html += '
\n' # Prompt放在最后,默认隐藏 if step.get('prompt'): html += f' \n' html += f'
{escape_html(step["prompt"])}
\n' html += '
\n' html += '
\n' # 添加箭头(除了最后一步) if j < len(workflow['steps']) - 1: html += '
\n' html += '
\n' # 输出信息 if workflow['output'].get('result'): html += '
\n' html += '

最终输出

\n' result_str = format_json_for_display(workflow['output']['result']) html += f'
{escape_html(result_str)}
\n' html += '
\n' html += '
\n' html += '''
''' return html def main(): """主函数""" # 获取当前脚本所在目录 script_dir = Path(__file__).parent os.chdir(script_dir) # 读取数据文件 print("正在读取数据文件...") data_list = load_data_files(DATA_FILE_PATHS) if not data_list: print("错误: 没有成功读取任何数据文件") return print(f"成功读取 {len(data_list)} 个数据文件") # 解析工作流数据 print("正在解析工作流数据...") workflows = [parse_workflow_data(data) for data in data_list] # 生成HTML print("正在生成HTML页面...") html = generate_html(workflows) # 保存HTML文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"workflow_visualization_{timestamp}.html" with open(output_filename, 'w', encoding='utf-8') as f: f.write(html) print(f"HTML页面已生成: {output_filename}") print(f"文件路径: {os.path.abspath(output_filename)}") if __name__ == '__main__': main()