| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693 |
- """
- Agent Runner - Agent 执行引擎
- 核心职责:
- 1. 执行 Agent 任务(循环调用 LLM + 工具)
- 2. 记录执行轨迹(Trace + Messages + GoalTree)
- 3. 检索和注入记忆(Experience + Skill)
- 4. 管理执行计划(GoalTree)
- 5. 收集反馈,提取经验
- """
- from agent.tools.builtin.browser import browser_read_long_content
- import logging
- from dataclasses import dataclass
- from datetime import datetime
- from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Union
- from agent.trace.models import Trace, Message
- from agent.trace.protocols import TraceStore
- from agent.trace.goal_models import GoalTree
- from agent.memory.models import Experience, Skill
- from agent.memory.protocols import MemoryStore, StateStore
- from agent.memory.skill_loader import load_skills_from_dir
- from agent.tools import ToolRegistry, get_tool_registry
- logger = logging.getLogger(__name__)
- @dataclass
- class AgentConfig:
- """Agent 配置"""
- agent_type: str = "default"
- max_iterations: int = 200
- enable_memory: bool = True
- auto_execute_tools: bool = True
- @dataclass
- class CallResult:
- """单次调用结果"""
- reply: str
- tool_calls: Optional[List[Dict]] = None
- trace_id: Optional[str] = None
- step_id: Optional[str] = None
- tokens: Optional[Dict[str, int]] = None
- cost: float = 0.0
- # 内置工具列表(始终自动加载)
- BUILTIN_TOOLS = [
- # 文件操作工具
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- # 系统工具
- "bash_command",
- # 技能和目标管理
- "skill",
- "list_skills",
- "goal",
- "subagent",
- # 搜索工具
- "search_posts",
- "get_search_suggestions",
- # 沙箱工具
- "sandbox_create_environment",
- "sandbox_run_shell",
- "sandbox_rebuild_with_ports",
- "sandbox_destroy_environment",
- # 浏览器工具
- "browser_navigate_to_url",
- "browser_search_web",
- "browser_go_back",
- "browser_wait",
- "browser_click_element",
- "browser_input_text",
- "browser_send_keys",
- "browser_upload_file",
- "browser_scroll_page",
- "browser_find_text",
- "browser_screenshot",
- "browser_switch_tab",
- "browser_close_tab",
- "browser_get_dropdown_options",
- "browser_select_dropdown_option",
- "browser_extract_content",
- "browser_read_long_content",
- "browser_get_page_html",
- "browser_get_selector_map",
- "browser_evaluate",
- "browser_ensure_login_with_cookies",
- "browser_wait_for_user_action",
- "browser_done",
- # 飞书工具
- "feishu_get_chat_history",
- "feishu_get_contact_replies",
- "feishu_send_message_to_contact",
- "feishu_get_contact_list",
- ]
- class AgentRunner:
- """
- Agent 执行引擎
- 支持两种模式:
- 1. call(): 单次 LLM 调用(简洁 API)
- 2. run(): Agent 模式(循环 + 记忆 + 追踪)
- """
- def __init__(
- self,
- trace_store: Optional[TraceStore] = None,
- memory_store: Optional[MemoryStore] = None,
- state_store: Optional[StateStore] = None,
- tool_registry: Optional[ToolRegistry] = None,
- llm_call: Optional[Callable] = None,
- config: Optional[AgentConfig] = None,
- skills_dir: Optional[str] = None,
- goal_tree: Optional[GoalTree] = None,
- debug: bool = False,
- ):
- """
- 初始化 AgentRunner
- Args:
- trace_store: Trace 存储(可选,不提供则不记录)
- memory_store: Memory 存储(可选,不提供则不使用记忆)
- state_store: State 存储(可选,用于任务状态)
- tool_registry: 工具注册表(可选,默认使用全局注册表)
- llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
- config: Agent 配置
- skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
- goal_tree: 执行计划(可选,不提供则在运行时按需创建)
- debug: 保留参数(已废弃,请使用 API Server 可视化)
- """
- self.trace_store = trace_store
- self.memory_store = memory_store
- self.state_store = state_store
- self.tools = tool_registry or get_tool_registry()
- self.llm_call = llm_call
- self.config = config or AgentConfig()
- self.skills_dir = skills_dir
- self.goal_tree = goal_tree
- self.debug = debug
- def _generate_id(self) -> str:
- """生成唯一 ID"""
- import uuid
- return str(uuid.uuid4())
- # ===== 单次调用 =====
- async def call(
- self,
- messages: List[Dict],
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- uid: Optional[str] = None,
- trace: bool = True,
- **kwargs
- ) -> CallResult:
- """
- 单次 LLM 调用
- Args:
- messages: 消息列表
- model: 模型名称
- tools: 工具名称列表
- uid: 用户 ID
- trace: 是否记录 Trace
- **kwargs: 其他参数传递给 LLM
- Returns:
- CallResult
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- trace_id = None
- message_id = None
- # 准备工具 Schema
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 创建 Trace
- if trace and self.trace_store:
- trace_obj = Trace.create(
- mode="call",
- uid=uid,
- model=model,
- tools=tool_schemas, # 保存工具定义
- llm_params=kwargs, # 保存 LLM 参数
- )
- trace_id = await self.trace_store.create_trace(trace_obj)
- # 调用 LLM
- result = await self.llm_call(
- messages=messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- # 记录 Message(单次调用模式不使用 GoalTree)
- if trace and self.trace_store and trace_id:
- msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=1,
- goal_id=None, # 单次调用没有 goal
- content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
- prompt_tokens=result.get("prompt_tokens", 0),
- completion_tokens=result.get("completion_tokens", 0),
- finish_reason=result.get("finish_reason"),
- cost=result.get("cost", 0),
- )
- message_id = await self.trace_store.add_message(msg)
- # 完成 Trace
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- )
- return CallResult(
- reply=result.get("content", ""),
- tool_calls=result.get("tool_calls"),
- trace_id=trace_id,
- step_id=message_id, # 兼容字段名
- tokens={
- "prompt": result.get("prompt_tokens", 0),
- "completion": result.get("completion_tokens", 0),
- },
- cost=result.get("cost", 0)
- )
- # ===== Agent 模式 =====
- async def run_result(
- self,
- task: str,
- messages: Optional[List[Dict]] = None,
- system_prompt: Optional[str] = None,
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- max_iterations: Optional[int] = None,
- enable_memory: Optional[bool] = None,
- auto_execute_tools: Optional[bool] = None,
- trace_id: Optional[str] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- Agent 结果模式执行。
- 消费 run() 的流式事件,返回结构化结果(最后一条有文本的 assistant + trace 统计)。
- """
- last_assistant_text = ""
- final_trace: Optional[Trace] = None
- async for item in self.run(
- task=task,
- messages=messages,
- system_prompt=system_prompt,
- model=model,
- tools=tools,
- agent_type=agent_type,
- uid=uid,
- max_iterations=max_iterations,
- enable_memory=enable_memory,
- auto_execute_tools=auto_execute_tools,
- trace_id=trace_id,
- **kwargs
- ):
- if isinstance(item, Message) and item.role == "assistant":
- content = item.content
- text = ""
- if isinstance(content, dict):
- text = content.get("text", "") or ""
- elif isinstance(content, str):
- text = content
- if text and text.strip():
- last_assistant_text = text
- elif isinstance(item, Trace):
- final_trace = item
- if not final_trace and trace_id and self.trace_store:
- final_trace = await self.trace_store.get_trace(trace_id)
- status = final_trace.status if final_trace else "unknown"
- error = final_trace.error_message if final_trace else None
- summary = last_assistant_text
- if not summary:
- status = "failed"
- error = error or "Sub-Agent 没有产生 assistant 文本结果"
- return {
- "status": status,
- "summary": summary,
- "trace_id": final_trace.trace_id if final_trace else trace_id,
- "error": error,
- "stats": {
- "total_messages": final_trace.total_messages if final_trace else 0,
- "total_tokens": final_trace.total_tokens if final_trace else 0,
- "total_cost": final_trace.total_cost if final_trace else 0.0,
- },
- }
- async def run(
- self,
- task: str,
- messages: Optional[List[Dict]] = None,
- system_prompt: Optional[str] = None,
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- max_iterations: Optional[int] = None,
- enable_memory: Optional[bool] = None,
- auto_execute_tools: Optional[bool] = None,
- trace_id: Optional[str] = None,
- **kwargs
- ) -> AsyncIterator[Union[Trace, Message]]:
- """
- Agent 模式执行
- Args:
- task: 任务描述
- messages: 初始消息(可选)
- system_prompt: 系统提示(可选)
- model: 模型名称
- tools: 工具名称列表
- agent_type: Agent 类型
- uid: 用户 ID
- max_iterations: 最大迭代次数
- enable_memory: 是否启用记忆
- auto_execute_tools: 是否自动执行工具
- trace_id: Trace ID(可选,传入时复用已有 Trace)
- **kwargs: 其他参数
- Yields:
- Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- # 使用配置默认值
- agent_type = agent_type or self.config.agent_type
- max_iterations = max_iterations or self.config.max_iterations
- enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory
- auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools
- # 准备工具 Schema(提前准备,用于 Trace)
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 创建或复用 Trace
- if trace_id:
- if self.trace_store:
- trace_obj = await self.trace_store.get_trace(trace_id)
- if not trace_obj:
- raise ValueError(f"Trace not found: {trace_id}")
- else:
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task,
- agent_type=agent_type,
- uid=uid,
- model=model,
- tools=tool_schemas,
- llm_params=kwargs,
- status="running"
- )
- else:
- trace_id = self._generate_id()
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task,
- agent_type=agent_type,
- uid=uid,
- model=model,
- tools=tool_schemas, # 保存工具定义
- llm_params=kwargs, # 保存 LLM 参数
- status="running"
- )
- if self.trace_store:
- await self.trace_store.create_trace(trace_obj)
- # 初始化 GoalTree
- goal_tree = self.goal_tree or GoalTree(mission=task)
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- # 返回 Trace(表示开始)
- yield trace_obj
- try:
- # 加载记忆(Experience 和 Skill)
- experiences_text = ""
- skills_text = ""
- if enable_memory and self.memory_store:
- scope = f"agent:{agent_type}"
- experiences = await self.memory_store.search_experiences(scope, task)
- experiences_text = self._format_experiences(experiences)
- logger.info(f"加载 {len(experiences)} 条经验")
- # 加载 Skills(内置 + 用户自定义)
- skills = load_skills_from_dir(self.skills_dir)
- if skills:
- skills_text = self._format_skills(skills)
- if self.skills_dir:
- logger.info(f"加载 {len(skills)} 个 skills (内置 + 自定义: {self.skills_dir})")
- else:
- logger.info(f"加载 {len(skills)} 个内置 skills")
- # 构建初始消息
- sequence = 1
- if messages is None:
- if trace_id and self.trace_store:
- existing_messages = await self.trace_store.get_trace_messages(trace_id)
- messages = []
- for msg in existing_messages:
- msg_dict = {"role": msg.role}
- if isinstance(msg.content, dict):
- if msg.content.get("text"):
- msg_dict["content"] = msg.content["text"]
- if msg.content.get("tool_calls"):
- msg_dict["tool_calls"] = msg.content["tool_calls"]
- else:
- msg_dict["content"] = msg.content
- if msg.role == "tool" and msg.tool_call_id:
- msg_dict["tool_call_id"] = msg.tool_call_id
- msg_dict["name"] = msg.description or "unknown"
- messages.append(msg_dict)
- if existing_messages:
- sequence = existing_messages[-1].sequence + 1
- else:
- messages = []
- # 记录初始 system 和 user 消息到 trace
- if system_prompt and not any(m.get("role") == "system" for m in messages):
- # 注入记忆和 skills 到 system prompt
- full_system = system_prompt
- if skills_text:
- full_system += f"\n\n## Skills\n{skills_text}"
- if experiences_text:
- full_system += f"\n\n## 相关经验\n{experiences_text}"
- messages = [{"role": "system", "content": full_system}] + messages
- # 保存 system 消息
- if self.trace_store:
- system_msg = Message.create(
- trace_id=trace_id,
- role="system",
- sequence=sequence,
- goal_id=None, # 初始消息没有 goal
- content=full_system,
- )
- await self.trace_store.add_message(system_msg)
- yield system_msg
- sequence += 1
- # 添加任务描述(支持 continue_from 场景再次追加)
- if task:
- messages.append({"role": "user", "content": task})
- # 保存 user 消息(任务描述)
- if self.trace_store:
- user_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- goal_id=None, # 初始消息没有 goal
- content=task,
- )
- await self.trace_store.add_message(user_msg)
- yield user_msg
- sequence += 1
- # 获取 GoalTree
- goal_tree = None
- if self.trace_store:
- goal_tree = await self.trace_store.get_goal_tree(trace_id)
- # 设置 goal_tree 到 goal 工具(供 LLM 调用)
- from agent.trace.goal_tool import set_goal_tree
- set_goal_tree(goal_tree)
- # 执行循环
- for iteration in range(max_iterations):
- # 注入当前计划到 messages(如果有 goals)
- llm_messages = list(messages)
- if goal_tree and goal_tree.goals:
- plan_text = f"\n## Current Plan\n\n{goal_tree.to_prompt()}"
- # 在最后一条 system 消息之后注入
- llm_messages.append({"role": "system", "content": plan_text})
- # 调用 LLM
- result = await self.llm_call(
- messages=llm_messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- response_content = result.get("content", "")
- tool_calls = result.get("tool_calls")
- finish_reason = result.get("finish_reason")
- prompt_tokens = result.get("prompt_tokens", 0)
- completion_tokens = result.get("completion_tokens", 0)
- step_tokens = prompt_tokens + completion_tokens
- step_cost = result.get("cost", 0)
- # 按需自动创建 root goal:LLM 有 tool 调用但未主动创建目标时兜底
- if goal_tree and not goal_tree.goals and tool_calls:
- has_goal_call = any(
- tc.get("function", {}).get("name") == "goal"
- for tc in tool_calls
- )
- if not has_goal_call:
- root_desc = goal_tree.mission[:200] if len(goal_tree.mission) > 200 else goal_tree.mission
- goal_tree.add_goals(
- descriptions=[root_desc],
- reasons=["系统自动创建:Agent 未显式创建目标"],
- parent_id=None
- )
- goal_tree.focus(goal_tree.goals[0].id)
- if self.trace_store:
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- await self.trace_store.add_goal(trace_id, goal_tree.goals[0])
- logger.info(f"自动创建 root goal: {goal_tree.goals[0].id}")
- # 获取当前 goal_id
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- # 记录 assistant Message
- assistant_msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=sequence,
- goal_id=current_goal_id,
- content={"text": response_content, "tool_calls": tool_calls},
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- finish_reason=finish_reason,
- cost=step_cost,
- )
- if self.trace_store:
- await self.trace_store.add_message(assistant_msg)
- # WebSocket 广播由 add_message 内部的 append_event 触发
- yield assistant_msg
- sequence += 1
- # 处理工具调用
- if tool_calls and auto_execute_tools:
- # 添加 assistant 消息到对话历史
- messages.append({
- "role": "assistant",
- "content": response_content,
- "tool_calls": tool_calls,
- })
- for tc in tool_calls:
- # 每次工具执行前重新获取最新的 goal_id(处理并行 tool_calls 的情况)
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- tool_name = tc["function"]["name"]
- tool_args = tc["function"]["arguments"]
- # 解析参数
- if isinstance(tool_args, str):
- if tool_args.strip(): # 非空字符串
- import json
- tool_args = json.loads(tool_args)
- else:
- tool_args = {} # 空字符串转换为空字典
- elif tool_args is None:
- tool_args = {} # None 转换为空字典
- # 执行工具(统一处理,传递 context)
- tool_result = await self.tools.execute(
- tool_name,
- tool_args,
- uid=uid or "",
- context={
- "store": self.trace_store,
- "trace_id": trace_id,
- "goal_id": current_goal_id,
- "runner": self,
- }
- )
- # 记录 tool Message
- tool_msg = Message.create(
- trace_id=trace_id,
- role="tool",
- sequence=sequence,
- goal_id=current_goal_id,
- tool_call_id=tc["id"],
- content={"tool_name": tool_name, "result": tool_result},
- )
- if self.trace_store:
- await self.trace_store.add_message(tool_msg)
- yield tool_msg
- sequence += 1
- # 添加到消息历史
- messages.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "name": tool_name,
- "content": str(tool_result),
- })
- continue # 继续循环
- # 无工具调用,任务完成
- break
- # 完成 Trace
- if self.trace_store:
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- )
- # 重新获取更新后的 Trace 并返回
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- except Exception as e:
- logger.error(f"Agent run failed: {e}")
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="failed",
- error_message=str(e),
- completed_at=datetime.now()
- )
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- raise
- # ===== 辅助方法 =====
- def _format_skills(self, skills: List[Skill]) -> str:
- """格式化技能为 Prompt 文本"""
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
- def _format_experiences(self, experiences: List[Experience]) -> str:
- """格式化经验为 Prompt 文本"""
- if not experiences:
- return ""
- return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)
|