问题: 现有Trace存储在本地文件系统,跨设备时如何访问?
云端Agent (云端存储)
↓ 调用
终端Agent (终端存储)
↓ 返回 sub_trace_id
云端Agent 想 continue_from sub_trace_id
↓ 问题:sub_trace在终端,云端访问不到!
终端Agent (终端存储)
↓ 调用
云端Agent (云端存储)
↓ 返回 sub_trace_id
终端Agent 想 continue_from sub_trace_id
↓ 问题:sub_trace在云端,终端访问不到!
Trace ID包含位置信息,通过HTTP API访问远程Trace
本地Trace: abc-123
远程Trace: agent://terminal-agent-456/abc-123
^^^^^^^^ ^^^^^^^^^^^^^^^^^ ^^^^^^^
协议 Agent地址 本地ID
# agent/trace/remote_store.py
class RemoteTraceStore:
"""远程Trace存储代理"""
def __init__(self, agent_url: str, api_key: str):
self.agent_url = agent_url
self.api_key = api_key
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""通过HTTP API获取远程Trace"""
response = await self._get(f"/api/traces/{trace_id}")
if response:
return Trace(**response)
return None
async def get_main_path_messages(
self, trace_id: str, head_sequence: int
) -> List[Message]:
"""获取远程Trace的消息"""
response = await self._get(
f"/api/traces/{trace_id}/messages",
params={"mode": "main_path", "head_sequence": head_sequence}
)
return [Message(**m) for m in response["messages"]]
async def add_message(self, message: Message) -> str:
"""向远程Trace追加消息(续跑)"""
response = await self._post(
f"/api/traces/{message.trace_id}/messages",
data=message.to_dict()
)
return response["message_id"]
# agent/trace/hybrid_store.py
class HybridTraceStore:
"""混合存储:本地 + 远程"""
def __init__(self, local_store: FileSystemTraceStore):
self.local_store = local_store
self.remote_stores = {} # agent_url -> RemoteTraceStore
def _parse_trace_id(self, trace_id: str) -> tuple[str, str]:
"""
解析Trace ID
返回: (location, local_id)
- location: "local" 或 agent_url
- local_id: 本地Trace ID
"""
if trace_id.startswith("agent://"):
# agent://terminal-agent-456/abc-123
parts = trace_id[8:].split("/", 1)
return parts[0], parts[1]
else:
return "local", trace_id
async def get_trace(self, trace_id: str) -> Optional[Trace]:
"""获取Trace(自动路由到本地或远程)"""
location, local_id = self._parse_trace_id(trace_id)
if location == "local":
return await self.local_store.get_trace(local_id)
else:
# 远程Trace
if location not in self.remote_stores:
# 创建远程存储代理
self.remote_stores[location] = RemoteTraceStore(
agent_url=f"https://{location}",
api_key=self._get_api_key(location)
)
return await self.remote_stores[location].get_trace(local_id)
async def add_message(self, message: Message) -> str:
"""添加消息(自动路由)"""
location, local_id = self._parse_trace_id(message.trace_id)
if location == "local":
return await self.local_store.add_message(message)
else:
# 向远程Trace追加消息
return await self.remote_stores[location].add_message(message)
@tool(description="创建 Agent 执行任务")
async def agent(
task: Union[str, List[str]],
continue_from: Optional[str] = None, # 支持远程Trace ID
agent_url: Optional[str] = None, # 新增:远程Agent地址
context: Optional[dict] = None,
) -> Dict[str, Any]:
"""
创建 Agent 执行任务
Args:
task: 任务描述
continue_from: 继续已有trace(支持远程Trace ID)
agent_url: 远程Agent地址(如果调用远程Agent)
context: 框架注入的上下文
"""
store = context.get("store") # HybridTraceStore
if agent_url:
# 调用远程Agent
result = await _call_remote_agent(agent_url, task, continue_from)
# 返回远程Trace ID
remote_trace_id = f"agent://{agent_url}/{result['sub_trace_id']}"
return {
**result,
"sub_trace_id": remote_trace_id,
"remote": True
}
else:
# 本地执行(现有逻辑)
if continue_from:
# 可能是远程Trace,HybridStore会自动处理
existing = await store.get_trace(continue_from)
if not existing:
return {"status": "failed", "error": "Trace not found"}
# ... 现有逻辑
# 云端Agent调用终端Agent
result1 = await agent(
task="分析本地项目",
agent_url="terminal-agent-456.local"
)
# 返回: {"sub_trace_id": "agent://terminal-agent-456.local/abc-123"}
# 继续对话(自动访问远程Trace)
result2 = await agent(
task="重点分析core模块",
continue_from=result1["sub_trace_id"], # 远程Trace ID
agent_url="terminal-agent-456.local"
)
# HybridStore自动通过HTTP API访问远程Trace
所有Agent共享同一个Trace存储(数据库)
云端Agent ──┐
├──> 中心化Trace存储(PostgreSQL/MongoDB)
终端Agent ──┘
# agent/trace/db_store.py
class DatabaseTraceStore:
"""数据库Trace存储"""
def __init__(self, db_url: str):
self.db = connect(db_url)
async def create_trace(self, trace: Trace) -> str:
await self.db.traces.insert_one(trace.to_dict())
return trace.trace_id
async def get_trace(self, trace_id: str) -> Optional[Trace]:
doc = await self.db.traces.find_one({"trace_id": trace_id})
if doc:
return Trace(**doc)
return None
# ... 其他方法
# config/storage.yaml
trace_store:
type: database
url: postgresql://user:pass@db.cloud/traces
# 或
url: mongodb://db.cloud/traces
按需同步远程Trace到本地
class CachedRemoteStore:
"""带缓存的远程存储"""
def __init__(self, local_store, remote_url):
self.local_store = local_store
self.remote_url = remote_url
self.cache = {} # trace_id -> Trace
async def get_trace(self, trace_id: str) -> Optional[Trace]:
# 1. 检查本地
trace = await self.local_store.get_trace(trace_id)
if trace:
return trace
# 2. 检查缓存
if trace_id in self.cache:
return self.cache[trace_id]
# 3. 从远程获取并缓存
trace = await self._fetch_remote(trace_id)
if trace:
self.cache[trace_id] = trace
# 可选:持久化到本地
await self.local_store.create_trace(trace)
return trace
async def add_message(self, message: Message) -> str:
# 同时写入本地和远程
local_id = await self.local_store.add_message(message)
await self._sync_to_remote(message)
return local_id
结合方案1和方案2的优点
组织内部:
云端Agent ──┐
├──> 组织数据库
职能Agent ──┘
跨组织:
组织A Agent <──HTTP API──> 组织B Agent
# config/storage.yaml
trace_store:
# 组织内部使用数据库
internal:
type: database
url: postgresql://org-db/traces
# 跨组织使用远程访问
external:
type: remote
agents:
- url: https://partner-org.com
api_key: xxx
理由:
实现步骤:
理由:
# 本地Trace
"abc-123"
# 远程Trace(完整格式)
"agent://terminal-agent-456.local:8000/abc-123"
# 远程Trace(简化格式,使用注册的agent_id)
"@terminal-agent-456/abc-123"
# config/agents.yaml
agents:
terminal-agent-456:
url: https://terminal-agent-456.local:8000
api_key: ak_xxx
type: terminal
# 必需的API端点(用于远程访问)
GET /api/traces/{trace_id} # 获取Trace
GET /api/traces/{trace_id}/messages # 获取消息
POST /api/traces/{trace_id}/run # 续跑
POST /api/traces/{trace_id}/messages # 追加消息
# 每个Agent有自己的API Key
headers = {
"Authorization": f"Bearer {api_key}"
}
# 权限检查
if trace.uid != request.user_id:
raise Forbidden("Cannot access other user's trace")
# 批量获取消息
GET /api/traces/{trace_id}/messages?limit=100&offset=0
# 增量同步
GET /api/traces/{trace_id}/messages?since_sequence=50
# 压缩传输
headers = {"Accept-Encoding": "gzip"}
推荐路线:
Phase 1(MVP) - 实现方案1(远程Trace访问)
Phase 2 - 优化和增强
Phase 3(可选) - 迁移到方案4(混合模式)
关键优势: