api.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. """
  2. Step 树 RESTful API
  3. 提供 Trace 和 Step 的查询接口,支持懒加载和 compact/full 视图
  4. """
  5. from typing import List, Optional, Dict, Any
  6. from fastapi import APIRouter, HTTPException, Query
  7. from fastapi.responses import PlainTextResponse
  8. from pydantic import BaseModel
  9. from agent.execution.protocols import TraceStore
  10. router = APIRouter(prefix="/api/traces", tags=["traces"])
  11. # ===== Response 模型 =====
  12. class TraceListResponse(BaseModel):
  13. """Trace 列表响应"""
  14. traces: List[Dict[str, Any]]
  15. class TraceResponse(BaseModel):
  16. """Trace 元数据响应"""
  17. trace_id: str
  18. mode: str
  19. task: Optional[str] = None
  20. agent_type: Optional[str] = None
  21. status: str
  22. total_steps: int
  23. total_tokens: int
  24. total_cost: float
  25. created_at: str
  26. completed_at: Optional[str] = None
  27. class StepNode(BaseModel):
  28. """Step 节点(递归结构)"""
  29. step_id: str
  30. step_type: str
  31. status: str
  32. description: str
  33. sequence: int
  34. parent_id: Optional[str] = None
  35. data: Optional[Dict[str, Any]] = None
  36. summary: Optional[str] = None
  37. duration_ms: Optional[int] = None
  38. tokens: Optional[int] = None
  39. cost: Optional[float] = None
  40. created_at: str
  41. children: List["StepNode"] = []
  42. class TreeResponse(BaseModel):
  43. """完整树响应"""
  44. trace_id: str
  45. root_steps: List[StepNode]
  46. class NodeResponse(BaseModel):
  47. """节点响应"""
  48. step_id: Optional[str]
  49. step_type: Optional[str]
  50. description: Optional[str]
  51. children: List[StepNode]
  52. # ===== 全局 TraceStore(由 api_server.py 注入)=====
  53. _trace_store: Optional[TraceStore] = None
  54. def set_trace_store(store: TraceStore):
  55. """设置 TraceStore 实例"""
  56. global _trace_store
  57. _trace_store = store
  58. def get_trace_store() -> TraceStore:
  59. """获取 TraceStore 实例"""
  60. if _trace_store is None:
  61. raise RuntimeError("TraceStore not initialized")
  62. return _trace_store
  63. # ===== 路由 =====
  64. @router.get("", response_model=TraceListResponse)
  65. async def list_traces(
  66. mode: Optional[str] = None,
  67. agent_type: Optional[str] = None,
  68. uid: Optional[str] = None,
  69. status: Optional[str] = None,
  70. limit: int = Query(20, le=100)
  71. ):
  72. """
  73. 列出 Traces
  74. Args:
  75. mode: 模式过滤(call/agent)
  76. agent_type: Agent 类型过滤
  77. uid: 用户 ID 过滤
  78. status: 状态过滤(running/completed/failed)
  79. limit: 最大返回数量
  80. """
  81. store = get_trace_store()
  82. traces = await store.list_traces(
  83. mode=mode,
  84. agent_type=agent_type,
  85. uid=uid,
  86. status=status,
  87. limit=limit
  88. )
  89. return TraceListResponse(
  90. traces=[t.to_dict() for t in traces]
  91. )
  92. @router.get("/{trace_id}", response_model=TraceResponse)
  93. async def get_trace(trace_id: str):
  94. """
  95. 获取 Trace 元数据
  96. Args:
  97. trace_id: Trace ID
  98. """
  99. store = get_trace_store()
  100. trace = await store.get_trace(trace_id)
  101. if not trace:
  102. raise HTTPException(status_code=404, detail="Trace not found")
  103. return TraceResponse(**trace.to_dict())
  104. @router.get("/{trace_id}/tree", response_model=TreeResponse)
  105. async def get_full_tree(
  106. trace_id: str,
  107. view: str = Query("compact", regex="^(compact|full)$"),
  108. max_depth: int = Query(999, ge=1, le=999)
  109. ):
  110. """
  111. 获取完整 Step 树(小型 Trace 推荐)
  112. Args:
  113. trace_id: Trace ID
  114. view: compact(默认,不含 blob)| full(含 blob)
  115. max_depth: 最大深度
  116. """
  117. store = get_trace_store()
  118. # 验证 Trace 存在
  119. trace = await store.get_trace(trace_id)
  120. if not trace:
  121. raise HTTPException(status_code=404, detail="Trace not found")
  122. # 获取所有 Steps
  123. steps = await store.get_trace_steps(trace_id)
  124. # 构建树结构
  125. root_nodes = await _build_tree(store, trace_id, None, view=view, expand=True, max_depth=max_depth)
  126. return TreeResponse(
  127. trace_id=trace_id,
  128. root_steps=root_nodes
  129. )
  130. @router.get("/{trace_id}/node/{step_id}", response_model=NodeResponse)
  131. async def get_node(
  132. trace_id: str,
  133. step_id: str,
  134. view: str = Query("compact", regex="^(compact|full)$"),
  135. expand: bool = Query(False, description="是否加载子节点"),
  136. max_depth: int = Query(1, ge=1, le=10, description="递归深度")
  137. ):
  138. """
  139. 懒加载节点 + 子节点(大型 Trace 推荐)
  140. Args:
  141. trace_id: Trace ID
  142. step_id: Step ID("null" 表示根节点)
  143. view: compact | full
  144. expand: 是否加载子节点
  145. max_depth: 递归深度
  146. """
  147. store = get_trace_store()
  148. # 验证 Trace 存在
  149. trace = await store.get_trace(trace_id)
  150. if not trace:
  151. raise HTTPException(status_code=404, detail="Trace not found")
  152. # step_id = "null" 表示根节点
  153. actual_step_id = None if step_id == "null" else step_id
  154. # 验证 Step 存在(非根节点)
  155. if actual_step_id:
  156. step = await store.get_step(actual_step_id)
  157. if not step or step.trace_id != trace_id:
  158. raise HTTPException(status_code=404, detail="Step not found")
  159. # 构建节点树
  160. children = await _build_tree(store, trace_id, actual_step_id, view=view, expand=expand, max_depth=max_depth)
  161. # 如果是根节点,返回所有根 Steps
  162. if actual_step_id is None:
  163. return NodeResponse(
  164. step_id=None,
  165. step_type=None,
  166. description=None,
  167. children=children
  168. )
  169. # 否则返回当前节点 + 子节点
  170. step = await store.get_step(actual_step_id)
  171. return NodeResponse(
  172. step_id=step.step_id,
  173. step_type=step.step_type,
  174. description=step.description,
  175. children=children
  176. )
  177. @router.get("/{trace_id}/node/{step_id}/children")
  178. async def get_children_paginated(
  179. trace_id: str,
  180. step_id: str,
  181. cursor: Optional[int] = Query(None, description="上次最后的 sequence"),
  182. limit: int = Query(20, ge=1, le=100),
  183. view: str = Query("compact", regex="^(compact|full)$")
  184. ):
  185. """
  186. 分页获取子节点(基于 sequence 游标)
  187. Args:
  188. trace_id: Trace ID
  189. step_id: Step ID
  190. cursor: 上次最后的 sequence(None 表示从头开始)
  191. limit: 每页数量
  192. view: compact | full
  193. Returns:
  194. {
  195. "children": [...],
  196. "next_cursor": 123, # 下一页游标(None 表示没有更多)
  197. "has_more": true
  198. }
  199. """
  200. store = get_trace_store()
  201. # 验证 trace 存在
  202. trace = await store.get_trace(trace_id)
  203. if not trace:
  204. raise HTTPException(status_code=404, detail="Trace not found")
  205. # 验证 step 存在
  206. step = await store.get_step(step_id)
  207. if not step or step.trace_id != trace_id:
  208. raise HTTPException(status_code=404, detail="Step not found")
  209. # 获取所有子节点
  210. children = await store.get_step_children(step_id)
  211. # 过滤 cursor 之后的节点
  212. if cursor is not None:
  213. children = [s for s in children if s.sequence > cursor]
  214. # 分页
  215. has_more = len(children) > limit
  216. page = children[:limit]
  217. next_cursor = page[-1].sequence if page and has_more else None
  218. # 序列化
  219. children_dicts = [s.to_dict(view=view) for s in page]
  220. return {
  221. "children": children_dicts,
  222. "next_cursor": next_cursor,
  223. "has_more": has_more
  224. }
  225. # ===== 核心算法:懒加载树构建 =====
  226. async def _build_tree(
  227. store: TraceStore,
  228. trace_id: str,
  229. step_id: Optional[str],
  230. view: str = "compact", # 新增参数
  231. expand: bool = False,
  232. max_depth: int = 1,
  233. current_depth: int = 0
  234. ) -> List[StepNode]:
  235. """
  236. 懒加载核心逻辑(简洁版本)
  237. 没有"批次计算"、没有"同层完整性检查"
  238. 只有简单的递归遍历
  239. Args:
  240. store: TraceStore 实例
  241. trace_id: Trace ID
  242. step_id: 当前 Step ID(None 表示根节点)
  243. view: "compact" | "full"
  244. expand: 是否展开子节点
  245. max_depth: 最大递归深度
  246. current_depth: 当前递归深度
  247. Returns:
  248. List[StepNode]: 节点列表
  249. """
  250. # 1. 获取当前层节点
  251. if step_id is None:
  252. # 根节点:获取所有 parent_id=None 的 Steps
  253. steps = await store.get_trace_steps(trace_id)
  254. current_nodes = [s for s in steps if s.parent_id is None]
  255. else:
  256. # 非根节点:获取子节点
  257. current_nodes = await store.get_step_children(step_id)
  258. # 2. 构建响应
  259. result_nodes = []
  260. for step in current_nodes:
  261. node_dict = step.to_dict(view=view) # 使用 view 参数
  262. node_dict["children"] = []
  263. # 3. 递归加载子节点(可选)
  264. if expand and current_depth < max_depth:
  265. children = await store.get_step_children(step.step_id)
  266. if children:
  267. node_dict["children"] = await _build_tree(
  268. store, trace_id, step.step_id,
  269. view=view, expand=True, max_depth=max_depth,
  270. current_depth=current_depth + 1
  271. )
  272. result_nodes.append(StepNode(**node_dict))
  273. return result_nodes