""" Pipeline 执行追踪可视化工具。 读取 tests/traces/{trace_id}/pipeline.jsonl,生成 HTML 可视化页面。 HTML 风格与 tests/.cache/visualize_log.py 保持一致。 用法: python pipeline_visualize.py # 读取最新 trace python pipeline_visualize.py # 指定 trace_id python pipeline_visualize.py --list # 列出所有可用 trace """ from __future__ import annotations import html as html_mod import json import re import sys from datetime import datetime from pathlib import Path TRACES_DIR = Path(__file__).parent / "tests" / "traces" # ───────────────────────────────────────────────────────────── # 工具函数 # ───────────────────────────────────────────────────────────── def _esc(s: str) -> str: return html_mod.escape(str(s)) def _ts(s: str) -> str: return s[:19] if s else "" def _duration_label(ms: int | None) -> str: if ms is None: return "" if ms < 1000: return f"{ms}ms" return f"{ms / 1000:.1f}s" def _calc_duration(start_ts: str, end_ts: str) -> str: try: t0 = datetime.strptime(start_ts[:19], "%Y-%m-%d %H:%M:%S") t1 = datetime.strptime(end_ts[:19], "%Y-%m-%d %H:%M:%S") delta = int((t1 - t0).total_seconds()) mins, secs = divmod(delta, 60) return f"{mins}分{secs}秒" if mins else f"{secs}秒" except Exception: return "N/A" def _ts_readable(ts: int) -> str: """将时间戳转为可读格式""" try: dt = datetime.fromtimestamp(ts) return dt.strftime("%Y-%m-%d %H:%M") except Exception: return str(ts) # ───────────────────────────────────────────────────────────── # JSONL 读取 # ───────────────────────────────────────────────────────────── def read_jsonl(path: Path) -> list[dict]: events = [] for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line: continue try: events.append(json.loads(line)) except json.JSONDecodeError: pass return events # ───────────────────────────────────────────────────────────── # HTML 渲染 # ───────────────────────────────────────────────────────────── _STAGE_LABELS = { "demand_analysis": "需求理解", "content_search": "内容召回", "hard_filter": "硬规则过滤", "coarse_filter": "标题粗筛", "quality_filter": "质量精排", "account_precipitate": "账号沉淀", "output_persist": "结果输出", } _GATE_LABELS = { "content_search": "SearchCompletenessGate", "quality_filter": "FilterSufficiencyGate", "output_persist": "OutputSchemaGate", } _ACTION_COLORS = { "proceed": "var(--green)", "retry_stage": "var(--yellow)", "fallback": "var(--orange)", "abort": "var(--red)", } # ───────────────────────────────────────────────────────────── # 决策数据渲染 # ───────────────────────────────────────────────────────────── def _render_decisions(stage: str, decisions: dict) -> str: """根据阶段类型,将 decisions dict 渲染为 HTML(可折叠
卡片)。""" if not decisions: return "" renderer = _DECISION_RENDERERS.get(stage) if not renderer: return "" try: inner = renderer(decisions) except Exception: return "" if not inner: return "" label = _STAGE_LABELS.get(stage, stage) # 重要阶段默认展开 open_attr = " open" if stage in ("quality_filter", "demand_analysis", "content_search", "coarse_filter") else "" return ( f'
' f'📋 {_esc(label)} 决策详情' f'
{inner}
' f'
' ) def _render_demand_analysis(d: dict) -> str: parts: list[str] = [] # 特征分层 feature_rows: list[str] = [] for key, label in [ ("substantive_features", "实质特征"), ("formal_features", "形式特征"), ("upper_features", "上层特征"), ("lower_features", "下层特征"), ]: items = d.get(key, []) if items: tags = " ".join(f'{_esc(t)}' for t in items) feature_rows.append( f'{label}{tags}' ) if feature_rows: parts.append( '
' '
🧠 特征分层
' '' + "\n".join(feature_rows) + '
' ) # 搜索策略 ss = d.get("search_strategy", {}) precise = ss.get("precise_keywords", []) topic = ss.get("topic_keywords", []) if precise or topic: rows: list[str] = [] if precise: tags = " ".join(f'{_esc(k)}' for k in precise) rows.append(f'精准词{tags}') if topic: tags = " ".join(f'{_esc(k)}' for k in topic) rows.append(f'主题词{tags}') parts.append( '
' '
🔎 搜索策略
' '' + "\n".join(rows) + '
' ) # 筛选关注点 ff = d.get("filter_focus", {}) relevance = ff.get("relevance_focus", []) risks = ff.get("elimination_risks", []) if relevance or risks: items_html = "" if relevance: items_html += '
关注点
    ' items_html += "".join(f'
  • {_esc(r)}
  • ' for r in relevance) items_html += '
' if risks: items_html += '
淘汰风险
    ' items_html += "".join(f'
  • {_esc(r)}
  • ' for r in risks) items_html += '
' parts.append( '
' '
🎯 筛选关注
' + items_html + '
' ) return "\n".join(parts) def _render_content_search(d: dict) -> str: parts: list[str] = [] stats = d.get("keyword_stats", []) total = d.get("total_candidates", 0) candidates = d.get("candidates", []) # 搜索词命中统计 if stats: rows = [] for s in stats: kw = _esc(s.get("keyword", "")) returned = s.get("returned", 0) new = s.get("new", 0) rows.append( f'{kw}' f'{returned}' f'{new}' ) parts.append( '
' '
📊 搜索词命中
' '' '' '' + "\n".join(rows) + '' '
关键词返回数新增数
' ) # 全部召回文章列表 if candidates: rows = [] for idx, c in enumerate(candidates, 1): title = _esc(c.get("title", "")) url = _esc(c.get("url", "")) kw = _esc(c.get("source_keyword", "")) pt = c.get("publish_time", 0) pt_str = _ts_readable(pt) if pt else "未知" view = c.get("view_count", 0) rows.append( f'' f'{idx}' f'{title}' f'{kw}' f'{_esc(pt_str)}' f'{view}' f'' ) parts.append( '
' f'
📋 全部召回文章({len(candidates)} 篇)
' '' '' '' + "\n".join(rows) + '' '
#标题来源关键词发布时间阅读量
' ) parts.append( f'
累计候选: {total}
' ) return "\n".join(parts) def _render_hard_filter(d: dict) -> str: count = d.get("after_filter_count", 0) return ( '
' f'
📊 过滤结果
' f'
过滤后剩余: {count}
' '
' ) def _render_coarse_filter(d: dict) -> str: log = d.get("coarse_log", []) total = d.get("total_count", len(log)) passed_cnt = d.get("passed_count", 0) rejected_cnt = d.get("rejected_count", 0) after_cnt = d.get("after_filter_count", 0) if not log: return f'
粗筛后剩余: {after_cnt} 篇
' parts: list[str] = [] # 统计概览 parts.append( '
' f'
📊 粗筛统计
' f'通过 {passed_cnt}' f'淘汰 {rejected_cnt}' '
' ) # 通过的文章 passed = [r for r in log if r.get("status") == "pass"] if passed: rows = [] for idx, r in enumerate(passed, 1): title = _esc(r.get("title", "")) url = _esc(r.get("url", "")) reason = _esc(r.get("reason", "")) src_kw = _esc(r.get("source_keyword", "")) rows.append( f'' f'{idx}' f'{title}' f'{src_kw}' f'{reason}' f'' ) parts.append( '
' f'
✅ 通过文章({len(passed)} 篇)
' '' '' '' + "\n".join(rows) + '' '
#标题来源词理由
' ) # 淘汰的文章 rejected = [r for r in log if r.get("status") == "reject"] if rejected: rows = [] for idx, r in enumerate(rejected, 1): title = _esc(r.get("title", "")) url = _esc(r.get("url", "")) reason = _esc(r.get("reason", "")) src_kw = _esc(r.get("source_keyword", "")) rows.append( f'' f'{idx}' f'{title}' f'{src_kw}' f'{reason}' f'' ) parts.append( '
' f'
❌ 淘汰文章({len(rejected)} 篇)
' '' '' '' + "\n".join(rows) + '' '
#标题来源词理由
' ) parts.append( f'
粗筛后剩余: {after_cnt}
' ) return "\n".join(parts) def _render_quality_filter(d: dict) -> str: reviews = d.get("review_log", []) accepted_cnt = d.get("accepted_count", 0) rejected_cnt = d.get("rejected_count", 0) skipped_cnt = d.get("skipped_count", 0) final_cnt = d.get("final_filtered_count", 0) score_config = d.get("score_config", {}) match_terms = d.get("match_terms", []) if not reviews: return f'
最终入选: {final_cnt} 篇
' parts: list[str] = [] # 评分配置 if score_config: cfg_rows: list[str] = [] for key, label in [ ("min_body_length", "最小正文长度"), ("high_relevance_ratio", "高相关性阈值"), ("view_count_threshold", "阅读量阈值"), ("engage_rate_threshold", "互动率阈值"), ("spam_keywords_count", "标题党关键词数"), ]: val = score_config.get(key, "") if val != "": cfg_rows.append( f'{label}' f'{_esc(str(val))}' ) if cfg_rows: parts.append( '
' '
⚙️ 评分配置
' '' + "\n".join(cfg_rows) + '
' ) # 匹配词表 if match_terms: tags = " ".join(f'{_esc(t)}' for t in match_terms) parts.append( '
' f'
🔑 匹配词表({len(match_terms)} 个)
' f'{tags}' '
' ) # 统计概览 parts.append( '
' f'
📊 审核统计
' f'入选 {accepted_cnt}' f'淘汰 {rejected_cnt}' + (f'跳过 {skipped_cnt}' if skipped_cnt else '') + '
' ) review_table_head = ( '' '#标题来源词相关性兴趣阶段' '发布日期正文长度阅读点赞分享在看' '原因' '' ) # 入选文章 accepted = [r for r in reviews if r.get("status") == "accept"] accepted.sort(key=lambda r: int(r.get("view_count", 0) or 0), reverse=True) if accepted: rows = [] for idx, r in enumerate(accepted, 1): title = _esc(r.get("title", "")) url = _esc(r.get("url", "")) relevance = _esc(r.get("relevance", "")) interest = _esc(r.get("interest", "")) reason = _esc(r.get("reason", "")) phase = "LLM" if r.get("phase") == "llm" else "启发式" pt = r.get("publish_time", 0) pt_str = _ts_readable(pt) if pt else "未知" view = r.get("view_count", 0) like = r.get("like_count", 0) share = r.get("share_count", 0) looking = r.get("looking_count", 0) body_len = r.get("body_length", "-") src_kw = _esc(r.get("source_keyword", "")) rows.append( f'' f'{idx}' f'{title}' f'{src_kw}' f'{relevance}' f'{interest}' f'{phase}' f'{_esc(pt_str)}' f'{body_len}' f'{view}' f'{like}' f'{share}' f'{looking}' f'{reason}' f'' ) parts.append( '
' f'
✅ 入选文章({len(accepted)} 篇)
' '' + review_table_head + '' + "\n".join(rows) + '' '
' ) # 淘汰文章 rejected = [r for r in reviews if r.get("status") == "reject"] if rejected: rows = [] for idx, r in enumerate(rejected, 1): title = _esc(r.get("title", "")) url = _esc(r.get("url", "")) relevance = _esc(r.get("relevance", "")) interest = _esc(r.get("interest", "")) reason = _esc(r.get("reason", "")) phase = "LLM" if r.get("phase") == "llm" else "启发式" pt = r.get("publish_time", 0) pt_str = _ts_readable(pt) if pt else "未知" view = r.get("view_count", 0) like = r.get("like_count", 0) share = r.get("share_count", 0) looking = r.get("looking_count", 0) body_len = r.get("body_length", "-") src_kw = _esc(r.get("source_keyword", "")) rows.append( f'' f'{idx}' f'{title}' f'{src_kw}' f'{relevance}' f'{interest}' f'{phase}' f'{_esc(pt_str)}' f'{body_len}' f'{view}' f'{like}' f'{share}' f'{looking}' f'{reason}' f'' ) parts.append( '
' f'
❌ 淘汰文章({len(rejected)} 篇)
' '' + review_table_head + '' + "\n".join(rows) + '' '
' ) # 跳过文章 skipped = [r for r in reviews if r.get("status") == "skip"] if skipped: rows = [] for idx, r in enumerate(skipped, 1): title = _esc(r.get("title", "")) url = _esc(r.get("url", "")) reason = _esc(r.get("reason", "")) phase = "LLM" if r.get("phase") == "llm" else "启发式" pt = r.get("publish_time", 0) pt_str = _ts_readable(pt) if pt else "未知" view = r.get("view_count", 0) src_kw = _esc(r.get("source_keyword", "")) rows.append( f'' f'{idx}' f'{title}' f'{src_kw}' f'-' f'-' f'{phase}' f'{_esc(pt_str)}' f'-' f'{view}' f'-' f'-' f'-' f'{reason}' f'' ) parts.append( '
' f'
⏭️ 跳过文章({len(skipped)} 篇)
' '' + review_table_head + '' + "\n".join(rows) + '' '
' ) parts.append( f'
最终入选: {final_cnt}
' ) return "\n".join(parts) def _render_account_precipitate(d: dict) -> str: accounts = d.get("accounts", []) if not accounts: return '
聚合账号: 0 个
' rows = [] for acc in accounts: name = _esc(acc.get("account_name", "")) count = acc.get("article_count", 0) samples = acc.get("sample_articles", []) sample_html = "" if samples: sample_html = '
' + ", ".join( _esc(s) for s in samples[:3] ) + '
' rows.append( f'{name}' f'{count} 篇' f'{sample_html}' ) return ( '
' '
👤 聚合账号
' '' '' '' + "\n".join(rows) + '' '
账号名文章数示例标题
' ) def _render_output_persist(d: dict) -> str: path = d.get("output_file", "") if not path: return "" return ( '
' f'
📄 输出文件
' f'{_esc(path)}' '
' ) _DECISION_RENDERERS = { "demand_analysis": _render_demand_analysis, "content_search": _render_content_search, "hard_filter": _render_hard_filter, "coarse_filter": _render_coarse_filter, "quality_filter": _render_quality_filter, "account_precipitate": _render_account_precipitate, "output_persist": _render_output_persist, } # ───────────────────────────────────────────────────────────── # LLM 交互追踪渲染 # ───────────────────────────────────────────────────────────── def _render_llm_interactions(interactions: list[dict]) -> str: """渲染阶段内所有 LLM 交互记录(折叠卡片,展示思考过程)。""" if not interactions: return "" sections: list[str] = [] for idx, ix in enumerate(interactions, 1): name = _esc(ix.get("name", "LLM 调用")) model = _esc(ix.get("model", "")) duration_ms = ix.get("duration_ms", 0) tokens = ix.get("tokens", 0) dur_label = _duration_label(duration_ms) parts: list[str] = [] # 输入 Prompt messages = ix.get("messages", []) for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if not content: continue content_str = str(content) if not isinstance(content, str) else content role_icon = {"system": "📜", "user": "👤", "assistant": "🤖"}.get(role, "💬") role_label = {"system": "System Prompt", "user": "User Prompt", "assistant": "Assistant"}.get(role, role) if len(content_str) > 500: parts.append( f'
' f'{role_icon} {role_label} ({len(content_str)} 字)' f'
{_esc(content_str)}
' f'
' ) else: parts.append( f'
' f'
{role_icon} {role_label}
' f'
{_esc(content_str)}
' f'
' ) # 推理过程(reasoning) reasoning = ix.get("reasoning", "") if reasoning: parts.append( f'
' f'🧠 LLM 推理过程 ({len(reasoning)} 字)' f'
{_esc(reasoning)}
' f'
' ) # 工具调用 tool_calls = ix.get("tool_calls") or [] for tc in tool_calls: tool_name = _esc(tc.get("tool_name", "")) args = tc.get("arguments", "") try: args_obj = json.loads(args) if isinstance(args, str) else args args_formatted = json.dumps(args_obj, ensure_ascii=False, indent=2) except Exception: args_formatted = str(args) result_preview = tc.get("result_preview", "") parts.append( f'
' f'
🔧 {tool_name}
' f'
{_esc(args_formatted)}
' ) if result_preview: parts.append( f'
' f'📤 返回结果 ({len(result_preview)} 字)' f'
{_esc(result_preview)}
' f'
' ) parts.append('
') # LLM 回复 response_text = ix.get("response_text", "") if response_text: if len(response_text) > 2000: parts.append( f'
' f'💬 LLM 回复 ({len(response_text)} 字)' f'
{_esc(response_text)}
' f'
' ) else: parts.append( f'
' f'
💬 LLM 回复
' f'
{_esc(response_text)}
' f'
' ) inner = "\n".join(parts) meta_parts = [] if model: meta_parts.append(f'{model}') if dur_label: meta_parts.append(dur_label) if tokens: meta_parts.append(f'{tokens} tokens') meta_html = " · ".join(meta_parts) sections.append( f'
' f'🧪 LLM 交互 #{idx}: {name}' f' ({meta_html})' f'' f'
{inner}
' f'
' ) return "\n".join(sections) # ───────────────────────────────────────────────────────────── # Agent Trace 渲染 # ───────────────────────────────────────────────────────────── def _load_agent_trace(trace_id: str) -> tuple[list[dict], dict]: """读取 agent 子任务的 events.jsonl 和 meta.json。""" agent_dir = TRACES_DIR / trace_id events: list[dict] = [] meta: dict = {} events_path = agent_dir / "events.jsonl" meta_path = agent_dir / "meta.json" if events_path.exists(): events = read_jsonl(events_path) if meta_path.exists(): try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: pass return events, meta def _render_agent_meta(meta: dict) -> str: """渲染 agent 元信息摘要。""" task = meta.get("task", "") model = meta.get("model", "") status = meta.get("status", "") total_tokens = meta.get("total_tokens", 0) total_cost = meta.get("total_cost", 0) created = meta.get("created_at", "")[:19] completed = meta.get("completed_at", "")[:19] duration = _calc_duration(created, completed) if created and completed else "N/A" status_color = "var(--green)" if status == "completed" else "var(--red)" return ( f'
' f'任务: {_esc(task)}' f'模型: {_esc(model)}' f'状态: {_esc(status)}' f'耗时: {_esc(duration)}' f'Tokens: {total_tokens}' f'Cost: ${total_cost:.4f}' f'
' ) def _render_agent_message(msg: dict) -> str: """渲染单条 agent 消息。""" role = msg.get("role", "") content = msg.get("content", "") tool_call_id = msg.get("tool_call_id", "") ts = msg.get("created_at", "")[:19] tokens = msg.get("tokens", 0) if role == "system": # System prompt 折叠显示,只展示前 200 字 text = content if isinstance(content, str) else str(content) preview = text[:200] + "..." if len(text) > 200 else text return ( f'
' f'📜 System Prompt ({len(text)} 字)' f'
{_esc(text)}
' f'
' ) elif role == "user": text = content if isinstance(content, str) else str(content) # User prompt 折叠,如果太长 if len(text) > 500: return ( f'
' f'👤 User Prompt ({len(text)} 字)' f'
{_esc(text)}
' f'
' ) return ( f'
' f'
👤 User
' f'
{_esc(text)}
' f'
' ) elif role == "assistant": parts: list[str] = [] # 提取文本和工具调用 if isinstance(content, dict): text = content.get("text", "") or "" tool_calls = content.get("tool_calls", []) or [] reasoning = content.get("reasoning_content", "") or "" else: text = str(content or "") tool_calls = [] reasoning = "" token_badge = f'{tokens} tokens' if tokens else "" # 推理内容 if reasoning: parts.append( f'
' f'🧠 推理过程' f'
{_esc(reasoning)}
' f'
' ) # 文本回复 if text: # 截断过长文本,折叠显示 if len(text) > 2000: parts.append( f'
' f'💭 回复 ({len(text)} 字)' f'
{_esc(text)}
' f'
' ) else: parts.append( f'
' f'
{_esc(text)}
' f'
' ) # 工具调用 for tc in tool_calls: fn = tc.get("function", {}) fn_name = fn.get("name", "?") fn_args = fn.get("arguments", "") # 格式化 JSON 参数 try: args_obj = json.loads(fn_args) if isinstance(fn_args, str) else fn_args fn_args_formatted = json.dumps(args_obj, ensure_ascii=False, indent=2) except Exception: fn_args_formatted = str(fn_args) parts.append( f'
' f'
🔧 {_esc(fn_name)}
' f'
{_esc(fn_args_formatted)}
' f'
' ) inner = "\n".join(parts) return ( f'
' f'
🤖 Assistant {token_badge}
' f'{inner}' f'
' ) elif role == "tool": # 工具返回 if isinstance(content, dict): tool_name = content.get("tool_name", "") result = content.get("result", "") else: tool_name = "" result = str(content or "") result_str = str(result) if len(result_str) > 1500: return ( f'
' f'📤 {_esc(tool_name)} 返回 ({len(result_str)} 字)' f'
{_esc(result_str)}
' f'
' ) return ( f'
' f'
📤 {_esc(tool_name)}
' f'
{_esc(result_str)}
' f'
' ) return "" def _render_agent_trace_section(agent_trace_ids: list[str]) -> str: """渲染阶段内所有 agent 子任务的执行详情(折叠卡片)。""" if not agent_trace_ids: return "" sections: list[str] = [] for tid in agent_trace_ids: agent_events, meta = _load_agent_trace(tid) if not agent_events and not meta: continue task_name = meta.get("task", tid[:8]) total_tokens = meta.get("total_tokens", 0) total_cost = meta.get("total_cost", 0) # 渲染元信息 meta_html = _render_agent_meta(meta) if meta else "" # 渲染消息序列 msg_blocks: list[str] = [] for ev in agent_events: if ev.get("event") == "message_added": msg = ev.get("message", {}) rendered = _render_agent_message(msg) if rendered: msg_blocks.append(rendered) elif ev.get("event") == "goal_added": goal = ev.get("goal", {}) goal_desc = goal.get("description", "") msg_blocks.append( f'
' f'🎯 目标创建: {_esc(goal_desc)}' f'
' ) messages_html = "\n".join(msg_blocks) sections.append( f'
' f'🤖 Agent 执行详情: {_esc(task_name)}' f' ({total_tokens} tokens · ${total_cost:.4f})' f'' f'
' f'{meta_html}' f'
{messages_html}
' f'
' f'
' ) return "\n".join(sections) def _parse_log_lines(log_text: str) -> list[dict]: """将 full_log.log 文本解析为结构化行列表,供 HTML 渲染。""" lines: list[dict] = [] for raw in log_text.splitlines(): level = "STDOUT" if "| DEBUG |" in raw: level = "DEBUG" elif "| INFO |" in raw: level = "INFO" elif "| WARNING |" in raw: level = "WARNING" elif "| ERROR |" in raw: level = "ERROR" elif "| CRITICAL|" in raw: level = "CRITICAL" lines.append({"level": level, "text": raw}) return lines def _render_full_log_section(log_lines: list[dict]) -> str: """生成完整日志面板的 HTML(带过滤、搜索、行号、颜色)。""" if not log_lines: return "" level_counts = {} for ln in log_lines: level_counts[ln["level"]] = level_counts.get(ln["level"], 0) + 1 rows: list[str] = [] for idx, ln in enumerate(log_lines, 1): lvl = ln["level"].lower() text = _esc(ln["text"]) rows.append( f'' f'{idx}' f'{text}' f'' ) filter_buttons: list[str] = [] filter_buttons.append( f'' ) for lvl in ("DEBUG", "INFO", "WARNING", "ERROR", "STDOUT"): cnt = level_counts.get(lvl, 0) if cnt: filter_buttons.append( f'' ) return ( '
' '

📋 完整执行日志

' '
' '
' + "".join(filter_buttons) + '
' '' '
' '
' '' + "\n".join(rows) + '
' '
' '
' ) _LOG_VIEWER_CSS = """ /* ── 日志面板 ── */ .log-panel { margin-top:32px; border:1px solid var(--border); border-radius:10px; background:var(--bg2); overflow:hidden; } .log-title { font-size:18px; color:var(--blue); padding:16px 20px 0; margin:0; letter-spacing:-0.3px; } .log-toolbar { display:flex; flex-wrap:wrap; gap:8px; align-items:center; padding:12px 20px; border-bottom:1px solid var(--border); } .log-filters { display:flex; gap:4px; flex-wrap:wrap; } .log-btn { background:rgba(139,148,158,.08); border:1px solid var(--border); border-radius:6px; padding:3px 10px; font-size:11px; color:var(--dim); cursor:pointer; transition:all .15s; } .log-btn:hover { background:rgba(88,166,255,.1); color:var(--blue); border-color:rgba(88,166,255,.3); } .log-btn-active { background:rgba(88,166,255,.15); color:var(--blue); border-color:rgba(88,166,255,.4); } .log-search { flex:1; min-width:180px; background:var(--bg); border:1px solid var(--border); border-radius:6px; padding:4px 10px; font-size:12px; color:var(--text); outline:none; } .log-search:focus { border-color:var(--blue); } .log-container { max-height:calc(100vh - 200px); overflow-y:auto; } .log-table { width:100%; border-collapse:collapse; font-family:"SF Mono",Monaco,Menlo,monospace; font-size:11px; } .log-row { border-bottom:1px solid rgba(33,38,45,.3); } .log-row.log-hidden { display:none; } .log-ln { width:50px; text-align:right; padding:2px 8px 2px 4px; color:rgba(139,148,158,.4); user-select:none; vertical-align:top; } .log-txt { padding:2px 8px; white-space:pre-wrap; word-break:break-all; line-height:1.5; } .log-debug .log-txt { color:var(--dim); } .log-info .log-txt { color:var(--text); } .log-warning .log-txt { color:var(--yellow); } .log-error .log-txt, .log-critical .log-txt { color:var(--red); } .log-stdout .log-txt { color:var(--purple); } .log-highlight { background:rgba(227,179,65,.15); } """ _LOG_VIEWER_JS = """ """ def render_html(events: list[dict], full_log_lines: list[dict] | None = None) -> str: init_ev = next((e for e in events if e["type"] == "init"), None) complete_ev = next((e for e in events if e["type"] == "complete"), None) query = init_ev.get("query", "") if init_ev else "" model = init_ev.get("model", "") if init_ev else "" demand_id = init_ev.get("demand_id", "") if init_ev else "" trace_id = init_ev.get("trace_id", "") if init_ev else "" target_count = init_ev.get("target_count", 0) if init_ev else 0 timestamps = [e.get("ts", "") for e in events if e.get("ts")] start_ts = timestamps[0] if timestamps else "" end_ts = timestamps[-1] if timestamps else "" duration = _calc_duration(start_ts, end_ts) if start_ts and end_ts else "N/A" stage_completes = [e for e in events if e["type"] == "stage_complete"] gate_checks = [e for e in events if e["type"] == "gate_check"] error_events = [e for e in events if e["type"] == "error"] final_stats = complete_ev.get("stats", {}) if complete_ev else {} candidate_count = final_stats.get("candidate_count", 0) filtered_count = final_stats.get("filtered_count", 0) account_count = final_stats.get("account_count", 0) output_file = complete_ev.get("output_file", "") if complete_ev else "" pipeline_status = complete_ev.get("status", "unknown") if complete_ev else "running" ordered_stages = list(_STAGE_LABELS.keys()) stage_order_map = {name: idx + 1 for idx, name in enumerate(ordered_stages)} # ── 构建事件块 ── blocks: list[str] = [] for ev in events: t = ev["type"] ts = _ts(ev.get("ts", "")) if t == "init": # 基础信息 init_html_parts = [ f'
', f'
🚀 Pipeline 启动
', f'
查询词{_esc(query)}
', f'
模型{_esc(model)}
', f'
需求ID{_esc(str(demand_id))}
', f'
目标条数{target_count}
', f'
trace_id{_esc(trace_id)}
', f'
时间{_esc(ts)}
', ] # 执行计划(来自 Harness) rp = ev.get("run_plan") if rp: init_html_parts.append('
') init_html_parts.append('
📋 执行计划
') init_html_parts.append( f'
超时上限' f'{rp.get("timeout_seconds", "N/A")} 秒
' ) init_html_parts.append( f'
目标文章上限' f'{rp.get("max_target_count", "N/A")} 篇
' ) init_html_parts.append( f'
最大补召回轮次' f'{rp.get("max_fallback_rounds", "N/A")} 轮
' ) plan_stages = rp.get("stages", []) if plan_stages: init_html_parts.append( '
阶段规划:
' ) for idx, ps in enumerate(plan_stages, 1): sname = _esc(ps.get("name", "")) slabel = _esc(ps.get("label", "")) sicon = _STAGE_LABELS.get(ps.get("name", ""), sname) gate = ps.get("gate", "") gate_html = ( f' ' f'└─ Gate: {_esc(gate)}' ) if gate else "" init_html_parts.append( f'
' f'{idx}. ' f'{sname} ' f'← {slabel}' f'{gate_html}
' ) init_html_parts.append('
') blocks.append("\n".join(init_html_parts)) elif t == "stage_start": stage = ev.get("stage", "") icon = ev.get("icon", "▶") label = _STAGE_LABELS.get(stage, stage) stage_no = stage_order_map.get(stage, 0) blocks.append( f'
' f'{_esc(ts)} ' f'{icon}

{stage_no}. {_esc(label)}

' f' ({_esc(stage)})' f'
' ) elif t == "stage_complete": stage = ev.get("stage", "") icon = ev.get("icon", "▶") label = _STAGE_LABELS.get(stage, stage) attempt = ev.get("attempt", 1) dur = _duration_label(ev.get("duration_ms")) stats = ev.get("stats", {}) decisions = ev.get("decisions", {}) agent_trace_ids = ev.get("agent_trace_ids", []) llm_interactions = ev.get("llm_interactions", []) attempt_badge = f'重试#{attempt}' if attempt > 1 else "" dur_badge = f'{_esc(dur)}' if dur else "" stats_html = ( f'候选 {stats.get("candidate_count", 0)}' f'入选 {stats.get("filtered_count", 0)}' f'账号 {stats.get("account_count", 0)}' ) decisions_html = _render_decisions(stage, decisions) agent_html = _render_agent_trace_section(agent_trace_ids) llm_html = _render_llm_interactions(llm_interactions) blocks.append( f'
' f'{_esc(ts)} ' f'✅ {icon} {_esc(label)} 完成 ' f'{attempt_badge}{dur_badge}' f'
{stats_html}
' f'{llm_html}' f'{decisions_html}' f'{agent_html}' f'
' ) elif t == "gate_check": gate = ev.get("gate", "") gate_label = _GATE_LABELS.get(gate, gate) passed = ev.get("passed", False) action = ev.get("action", "proceed") issues = ev.get("issues", []) fallback = ev.get("fallback_stage", "") icon = ev.get("icon", "🚦") action_color = _ACTION_COLORS.get(action, "var(--dim)") status_icon = "✅" if passed else "⚠️" issues_html = "" if issues: issues_html = ( '" ) fallback_html = ( f'→ 回退到 {_esc(fallback)}' if fallback else "" ) cls = "ev-gate-ok" if passed else "ev-gate-warn" blocks.append( f'
' f'{_esc(ts)} ' f'{status_icon} {icon} {_esc(gate_label)} ' f'[{_esc(action)}]' f'{fallback_html}' f'{issues_html}' f'
' ) elif t == "error": stage = ev.get("stage", "") msg = ev.get("msg", "") blocks.append( f'
' f'❌ 错误 @ {_esc(stage)}: {_esc(msg[:200])}' f'
{_esc(msg)}
' f'
' ) elif t == "complete": status = ev.get("status", "unknown") stats = ev.get("stats", {}) stage_count = ev.get("stage_count", 0) err_count = ev.get("error_count", 0) cls = "ev-complete-ok" if status == "completed" else "ev-complete-fail" out_html = ( f'
输出文件' f'{_esc(output_file)}
' if output_file else "" ) blocks.append( f'
' f'
🏁 Pipeline 结束
' f'
状态{_esc(status)}
' f'
trace_id{_esc(trace_id)}
' f'
阶段数{stage_count}
' f'
错误数{err_count}
' f'
候选文章{stats.get("candidate_count", 0)}
' f'
入选文章{stats.get("filtered_count", 0)}
' f'
账号数{stats.get("account_count", 0)}
' f'{out_html}' f'
' ) body = "\n".join(blocks) status_color = "var(--green)" if pipeline_status == "completed" else "var(--red)" completed_stage_names = {e.get("stage", "") for e in stage_completes} errored_stage_names = {e.get("stage", "") for e in error_events} ordered_stages = list(_STAGE_LABELS.keys()) stage_order_map = {name: idx + 1 for idx, name in enumerate(ordered_stages)} flow_items: list[str] = [] for stage_key in ordered_stages: label = _STAGE_LABELS.get(stage_key, stage_key) if stage_key in errored_stage_names: cls = "flow-step flow-error" icon = "✖" elif stage_key in completed_stage_names: cls = "flow-step flow-done" icon = "✔" else: cls = "flow-step flow-pending" icon = "○" stage_no = stage_order_map.get(stage_key, 0) flow_items.append( f'
{icon}' f'

{stage_no}. {_esc(label)}

' ) flow_html = "".join(flow_items) return f""" Pipeline 执行追踪 — {_esc(query[:40])}

🔄 Pipeline 执行追踪

查询: {_esc(query)}  |  模型: {_esc(model)}  |  状态: {_esc(pipeline_status)}  |  耗时: {_esc(duration)}  |  trace_id: {_esc(trace_id)}
{_esc(duration)}
总耗时
{candidate_count}
召回候选
{filtered_count}
入选文章
{account_count}
沉淀账号
{len(error_events)}
错误

流程总览

按阶段展示执行状态
{flow_html}

执行时间线

按事件顺序展示关键决策与结果
{body}
{_render_full_log_section(full_log_lines or [])}
{_LOG_VIEWER_JS}
""" # ───────────────────────────────────────────────────────────── # 入口 # ───────────────────────────────────────────────────────────── def list_traces() -> None: if not TRACES_DIR.exists(): print(f"traces 目录不存在: {TRACES_DIR}") return dirs = sorted( [d for d in TRACES_DIR.iterdir() if d.is_dir() and (d / "pipeline.jsonl").exists()], key=lambda d: d.stat().st_mtime, reverse=True, ) if not dirs: print("暂无可用 trace(需先运行 run_search_agent.py)") return print(f"{'trace_id':<40} {'修改时间'}") print("-" * 60) for d in dirs: mtime = datetime.fromtimestamp(d.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S") print(f"{d.name:<40} {mtime}") def find_latest_trace() -> Path | None: if not TRACES_DIR.exists(): return None dirs = sorted( [d for d in TRACES_DIR.iterdir() if d.is_dir() and (d / "pipeline.jsonl").exists()], key=lambda d: d.stat().st_mtime, reverse=True, ) return dirs[0] if dirs else None def main() -> None: args = sys.argv[1:] if "--list" in args: list_traces() return if args and not args[0].startswith("--"): trace_id = args[0] trace_dir = TRACES_DIR / trace_id else: trace_dir = find_latest_trace() if trace_dir is None: print(f"❌ 找不到任何 trace,请先运行 run_search_agent.py") print(f" traces 目录: {TRACES_DIR}") sys.exit(1) trace_id = trace_dir.name jsonl_path = trace_dir / "pipeline.jsonl" if not jsonl_path.exists(): print(f"❌ 找不到 {jsonl_path}") sys.exit(1) events = read_jsonl(jsonl_path) print(f"📄 读取了 {len(events)} 个事件 (trace_id={trace_id})") # 读取完整日志文件(如有) log_path = trace_dir / "full_log.log" full_log_lines: list[dict] | None = None if log_path.exists(): log_text = log_path.read_text(encoding="utf-8") full_log_lines = _parse_log_lines(log_text) print(f"📋 读取了 {len(full_log_lines)} 行完整日志") html_content = render_html(events, full_log_lines=full_log_lines) out_path = trace_dir / "pipeline_trace.html" out_path.write_text(html_content, encoding="utf-8") size_kb = out_path.stat().st_size / 1024 print(f"✅ 已生成: {out_path} ({size_kb:.0f} KB)") if __name__ == "__main__": main()