""" Agent Runner - Agent 执行引擎 核心职责: 1. 执行 Agent 任务(循环调用 LLM + 工具) 2. 记录执行图(Trace + Steps) 3. 检索和注入记忆(Experience + Skill) 4. 收集反馈,提取经验 """ import logging from dataclasses import dataclass, field from datetime import datetime from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal from agent.events import AgentEvent from agent.models.trace import Trace, Step from agent.models.memory import Experience, Skill from agent.storage.protocols import TraceStore, MemoryStore, StateStore from agent.tools import ToolRegistry, get_tool_registry logger = logging.getLogger(__name__) @dataclass class AgentConfig: """Agent 配置""" agent_type: str = "default" max_iterations: int = 10 enable_memory: bool = True auto_execute_tools: bool = True @dataclass class CallResult: """单次调用结果""" reply: str tool_calls: Optional[List[Dict]] = None trace_id: Optional[str] = None step_id: Optional[str] = None tokens: Optional[Dict[str, int]] = None cost: float = 0.0 class AgentRunner: """ Agent 执行引擎 支持两种模式: 1. call(): 单次 LLM 调用(简洁 API) 2. run(): Agent 模式(循环 + 记忆 + 追踪) """ def __init__( self, trace_store: Optional[TraceStore] = None, memory_store: Optional[MemoryStore] = None, state_store: Optional[StateStore] = None, tool_registry: Optional[ToolRegistry] = None, llm_call: Optional[Callable] = None, config: Optional[AgentConfig] = None, ): """ 初始化 AgentRunner Args: trace_store: Trace 存储(可选,不提供则不记录) memory_store: Memory 存储(可选,不提供则不使用记忆) state_store: State 存储(可选,用于任务状态) tool_registry: 工具注册表(可选,默认使用全局注册表) llm_call: LLM 调用函数(必须提供,用于实际调用 LLM) config: Agent 配置 """ self.trace_store = trace_store self.memory_store = memory_store self.state_store = state_store self.tools = tool_registry or get_tool_registry() self.llm_call = llm_call self.config = config or AgentConfig() def _generate_id(self) -> str: """生成唯一 ID""" import uuid return str(uuid.uuid4()) # ===== 单次调用 ===== async def call( self, messages: List[Dict], model: str = "gpt-4o", tools: Optional[List[str]] = None, uid: Optional[str] = None, trace: bool = True, **kwargs ) -> CallResult: """ 单次 LLM 调用 Args: messages: 消息列表 model: 模型名称 tools: 工具名称列表 uid: 用户 ID trace: 是否记录 Trace **kwargs: 其他参数传递给 LLM Returns: CallResult """ if not self.llm_call: raise ValueError("llm_call function not provided") trace_id = None step_id = None # 创建 Trace if trace and self.trace_store: trace_obj = Trace.create( mode="call", uid=uid, context={"model": model} ) trace_id = await self.trace_store.create_trace(trace_obj) # 准备工具 Schema tool_schemas = None if tools: tool_schemas = self.tools.get_schemas(tools) # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) # 记录 Step if trace and self.trace_store and trace_id: step = Step.create( trace_id=trace_id, step_type="llm_call", sequence=0, data={ "messages": messages, "response": result.get("content", ""), "model": model, "tool_calls": result.get("tool_calls"), "prompt_tokens": result.get("prompt_tokens", 0), "completion_tokens": result.get("completion_tokens", 0), "cost": result.get("cost", 0), } ) step_id = await self.trace_store.add_step(step) # 完成 Trace await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0), total_cost=result.get("cost", 0) ) return CallResult( reply=result.get("content", ""), tool_calls=result.get("tool_calls"), trace_id=trace_id, step_id=step_id, tokens={ "prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0), }, cost=result.get("cost", 0) ) # ===== Agent 模式 ===== async def run( self, task: str, messages: Optional[List[Dict]] = None, system_prompt: Optional[str] = None, model: str = "gpt-4o", tools: Optional[List[str]] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, max_iterations: Optional[int] = None, enable_memory: Optional[bool] = None, auto_execute_tools: Optional[bool] = None, **kwargs ) -> AsyncIterator[AgentEvent]: """ Agent 模式执行 Args: task: 任务描述 messages: 初始消息(可选) system_prompt: 系统提示(可选) model: 模型名称 tools: 工具名称列表 agent_type: Agent 类型 uid: 用户 ID max_iterations: 最大迭代次数 enable_memory: 是否启用记忆 auto_execute_tools: 是否自动执行工具 **kwargs: 其他参数 Yields: AgentEvent """ if not self.llm_call: raise ValueError("llm_call function not provided") # 使用配置默认值 agent_type = agent_type or self.config.agent_type max_iterations = max_iterations or self.config.max_iterations enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools # 创建 Trace trace_id = self._generate_id() if self.trace_store: trace_obj = Trace( trace_id=trace_id, mode="agent", task=task, agent_type=agent_type, uid=uid, context={"model": model, **kwargs} ) await self.trace_store.create_trace(trace_obj) yield AgentEvent("trace_started", { "trace_id": trace_id, "task": task, "agent_type": agent_type }) try: # 加载记忆 skills_text = "" experiences_text = "" if enable_memory and self.memory_store: scope = f"agent:{agent_type}" skills = await self.memory_store.search_skills(scope, task) experiences = await self.memory_store.search_experiences(scope, task) skills_text = self._format_skills(skills) experiences_text = self._format_experiences(experiences) # 记录 memory_read Step if self.trace_store: mem_step = Step.create( trace_id=trace_id, step_type="memory_read", sequence=0, data={ "skills_count": len(skills), "experiences_count": len(experiences), "skills": [s.to_dict() for s in skills], "experiences": [e.to_dict() for e in experiences], } ) await self.trace_store.add_step(mem_step) yield AgentEvent("memory_loaded", { "skills_count": len(skills), "experiences_count": len(experiences) }) # 构建初始消息 if messages is None: messages = [] if system_prompt: # 注入记忆到 system prompt full_system = system_prompt if skills_text: full_system += f"\n\n## 相关技能\n{skills_text}" if experiences_text: full_system += f"\n\n## 相关经验\n{experiences_text}" messages = [{"role": "system", "content": full_system}] + messages # 添加任务描述 messages.append({"role": "user", "content": task}) # 准备工具 tool_schemas = None if tools: tool_schemas = self.tools.get_schemas(tools) # 执行循环 parent_step_ids = [] sequence = 1 total_tokens = 0 total_cost = 0.0 for iteration in range(max_iterations): yield AgentEvent("step_started", { "iteration": iteration, "step_type": "llm_call" }) # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) response_content = result.get("content", "") tool_calls = result.get("tool_calls") tokens = result.get("prompt_tokens", 0) + result.get("completion_tokens", 0) cost = result.get("cost", 0) total_tokens += tokens total_cost += cost # 记录 LLM 调用 Step llm_step_id = self._generate_id() if self.trace_store: llm_step = Step( step_id=llm_step_id, trace_id=trace_id, step_type="llm_call", sequence=sequence, parent_ids=parent_step_ids, data={ "messages": messages, "response": response_content, "model": model, "tool_calls": tool_calls, "prompt_tokens": result.get("prompt_tokens", 0), "completion_tokens": result.get("completion_tokens", 0), "cost": cost, } ) await self.trace_store.add_step(llm_step) sequence += 1 parent_step_ids = [llm_step_id] yield AgentEvent("llm_call_completed", { "step_id": llm_step_id, "content": response_content, "tool_calls": tool_calls, "tokens": tokens, "cost": cost }) # 处理工具调用 if tool_calls and auto_execute_tools: # 检查是否需要用户确认 if self.tools.check_confirmation_required(tool_calls): yield AgentEvent("awaiting_user_action", { "tool_calls": tool_calls, "confirmation_flags": self.tools.get_confirmation_flags(tool_calls), "editable_params": self.tools.get_editable_params_map(tool_calls) }) # TODO: 等待用户确认 break # 执行工具 messages.append({"role": "assistant", "content": response_content, "tool_calls": tool_calls}) for tc in tool_calls: tool_name = tc["function"]["name"] tool_args = tc["function"]["arguments"] if isinstance(tool_args, str): import json tool_args = json.loads(tool_args) yield AgentEvent("tool_executing", { "tool_name": tool_name, "arguments": tool_args }) # 执行工具 tool_result = await self.tools.execute( tool_name, tool_args, uid=uid or "" ) # 记录 tool_call Step tool_step_id = self._generate_id() if self.trace_store: tool_step = Step( step_id=tool_step_id, trace_id=trace_id, step_type="tool_call", sequence=sequence, parent_ids=[llm_step_id], data={ "tool_name": tool_name, "arguments": tool_args, "result": tool_result, } ) await self.trace_store.add_step(tool_step) sequence += 1 parent_step_ids.append(tool_step_id) yield AgentEvent("tool_result", { "step_id": tool_step_id, "tool_name": tool_name, "result": tool_result }) # 添加到消息(Gemini 需要 name 字段!) messages.append({ "role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": tool_result }) continue # 继续循环 # 无工具调用,任务完成 # 记录 conclusion Step conclusion_step_id = self._generate_id() if self.trace_store: conclusion_step = Step( step_id=conclusion_step_id, trace_id=trace_id, step_type="conclusion", sequence=sequence, parent_ids=parent_step_ids, data={ "content": response_content, "is_final": True } ) await self.trace_store.add_step(conclusion_step) yield AgentEvent("conclusion", { "step_id": conclusion_step_id, "content": response_content, "is_final": True }) break # 完成 Trace if self.trace_store: await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=total_tokens, total_cost=total_cost ) yield AgentEvent("trace_completed", { "trace_id": trace_id, "total_tokens": total_tokens, "total_cost": total_cost }) except Exception as e: logger.error(f"Agent run failed: {e}") if self.trace_store: await self.trace_store.update_trace( trace_id, status="failed", completed_at=datetime.now() ) yield AgentEvent("trace_failed", { "trace_id": trace_id, "error": str(e) }) raise # ===== 反馈 ===== async def add_feedback( self, trace_id: str, target_step_id: str, feedback_type: Literal["positive", "negative", "correction"], content: str, extract_experience: bool = True ) -> Optional[str]: """ 添加人工反馈 Args: trace_id: Trace ID target_step_id: 反馈针对的 Step ID feedback_type: 反馈类型 content: 反馈内容 extract_experience: 是否自动提取经验 Returns: experience_id: 如果提取了经验 """ if not self.trace_store: return None # 获取 Trace trace = await self.trace_store.get_trace(trace_id) if not trace: logger.warning(f"Trace not found: {trace_id}") return None # 创建 feedback Step steps = await self.trace_store.get_trace_steps(trace_id) max_seq = max(s.sequence for s in steps) if steps else 0 feedback_step = Step.create( trace_id=trace_id, step_type="feedback", sequence=max_seq + 1, parent_ids=[target_step_id], data={ "target_step_id": target_step_id, "feedback_type": feedback_type, "content": content } ) await self.trace_store.add_step(feedback_step) # 提取经验 exp_id = None if extract_experience and self.memory_store and feedback_type in ("positive", "correction"): exp = Experience.create( scope=f"agent:{trace.agent_type}" if trace.agent_type else "agent:default", condition=f"执行类似 '{trace.task}' 任务时" if trace.task else "通用场景", rule=content, evidence=[target_step_id, feedback_step.step_id], source="feedback", confidence=0.8 if feedback_type == "positive" else 0.6 ) exp_id = await self.memory_store.add_experience(exp) # 记录 memory_write Step mem_step = Step.create( trace_id=trace_id, step_type="memory_write", sequence=max_seq + 2, parent_ids=[feedback_step.step_id], data={ "experience_id": exp_id, "condition": exp.condition, "rule": exp.rule } ) await self.trace_store.add_step(mem_step) return exp_id # ===== 辅助方法 ===== def _format_skills(self, skills: List[Skill]) -> str: """格式化技能为 Prompt 文本""" if not skills: return "" return "\n\n".join(s.to_prompt_text() for s in skills) def _format_experiences(self, experiences: List[Experience]) -> str: """格式化经验为 Prompt 文本""" if not experiences: return "" return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)