""" Step 树 RESTful API 提供 Trace 和 Step 的查询接口,支持懒加载和 compact/full 视图 """ from typing import List, Optional, Dict, Any from fastapi import APIRouter, HTTPException, Query from fastapi.responses import PlainTextResponse from pydantic import BaseModel from agent.execution.protocols import TraceStore router = APIRouter(prefix="/api/traces", tags=["traces"]) # ===== Response 模型 ===== class TraceListResponse(BaseModel): """Trace 列表响应""" traces: List[Dict[str, Any]] class TraceResponse(BaseModel): """Trace 元数据响应""" trace_id: str mode: str task: Optional[str] = None agent_type: Optional[str] = None status: str total_steps: int total_tokens: int total_cost: float created_at: str completed_at: Optional[str] = None class StepNode(BaseModel): """Step 节点(递归结构)""" step_id: str step_type: str status: str description: str sequence: int parent_id: Optional[str] = None data: Optional[Dict[str, Any]] = None summary: Optional[str] = None duration_ms: Optional[int] = None tokens: Optional[int] = None cost: Optional[float] = None created_at: str children: List["StepNode"] = [] class TreeResponse(BaseModel): """完整树响应""" trace_id: str root_steps: List[StepNode] class NodeResponse(BaseModel): """节点响应""" step_id: Optional[str] step_type: Optional[str] description: Optional[str] children: List[StepNode] # ===== 全局 TraceStore(由 api_server.py 注入)===== _trace_store: Optional[TraceStore] = None def set_trace_store(store: TraceStore): """设置 TraceStore 实例""" global _trace_store _trace_store = store def get_trace_store() -> TraceStore: """获取 TraceStore 实例""" if _trace_store is None: raise RuntimeError("TraceStore not initialized") return _trace_store # ===== 路由 ===== @router.get("", response_model=TraceListResponse) async def list_traces( mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = Query(20, le=100) ): """ 列出 Traces Args: mode: 模式过滤(call/agent) agent_type: Agent 类型过滤 uid: 用户 ID 过滤 status: 状态过滤(running/completed/failed) limit: 最大返回数量 """ store = get_trace_store() traces = await store.list_traces( mode=mode, agent_type=agent_type, uid=uid, status=status, limit=limit ) return TraceListResponse( traces=[t.to_dict() for t in traces] ) @router.get("/{trace_id}", response_model=TraceResponse) async def get_trace(trace_id: str): """ 获取 Trace 元数据 Args: trace_id: Trace ID """ store = get_trace_store() trace = await store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail="Trace not found") return TraceResponse(**trace.to_dict()) @router.get("/{trace_id}/tree", response_model=TreeResponse) async def get_full_tree( trace_id: str, view: str = Query("compact", regex="^(compact|full)$"), max_depth: int = Query(999, ge=1, le=999) ): """ 获取完整 Step 树(小型 Trace 推荐) Args: trace_id: Trace ID view: compact(默认,不含 blob)| full(含 blob) max_depth: 最大深度 """ store = get_trace_store() # 验证 Trace 存在 trace = await store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail="Trace not found") # 获取所有 Steps steps = await store.get_trace_steps(trace_id) # 构建树结构 root_nodes = await _build_tree(store, trace_id, None, view=view, expand=True, max_depth=max_depth) return TreeResponse( trace_id=trace_id, root_steps=root_nodes ) @router.get("/{trace_id}/node/{step_id}", response_model=NodeResponse) async def get_node( trace_id: str, step_id: str, view: str = Query("compact", regex="^(compact|full)$"), expand: bool = Query(False, description="是否加载子节点"), max_depth: int = Query(1, ge=1, le=10, description="递归深度") ): """ 懒加载节点 + 子节点(大型 Trace 推荐) Args: trace_id: Trace ID step_id: Step ID("null" 表示根节点) view: compact | full expand: 是否加载子节点 max_depth: 递归深度 """ store = get_trace_store() # 验证 Trace 存在 trace = await store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail="Trace not found") # step_id = "null" 表示根节点 actual_step_id = None if step_id == "null" else step_id # 验证 Step 存在(非根节点) if actual_step_id: step = await store.get_step(actual_step_id) if not step or step.trace_id != trace_id: raise HTTPException(status_code=404, detail="Step not found") # 构建节点树 children = await _build_tree(store, trace_id, actual_step_id, view=view, expand=expand, max_depth=max_depth) # 如果是根节点,返回所有根 Steps if actual_step_id is None: return NodeResponse( step_id=None, step_type=None, description=None, children=children ) # 否则返回当前节点 + 子节点 step = await store.get_step(actual_step_id) return NodeResponse( step_id=step.step_id, step_type=step.step_type, description=step.description, children=children ) @router.get("/{trace_id}/node/{step_id}/children") async def get_children_paginated( trace_id: str, step_id: str, cursor: Optional[int] = Query(None, description="上次最后的 sequence"), limit: int = Query(20, ge=1, le=100), view: str = Query("compact", regex="^(compact|full)$") ): """ 分页获取子节点(基于 sequence 游标) Args: trace_id: Trace ID step_id: Step ID cursor: 上次最后的 sequence(None 表示从头开始) limit: 每页数量 view: compact | full Returns: { "children": [...], "next_cursor": 123, # 下一页游标(None 表示没有更多) "has_more": true } """ store = get_trace_store() # 验证 trace 存在 trace = await store.get_trace(trace_id) if not trace: raise HTTPException(status_code=404, detail="Trace not found") # 验证 step 存在 step = await store.get_step(step_id) if not step or step.trace_id != trace_id: raise HTTPException(status_code=404, detail="Step not found") # 获取所有子节点 children = await store.get_step_children(step_id) # 过滤 cursor 之后的节点 if cursor is not None: children = [s for s in children if s.sequence > cursor] # 分页 has_more = len(children) > limit page = children[:limit] next_cursor = page[-1].sequence if page and has_more else None # 序列化 children_dicts = [s.to_dict(view=view) for s in page] return { "children": children_dicts, "next_cursor": next_cursor, "has_more": has_more } # ===== 核心算法:懒加载树构建 ===== async def _build_tree( store: TraceStore, trace_id: str, step_id: Optional[str], view: str = "compact", # 新增参数 expand: bool = False, max_depth: int = 1, current_depth: int = 0 ) -> List[StepNode]: """ 懒加载核心逻辑(简洁版本) 没有"批次计算"、没有"同层完整性检查" 只有简单的递归遍历 Args: store: TraceStore 实例 trace_id: Trace ID step_id: 当前 Step ID(None 表示根节点) view: "compact" | "full" expand: 是否展开子节点 max_depth: 最大递归深度 current_depth: 当前递归深度 Returns: List[StepNode]: 节点列表 """ # 1. 获取当前层节点 if step_id is None: # 根节点:获取所有 parent_id=None 的 Steps steps = await store.get_trace_steps(trace_id) current_nodes = [s for s in steps if s.parent_id is None] else: # 非根节点:获取子节点 current_nodes = await store.get_step_children(step_id) # 2. 构建响应 result_nodes = [] for step in current_nodes: node_dict = step.to_dict(view=view) # 使用 view 参数 node_dict["children"] = [] # 3. 递归加载子节点(可选) if expand and current_depth < max_depth: children = await store.get_step_children(step.step_id) if children: node_dict["children"] = await _build_tree( store, trace_id, step.step_id, view=view, expand=True, max_depth=max_depth, current_depth=current_depth + 1 ) result_nodes.append(StepNode(**node_dict)) return result_nodes