||
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- '''
- 知识获取工作流可视化
- 1. 读取知识获取工作流详细过程数据文件(参考 .cache/9f510b2a8348/execution_record.json)
- 2. 文件路径可设置,每个文件表示一个输入信息执行过程,可能有多个。多个文件路径硬编码在代码中,用list表示
- 3. 将知识获取工作流用html页面展示出来
- 4. HTML 文件输出路径在当前目录下,文件名称为 workflow_visualization_datetime.html
- '''
- import json
- import os
- from datetime import datetime
- from pathlib import Path
- # 硬编码的文件路径列表(相对于项目根目录)
- DATA_FILE_PATHS = [
- "../.cache/6ce435bbe63c/execution_record.json",
- "../.cache/1568b0312262/execution_record.json",
- # 可以在这里添加更多文件路径
- ]
- def load_data_files(file_paths):
- """读取并解析JSON文件,返回数据和文件夹名称的列表"""
- data_list = []
- script_dir = Path(__file__).parent
- for file_path in file_paths:
- # 将相对路径转换为绝对路径
- abs_path = (script_dir / file_path).resolve()
- if not abs_path.exists():
- print(f"警告: 文件不存在: {abs_path}")
- continue
- try:
- with open(abs_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- # 从文件路径中提取文件夹名称(如 a588c7a380ee)
- # 路径格式:../.cache/a588c7a380ee/execution_record.json
- folder_name = abs_path.parent.name
- data_list.append((data, folder_name))
- except (json.JSONDecodeError, IOError) as e:
- print(f"错误: 读取文件失败 {abs_path}: {e}")
- return data_list
- def parse_workflow_data(data, folder_name=''):
- """解析工作流数据,提取关键信息"""
- workflow = {
- 'input': {},
- 'steps': [],
- 'output': {},
- 'folder_name': folder_name
- }
-
- # 提取输入信息(字符串格式)
- if 'input' in data:
- input_data = data['input']
- if isinstance(input_data, str):
- # 直接保存字符串
- workflow['input'] = {'text': input_data}
- else:
- # 兼容旧格式:转换为字符串
- question = input_data.get('question', '')
- post_info = input_data.get('post_info', '')
- persona_info = input_data.get('persona_info', '')
- input_text = f'问题:{question}\n' if question else ''
- if post_info:
- input_text += f'{post_info}\n'
- if persona_info:
- input_text += f'账号人设信息:{persona_info}\n'
- workflow['input'] = {'text': input_text.strip()}
-
- # 提取执行流程(新格式:execution 直接包含各步骤,不再有 modules.function_knowledge)
- if 'execution' in data:
- execution = data['execution']
-
- # 步骤0: 寻找方法/工具
- if 'find_tools' in execution:
- find_tools = execution['find_tools']
- workflow['steps'].append({
- 'step': 'find_tools',
- 'name': '寻找方法/工具',
- 'prompt': find_tools.get('prompt', ''),
- 'result': find_tools.get('result', '')
- })
-
- # 步骤1: 执行default工具(热榜)
- if 'call_default_hot_tool' in execution:
- call_default_hot_tool = execution['call_default_hot_tool']
- workflow['steps'].append({
- 'step': 'call_default_hot_tool',
- 'name': '执行default工具(热榜)',
- 'extract_params_prompt': call_default_hot_tool.get('extract_params_prompt', ''),
- 'params': call_default_hot_tool.get('params', {}),
- 'hot_data': call_default_hot_tool.get('hot_data', ''),
- 'analyze_prompt': call_default_hot_tool.get('analyze_prompt', ''),
- 'analysis_result': call_default_hot_tool.get('analysis_result', '')
- })
-
- # 步骤1: 生成query(暂时隐藏,不添加到步骤列表)
- # if 'generate_query' in execution:
- # generate_query = execution['generate_query']
- # workflow['steps'].append({
- # 'step': 'generate_query',
- # 'name': '生成查询',
- # 'query': generate_query.get('query', '') or generate_query.get('response', ''),
- # 'prompt': generate_query.get('prompt', '')
- # })
-
- # 步骤1: 选择工具(原步骤2,现在变成步骤1)
- if 'select_tool' in execution:
- select_tool = execution['select_tool']
- response = select_tool.get('response', {})
- workflow['steps'].append({
- 'step': 'select_tool',
- 'name': '选择工具',
- 'prompt': select_tool.get('prompt', ''),
- 'tool_name': response.get('工具名', '') if isinstance(response, dict) else '',
- 'tool_id': response.get('工具调用ID', '') if isinstance(response, dict) else '',
- 'tool_usage': response.get('使用方法', '') if isinstance(response, dict) else '',
- 'match_reason': response.get('匹配理由', '') if isinstance(response, dict) else '',
- 'application_scenario': response.get('应用场景', '') if isinstance(response, dict) else ''
- })
-
- # 判断是否选择了工具(如果 response 为空字典或没有工具信息,或工具名为"无工具匹配",则没有选择到工具)
- has_tool = False
- if 'select_tool' in execution:
- select_tool = execution['select_tool']
- response = select_tool.get('response', {})
- if isinstance(response, dict):
- tool_name = response.get('工具名', '')
- # 检查工具名是否存在且不是"无工具匹配"
- if tool_name and tool_name != '无工具匹配':
- has_tool = True
-
- # 如果选择了工具,执行工具调用流程
- if has_tool:
- # 步骤3: 提取参数
- if 'extract_params' in execution:
- extract_params = execution['extract_params']
- workflow['steps'].append({
- 'step': 'extract_params',
- 'name': '提取参数',
- 'prompt': extract_params.get('prompt', ''),
- 'params': extract_params.get('params', {})
- })
-
- # 步骤4: 执行工具(新格式:tool_call 替代 execute_tool)
- if 'tool_call' in execution:
- tool_call = execution['tool_call']
- # 优先使用 result,如果没有则使用 response
- result = tool_call.get('result', '')
- if not result and tool_call.get('response'):
- # 如果 response 是字典,尝试提取其中的 result
- response = tool_call.get('response', {})
- if isinstance(response, dict):
- result = response.get('result', response)
- else:
- result = response
- workflow['steps'].append({
- 'step': 'execute_tool',
- 'name': '执行工具',
- 'response': result or tool_call.get('response', '')
- })
-
- # 步骤5: 工具结果评估
- if 'evaluate_tool_result' in execution:
- evaluate_tool_result = execution['evaluate_tool_result']
- eval_result = evaluate_tool_result.get('eval_result', {})
- workflow['steps'].append({
- 'step': 'evaluate_tool_result',
- 'name': '工具结果评估',
- 'prompt': evaluate_tool_result.get('prompt', ''),
- 'can_answer': eval_result.get('是否可以回答', ''),
- 'reason': eval_result.get('理由', '')
- })
-
- # 如果没有选择到工具,进行知识搜索流程
- else:
- if 'knowledge_search' in execution:
- knowledge_search = execution['knowledge_search']
-
- # 步骤3: LLM搜索(大模型+search 渠道的搜索过程)
- if 'llm_search' in knowledge_search:
- llm_search = knowledge_search['llm_search']
- # 处理 generated_queries(生成查询)
- generated_queries = llm_search.get('generated_queries', {})
- queries = generated_queries.get('queries', [])
- search_results = llm_search.get('search_results', [])
- # 处理 merge(合并结果)
- merge = llm_search.get('merge', {})
- workflow['steps'].append({
- 'step': 'llm_search',
- 'name': 'LLM搜索',
- 'prompt': generated_queries.get('prompt', ''),
- 'queries': queries,
- 'search_results': search_results,
- 'merge_prompt': merge.get('prompt', ''),
- 'merge_response': merge.get('response', ''),
- 'sources_count': merge.get('sources_count', 0)
- })
-
- # 步骤4: 多渠道搜索结果整合
- if 'multi_search_merge' in knowledge_search:
- multi_search_merge = knowledge_search['multi_search_merge']
- workflow['steps'].append({
- 'step': 'multi_search_merge',
- 'name': '多渠道搜索结果整合',
- 'prompt': multi_search_merge.get('prompt', ''),
- 'response': multi_search_merge.get('response', ''),
- 'sources_count': multi_search_merge.get('sources_count', 0),
- 'valid_sources_count': multi_search_merge.get('valid_sources_count', 0)
- })
-
- # 步骤5: 发现新工具
- if 'extra_tools' in knowledge_search:
- extra_tools = knowledge_search['extra_tools']
- match_tool_response = extra_tools.get('match_tool_response', {})
- selected_tools = match_tool_response.get('selected_tools', [])
- workflow['steps'].append({
- 'step': 'extra_tools',
- 'name': '发现新工具',
- 'prompt': extra_tools.get('match_tool_prompt', ''),
- 'selected_tools': selected_tools,
- 'analysis_summary': match_tool_response.get('analysis_summary', '')
- })
-
- # 提取输出信息
- if 'output' in data:
- workflow['output'] = {
- 'result': data['output'].get('result', '')
- }
-
- return workflow
- def escape_html(text):
- """转义HTML特殊字符"""
- if not isinstance(text, str):
- text = str(text)
- return (text.replace('&', '&')
- .replace('<', '<')
- .replace('>', '>')
- .replace('"', '"')
- .replace("'", '''))
- def format_json_for_display(obj):
- """格式化JSON对象用于显示"""
- if isinstance(obj, dict):
- return json.dumps(obj, ensure_ascii=False, indent=2)
- elif isinstance(obj, str):
- try:
- parsed = json.loads(obj)
- return json.dumps(parsed, ensure_ascii=False, indent=2)
- except (json.JSONDecodeError, ValueError):
- return obj
- return str(obj)
- def generate_html(workflows):
- """生成HTML页面"""
- html = '''<!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <title>知识获取工作流可视化</title>
- <style>
- * {
- margin: 0;
- padding: 0;
- box-sizing: border-box;
- }
-
- body {
- font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'PingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei', sans-serif;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- color: #333;
- line-height: 1.6;
- min-height: 100vh;
- }
-
- .container {
- max-width: 1400px;
- margin: 0 auto;
- padding: 30px 20px;
- }
-
- h1 {
- text-align: center;
- color: white;
- margin-bottom: 40px;
- font-size: 32px;
- font-weight: 600;
- text-shadow: 0 2px 10px rgba(0,0,0,0.2);
- letter-spacing: 1px;
- }
-
- .tabs {
- display: flex;
- background: white;
- border-radius: 12px 12px 0 0;
- box-shadow: 0 4px 20px rgba(0,0,0,0.15);
- overflow-x: auto;
- padding: 5px;
- }
-
- .tab {
- padding: 16px 28px;
- cursor: pointer;
- border: none;
- background: transparent;
- color: #666;
- font-size: 14px;
- transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
- white-space: nowrap;
- border-radius: 8px;
- margin: 0 4px;
- position: relative;
- font-weight: 500;
- }
-
- .tab:hover {
- background: #f0f0f0;
- color: #333;
- }
-
- .tab.active {
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- color: white;
- box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
- }
-
- .tab-content {
- display: none;
- background: white;
- padding: 40px;
- border-radius: 0 0 12px 12px;
- box-shadow: 0 4px 20px rgba(0,0,0,0.15);
- margin-bottom: 20px;
- animation: fadeIn 0.3s ease-in;
- }
-
- @keyframes fadeIn {
- from { opacity: 0; transform: translateY(10px); }
- to { opacity: 1; transform: translateY(0); }
- }
-
- .tab-content.active {
- display: block;
- }
-
- .input-section {
- background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%);
- padding: 28px;
- border-radius: 12px;
- margin-bottom: 35px;
- box-shadow: 0 4px 15px rgba(0,0,0,0.1);
- border: 1px solid rgba(255,255,255,0.5);
- }
-
- .input-section h3 {
- color: #2c3e50;
- margin-bottom: 20px;
- font-size: 20px;
- font-weight: 600;
- display: flex;
- align-items: center;
- gap: 10px;
- }
-
- .input-section h3::before {
- content: '📋';
- font-size: 24px;
- }
-
- .input-item {
- margin-bottom: 16px;
- padding: 12px;
- background: rgba(255,255,255,0.7);
- border-radius: 8px;
- transition: all 0.3s;
- }
-
- .input-item:hover {
- background: rgba(255,255,255,0.9);
- transform: translateX(5px);
- }
-
- .input-item strong {
- color: #495057;
- display: inline-block;
- width: 110px;
- font-weight: 600;
- }
-
- .input-item .placeholder {
- color: #999;
- font-style: italic;
- }
-
- .workflow {
- position: relative;
- }
-
- .workflow-step {
- background: white;
- border: 2px solid #e0e0e0;
- border-radius: 12px;
- margin-bottom: 25px;
- transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
- overflow: hidden;
- box-shadow: 0 2px 8px rgba(0,0,0,0.08);
- }
-
- .workflow-step.active {
- border-color: #667eea;
- box-shadow: 0 8px 24px rgba(102, 126, 234, 0.25);
- transform: translateY(-2px);
- }
-
- .step-header {
- padding: 20px 24px;
- background: linear-gradient(135deg, #f8f9fa 0%, #e9ecef 100%);
- cursor: pointer;
- display: flex;
- justify-content: space-between;
- align-items: center;
- user-select: none;
- transition: all 0.3s;
- }
-
- .step-header:hover {
- background: linear-gradient(135deg, #e9ecef 0%, #dee2e6 100%);
- }
-
- .workflow-step.active .step-header {
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- color: white;
- }
-
- .workflow-step.active .step-name {
- color: white;
- }
-
- .workflow-step.active .step-toggle {
- color: white;
- }
-
- .step-title {
- display: flex;
- align-items: center;
- gap: 15px;
- }
-
- .step-number {
- display: inline-flex;
- align-items: center;
- justify-content: center;
- width: 36px;
- height: 36px;
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- color: white;
- border-radius: 50%;
- font-size: 16px;
- font-weight: bold;
- box-shadow: 0 4px 12px rgba(102, 126, 234, 0.3);
- }
-
- .workflow-step.active .step-number {
- background: white;
- color: #667eea;
- box-shadow: 0 4px 12px rgba(255,255,255,0.3);
- }
-
- .step-name {
- font-size: 18px;
- font-weight: 600;
- color: #2c3e50;
- }
-
- .step-toggle {
- color: #6c757d;
- font-size: 20px;
- transition: transform 0.3s cubic-bezier(0.4, 0, 0.2, 1);
- }
-
- .step-toggle.expanded {
- transform: rotate(180deg);
- }
-
- .step-content {
- padding: 0 20px;
- max-height: 0;
- overflow: hidden;
- transition: max-height 0.3s ease-out, padding 0.3s;
- }
-
- .step-content.expanded {
- max-height: 5000px;
- padding: 20px;
- }
-
- .step-detail {
- margin-bottom: 20px;
- }
-
- .step-detail-label {
- font-weight: 600;
- color: #495057;
- margin-bottom: 10px;
- display: block;
- font-size: 14px;
- text-transform: uppercase;
- letter-spacing: 0.5px;
- }
-
- .step-detail-content {
- background: #f8f9fa;
- padding: 16px;
- border-radius: 8px;
- border-left: 4px solid #667eea;
- font-size: 14px;
- line-height: 1.8;
- white-space: pre-wrap;
- word-wrap: break-word;
- max-height: 400px;
- overflow-y: auto;
- box-shadow: 0 2px 8px rgba(0,0,0,0.05);
- }
-
- .json-content {
- font-family: 'SF Mono', 'Monaco', 'Courier New', monospace;
- background: #1e1e1e;
- color: #d4d4d4;
- padding: 20px;
- border-radius: 8px;
- overflow-x: auto;
- border-left: 4px solid #667eea;
- box-shadow: 0 4px 12px rgba(0,0,0,0.15);
- }
-
- .prompt-toggle-btn {
- background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
- color: white;
- border: none;
- padding: 10px 20px;
- border-radius: 6px;
- cursor: pointer;
- font-size: 13px;
- font-weight: 500;
- margin-top: 15px;
- transition: all 0.3s;
- box-shadow: 0 2px 8px rgba(102, 126, 234, 0.3);
- }
-
- .prompt-toggle-btn:hover {
- transform: translateY(-2px);
- box-shadow: 0 4px 12px rgba(102, 126, 234, 0.4);
- }
-
- .prompt-content {
- display: none;
- margin-top: 15px;
- padding: 16px;
- background: #fff3cd;
- border-radius: 8px;
- border-left: 4px solid #ffc107;
- font-size: 13px;
- line-height: 1.8;
- white-space: pre-wrap;
- word-wrap: break-word;
- max-height: 500px;
- overflow-y: auto;
- }
-
- .prompt-content.show {
- display: block;
- animation: slideDown 0.3s ease-out;
- }
-
- @keyframes slideDown {
- from {
- opacity: 0;
- max-height: 0;
- }
- to {
- opacity: 1;
- max-height: 500px;
- }
- }
-
- .output-section {
- background: linear-gradient(135deg, #e0f2fe 0%, #bae6fd 100%);
- padding: 28px;
- border-radius: 12px;
- margin-top: 35px;
- border-left: 4px solid #0ea5e9;
- box-shadow: 0 4px 15px rgba(14, 165, 233, 0.2);
- }
-
- .output-section h3 {
- color: #0369a1;
- margin-bottom: 20px;
- font-size: 20px;
- font-weight: 600;
- display: flex;
- align-items: center;
- gap: 10px;
- }
-
- .output-section h3::before {
- content: '✨';
- font-size: 24px;
- }
-
- .arrow {
- text-align: center;
- color: #667eea;
- font-size: 32px;
- margin: -15px 0;
- position: relative;
- z-index: 1;
- filter: drop-shadow(0 2px 4px rgba(102, 126, 234, 0.3));
- }
-
- .arrow::before {
- content: '↓';
- }
-
- @media (max-width: 768px) {
- .container {
- padding: 10px;
- }
-
- .tab {
- padding: 12px 15px;
- font-size: 13px;
- }
-
- .tab-content {
- padding: 20px;
- }
- }
- </style>
- </head>
- <body>
- <div class="container">
- <h1>知识获取工作流可视化</h1>
- <div class="tabs" id="tabs">
- '''
-
- # 生成Tab标签
- for i, workflow in enumerate(workflows):
- # 使用文件夹名称作为Tab标题
- folder_name = workflow.get('folder_name', f'问题 {i+1}')
- active_class = 'active' if i == 0 else ''
- html += f' <button class="tab {active_class}" onclick="switchTab({i})">{escape_html(folder_name)}</button>\n'
-
- html += ' </div>\n'
-
- # 生成Tab内容
- for i, workflow in enumerate(workflows):
- active_class = 'active' if i == 0 else ''
- html += f' <div class="tab-content {active_class}" id="tab-{i}">\n'
-
- # 输入信息
- html += ' <div class="input-section">\n'
- html += ' <h3>输入信息</h3>\n'
- input_text = workflow['input'].get('text', '')
- if input_text:
- html += f' <div class="input-item" style="white-space: pre-wrap; line-height: 1.8;">{escape_html(input_text)}</div>\n'
- else:
- html += ' <div class="input-item"><span class="placeholder">(无)</span></div>\n'
- html += ' </div>\n'
-
- # 工作流程
- html += ' <div class="workflow">\n'
- for j, step in enumerate(workflow['steps']):
- step_id = f"step-{i}-{j}"
- html += f' <div class="workflow-step" id="{step_id}">\n'
- html += ' <div class="step-header" onclick="toggleStep(\'' + step_id + '\')">\n'
- html += ' <div class="step-title">\n'
- html += f' <span class="step-number">{j+1}</span>\n'
- html += f' <span class="step-name">{escape_html(step["name"])}</span>\n'
- html += ' </div>\n'
- html += ' <span class="step-toggle">▼</span>\n'
- html += ' </div>\n'
- html += ' <div class="step-content" id="content-' + step_id + '">\n'
-
- # 根据步骤类型显示不同内容(prompt放在最后,默认隐藏)
- prompt_id = f"prompt-{step_id}"
-
- if step['step'] == 'find_tools':
- # 寻找结果(默认展开)
- if step.get('result'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">寻找结果:</span>\n'
- html += f' <div class="step-detail-content" style="white-space: pre-wrap; line-height: 1.8;">{escape_html(step["result"])}</div>\n'
- html += ' </div>\n'
-
- # prompt(默认收起)
- if step.get('prompt'):
- find_tools_prompt_id = f"find-tools-prompt-{step_id}"
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{find_tools_prompt_id}\')">显示 Prompt</button>\n'
- html += f' <div class="prompt-content" id="{find_tools_prompt_id}">{escape_html(step["prompt"])}</div>\n'
-
- elif step['step'] == 'call_default_hot_tool':
- # 热榜工具参数提取prompt(默认收起)
- if step.get('extract_params_prompt'):
- extract_prompt_id = f"extract-prompt-{step_id}"
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{extract_prompt_id}\')">显示 热榜工具参数提取 Prompt</button>\n'
- html += f' <div class="prompt-content" id="{extract_prompt_id}">{escape_html(step["extract_params_prompt"])}</div>\n'
-
- # 参数(默认收起)
- if step.get('params'):
- params_id = f"params-{step_id}"
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{params_id}\')">显示 参数</button>\n'
- params_str = format_json_for_display(step['params'])
- html += f' <div class="prompt-content" id="{params_id}"><div class="step-detail-content json-content">{escape_html(params_str)}</div></div>\n'
-
- # 热榜数据(默认收起)
- if step.get('hot_data'):
- hot_data_id = f"hot-data-{step_id}"
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{hot_data_id}\')">显示 热榜数据</button>\n'
- html += f' <div class="prompt-content" id="{hot_data_id}"><div class="step-detail-content" style="white-space: pre-wrap; line-height: 1.6;">{escape_html(step["hot_data"])}</div></div>\n'
-
- # 热榜数据分析prompt(默认收起)
- if step.get('analyze_prompt'):
- analyze_prompt_id = f"analyze-prompt-{step_id}"
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{analyze_prompt_id}\')">显示 热榜数据分析 Prompt</button>\n'
- html += f' <div class="prompt-content" id="{analyze_prompt_id}">{escape_html(step["analyze_prompt"])}</div>\n'
-
- # 热榜数据分析结果(默认展开)
- if step.get('analysis_result'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">热榜数据分析结果:</span>\n'
- html += f' <div class="step-detail-content" style="white-space: pre-wrap; line-height: 1.8;">{escape_html(step["analysis_result"])}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'generate_query':
- if step.get('query'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">生成的Query:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["query"])}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'select_tool':
- # 判断是否选择了工具
- tool_name = step.get('tool_name', '')
- if tool_name and tool_name != '无工具匹配':
- # 有工具选择时显示工具信息
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">工具名称:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(tool_name)}</div>\n'
- html += ' </div>\n'
- if step.get('tool_id'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">工具调用ID:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["tool_id"])}</div>\n'
- html += ' </div>\n'
- if step.get('match_reason'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">匹配理由:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["match_reason"])}</div>\n'
- html += ' </div>\n'
- if step.get('application_scenario'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">应用场景:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["application_scenario"])}</div>\n'
- html += ' </div>\n'
- if step.get('tool_usage'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">使用方法:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["tool_usage"])}</div>\n'
- html += ' </div>\n'
- else:
- # 无工具选择时显示提示和匹配理由、应用场景
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">选择结果:</span>\n'
- html += ' <div class="step-detail-content" style="color: #dc3545; font-weight: 500;">无匹配工具</div>\n'
- html += ' </div>\n'
- if step.get('match_reason'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">匹配理由:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["match_reason"])}</div>\n'
- html += ' </div>\n'
- if step.get('application_scenario'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">应用场景:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["application_scenario"])}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'extract_params':
- if step.get('params'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">提取的参数:</span>\n'
- params_str = format_json_for_display(step['params'])
- html += f' <div class="step-detail-content json-content">{escape_html(params_str)}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'execute_tool':
- if step.get('response'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">执行结果:</span>\n'
- response_str = format_json_for_display(step['response'])
- html += f' <div class="step-detail-content json-content">{escape_html(response_str)}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'evaluate_tool_result':
- can_answer = step.get('can_answer', '')
- reason = step.get('reason', '')
- if can_answer:
- # 根据是否可以回答设置颜色
- answer_color = '#28a745' if can_answer == '是' else '#dc3545'
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">是否可以回答输入需求:</span>\n'
- html += f' <div class="step-detail-content" style="color: {answer_color}; font-weight: 500;">{escape_html(can_answer)}</div>\n'
- html += ' </div>\n'
- if reason:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">理由:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(reason)}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'llm_search':
- # 显示生成的查询
- queries = step.get('queries', [])
- if queries:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">生成的查询:</span>\n'
- for idx, query in enumerate(queries, 1):
- html += ' <div style="margin-bottom: 8px; padding: 8px; background: #e8f4f8; border-radius: 4px; border-left: 3px solid #4a90e2;">\n'
- html += f' <div style="font-weight: 500; color: #2c3e50;">查询 {idx}: {escape_html(query)}</div>\n'
- html += ' </div>\n'
- html += ' </div>\n'
-
- # 显示搜索结果
- search_results = step.get('search_results', [])
- if search_results:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">搜索结果:</span>\n'
- for idx, result in enumerate(search_results, 1):
- query = result.get('query', '')
- content = result.get('content', '')
- html += ' <div style="margin-bottom: 15px; padding: 12px; background: #f0f8ff; border-radius: 6px; border-left: 3px solid #4a90e2;">\n'
- html += f' <div style="font-weight: 600; color: #2c3e50; margin-bottom: 8px;">查询 {idx}: {escape_html(query)}</div>\n'
- html += f' <div style="color: #555; line-height: 1.6; white-space: pre-wrap;">{escape_html(content)}</div>\n'
- html += ' </div>\n'
- html += ' </div>\n'
-
- # 显示合并结果
- merge_response = step.get('merge_response', '')
- if merge_response:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">合并结果:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(merge_response)}</div>\n'
- html += ' </div>\n'
-
- # 显示来源统计
- sources_count = step.get('sources_count', 0)
- if sources_count > 0:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">来源统计:</span>\n'
- html += f' <div class="step-detail-content">来源数: {sources_count}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'multi_search_merge':
- if step.get('sources_count') is not None:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">来源统计:</span>\n'
- html += f' <div class="step-detail-content">总来源数: {step.get("sources_count", 0)}, 有效来源数: {step.get("valid_sources_count", 0)}</div>\n'
- html += ' </div>\n'
- if step.get('response'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">整合结果:</span>\n'
- response_str = format_json_for_display(step['response'])
- html += f' <div class="step-detail-content">{escape_html(response_str)}</div>\n'
- html += ' </div>\n'
-
- elif step['step'] == 'extra_tools':
- selected_tools = step.get('selected_tools', [])
- if selected_tools:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">发现的新工具:</span>\n'
- for idx, tool in enumerate(selected_tools, 1):
- tool_name = tool.get('tool_name', '')
- tool_function = tool.get('function', '')
- html += ' <div style="margin-bottom: 15px; padding: 12px; background: #fff3cd; border-radius: 6px; border-left: 3px solid #ffc107;">\n'
- html += f' <div style="font-weight: 600; color: #856404; margin-bottom: 6px;">工具 {idx}: {escape_html(tool_name)}</div>\n'
- html += f' <div style="color: #856404; line-height: 1.6;">{escape_html(tool_function)}</div>\n'
- html += ' </div>\n'
- html += ' </div>\n'
- else:
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">发现结果:</span>\n'
- html += ' <div class="step-detail-content" style="color: #6c757d;">未发现合适的新工具</div>\n'
- html += ' </div>\n'
- if step.get('analysis_summary'):
- html += ' <div class="step-detail">\n'
- html += ' <span class="step-detail-label">分析摘要:</span>\n'
- html += f' <div class="step-detail-content">{escape_html(step["analysis_summary"])}</div>\n'
- html += ' </div>\n'
-
- # Prompt放在最后,默认隐藏(generate_query 和 find_tools 步骤不显示 prompt,因为它们已经单独处理了)
- if step.get('prompt') and step['step'] != 'generate_query' and step['step'] != 'find_tools':
- html += f' <button class="prompt-toggle-btn" onclick="togglePrompt(\'{prompt_id}\')">显示 Prompt</button>\n'
- html += f' <div class="prompt-content" id="{prompt_id}">{escape_html(step["prompt"])}</div>\n'
-
- html += ' </div>\n'
- html += ' </div>\n'
-
- # 添加箭头(除了最后一步)
- if j < len(workflow['steps']) - 1:
- html += ' <div class="arrow"></div>\n'
-
- html += ' </div>\n'
-
- # 输出信息(已隐藏)
- # if workflow['output'].get('result'):
- # html += ' <div class="output-section">\n'
- # html += ' <h3>最终输出</h3>\n'
- # result_str = format_json_for_display(workflow['output']['result'])
- # html += f' <div class="step-detail-content json-content">{escape_html(result_str)}</div>\n'
- # html += ' </div>\n'
-
- html += ' </div>\n'
-
- html += ''' </div>
-
- <script>
- function switchTab(index) {
- // 隐藏所有tab内容
- const contents = document.querySelectorAll('.tab-content');
- contents.forEach(content => content.classList.remove('active'));
-
- // 移除所有tab的active状态
- const tabs = document.querySelectorAll('.tab');
- tabs.forEach(tab => tab.classList.remove('active'));
-
- // 显示选中的tab内容
- document.getElementById('tab-' + index).classList.add('active');
- tabs[index].classList.add('active');
- }
-
- function toggleStep(stepId) {
- const step = document.getElementById(stepId);
- const content = document.getElementById('content-' + stepId);
- const toggle = step.querySelector('.step-toggle');
-
- if (content.classList.contains('expanded')) {
- content.classList.remove('expanded');
- toggle.classList.remove('expanded');
- step.classList.remove('active');
- } else {
- content.classList.add('expanded');
- toggle.classList.add('expanded');
- step.classList.add('active');
- }
- }
-
- function togglePrompt(promptId) {
- const promptContent = document.getElementById(promptId);
- const btn = promptContent.previousElementSibling;
-
- // 从按钮文本中提取字段名(去掉"显示 "或"隐藏 "前缀)
- let fieldName = btn.textContent;
- if (fieldName.startsWith('显示 ')) {
- fieldName = fieldName.substring(3); // 去掉"显示 "
- } else if (fieldName.startsWith('隐藏 ')) {
- fieldName = fieldName.substring(3); // 去掉"隐藏 "
- }
-
- if (promptContent.classList.contains('show')) {
- promptContent.classList.remove('show');
- btn.textContent = '显示 ' + fieldName;
- } else {
- promptContent.classList.add('show');
- btn.textContent = '隐藏 ' + fieldName;
- }
- }
-
- // 页面加载时高亮第一个步骤
- window.addEventListener('load', function() {
- const firstSteps = document.querySelectorAll('.workflow-step');
- firstSteps.forEach((step, index) => {
- if (index === 0 || index % (firstSteps.length / document.querySelectorAll('.tab-content').length) === 0) {
- step.classList.add('active');
- const content = step.querySelector('.step-content');
- const toggle = step.querySelector('.step-toggle');
- if (content) {
- content.classList.add('expanded');
- toggle.classList.add('expanded');
- }
- }
- });
- });
- </script>
- </body>
- </html>'''
-
- return html
- def main():
- """主函数"""
- # 获取当前脚本所在目录
- script_dir = Path(__file__).parent
- os.chdir(script_dir)
-
- # 读取数据文件
- print("正在读取数据文件...")
- data_list = load_data_files(DATA_FILE_PATHS)
-
- if not data_list:
- print("错误: 没有成功读取任何数据文件")
- return
-
- print(f"成功读取 {len(data_list)} 个数据文件")
-
- # 解析工作流数据
- print("正在解析工作流数据...")
- workflows = [parse_workflow_data(data, folder_name) for data, folder_name in data_list]
-
- # 生成HTML
- print("正在生成HTML页面...")
- html = generate_html(workflows)
-
- # 保存HTML文件
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- output_filename = f"workflow_visualization_{timestamp}.html"
-
- with open(output_filename, 'w', encoding='utf-8') as f:
- f.write(html)
-
- print(f"HTML页面已生成: {output_filename}")
- print(f"文件路径: {os.path.abspath(output_filename)}")
- if __name__ == '__main__':
- main()
|