#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' 知识获取工作流可视化 1. 读取知识获取工作流详细过程数据文件(参考 .cache/9f510b2a8348/execution_record.json) 2. 文件路径可设置,每个文件表示一个输入信息执行过程,可能有多个。多个文件路径硬编码在代码中,用list表示 3. 将知识获取工作流用html页面展示出来 4. HTML 文件输出路径在当前目录下,文件名称为 workflow_visualization_datetime.html ''' import json import os from datetime import datetime from pathlib import Path # 硬编码的文件路径列表(相对于项目根目录) DATA_FILE_PATHS = [ "../.cache/dd8dd68ebf0c/execution_record.json", "../.cache/70a208f70a7e/execution_record.json", "../.cache/996e626b9c85/execution_record.json", # 可以在这里添加更多文件路径 ] def load_data_files(file_paths): """读取并解析JSON文件""" data_list = [] script_dir = Path(__file__).parent for file_path in file_paths: # 将相对路径转换为绝对路径 abs_path = (script_dir / file_path).resolve() if not abs_path.exists(): print(f"警告: 文件不存在: {abs_path}") continue try: with open(abs_path, 'r', encoding='utf-8') as f: data = json.load(f) data_list.append(data) except (json.JSONDecodeError, IOError) as e: print(f"错误: 读取文件失败 {abs_path}: {e}") return data_list def parse_workflow_data(data): """解析工作流数据,提取关键信息""" workflow = { 'input': {}, 'steps': [], 'output': {} } # 提取输入信息 if 'input' in data: workflow['input'] = { 'question': data['input'].get('question', ''), 'post_info': data['input'].get('post_info', ''), 'persona_info': data['input'].get('persona_info', '') } # 提取执行流程(新格式:execution 直接包含各步骤,不再有 modules.function_knowledge) if 'execution' in data: execution = data['execution'] # 步骤1: 生成query if 'generate_query' in execution: generate_query = execution['generate_query'] workflow['steps'].append({ 'step': 'generate_query', 'name': '生成查询', 'query': generate_query.get('query', '') or generate_query.get('response', ''), 'prompt': generate_query.get('prompt', '') }) # 步骤2: 选择工具 if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) workflow['steps'].append({ 'step': 'select_tool', 'name': '选择工具', 'prompt': select_tool.get('prompt', ''), 'tool_name': response.get('工具名', '') if isinstance(response, dict) else '', 'tool_id': response.get('工具调用ID', '') if isinstance(response, dict) else '', 'tool_usage': response.get('使用方法', '') if isinstance(response, dict) else '' }) # 判断是否选择了工具(如果 response 为空字典或没有工具信息,则没有选择到工具) has_tool = False if 'select_tool' in execution: select_tool = execution['select_tool'] response = select_tool.get('response', {}) if isinstance(response, dict) and response.get('工具名'): has_tool = True # 如果选择了工具,执行工具调用流程 if has_tool: # 步骤3: 提取参数 if 'extract_params' in execution: extract_params = execution['extract_params'] workflow['steps'].append({ 'step': 'extract_params', 'name': '提取参数', 'prompt': extract_params.get('prompt', ''), 'params': extract_params.get('params', {}) }) # 步骤4: 执行工具(新格式:tool_call 替代 execute_tool) if 'tool_call' in execution: tool_call = execution['tool_call'] # 优先使用 result,如果没有则使用 response result = tool_call.get('result', '') if not result and tool_call.get('response'): # 如果 response 是字典,尝试提取其中的 result response = tool_call.get('response', {}) if isinstance(response, dict): result = response.get('result', response) else: result = response workflow['steps'].append({ 'step': 'execute_tool', 'name': '执行工具', 'response': result or tool_call.get('response', '') }) # 如果没有选择到工具,进行知识搜索流程 else: if 'knowledge_search' in execution: knowledge_search = execution['knowledge_search'] # 步骤3: LLM搜索(大模型+search 渠道的搜索过程) if 'llm_search' in knowledge_search: llm_search = knowledge_search['llm_search'] search_results = llm_search.get('search_results', []) workflow['steps'].append({ 'step': 'llm_search', 'name': 'LLM搜索', 'search_results': search_results }) # 步骤4: 多渠道搜索结果整合 if 'multi_search_merge' in knowledge_search: multi_search_merge = knowledge_search['multi_search_merge'] workflow['steps'].append({ 'step': 'multi_search_merge', 'name': '多渠道搜索结果整合', 'prompt': multi_search_merge.get('prompt', ''), 'response': multi_search_merge.get('response', ''), 'sources_count': multi_search_merge.get('sources_count', 0), 'valid_sources_count': multi_search_merge.get('valid_sources_count', 0) }) # 提取输出信息 if 'output' in data: workflow['output'] = { 'result': data['output'].get('result', '') } return workflow def escape_html(text): """转义HTML特殊字符""" if not isinstance(text, str): text = str(text) return (text.replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"') .replace("'", ''')) def format_json_for_display(obj): """格式化JSON对象用于显示""" if isinstance(obj, dict): return json.dumps(obj, ensure_ascii=False, indent=2) elif isinstance(obj, str): try: parsed = json.loads(obj) return json.dumps(parsed, ensure_ascii=False, indent=2) except (json.JSONDecodeError, ValueError): return obj return str(obj) def generate_html(workflows): """生成HTML页面""" html = ''' 知识获取工作流可视化

知识获取工作流可视化

''' # 生成Tab标签 for i, workflow in enumerate(workflows): question = workflow['input'].get('question', f'问题 {i+1}') active_class = 'active' if i == 0 else '' html += f' \n' html += '
\n' # 生成Tab内容 for i, workflow in enumerate(workflows): active_class = 'active' if i == 0 else '' html += f'
\n' # 输入信息 html += '
\n' html += '

输入信息

\n' html += f'
问题: {escape_html(workflow["input"].get("question", ""))}
\n' post_info = workflow['input'].get('post_info', '') post_info_display = escape_html(post_info) if post_info else '(无)' html += f'
帖子信息: {post_info_display}
\n' persona_info = workflow['input'].get('persona_info', '') persona_info_display = escape_html(persona_info) if persona_info else '(无)' html += f'
人设信息: {persona_info_display}
\n' html += '
\n' # 工作流程 html += '
\n' for j, step in enumerate(workflow['steps']): step_id = f"step-{i}-{j}" html += f'
\n' html += '
\n' html += '
\n' html += f' {j+1}\n' html += f' {escape_html(step["name"])}\n' html += '
\n' html += ' \n' html += '
\n' html += '
\n' # 根据步骤类型显示不同内容(prompt放在最后,默认隐藏) prompt_id = f"prompt-{step_id}" if step['step'] == 'generate_query': if step.get('query'): html += '
\n' html += ' 生成的Query:\n' html += f'
{escape_html(step["query"])}
\n' html += '
\n' elif step['step'] == 'select_tool': # 判断是否选择了工具 if step.get('tool_name'): html += '
\n' html += ' 工具名称:\n' html += f'
{escape_html(step["tool_name"])}
\n' html += '
\n' if step.get('tool_id'): html += '
\n' html += ' 工具调用ID:\n' html += f'
{escape_html(step["tool_id"])}
\n' html += '
\n' if step.get('tool_usage'): html += '
\n' html += ' 使用方法:\n' html += f'
{escape_html(step["tool_usage"])}
\n' html += '
\n' else: # 无工具选择时显示提示 html += '
\n' html += ' 选择结果:\n' html += '
无匹配工具
\n' html += '
\n' elif step['step'] == 'extract_params': if step.get('params'): html += '
\n' html += ' 提取的参数:\n' params_str = format_json_for_display(step['params']) html += f'
{escape_html(params_str)}
\n' html += '
\n' elif step['step'] == 'execute_tool': if step.get('response'): html += '
\n' html += ' 执行结果:\n' response_str = format_json_for_display(step['response']) html += f'
{escape_html(response_str)}
\n' html += '
\n' elif step['step'] == 'llm_search': search_results = step.get('search_results', []) if search_results: html += '
\n' html += ' 搜索结果:\n' for idx, result in enumerate(search_results, 1): query = result.get('query', '') content = result.get('content', '') html += '
\n' html += f'
查询 {idx}: {escape_html(query)}
\n' html += f'
{escape_html(content)}
\n' html += '
\n' html += '
\n' elif step['step'] == 'multi_search_merge': if step.get('sources_count') is not None: html += '
\n' html += ' 来源统计:\n' html += f'
总来源数: {step.get("sources_count", 0)}, 有效来源数: {step.get("valid_sources_count", 0)}
\n' html += '
\n' if step.get('response'): html += '
\n' html += ' 整合结果:\n' response_str = format_json_for_display(step['response']) html += f'
{escape_html(response_str)}
\n' html += '
\n' # Prompt放在最后,默认隐藏 if step.get('prompt'): html += f' \n' html += f'
{escape_html(step["prompt"])}
\n' html += '
\n' html += '
\n' # 添加箭头(除了最后一步) if j < len(workflow['steps']) - 1: html += '
\n' html += '
\n' # 输出信息 if workflow['output'].get('result'): html += '
\n' html += '

最终输出

\n' result_str = format_json_for_display(workflow['output']['result']) html += f'
{escape_html(result_str)}
\n' html += '
\n' html += '
\n' html += '''
''' return html def main(): """主函数""" # 获取当前脚本所在目录 script_dir = Path(__file__).parent os.chdir(script_dir) # 读取数据文件 print("正在读取数据文件...") data_list = load_data_files(DATA_FILE_PATHS) if not data_list: print("错误: 没有成功读取任何数据文件") return print(f"成功读取 {len(data_list)} 个数据文件") # 解析工作流数据 print("正在解析工作流数据...") workflows = [parse_workflow_data(data) for data in data_list] # 生成HTML print("正在生成HTML页面...") html = generate_html(workflows) # 保存HTML文件 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_filename = f"workflow_visualization_{timestamp}.html" with open(output_filename, 'w', encoding='utf-8') as f: f.write(html) print(f"HTML页面已生成: {output_filename}") print(f"文件路径: {os.path.abspath(output_filename)}") if __name__ == '__main__': main()