| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- """
- Agent Runner - Agent 执行引擎
- 核心职责:
- 1. 执行 Agent 任务(循环调用 LLM + 工具)
- 2. 记录执行轨迹(Trace + Messages + GoalTree)
- 3. 检索和注入记忆(Experience + Skill)
- 4. 管理执行计划(GoalTree)
- 5. 收集反馈,提取经验
- """
- import logging
- from datetime import datetime
- from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Union
- from agent.core.config import AgentConfig, CallResult
- from agent.execution.models import Trace, Message
- from agent.execution.protocols import TraceStore
- from agent.goal.models import GoalTree
- from agent.goal.tool import goal_tool
- from agent.memory.models import Experience, Skill
- from agent.memory.protocols import MemoryStore, StateStore
- from agent.memory.skill_loader import load_skills_from_dir
- from agent.tools import ToolRegistry, get_tool_registry
- logger = logging.getLogger(__name__)
- # 内置工具列表(始终自动加载)
- BUILTIN_TOOLS = [
- # 文件操作工具
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- # 系统工具
- "bash_command",
- # 技能和目标管理
- "skill",
- "list_skills",
- "goal",
- # 搜索工具
- "search_posts",
- "get_search_suggestions",
- # 沙箱工具
- "sandbox_create_environment",
- "sandbox_run_shell",
- "sandbox_rebuild_with_ports",
- "sandbox_destroy_environment",
- # 浏览器工具
- "browser_navigate_to_url",
- "browser_search_web",
- "browser_go_back",
- "browser_wait",
- "browser_click_element",
- "browser_input_text",
- "browser_send_keys",
- "browser_upload_file",
- "browser_scroll_page",
- "browser_find_text",
- "browser_screenshot",
- "browser_switch_tab",
- "browser_close_tab",
- "browser_get_dropdown_options",
- "browser_select_dropdown_option",
- "browser_extract_content",
- "browser_get_page_html",
- "browser_get_selector_map",
- "browser_evaluate",
- "browser_ensure_login_with_cookies",
- "browser_wait_for_user_action",
- "browser_done",
- ]
- class AgentRunner:
- """
- Agent 执行引擎
- 支持两种模式:
- 1. call(): 单次 LLM 调用(简洁 API)
- 2. run(): Agent 模式(循环 + 记忆 + 追踪)
- """
- def __init__(
- self,
- trace_store: Optional[TraceStore] = None,
- memory_store: Optional[MemoryStore] = None,
- state_store: Optional[StateStore] = None,
- tool_registry: Optional[ToolRegistry] = None,
- llm_call: Optional[Callable] = None,
- config: Optional[AgentConfig] = None,
- skills_dir: Optional[str] = None,
- goal_tree: Optional[GoalTree] = None,
- debug: bool = False,
- ):
- """
- 初始化 AgentRunner
- Args:
- trace_store: Trace 存储(可选,不提供则不记录)
- memory_store: Memory 存储(可选,不提供则不使用记忆)
- state_store: State 存储(可选,用于任务状态)
- tool_registry: 工具注册表(可选,默认使用全局注册表)
- llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
- config: Agent 配置
- skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
- goal_tree: 执行计划(可选,不提供则在运行时按需创建)
- debug: 保留参数(已废弃,请使用 API Server 可视化)
- """
- self.trace_store = trace_store
- self.memory_store = memory_store
- self.state_store = state_store
- self.tools = tool_registry or get_tool_registry()
- self.llm_call = llm_call
- self.config = config or AgentConfig()
- self.skills_dir = skills_dir
- self.goal_tree = goal_tree
- self.debug = debug
- def _generate_id(self) -> str:
- """生成唯一 ID"""
- import uuid
- return str(uuid.uuid4())
- # ===== 单次调用 =====
- async def call(
- self,
- messages: List[Dict],
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- uid: Optional[str] = None,
- trace: bool = True,
- **kwargs
- ) -> CallResult:
- """
- 单次 LLM 调用
- Args:
- messages: 消息列表
- model: 模型名称
- tools: 工具名称列表
- uid: 用户 ID
- trace: 是否记录 Trace
- **kwargs: 其他参数传递给 LLM
- Returns:
- CallResult
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- trace_id = None
- message_id = None
- # 准备工具 Schema
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 创建 Trace
- if trace and self.trace_store:
- trace_obj = Trace.create(
- mode="call",
- uid=uid,
- model=model,
- tools=tool_schemas, # 保存工具定义
- llm_params=kwargs, # 保存 LLM 参数
- )
- trace_id = await self.trace_store.create_trace(trace_obj)
- # 调用 LLM
- result = await self.llm_call(
- messages=messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- # 记录 Message(单次调用模式不使用 GoalTree)
- if trace and self.trace_store and trace_id:
- msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=1,
- goal_id=None, # 单次调用没有 goal
- content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
- prompt_tokens=result.get("prompt_tokens", 0),
- completion_tokens=result.get("completion_tokens", 0),
- finish_reason=result.get("finish_reason"),
- cost=result.get("cost", 0),
- )
- message_id = await self.trace_store.add_message(msg)
- # 完成 Trace
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- )
- return CallResult(
- reply=result.get("content", ""),
- tool_calls=result.get("tool_calls"),
- trace_id=trace_id,
- step_id=message_id, # 兼容字段名
- tokens={
- "prompt": result.get("prompt_tokens", 0),
- "completion": result.get("completion_tokens", 0),
- },
- cost=result.get("cost", 0)
- )
- # ===== Agent 模式 =====
- async def run(
- self,
- task: str,
- messages: Optional[List[Dict]] = None,
- system_prompt: Optional[str] = None,
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- max_iterations: Optional[int] = None,
- enable_memory: Optional[bool] = None,
- auto_execute_tools: Optional[bool] = None,
- **kwargs
- ) -> AsyncIterator[Union[Trace, Message]]:
- """
- Agent 模式执行
- Args:
- task: 任务描述
- messages: 初始消息(可选)
- system_prompt: 系统提示(可选)
- model: 模型名称
- tools: 工具名称列表
- agent_type: Agent 类型
- uid: 用户 ID
- max_iterations: 最大迭代次数
- enable_memory: 是否启用记忆
- auto_execute_tools: 是否自动执行工具
- **kwargs: 其他参数
- Yields:
- Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- # 使用配置默认值
- agent_type = agent_type or self.config.agent_type
- max_iterations = max_iterations or self.config.max_iterations
- enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory
- auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools
- # 准备工具 Schema(提前准备,用于 Trace)
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 创建 Trace
- trace_id = self._generate_id()
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task,
- agent_type=agent_type,
- uid=uid,
- model=model,
- tools=tool_schemas, # 保存工具定义
- llm_params=kwargs, # 保存 LLM 参数
- status="running"
- )
- if self.trace_store:
- await self.trace_store.create_trace(trace_obj)
- # 初始化 GoalTree
- goal_tree = self.goal_tree or GoalTree(mission=task)
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- # 返回 Trace(表示开始)
- yield trace_obj
- try:
- # 加载记忆(Experience 和 Skill)
- experiences_text = ""
- skills_text = ""
- if enable_memory and self.memory_store:
- scope = f"agent:{agent_type}"
- experiences = await self.memory_store.search_experiences(scope, task)
- experiences_text = self._format_experiences(experiences)
- logger.info(f"加载 {len(experiences)} 条经验")
- # 加载 Skills(内置 + 用户自定义)
- skills = load_skills_from_dir(self.skills_dir)
- if skills:
- skills_text = self._format_skills(skills)
- if self.skills_dir:
- logger.info(f"加载 {len(skills)} 个 skills (内置 + 自定义: {self.skills_dir})")
- else:
- logger.info(f"加载 {len(skills)} 个内置 skills")
- # 构建初始消息
- if messages is None:
- messages = []
- # 记录初始 system 和 user 消息到 trace
- sequence = 1
- if system_prompt:
- # 注入记忆和 skills 到 system prompt
- full_system = system_prompt
- if skills_text:
- full_system += f"\n\n## Skills\n{skills_text}"
- if experiences_text:
- full_system += f"\n\n## 相关经验\n{experiences_text}"
- messages = [{"role": "system", "content": full_system}] + messages
- # 保存 system 消息
- if self.trace_store:
- system_msg = Message.create(
- trace_id=trace_id,
- role="system",
- sequence=sequence,
- goal_id=None, # 初始消息没有 goal
- content=full_system,
- )
- await self.trace_store.add_message(system_msg)
- yield system_msg
- sequence += 1
- # 添加任务描述
- messages.append({"role": "user", "content": task})
- # 保存 user 消息(任务描述)
- if self.trace_store:
- user_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- goal_id=None, # 初始消息没有 goal
- content=task,
- )
- await self.trace_store.add_message(user_msg)
- yield user_msg
- sequence += 1
- # 获取 GoalTree
- goal_tree = None
- if self.trace_store:
- goal_tree = await self.trace_store.get_goal_tree(trace_id)
- # 设置 goal_tree 到 goal 工具(供 LLM 调用)
- from agent.tools.builtin.goal import set_goal_tree
- set_goal_tree(goal_tree)
- # 执行循环
- for iteration in range(max_iterations):
- # 注入当前计划到 messages(如果有 goals)
- llm_messages = list(messages)
- if goal_tree and goal_tree.goals:
- plan_text = f"\n## Current Plan\n\n{goal_tree.to_prompt()}"
- # 在最后一条 system 消息之后注入
- llm_messages.append({"role": "system", "content": plan_text})
- # 调用 LLM
- result = await self.llm_call(
- messages=llm_messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- response_content = result.get("content", "")
- tool_calls = result.get("tool_calls")
- finish_reason = result.get("finish_reason")
- prompt_tokens = result.get("prompt_tokens", 0)
- completion_tokens = result.get("completion_tokens", 0)
- step_tokens = prompt_tokens + completion_tokens
- step_cost = result.get("cost", 0)
- # 获取当前 goal_id
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- # 记录 assistant Message
- assistant_msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=sequence,
- goal_id=current_goal_id,
- content={"text": response_content, "tool_calls": tool_calls},
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- finish_reason=finish_reason,
- cost=step_cost,
- )
- if self.trace_store:
- await self.trace_store.add_message(assistant_msg)
- # WebSocket 广播由 add_message 内部的 append_event 触发
- yield assistant_msg
- sequence += 1
- # 处理工具调用
- if tool_calls and auto_execute_tools:
- # 添加 assistant 消息到对话历史
- messages.append({
- "role": "assistant",
- "content": response_content,
- "tool_calls": tool_calls,
- })
- for tc in tool_calls:
- tool_name = tc["function"]["name"]
- tool_args = tc["function"]["arguments"]
- # 解析参数
- if isinstance(tool_args, str):
- if tool_args.strip(): # 非空字符串
- import json
- tool_args = json.loads(tool_args)
- else:
- tool_args = {} # 空字符串转换为空字典
- elif tool_args is None:
- tool_args = {} # None 转换为空字典
- # 执行工具(统一处理,传递 context)
- tool_result = await self.tools.execute(
- tool_name,
- tool_args,
- uid=uid or "",
- context={
- "store": self.trace_store,
- "trace_id": trace_id
- }
- )
- # 记录 tool Message
- tool_msg = Message.create(
- trace_id=trace_id,
- role="tool",
- sequence=sequence,
- goal_id=current_goal_id,
- tool_call_id=tc["id"],
- content={"tool_name": tool_name, "result": tool_result},
- )
- if self.trace_store:
- await self.trace_store.add_message(tool_msg)
- yield tool_msg
- sequence += 1
- # 添加到消息历史
- messages.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "name": tool_name,
- "content": str(tool_result),
- })
- continue # 继续循环
- # 无工具调用,任务完成
- break
- # 完成 Trace
- if self.trace_store:
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- )
- # 重新获取更新后的 Trace 并返回
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- except Exception as e:
- logger.error(f"Agent run failed: {e}")
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="failed",
- error_message=str(e),
- completed_at=datetime.now()
- )
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- raise
- # ===== 辅助方法 =====
- def _format_skills(self, skills: List[Skill]) -> str:
- """格式化技能为 Prompt 文本"""
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
- def _format_experiences(self, experiences: List[Experience]) -> str:
- """格式化经验为 Prompt 文本"""
- if not experiences:
- return ""
- return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)
|