""" Agent Runner - Agent 执行引擎 核心职责: 1. 执行 Agent 任务(循环调用 LLM + 工具) 2. 记录执行图(Trace + Steps) 3. 检索和注入记忆(Experience + Skill) 4. 收集反馈,提取经验 """ import logging from dataclasses import field from datetime import datetime from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Union from agent.core.config import AgentConfig, CallResult from agent.execution import Trace, Step, TraceStore from agent.memory.models import Experience, Skill from agent.memory.protocols import MemoryStore, StateStore from agent.memory.skill_loader import load_skills_from_dir from agent.tools import ToolRegistry, get_tool_registry logger = logging.getLogger(__name__) # 内置工具列表(始终自动加载) BUILTIN_TOOLS = [ "read_file", "edit_file", "write_file", "glob_files", "grep_content", "bash_command", "skill", "list_skills", ] class AgentRunner: """ Agent 执行引擎 支持两种模式: 1. call(): 单次 LLM 调用(简洁 API) 2. run(): Agent 模式(循环 + 记忆 + 追踪) """ def __init__( self, trace_store: Optional[TraceStore] = None, memory_store: Optional[MemoryStore] = None, state_store: Optional[StateStore] = None, tool_registry: Optional[ToolRegistry] = None, llm_call: Optional[Callable] = None, config: Optional[AgentConfig] = None, skills_dir: Optional[str] = None, debug: bool = False, ): """ 初始化 AgentRunner Args: trace_store: Trace 存储(可选,不提供则不记录) memory_store: Memory 存储(可选,不提供则不使用记忆) state_store: State 存储(可选,用于任务状态) tool_registry: 工具注册表(可选,默认使用全局注册表) llm_call: LLM 调用函数(必须提供,用于实际调用 LLM) config: Agent 配置 skills_dir: Skills 目录路径(可选,不提供则不加载 skills) debug: 保留参数(已废弃,请使用 API Server 可视化) """ self.trace_store = trace_store self.memory_store = memory_store self.state_store = state_store self.tools = tool_registry or get_tool_registry() self.llm_call = llm_call self.config = config or AgentConfig() self.skills_dir = skills_dir self.debug = debug def _generate_id(self) -> str: """生成唯一 ID""" import uuid return str(uuid.uuid4()) async def _dump_debug(self, trace_id: str) -> None: """Debug 模式(已废弃 - 使用 API 可视化替代)""" # 不再自动生成 tree.txt/tree.md/tree.json # 请使用 API Server 进行可视化:python3 api_server.py pass # ===== 单次调用 ===== async def call( self, messages: List[Dict], model: str = "gpt-4o", tools: Optional[List[str]] = None, uid: Optional[str] = None, trace: bool = True, **kwargs ) -> CallResult: """ 单次 LLM 调用 Args: messages: 消息列表 model: 模型名称 tools: 工具名称列表 uid: 用户 ID trace: 是否记录 Trace **kwargs: 其他参数传递给 LLM Returns: CallResult """ if not self.llm_call: raise ValueError("llm_call function not provided") trace_id = None step_id = None # 创建 Trace if trace and self.trace_store: trace_obj = Trace.create( mode="call", uid=uid, context={"model": model} ) trace_id = await self.trace_store.create_trace(trace_obj) # 准备工具 Schema # 合并内置工具 + 用户指定工具 tool_names = BUILTIN_TOOLS.copy() if tools: # 添加用户指定的工具(去重) for tool in tools: if tool not in tool_names: tool_names.append(tool) tool_schemas = self.tools.get_schemas(tool_names) # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) # 记录 Step if trace and self.trace_store and trace_id: step = Step.create( trace_id=trace_id, step_type="thought", sequence=0, status="completed", description=f"LLM 调用 ({model})", data={ "messages": messages, "response": result.get("content", ""), "model": model, "tools": tool_schemas, # 记录传给模型的 tools schema "tool_calls": result.get("tool_calls"), }, tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0), cost=result.get("cost", 0), ) step_id = await self.trace_store.add_step(step) await self._dump_debug(trace_id) # 完成 Trace await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0), total_cost=result.get("cost", 0) ) return CallResult( reply=result.get("content", ""), tool_calls=result.get("tool_calls"), trace_id=trace_id, step_id=step_id, tokens={ "prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0), }, cost=result.get("cost", 0) ) # ===== Agent 模式 ===== async def run( self, task: str, messages: Optional[List[Dict]] = None, system_prompt: Optional[str] = None, model: str = "gpt-4o", tools: Optional[List[str]] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, max_iterations: Optional[int] = None, enable_memory: Optional[bool] = None, auto_execute_tools: Optional[bool] = None, **kwargs ) -> AsyncIterator[Union[Trace, Step]]: """ Agent 模式执行 Args: task: 任务描述 messages: 初始消息(可选) system_prompt: 系统提示(可选) model: 模型名称 tools: 工具名称列表 agent_type: Agent 类型 uid: 用户 ID max_iterations: 最大迭代次数 enable_memory: 是否启用记忆 auto_execute_tools: 是否自动执行工具 **kwargs: 其他参数 Yields: Union[Trace, Step]: Trace 对象(状态变化)或 Step 对象(执行过程) """ if not self.llm_call: raise ValueError("llm_call function not provided") # 使用配置默认值 agent_type = agent_type or self.config.agent_type max_iterations = max_iterations or self.config.max_iterations enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools # 创建 Trace trace_id = self._generate_id() trace_obj = None if self.trace_store: trace_obj = Trace( trace_id=trace_id, mode="agent", task=task, agent_type=agent_type, uid=uid, context={"model": model, **kwargs} ) await self.trace_store.create_trace(trace_obj) # 返回 Trace(表示开始) yield trace_obj try: # 加载记忆(Experience 和 Skill) experiences_text = "" skills_text = "" if enable_memory and self.memory_store: scope = f"agent:{agent_type}" experiences = await self.memory_store.search_experiences(scope, task) experiences_text = self._format_experiences(experiences) # 记录 memory_read Step if self.trace_store: mem_step = Step.create( trace_id=trace_id, step_type="memory_read", sequence=0, status="completed", description=f"加载 {len(experiences)} 条经验", data={ "experiences_count": len(experiences), "experiences": [e.to_dict() for e in experiences], } ) await self.trace_store.add_step(mem_step) await self._dump_debug(trace_id) # 返回 Step(表示记忆加载完成) yield mem_step # 加载 Skills(内置 + 用户自定义) # load_skills_from_dir() 会自动加载 agent/skills/ 中的内置 skills # 如果提供了 skills_dir,会额外加载用户自定义的 skills skills = load_skills_from_dir(self.skills_dir) if skills: skills_text = self._format_skills(skills) if self.skills_dir: logger.info(f"加载 {len(skills)} 个 skills (内置 + 自定义: {self.skills_dir})") else: logger.info(f"加载 {len(skills)} 个内置 skills") # 构建初始消息 if messages is None: messages = [] if system_prompt: # 注入记忆和 skills 到 system prompt full_system = system_prompt if skills_text: full_system += f"\n\n## Skills\n{skills_text}" if experiences_text: full_system += f"\n\n## 相关经验\n{experiences_text}" messages = [{"role": "system", "content": full_system}] + messages # 添加任务描述 messages.append({"role": "user", "content": task}) # 准备工具 Schema # 合并内置工具 + 用户指定工具 tool_names = BUILTIN_TOOLS.copy() if tools: # 添加用户指定的工具(去重) for tool in tools: if tool not in tool_names: tool_names.append(tool) tool_schemas = self.tools.get_schemas(tool_names) # 执行循环 current_goal_id = None # 当前焦点 goal sequence = 1 total_tokens = 0 total_cost = 0.0 for iteration in range(max_iterations): # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) response_content = result.get("content", "") tool_calls = result.get("tool_calls") step_tokens = result.get("prompt_tokens", 0) + result.get("completion_tokens", 0) step_cost = result.get("cost", 0) total_tokens += step_tokens total_cost += step_cost # 记录 LLM 调用 Step llm_step_id = self._generate_id() llm_step = None if self.trace_store: # 推断 step_type step_type = "thought" if tool_calls: step_type = "thought" # 有工具调用的思考 elif not tool_calls and iteration > 0: step_type = "response" # 无工具调用,可能是最终回复 llm_step = Step( step_id=llm_step_id, trace_id=trace_id, step_type=step_type, status="completed", sequence=sequence, parent_id=current_goal_id, description=response_content[:100] + "..." if len(response_content) > 100 else response_content, data={ "messages": messages, # 记录完整的 messages(包含 system prompt) "content": response_content, "model": model, "tools": tool_schemas, # 记录传给模型的 tools schema "tool_calls": tool_calls, }, tokens=step_tokens, cost=step_cost, ) await self.trace_store.add_step(llm_step) await self._dump_debug(trace_id) # 返回 Step(LLM 思考完成) yield llm_step sequence += 1 # 处理工具调用 if tool_calls and auto_execute_tools: # 检查是否需要用户确认 if self.tools.check_confirmation_required(tool_calls): # 创建等待确认的 Step await_step = Step.create( trace_id=trace_id, step_type="action", status="awaiting_approval", sequence=sequence, parent_id=llm_step_id, description="等待用户确认工具调用", data={ "tool_calls": tool_calls, "confirmation_flags": self.tools.get_confirmation_flags(tool_calls), "editable_params": self.tools.get_editable_params_map(tool_calls) } ) if self.trace_store: await self.trace_store.add_step(await_step) await self._dump_debug(trace_id) yield await_step # TODO: 等待用户确认 break # 执行工具 messages.append({"role": "assistant", "content": response_content, "tool_calls": tool_calls}) for tc in tool_calls: tool_name = tc["function"]["name"] tool_args = tc["function"]["arguments"] if isinstance(tool_args, str): import json tool_args = json.loads(tool_args) # 执行工具 tool_result = await self.tools.execute( tool_name, tool_args, uid=uid or "" ) # 记录 action Step action_step_id = self._generate_id() action_step = None if self.trace_store: action_step = Step( step_id=action_step_id, trace_id=trace_id, step_type="action", status="completed", sequence=sequence, parent_id=llm_step_id, description=f"{tool_name}({', '.join(f'{k}={v}' for k, v in list(tool_args.items())[:2])})", data={ "tool_name": tool_name, "arguments": tool_args, } ) await self.trace_store.add_step(action_step) await self._dump_debug(trace_id) # 返回 Step(工具调用) yield action_step sequence += 1 # 记录 result Step result_step_id = self._generate_id() result_step = None if self.trace_store: result_step = Step( step_id=result_step_id, trace_id=trace_id, step_type="result", status="completed", sequence=sequence, parent_id=action_step_id, description=str(tool_result)[:100] if tool_result else "", data={ "tool_name": tool_name, "output": tool_result, } ) await self.trace_store.add_step(result_step) await self._dump_debug(trace_id) # 返回 Step(工具结果) yield result_step sequence += 1 # 添加到消息(Gemini 需要 name 字段!) messages.append({ "role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": tool_result }) continue # 继续循环 # 无工具调用,任务完成 # 记录 response Step response_step_id = self._generate_id() response_step = None if self.trace_store: response_step = Step( step_id=response_step_id, trace_id=trace_id, step_type="response", status="completed", sequence=sequence, parent_id=current_goal_id, description=response_content[:100] + "..." if len(response_content) > 100 else response_content, data={ "content": response_content, "is_final": True } ) await self.trace_store.add_step(response_step) await self._dump_debug(trace_id) # 返回 Step(最终回复) yield response_step break # 完成 Trace if self.trace_store: await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=total_tokens, total_cost=total_cost ) # 重新获取更新后的 Trace 并返回 trace_obj = await self.trace_store.get_trace(trace_id) if trace_obj: yield trace_obj except Exception as e: logger.error(f"Agent run failed: {e}") if self.trace_store: await self.trace_store.update_trace( trace_id, status="failed", completed_at=datetime.now() ) # 重新获取更新后的 Trace 并返回 trace_obj = await self.trace_store.get_trace(trace_id) if trace_obj: yield trace_obj raise # ===== 辅助方法 ===== def _format_skills(self, skills: List[Skill]) -> str: """格式化技能为 Prompt 文本""" if not skills: return "" return "\n\n".join(s.to_prompt_text() for s in skills) def _format_experiences(self, experiences: List[Experience]) -> str: """格式化经验为 Prompt 文本""" if not experiences: return "" return "\n".join(f"- {e.to_prompt_text()}" for e in experiences) def _format_skills(self, skills: List[Skill]) -> str: """格式化 Skills 为 Prompt 文本""" if not skills: return "" return "\n\n".join(s.to_prompt_text() for s in skills)