| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- """
- Step 树 RESTful API
- 提供 Trace 和 Step 的查询接口,支持懒加载
- """
- from typing import List, Optional, Dict, Any
- from fastapi import APIRouter, HTTPException, Query
- from pydantic import BaseModel
- from agent.trace.protocols import TraceStore
- router = APIRouter(prefix="/api/traces", tags=["traces"])
- # ===== Response 模型 =====
- class TraceListResponse(BaseModel):
- """Trace 列表响应"""
- traces: List[Dict[str, Any]]
- class TraceResponse(BaseModel):
- """Trace 元数据响应"""
- trace_id: str
- mode: str
- task: Optional[str] = None
- agent_type: Optional[str] = None
- status: str
- total_steps: int
- total_tokens: int
- total_cost: float
- created_at: str
- completed_at: Optional[str] = None
- class StepNode(BaseModel):
- """Step 节点(递归结构)"""
- step_id: str
- step_type: str
- status: str
- description: str
- sequence: int
- parent_id: Optional[str] = None
- data: Optional[Dict[str, Any]] = None
- summary: Optional[str] = None
- duration_ms: Optional[int] = None
- tokens: Optional[int] = None
- cost: Optional[float] = None
- created_at: str
- children: List["StepNode"] = []
- class TreeResponse(BaseModel):
- """完整树响应"""
- trace_id: str
- root_steps: List[StepNode]
- class NodeResponse(BaseModel):
- """节点响应"""
- step_id: Optional[str]
- step_type: Optional[str]
- description: Optional[str]
- children: List[StepNode]
- # ===== 全局 TraceStore(由 api_server.py 注入)=====
- _trace_store: Optional[TraceStore] = None
- def set_trace_store(store: TraceStore):
- """设置 TraceStore 实例"""
- global _trace_store
- _trace_store = store
- def get_trace_store() -> TraceStore:
- """获取 TraceStore 实例"""
- if _trace_store is None:
- raise RuntimeError("TraceStore not initialized")
- return _trace_store
- # ===== 路由 =====
- @router.get("", response_model=TraceListResponse)
- async def list_traces(
- mode: Optional[str] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- status: Optional[str] = None,
- limit: int = Query(20, le=100)
- ):
- """
- 列出 Traces
- Args:
- mode: 模式过滤(call/agent)
- agent_type: Agent 类型过滤
- uid: 用户 ID 过滤
- status: 状态过滤(running/completed/failed)
- limit: 最大返回数量
- """
- store = get_trace_store()
- traces = await store.list_traces(
- mode=mode,
- agent_type=agent_type,
- uid=uid,
- status=status,
- limit=limit
- )
- return TraceListResponse(
- traces=[t.to_dict() for t in traces]
- )
- @router.get("/{trace_id}", response_model=TraceResponse)
- async def get_trace(trace_id: str):
- """
- 获取 Trace 元数据
- Args:
- trace_id: Trace ID
- """
- store = get_trace_store()
- trace = await store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail="Trace not found")
- return TraceResponse(**trace.to_dict())
- @router.get("/{trace_id}/tree", response_model=TreeResponse)
- async def get_full_tree(trace_id: str):
- """
- 获取完整 Step 树(小型 Trace 推荐)
- Args:
- trace_id: Trace ID
- """
- store = get_trace_store()
- # 验证 Trace 存在
- trace = await store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail="Trace not found")
- # 获取所有 Steps
- steps = await store.get_trace_steps(trace_id)
- # 构建树结构
- root_nodes = await _build_tree(store, trace_id, None, expand=True, max_depth=999)
- return TreeResponse(
- trace_id=trace_id,
- root_steps=root_nodes
- )
- @router.get("/{trace_id}/node/{step_id}", response_model=NodeResponse)
- async def get_node(
- trace_id: str,
- step_id: str,
- expand: bool = Query(False, description="是否加载子节点"),
- max_depth: int = Query(1, ge=1, le=10, description="递归深度")
- ):
- """
- 懒加载节点 + 子节点(大型 Trace 推荐)
- Args:
- trace_id: Trace ID
- step_id: Step ID("null" 表示根节点)
- expand: 是否加载子节点
- max_depth: 递归深度
- """
- store = get_trace_store()
- # 验证 Trace 存在
- trace = await store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail="Trace not found")
- # step_id = "null" 表示根节点
- actual_step_id = None if step_id == "null" else step_id
- # 验证 Step 存在(非根节点)
- if actual_step_id:
- step = await store.get_step(actual_step_id)
- if not step or step.trace_id != trace_id:
- raise HTTPException(status_code=404, detail="Step not found")
- # 构建节点树
- children = await _build_tree(store, trace_id, actual_step_id, expand, max_depth)
- # 如果是根节点,返回所有根 Steps
- if actual_step_id is None:
- return NodeResponse(
- step_id=None,
- step_type=None,
- description=None,
- children=children
- )
- # 否则返回当前节点 + 子节点
- step = await store.get_step(actual_step_id)
- return NodeResponse(
- step_id=step.step_id,
- step_type=step.step_type,
- description=step.description,
- children=children
- )
- # ===== 核心算法:懒加载树构建 =====
- async def _build_tree(
- store: TraceStore,
- trace_id: str,
- step_id: Optional[str],
- expand: bool = False,
- max_depth: int = 1,
- current_depth: int = 0
- ) -> List[StepNode]:
- """
- 懒加载核心逻辑(简洁版本)
- 没有"批次计算"、没有"同层完整性检查"
- 只有简单的递归遍历
- Args:
- store: TraceStore 实例
- trace_id: Trace ID
- step_id: 当前 Step ID(None 表示根节点)
- expand: 是否展开子节点
- max_depth: 最大递归深度
- current_depth: 当前递归深度
- Returns:
- List[StepNode]: 节点列表
- """
- # 1. 获取当前层节点
- if step_id is None:
- # 根节点:获取所有 parent_id=None 的 Steps
- steps = await store.get_trace_steps(trace_id)
- current_nodes = [s for s in steps if s.parent_id is None]
- else:
- # 非根节点:获取子节点
- current_nodes = await store.get_step_children(step_id)
- # 2. 构建响应
- result_nodes = []
- for step in current_nodes:
- node_dict = step.to_dict()
- node_dict["children"] = []
- # 3. 递归加载子节点(可选)
- if expand and current_depth < max_depth:
- children = await store.get_step_children(step.step_id)
- if children:
- node_dict["children"] = await _build_tree(
- store, trace_id, step.step_id,
- expand=True, max_depth=max_depth,
- current_depth=current_depth + 1
- )
- result_nodes.append(StepNode(**node_dict))
- return result_nodes
|