| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458 |
- """
- Trace 控制 API — 新建 / 运行 / 停止 / 反思
- 提供 POST 端点触发 Agent 执行和控制。需要通过 set_runner() 注入 AgentRunner 实例。
- 执行在后台异步进行,客户端通过 WebSocket (/api/traces/{trace_id}/watch) 监听实时更新。
- 端点:
- POST /api/traces — 新建 Trace 并执行
- POST /api/traces/{id}/run — 运行(统一续跑 + 回溯)
- POST /api/traces/{id}/stop — 停止运行中的 Trace
- POST /api/traces/{id}/reflect — 反思,在 trace 末尾追加反思 prompt 运行,结果追加到 experiences 文件
- GET /api/traces/running — 列出正在运行的 Trace
- GET /api/experiences — 读取经验文件内容
- """
- import asyncio
- import logging
- import os
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel, Field
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/traces", tags=["run"])
- # 经验 API 使用独立 prefix
- experiences_router = APIRouter(prefix="/api", tags=["experiences"])
- # ===== 全局 Runner(由 api_server.py 注入)=====
- _runner = None
- def set_runner(runner):
- """注入 AgentRunner 实例"""
- global _runner
- _runner = runner
- def _get_runner():
- if _runner is None:
- raise HTTPException(
- status_code=503,
- detail="AgentRunner not configured. Server is in read-only mode.",
- )
- return _runner
- # ===== Request / Response 模型 =====
- class CreateRequest(BaseModel):
- """新建执行"""
- messages: List[Dict[str, Any]] = Field(
- ...,
- description="OpenAI SDK 格式的输入消息。可包含 system + user 消息;若无 system 消息则从 skills 自动构建",
- )
- model: str = Field("gpt-4o", description="模型名称")
- temperature: float = Field(0.3)
- max_iterations: int = Field(200)
- tools: Optional[List[str]] = Field(None, description="工具白名单(None = 全部)")
- name: Optional[str] = Field(None, description="任务名称(None = 自动生成)")
- uid: Optional[str] = Field(None)
- class TraceRunRequest(BaseModel):
- """运行(统一续跑 + 回溯)"""
- messages: List[Dict[str, Any]] = Field(
- default_factory=list,
- description="追加的新消息(可为空,用于重新生成场景)",
- )
- after_message_id: Optional[str] = Field(
- None,
- description="从哪条消息后续跑。None = 从末尾续跑,message_id = 从该消息后运行(自动判断续跑/回溯)",
- )
- class ReflectRequest(BaseModel):
- """反思请求"""
- focus: Optional[str] = Field(None, description="反思重点(可选)")
- class RunResponse(BaseModel):
- """操作响应(立即返回,后台执行)"""
- trace_id: str
- status: str = "started"
- message: str = ""
- class StopResponse(BaseModel):
- """停止响应"""
- trace_id: str
- status: str # "stopping" | "not_running"
- class ReflectResponse(BaseModel):
- """反思响应"""
- trace_id: str
- reflection: str
- # ===== 后台执行 =====
- _running_tasks: Dict[str, asyncio.Task] = {}
- async def _run_in_background(trace_id: str, messages: List[Dict], config):
- """后台执行 agent,消费 run() 的所有 yield"""
- runner = _get_runner()
- try:
- async for _item in runner.run(messages=messages, config=config):
- pass # WebSocket 广播由 runner 内部的 store 事件驱动
- except Exception as e:
- logger.error(f"Background run failed for {trace_id}: {e}")
- finally:
- _running_tasks.pop(trace_id, None)
- async def _run_with_trace_signal(
- messages: List[Dict], config, trace_id_future: asyncio.Future,
- ):
- """后台执行 agent,通过 Future 将 trace_id 传回给等待的 endpoint"""
- from agent.trace.models import Trace
- runner = _get_runner()
- trace_id: Optional[str] = None
- try:
- async for item in runner.run(messages=messages, config=config):
- if isinstance(item, Trace) and not trace_id_future.done():
- trace_id = item.trace_id
- trace_id_future.set_result(trace_id)
- except Exception as e:
- if not trace_id_future.done():
- trace_id_future.set_exception(e)
- logger.error(f"Background run failed: {e}")
- finally:
- if trace_id:
- _running_tasks.pop(trace_id, None)
- # ===== 路由 =====
- @router.post("", response_model=RunResponse)
- async def create_and_run(req: CreateRequest):
- """
- 新建 Trace 并开始执行
- 立即返回 trace_id,后台异步执行。
- 通过 WebSocket /api/traces/{trace_id}/watch 监听实时更新。
- """
- from agent.core.runner import RunConfig
- _get_runner() # 验证 Runner 已配置
- config = RunConfig(
- model=req.model,
- temperature=req.temperature,
- max_iterations=req.max_iterations,
- tools=req.tools,
- name=req.name,
- uid=req.uid,
- )
- # 启动后台执行,通过 Future 等待 trace_id(Phase 1 完成后即返回)
- trace_id_future: asyncio.Future[str] = asyncio.get_running_loop().create_future()
- task = asyncio.create_task(
- _run_with_trace_signal(req.messages, config, trace_id_future)
- )
- trace_id = await trace_id_future
- _running_tasks[trace_id] = task
- return RunResponse(
- trace_id=trace_id,
- status="started",
- message=f"Execution started. Watch via WebSocket: /api/traces/{trace_id}/watch",
- )
- async def _cleanup_incomplete_tool_calls(store, trace_id: str, after_sequence: int) -> int:
- """
- 找到安全的插入点,保证不会把新消息插在一个不完整的工具调用序列中间。
- 场景:
- 1. after_sequence 刚好是一条带 tool_calls 的 assistant 消息,
- 但其部分/全部 tool response 还没生成 → 回退到该 assistant 之前。
- 2. after_sequence 是某条 tool response,但同一批 tool_calls 中
- 还有其他 response 未生成 → 回退到该 assistant 之前。
- 核心逻辑:从 after_sequence 往前找,定位到包含它的那条 assistant 消息,
- 检查该 assistant 的所有 tool_calls 是否都有对应的 tool response。
- 如果不完整,就把截断点回退到该 assistant 消息之前(即其 parent_sequence)。
- Args:
- store: TraceStore
- trace_id: Trace ID
- after_sequence: 用户指定的插入位置
- Returns:
- 调整后的安全截断点(<= after_sequence)
- """
- all_messages = await store.get_trace_messages(trace_id)
- if not all_messages:
- return after_sequence
- by_seq = {msg.sequence: msg for msg in all_messages}
- target = by_seq.get(after_sequence)
- if target is None:
- return after_sequence
- # 找到"所属的 assistant 消息":
- # - 如果 target 本身是 assistant → 就是它
- # - 如果 target 是 tool → 沿 parent_sequence 往上找 assistant
- assistant_msg = None
- if target.role == "assistant":
- assistant_msg = target
- elif target.role == "tool":
- cur = target
- while cur and cur.role == "tool":
- parent_seq = cur.parent_sequence
- cur = by_seq.get(parent_seq) if parent_seq is not None else None
- if cur and cur.role == "assistant":
- assistant_msg = cur
- if assistant_msg is None:
- return after_sequence
- # 该 assistant 是否带 tool_calls?
- content = assistant_msg.content
- if not isinstance(content, dict) or not content.get("tool_calls"):
- return after_sequence
- # 收集所有 tool_call_ids
- expected_ids = set()
- for tc in content["tool_calls"]:
- if isinstance(tc, dict) and tc.get("id"):
- expected_ids.add(tc["id"])
- if not expected_ids:
- return after_sequence
- # 查找已有的 tool responses
- found_ids = set()
- for msg in all_messages:
- if msg.role == "tool" and msg.tool_call_id in expected_ids:
- found_ids.add(msg.tool_call_id)
- missing = expected_ids - found_ids
- if not missing:
- # 全部 tool response 都在,这是一个完整的序列
- return after_sequence
- # 不完整 → 回退到 assistant 之前
- safe = assistant_msg.parent_sequence
- if safe is None:
- # assistant 已经是第一条消息,没有更早的位置
- safe = assistant_msg.sequence - 1
- logger.info(
- "检测到不完整的工具调用 (assistant seq=%d, 缺少 %d/%d tool responses),"
- "自动回退插入点:%d -> %d",
- assistant_msg.sequence, len(missing), len(expected_ids),
- after_sequence, safe,
- )
- return safe
- def _parse_sequence_from_message_id(message_id: str) -> int:
- """从 message_id 末尾解析 sequence 整数(格式:{trace_id}-{sequence:04d})"""
- try:
- return int(message_id.rsplit("-", 1)[-1])
- except (ValueError, IndexError):
- raise HTTPException(
- status_code=422,
- detail=f"Invalid after_message_id format: {message_id!r}",
- )
- @router.post("/{trace_id}/run", response_model=RunResponse)
- async def run_trace(trace_id: str, req: TraceRunRequest):
- """
- 运行已有 Trace(统一续跑 + 回溯)
- - after_message_id 为 null(或省略):从末尾续跑
- - after_message_id 为 message_id 字符串:从该消息后运行(Runner 自动判断续跑/回溯)
- - messages 为空 + after_message_id 有值:重新生成(从该位置重跑,不插入新消息)
- **自动清理不完整工具调用**:
- 如果人工插入 message 的位置打断了一个工具调用过程(assistant 消息有 tool_calls
- 但缺少对应的 tool responses),框架会自动检测并调整插入位置,确保不会产生不一致的状态。
- """
- from agent.core.runner import RunConfig
- runner = _get_runner()
- # 将 message_id 转换为内部使用的 sequence 整数
- after_sequence: Optional[int] = None
- if req.after_message_id is not None:
- after_sequence = _parse_sequence_from_message_id(req.after_message_id)
- # 验证 trace 存在
- if runner.trace_store:
- trace = await runner.trace_store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
- # 自动检查并清理不完整的工具调用
- if after_sequence is not None and req.messages:
- adjusted_seq = await _cleanup_incomplete_tool_calls(
- runner.trace_store, trace_id, after_sequence
- )
- if adjusted_seq != after_sequence:
- logger.info(
- f"已自动调整插入位置:{after_sequence} -> {adjusted_seq}"
- )
- after_sequence = adjusted_seq
- # 检查是否已在运行
- if trace_id in _running_tasks and not _running_tasks[trace_id].done():
- raise HTTPException(status_code=409, detail="Trace is already running")
- config = RunConfig(trace_id=trace_id, after_sequence=after_sequence)
- task = asyncio.create_task(_run_in_background(trace_id, req.messages, config))
- _running_tasks[trace_id] = task
- mode = "rewind" if after_sequence is not None else "continue"
- return RunResponse(
- trace_id=trace_id,
- status="started",
- message=f"Run ({mode}) started. Watch via WebSocket: /api/traces/{trace_id}/watch",
- )
- @router.post("/{trace_id}/stop", response_model=StopResponse)
- async def stop_trace(trace_id: str):
- """
- 停止运行中的 Trace
- 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
- Trace 状态置为 "stopped"。
- """
- runner = _get_runner()
- # 通过 runner 的 stop 方法设置取消信号
- stopped = await runner.stop(trace_id)
- if not stopped:
- # 检查是否在 _running_tasks 但 runner 不知道(可能已完成)
- if trace_id in _running_tasks:
- task = _running_tasks[trace_id]
- if not task.done():
- task.cancel()
- _running_tasks.pop(trace_id, None)
- return StopResponse(trace_id=trace_id, status="stopping")
- return StopResponse(trace_id=trace_id, status="not_running")
- return StopResponse(trace_id=trace_id, status="stopping")
- @router.post("/{trace_id}/reflect", response_model=ReflectResponse)
- async def reflect_trace(trace_id: str, req: ReflectRequest):
- """
- 触发反思
- 在 trace 末尾追加一条包含反思 prompt 的 user message,单轮无工具 LLM 调用获取反思结果,
- 将结果追加到 experiences 文件(默认 ./.cache/experiences.md)。
- 反思消息作为侧枝(side branch):运行前保存 head_sequence,运行后恢复(try/finally 保证)。
- 使用 max_iterations=1, tools=[] 确保反思不会产生副作用。
- """
- from agent.core.runner import RunConfig
- from agent.trace.compaction import build_reflect_prompt
- runner = _get_runner()
- if not runner.trace_store:
- raise HTTPException(status_code=503, detail="TraceStore not configured")
- # 验证 trace 存在
- trace = await runner.trace_store.get_trace(trace_id)
- if not trace:
- raise HTTPException(status_code=404, detail=f"Trace not found: {trace_id}")
- # 检查是否仍在运行
- if trace_id in _running_tasks and not _running_tasks[trace_id].done():
- raise HTTPException(status_code=409, detail="Cannot reflect on a running trace. Stop it first.")
- # 保存当前 head_sequence(反思完成后恢复,使反思消息成为侧枝)
- saved_head_sequence = trace.head_sequence
- # 构建反思 prompt
- prompt = build_reflect_prompt()
- if req.focus:
- prompt += f"\n\n请特别关注:{req.focus}"
- # 以续跑方式运行:单轮无工具 LLM 调用
- config = RunConfig(trace_id=trace_id, max_iterations=1, tools=[])
- reflection_text = ""
- try:
- result = await runner.run_result(
- messages=[{"role": "user", "content": prompt}],
- config=config,
- )
- reflection_text = result.get("summary", "")
- finally:
- # 恢复 head_sequence(反思消息成为侧枝,不影响主路径)
- await runner.trace_store.update_trace(trace_id, head_sequence=saved_head_sequence)
- # 追加到 experiences 文件
- if reflection_text:
- experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md")
- if experiences_path:
- os.makedirs(os.path.dirname(experiences_path), exist_ok=True)
- header = f"\n\n---\n\n## {trace_id} ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n"
- with open(experiences_path, "a", encoding="utf-8") as f:
- f.write(header + reflection_text + "\n")
- logger.info(f"Reflection appended to {experiences_path}")
- return ReflectResponse(
- trace_id=trace_id,
- reflection=reflection_text,
- )
- @router.get("/running", tags=["run"])
- async def list_running():
- """列出正在运行的 Trace"""
- running = []
- for tid, task in list(_running_tasks.items()):
- if task.done():
- _running_tasks.pop(tid, None)
- else:
- running.append(tid)
- return {"running": running}
- # ===== 经验 API =====
- @experiences_router.get("/experiences")
- async def list_experiences():
- """读取经验文件内容"""
- runner = _get_runner()
- experiences_path = getattr(runner, "experiences_path", "./.cache/experiences.md")
- if not experiences_path or not os.path.exists(experiences_path):
- return {"content": "", "path": experiences_path}
- with open(experiences_path, "r", encoding="utf-8") as f:
- content = f.read()
- return {"content": content, "path": experiences_path}
|