#!/usr/bin/env python3 """ Steps 可视化工具 将 steps.json 转换为 HTML 可视化页面 """ import json import argparse from pathlib import Path from datetime import datetime HTML_TEMPLATE = """ Query Optimization Steps 可视化
{content}
""" def make_collapsible(title, content, collapsed=True, section_id=None): """创建可折叠区域的HTML""" collapsed_class = " collapsed" if collapsed else "" id_attr = f' id="{section_id}"' if section_id else "" # 添加 data-title 属性用于目录生成 title_attr = f' data-title="{title}"' if section_id else "" return f"""
{title}
{content}
""" def get_confidence_class(score): """根据置信度分数返回CSS类""" if score >= 0.7: return "confidence-high" elif score >= 0.5: return "confidence-medium" else: return "confidence-low" def escape_js_string(s): """转义JavaScript字符串""" import json return json.dumps(str(s) if s else "") def build_post_json_data(note, evaluation=None): """构建帖子的JSON数据用于模态框""" import json image_list = note.get('image_list', []) if not image_list and note.get('cover_image'): cover = note.get('cover_image') # cover_image 可能是字典或字符串 if isinstance(cover, dict): image_list = [cover.get('image_url', '')] else: image_list = [cover] # image_list 现在已经是 URL 字符串列表(由搜索API预处理) images = [img if isinstance(img, str) else img.get('image_url', '') for img in image_list if img] interact = note.get('interact_info', {}) user = note.get('user', {}) data = { 'title': note.get('title', '无标题'), 'desc': note.get('desc', ''), 'user': user.get('nickname', '未知'), 'likes': interact.get('liked_count', 0), 'collects': interact.get('collected_count', 0), 'comments': interact.get('comment_count', 0), 'type': note.get('type', 'normal'), 'url': note.get('note_url', ''), 'images': images } if evaluation: data['evaluation'] = { 'reason': evaluation.get('reason', ''), 'title_relevance': evaluation.get('title_relevance', 0), 'content_expectation': evaluation.get('content_expectation', 0), 'confidence_score': evaluation.get('confidence_score', 0) } return json.dumps(data, ensure_ascii=False) def render_header(steps_data): """渲染页面头部""" # 获取基本信息 first_step = steps_data[0] if steps_data else {} last_step = steps_data[-1] if steps_data else {} original_question = "" keywords = [] total_steps = len(steps_data) satisfied_notes = 0 # 提取关键信息 for step in steps_data: if step.get("step_type") == "keyword_extraction": original_question = step.get("data", {}).get("input_question", "") keywords = step.get("data", {}).get("keywords", []) elif step.get("step_type") == "final_result": satisfied_notes = step.get("data", {}).get("satisfied_notes_count", 0) keywords_html = "".join([f'{k}' for k in keywords]) html = f"""

🔍 Query Optimization Steps

原始问题
{original_question}
{f'
{keywords_html}
' if keywords else ''}
总步骤数
{total_steps}
满足需求的帖子
{satisfied_notes}
""" return html def render_keyword_extraction(step): """渲染关键词提取步骤""" data = step.get("data", {}) keywords = data.get("keywords", []) reasoning = data.get("reasoning", "") keywords_html = "".join([f'{k}' for k in keywords]) html = f"""
步骤 {step['step_number']}: {step['step_name']}
{step['step_type']}
{keywords_html}
{f'

{reasoning}

' if reasoning else ''}
⏰ {step.get('timestamp', '')}
""" return html def render_level_exploration(step): """渲染层级探索步骤""" data = step.get("data", {}) level = data.get("level", 0) query_count = data.get("query_count", 0) results = data.get("results", []) queries_html = "" for result in results: query = result.get("query", "") suggestions = result.get("suggestions", []) # 使用标签样式显示推荐词 suggestions_tags = "" for suggestion in suggestions: suggestions_tags += f'{suggestion}' queries_html += f"""
{query}
推荐词 ({len(suggestions)} 个):
{suggestions_tags}
""" html = f"""
步骤 {step['step_number']}: Level {level} 探索
{step['step_type']}
探索query数
{query_count}
获得推荐词总数
{data.get('total_suggestions', 0)}
{queries_html}
⏰ {step.get('timestamp', '')}
""" return html def render_level_analysis(step): """渲染层级分析步骤""" data = step.get("data", {}) level = data.get("level", 0) key_findings = data.get("key_findings", "") should_evaluate = data.get("should_evaluate_now", False) promising_signals_count = data.get("promising_signals_count", 0) next_combinations = data.get("next_combinations", []) promising_signals = data.get("promising_signals", []) reasoning = data.get("reasoning", "") step_num = step['step_number'] # 渲染推理过程 reasoning_html = "" if reasoning: reasoning_html = f"""
💭 推理过程
{reasoning}
""" # 渲染下一层探索 next_html = "" if next_combinations: next_items = "".join([f'{q}' for q in next_combinations]) next_html = f'
下一层探索:
{next_items}
' # 渲染有价值的信号 signals_html = "" if promising_signals: signals_items = "" for signal in promising_signals: query = signal.get("query", "") from_level = signal.get("from_level", "") reason = signal.get("reason", "") signals_items += f"""
{query}
来自 Level {from_level}
{reason}
""" signals_html = make_collapsible( f"💡 有价值的信号 ({len(promising_signals)} 个)", f'
{signals_items}
', collapsed=True, section_id=f"step{step_num}-signals" ) html = f"""
步骤 {step['step_number']}: Level {level} 分析
{step['step_type']}
🔎 关键发现
{key_findings}
有价值信号数
{promising_signals_count}
是否开始评估
{'是' if should_evaluate else '否'}
{signals_html} {reasoning_html} {next_html}
⏰ {step.get('timestamp', '')}
""" return html def render_search_results(step): """渲染搜索结果步骤""" data = step.get("data", {}) search_results = data.get("search_results", []) posts_html = "" step_num = step['step_number'] for idx, sr in enumerate(search_results): query = sr.get("query", "") note_count = sr.get("note_count", 0) notes_summary = sr.get("notes_summary", []) # 渲染该query的帖子 posts_cards = "" for note in notes_summary: # 获取封面图 image_list = note.get('image_list', []) if image_list: # image_list 已经是 URL 字符串列表,第一张就是封面 cover_url = image_list[0] if isinstance(image_list[0], str) else image_list[0].get('image_url', '') else: cover = note.get("cover_image", {}) cover_url = cover.get("image_url", "") if isinstance(cover, dict) else cover if cover else "" interact = note.get("interact_info", {}) user = note.get("user", {}) # image_list 现在已经是 URL 字符串列表 images = [img if isinstance(img, str) else img.get('image_url', '') for img in image_list if img] # 构建帖子数据用于模态框 post_data = build_post_json_data(note) images_json = json.dumps(images) image_html = f'{note.get(' if cover_url else '
无图片
' type_badge = "" if note.get("type") == "video": type_badge = '
📹 视频
' # 轮播指示器 dots_html = "" if len(images) > 1: dots_html = '' posts_cards += f"""
{image_html} {type_badge} {dots_html}
{note.get('title', '无标题')}
{note.get('desc', '')}
{note.get('note_id', '')}
""" # 使用可折叠区域包装每个query的搜索结果,添加唯一ID query_content = f'
{posts_cards}
' posts_html += make_collapsible( f"🔎 {query} (找到 {note_count} 个帖子)", query_content, collapsed=True, section_id=f"step{step_num}-search-{idx}" ) html = f"""
步骤 {step['step_number']}: 搜索结果
{step['step_type']}
搜索query数
{data.get('qualified_count', 0)}
{posts_html}
⏰ {step.get('timestamp', '')}
""" return html def render_note_evaluations(step): """渲染帖子评估步骤""" data = step.get("data", {}) note_evaluations = data.get("note_evaluations", []) total_satisfied = data.get("total_satisfied", 0) evals_html = "" step_num = step["step_number"] for idx, query_eval in enumerate(note_evaluations): query = query_eval.get("query", "") satisfied_count = query_eval.get("satisfied_count", 0) evaluated_notes = query_eval.get("evaluated_notes", []) # 分离满足和不满足需求的帖子 satisfied_notes = [n for n in evaluated_notes if n.get('evaluation', {}).get('need_satisfaction')] unsatisfied_notes = [n for n in evaluated_notes if not n.get('evaluation', {}).get('need_satisfaction')] # 渲染满足需求的帖子 satisfied_cards = "" for note in satisfied_notes: # 获取封面图 image_list = note.get('image_list', []) if image_list: cover_url = image_list[0] if isinstance(image_list[0], str) else image_list[0].get('image_url', '') else: cover = note.get("cover_image", {}) cover_url = cover.get("image_url", "") if isinstance(cover, dict) else cover if cover else "" interact = note.get("interact_info", {}) user = note.get("user", {}) evaluation = note.get("evaluation", {}) confidence = evaluation.get("confidence_score", 0) # image_list 现在已经是 URL 字符串列表 images = [img if isinstance(img, str) else img.get('image_url', '') for img in image_list if img] # 构建帖子数据用于模态框 post_data = build_post_json_data(note, evaluation) images_json = json.dumps(images) image_html = f'{note.get(' if cover_url else '
无图片
' type_badge = "" if note.get("type") == "video": type_badge = '
📹 视频
' # 轮播指示器 dots_html = "" if len(images) > 1: dots_html = '' # 评估详情 eval_reason = evaluation.get("reason", "") title_rel = evaluation.get("title_relevance", 0) content_exp = evaluation.get("content_expectation", 0) eval_details = "" # 置信度百分比 confidence_percent = int(confidence * 100) satisfied_cards += f"""
{image_html} {type_badge} {dots_html}
{note.get('title', '无标题')}
{note.get('desc', '')}
{note.get('note_id', '')}
置信度: {confidence:.2f}
{eval_details}
""" # 渲染不满足需求的帖子 unsatisfied_cards = "" for note in unsatisfied_notes: # 获取封面图 image_list = note.get('image_list', []) if image_list: cover_url = image_list[0] if isinstance(image_list[0], str) else image_list[0].get('image_url', '') else: cover = note.get("cover_image", {}) cover_url = cover.get("image_url", "") if isinstance(cover, dict) else cover if cover else "" interact = note.get("interact_info", {}) user = note.get("user", {}) evaluation = note.get("evaluation", {}) confidence = evaluation.get("confidence_score", 0) # image_list 现在已经是 URL 字符串列表 images = [img if isinstance(img, str) else img.get('image_url', '') for img in image_list if img] post_data = build_post_json_data(note, evaluation) images_json = json.dumps(images) image_html = f'{note.get(' if cover_url else '
无图片
' type_badge = "" if note.get("type") == "video": type_badge = '
📹 视频
' dots_html = "" if len(images) > 1: dots_html = '' eval_reason = evaluation.get("reason", "") title_rel = evaluation.get("title_relevance", 0) content_exp = evaluation.get("content_expectation", 0) eval_details = "" confidence_percent = int(confidence * 100) unsatisfied_cards += f"""
{image_html} {type_badge} {dots_html}
{note.get('title', '无标题')}
{note.get('desc', '')}
{note.get('note_id', '')}
置信度: {confidence:.2f}
{eval_details}
""" # 构建该query的评估结果,使用嵌套可折叠区域 query_sections = "" if satisfied_cards: query_sections += make_collapsible( f"✅ 满足需求 ({len(satisfied_notes)} 个帖子)", f'
{satisfied_cards}
', collapsed=True, section_id=f"step{step_num}-eval-{idx}-satisfied" ) if unsatisfied_cards: query_sections += make_collapsible( f"❌ 不满足需求 ({len(unsatisfied_notes)} 个帖子)", f'
{unsatisfied_cards}
', collapsed=True, section_id=f"step{step_num}-eval-{idx}-unsatisfied" ) if query_sections: # 使用可折叠区域包装每个query的评估结果 evals_html += make_collapsible( f"📊 {query} ({satisfied_count}/{len(evaluated_notes)} 个满足需求)", query_sections, collapsed=True, section_id=f"step{step_num}-eval-{idx}" ) html = f"""
步骤 {step['step_number']}: 帖子评估结果
{step['step_type']}
评估的query数
{data.get('query_count', 0)}
总帖子数
{data.get('total_notes', 0)}
满足需求的帖子
{total_satisfied}
{evals_html}
⏰ {step.get('timestamp', '')}
""" return html def render_answer_generation(step): """渲染答案生成步骤""" data = step.get("data", {}) result = data.get("result", {}) answer = result.get("answer", "") confidence = result.get("confidence", 0) summary = result.get("summary", "") cited_notes = result.get("cited_notes", []) # 渲染引用的帖子 cited_html = "" for note in cited_notes: # 获取封面图 image_list = note.get('image_list', []) if image_list: cover_url = image_list[0] if isinstance(image_list[0], str) else image_list[0].get('image_url', '') else: cover = note.get("cover_image", {}) cover_url = cover.get("image_url", "") if isinstance(cover, dict) else cover if cover else "" interact = note.get("interact_info", {}) user = note.get("user", {}) # image_list 现在已经是 URL 字符串列表 images = [img if isinstance(img, str) else img.get('image_url', '') for img in image_list if img] # 构建帖子数据用于模态框(包含评估信息) eval_data = { 'reason': note.get("reason", ""), 'title_relevance': note.get("title_relevance", 0), 'content_expectation': note.get("content_expectation", 0), 'confidence_score': note.get('confidence_score', 0) } post_data = build_post_json_data(note, eval_data) images_json = json.dumps(images) image_html = f'{note.get(' if cover_url else '
无图片
' # 类型标识 type_badge = "" if note.get("type") == "video": type_badge = '
📹 视频
' # 轮播指示器 dots_html = "" if len(images) > 1: dots_html = '' # 评估详情 eval_reason = note.get("reason", "") title_rel = note.get("title_relevance", 0) content_exp = note.get("content_expectation", 0) eval_details = "" # 置信度百分比 note_confidence = note.get('confidence_score', 0) confidence_percent = int(note_confidence * 100) cited_html += f"""
{image_html} {type_badge} {dots_html}
[{note.get('index')}] {note.get('title', '无标题')}
{note.get('desc', '')}
{note.get('note_id', '')}
置信度: {note_confidence:.2f}
{eval_details}
""" # 使用可折叠区域包装引用的帖子 step_num = step['step_number'] cited_section = "" if cited_html: cited_section = make_collapsible( f"📌 引用的帖子 ({len(cited_notes)} 个)", f'
{cited_html}
', collapsed=True, section_id=f"step{step_num}-cited" ) html = f"""
步骤 {step['step_number']}: 生成答案
{step['step_type']}
📝 生成的答案
{answer}
置信度: {confidence:.2f}
引用帖子: {len(cited_notes)} 个
{f'

摘要: {summary}

' if summary else ''} {cited_section}
⏰ {step.get('timestamp', '')}
""" return html def render_final_result(step): """渲染最终结果步骤""" data = step.get("data", {}) success = data.get("success", False) message = data.get("message", "") satisfied_notes_count = data.get("satisfied_notes_count", 0) status_color = "#10b981" if success else "#ef4444" status_text = "✅ 成功" if success else "❌ 失败" html = f"""
步骤 {step['step_number']}: {step['step_name']}
{step['step_type']}
状态
{status_text}
满足需求的帖子
{satisfied_notes_count}

{message}

⏰ {step.get('timestamp', '')}
""" return html def render_query_suggestion_evaluation(step): """渲染候选query推荐词评估步骤""" data = step.get("data", {}) candidate_count = data.get("candidate_count", 0) results = data.get("results", []) results_html = "" step_num = step['step_number'] for idx, result in enumerate(results): candidate = result.get("candidate", "") suggestions = result.get("suggestions", []) evaluations = result.get("evaluations", []) # 渲染每个候选词的推荐词评估 eval_cards = "" for evaluation in evaluations: query = evaluation.get("query", "") intent_match = evaluation.get("intent_match", False) relevance_score = evaluation.get("relevance_score", 0) reason = evaluation.get("reason", "") intent_badge = "✅ 意图匹配" if intent_match else "❌ 意图不匹配" intent_class = "confidence-high" if intent_match else "confidence-low" eval_cards += f"""
{query}
{intent_badge} 相关性: {relevance_score:.2f}
{reason}
""" if eval_cards: # 使用可折叠区域包装每个候选词的推荐词列表,添加唯一ID results_html += make_collapsible( f"候选词: {candidate} ({len(evaluations)} 个推荐词)", eval_cards, collapsed=True, section_id=f"step{step_num}-candidate-{idx}" ) html = f"""
步骤 {step['step_number']}: {step['step_name']}
{step['step_type']}
候选query数
{candidate_count}
总推荐词数
{sum(len(r.get('evaluations', [])) for r in results)}
{results_html}
⏰ {step.get('timestamp', '')}
""" return html def render_filter_qualified_queries(step): """渲染筛选合格推荐词步骤""" data = step.get("data", {}) input_count = data.get("input_evaluation_count", 0) qualified_count = data.get("qualified_count", 0) min_relevance = data.get("min_relevance_score", 0.7) all_queries = data.get("all_queries", []) # 如果没有all_queries,使用旧的qualified_queries if not all_queries: all_queries = data.get("qualified_queries", []) # 分离合格和不合格的查询 qualified_html = "" unqualified_html = "" for item in all_queries: query = item.get("query", "") from_candidate = item.get("from_candidate", "") intent_match = item.get("intent_match", False) relevance_score = item.get("relevance_score", 0) reason = item.get("reason", "") is_qualified = item.get("is_qualified", True) # 默认为True以兼容旧数据 intent_badge = "✅ 意图匹配" if intent_match else "❌ 意图不匹配" intent_class = "confidence-high" if intent_match else "confidence-low" # 根据相关性分数确定badge颜色 if relevance_score >= 0.8: score_class = "confidence-high" elif relevance_score >= 0.6: score_class = "confidence-medium" else: score_class = "confidence-low" # 确定边框颜色和背景色 if is_qualified: border_color = "#10b981" bg_color = "#f0fdf4" border_left_color = "#10b981" else: border_color = "#e5e7eb" bg_color = "#f9fafb" border_left_color = "#9ca3af" query_html = f"""
{query}
来自候选词: {from_candidate}
{intent_badge} 相关性: {relevance_score:.2f}
{reason}
""" if is_qualified: qualified_html += query_html else: unqualified_html += query_html # 构建HTML - 使用可折叠区域 step_num = step['step_number'] qualified_section = make_collapsible( f"✅ 合格的推荐词 ({qualified_count})", qualified_html, collapsed=True, section_id=f"step{step_num}-qualified" ) if qualified_html else '' unqualified_section = make_collapsible( f"❌ 不合格的推荐词 ({input_count - qualified_count})", unqualified_html, collapsed=True, section_id=f"step{step_num}-unqualified" ) if unqualified_html else '' html = f"""
步骤 {step['step_number']}: {step['step_name']}
{step['step_type']}
输入推荐词数
{input_count}
合格推荐词数
{qualified_count}
最低相关性
{min_relevance:.2f}
{qualified_section} {unqualified_section}
⏰ {step.get('timestamp', '')}
""" return html def render_generic_step(step): """通用步骤渲染""" data = step.get("data", {}) # 提取数据的简单展示 data_html = "" if data: data_html = "
"
        import json
        data_html += json.dumps(data, ensure_ascii=False, indent=2)[:500]  # 限制长度
        if len(json.dumps(data)) > 500:
            data_html += "\n..."
        data_html += "
" return f"""
步骤 {step['step_number']}: {step['step_name']}
{step['step_type']}
{data_html}
⏰ {step.get('timestamp', '')}
""" def render_step(step): """根据步骤类型渲染对应的HTML""" step_type = step.get("step_type", "") renderers = { "keyword_extraction": render_keyword_extraction, "level_exploration": render_level_exploration, "level_analysis": render_level_analysis, "query_suggestion_evaluation": render_query_suggestion_evaluation, "filter_qualified_queries": render_filter_qualified_queries, "search_qualified_queries": render_search_results, "evaluate_search_notes": render_note_evaluations, "answer_generation": render_answer_generation, "final_result": render_final_result, } renderer = renderers.get(step_type) if renderer: return renderer(step) else: # 使用通用渲染显示数据 return render_generic_step(step) def generate_html(steps_json_path, output_path=None): """生成HTML可视化文件""" # 读取 steps.json with open(steps_json_path, 'r', encoding='utf-8') as f: steps_data = json.load(f) # 生成内容 content_parts = [render_header(steps_data)] for step in steps_data: content_parts.append(render_step(step)) content = "\n".join(content_parts) # 生成最终HTML(使用replace而不是format来避免CSS中的花括号问题) html = HTML_TEMPLATE.replace("{content}", content) # 确定输出路径 if output_path is None: steps_path = Path(steps_json_path) output_path = steps_path.parent / "steps_visualization.html" # 写入文件 with open(output_path, 'w', encoding='utf-8') as f: f.write(html) return output_path def main(): parser = argparse.ArgumentParser(description="Steps 可视化工具") parser.add_argument("steps_json", type=str, help="steps.json 文件路径") parser.add_argument("-o", "--output", type=str, help="输出HTML文件路径(可选)") args = parser.parse_args() # 生成可视化 output_path = generate_html(args.steps_json, args.output) print(f"✅ 可视化生成成功!") print(f"📄 输出文件: {output_path}") output_abs = Path(output_path).absolute() if isinstance(output_path, str) else output_path.absolute() print(f"\n💡 在浏览器中打开查看: file://{output_abs}") if __name__ == "__main__": main()