""" Agent Runner - Agent 执行引擎 核心职责: 1. 执行 Agent 任务(循环调用 LLM + 工具) 2. 记录执行图(Trace + Steps) 3. 检索和注入记忆(Experience + Skill) 4. 收集反馈,提取经验 """ import logging from dataclasses import dataclass, field from datetime import datetime from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal from reson_agent.events import AgentEvent from reson_agent.models.trace import Trace, Step from reson_agent.models.memory import Experience, Skill from reson_agent.storage.protocols import TraceStore, MemoryStore, StateStore from reson_agent.tools import ToolRegistry, get_tool_registry logger = logging.getLogger(__name__) @dataclass class AgentConfig: """Agent 配置""" agent_type: str = "default" max_iterations: int = 10 enable_memory: bool = True auto_execute_tools: bool = True @dataclass class CallResult: """单次调用结果""" reply: str tool_calls: Optional[List[Dict]] = None trace_id: Optional[str] = None step_id: Optional[str] = None tokens: Optional[Dict[str, int]] = None cost: float = 0.0 class AgentRunner: """ Agent 执行引擎 支持两种模式: 1. call(): 单次 LLM 调用(简洁 API) 2. run(): Agent 模式(循环 + 记忆 + 追踪) """ def __init__( self, trace_store: Optional[TraceStore] = None, memory_store: Optional[MemoryStore] = None, state_store: Optional[StateStore] = None, tool_registry: Optional[ToolRegistry] = None, llm_call: Optional[Callable] = None, config: Optional[AgentConfig] = None, ): """ 初始化 AgentRunner Args: trace_store: Trace 存储(可选,不提供则不记录) memory_store: Memory 存储(可选,不提供则不使用记忆) state_store: State 存储(可选,用于任务状态) tool_registry: 工具注册表(可选,默认使用全局注册表) llm_call: LLM 调用函数(必须提供,用于实际调用 LLM) config: Agent 配置 """ self.trace_store = trace_store self.memory_store = memory_store self.state_store = state_store self.tools = tool_registry or get_tool_registry() self.llm_call = llm_call self.config = config or AgentConfig() def _generate_id(self) -> str: """生成唯一 ID""" import uuid return str(uuid.uuid4()) # ===== 单次调用 ===== async def call( self, messages: List[Dict], model: str = "gpt-4o", tools: Optional[List[str]] = None, uid: Optional[str] = None, trace: bool = True, **kwargs ) -> CallResult: """ 单次 LLM 调用 Args: messages: 消息列表 model: 模型名称 tools: 工具名称列表 uid: 用户 ID trace: 是否记录 Trace **kwargs: 其他参数传递给 LLM Returns: CallResult """ if not self.llm_call: raise ValueError("llm_call function not provided") trace_id = None step_id = None # 创建 Trace if trace and self.trace_store: trace_obj = Trace.create( mode="call", uid=uid, context={"model": model} ) trace_id = await self.trace_store.create_trace(trace_obj) # 准备工具 Schema tool_schemas = None if tools: tool_schemas = self.tools.get_schemas(tools) # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) # 记录 Step if trace and self.trace_store and trace_id: step = Step.create( trace_id=trace_id, step_type="llm_call", sequence=0, data={ "messages": messages, "response": result.get("content", ""), "model": model, "tool_calls": result.get("tool_calls"), "prompt_tokens": result.get("prompt_tokens", 0), "completion_tokens": result.get("completion_tokens", 0), "cost": result.get("cost", 0), } ) step_id = await self.trace_store.add_step(step) # 完成 Trace await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0), total_cost=result.get("cost", 0) ) return CallResult( reply=result.get("content", ""), tool_calls=result.get("tool_calls"), trace_id=trace_id, step_id=step_id, tokens={ "prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0), }, cost=result.get("cost", 0) ) # ===== Agent 模式 ===== async def run( self, task: str, messages: Optional[List[Dict]] = None, system_prompt: Optional[str] = None, model: str = "gpt-4o", tools: Optional[List[str]] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, max_iterations: Optional[int] = None, enable_memory: Optional[bool] = None, auto_execute_tools: Optional[bool] = None, **kwargs ) -> AsyncIterator[AgentEvent]: """ Agent 模式执行 Args: task: 任务描述 messages: 初始消息(可选) system_prompt: 系统提示(可选) model: 模型名称 tools: 工具名称列表 agent_type: Agent 类型 uid: 用户 ID max_iterations: 最大迭代次数 enable_memory: 是否启用记忆 auto_execute_tools: 是否自动执行工具 **kwargs: 其他参数 Yields: AgentEvent """ if not self.llm_call: raise ValueError("llm_call function not provided") # 使用配置默认值 agent_type = agent_type or self.config.agent_type max_iterations = max_iterations or self.config.max_iterations enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools # 创建 Trace trace_id = self._generate_id() if self.trace_store: trace_obj = Trace( trace_id=trace_id, mode="agent", task=task, agent_type=agent_type, uid=uid, context={"model": model, **kwargs} ) await self.trace_store.create_trace(trace_obj) yield AgentEvent("trace_started", { "trace_id": trace_id, "task": task, "agent_type": agent_type }) try: # 加载记忆 skills_text = "" experiences_text = "" if enable_memory and self.memory_store: scope = f"agent:{agent_type}" skills = await self.memory_store.search_skills(scope, task) experiences = await self.memory_store.search_experiences(scope, task) skills_text = self._format_skills(skills) experiences_text = self._format_experiences(experiences) # 记录 memory_read Step if self.trace_store: mem_step = Step.create( trace_id=trace_id, step_type="memory_read", sequence=0, data={ "skills_count": len(skills), "experiences_count": len(experiences), "skills": [s.to_dict() for s in skills], "experiences": [e.to_dict() for e in experiences], } ) await self.trace_store.add_step(mem_step) yield AgentEvent("memory_loaded", { "skills_count": len(skills), "experiences_count": len(experiences) }) # 构建初始消息 if messages is None: messages = [] if system_prompt: # 注入记忆到 system prompt full_system = system_prompt if skills_text: full_system += f"\n\n## 相关技能\n{skills_text}" if experiences_text: full_system += f"\n\n## 相关经验\n{experiences_text}" messages = [{"role": "system", "content": full_system}] + messages # 添加任务描述 messages.append({"role": "user", "content": task}) # 准备工具 tool_schemas = None if tools: tool_schemas = self.tools.get_schemas(tools) # 执行循环 parent_step_ids = [] sequence = 1 total_tokens = 0 total_cost = 0.0 for iteration in range(max_iterations): yield AgentEvent("step_started", { "iteration": iteration, "step_type": "llm_call" }) # 调用 LLM result = await self.llm_call( messages=messages, model=model, tools=tool_schemas, **kwargs ) response_content = result.get("content", "") tool_calls = result.get("tool_calls") tokens = result.get("prompt_tokens", 0) + result.get("completion_tokens", 0) cost = result.get("cost", 0) total_tokens += tokens total_cost += cost # 记录 LLM 调用 Step llm_step_id = self._generate_id() if self.trace_store: llm_step = Step( step_id=llm_step_id, trace_id=trace_id, step_type="llm_call", sequence=sequence, parent_ids=parent_step_ids, data={ "messages": messages, "response": response_content, "model": model, "tool_calls": tool_calls, "prompt_tokens": result.get("prompt_tokens", 0), "completion_tokens": result.get("completion_tokens", 0), "cost": cost, } ) await self.trace_store.add_step(llm_step) sequence += 1 parent_step_ids = [llm_step_id] yield AgentEvent("llm_call_completed", { "step_id": llm_step_id, "content": response_content, "tool_calls": tool_calls, "tokens": tokens, "cost": cost }) # 处理工具调用 if tool_calls and auto_execute_tools: # 检查是否需要用户确认 if self.tools.check_confirmation_required(tool_calls): yield AgentEvent("awaiting_user_action", { "tool_calls": tool_calls, "confirmation_flags": self.tools.get_confirmation_flags(tool_calls), "editable_params": self.tools.get_editable_params_map(tool_calls) }) # TODO: 等待用户确认 break # 执行工具 messages.append({"role": "assistant", "content": response_content, "tool_calls": tool_calls}) for tc in tool_calls: tool_name = tc["function"]["name"] tool_args = tc["function"]["arguments"] if isinstance(tool_args, str): import json tool_args = json.loads(tool_args) yield AgentEvent("tool_executing", { "tool_name": tool_name, "arguments": tool_args }) # 执行工具 tool_result = await self.tools.execute( tool_name, tool_args, uid=uid or "" ) # 记录 tool_call Step tool_step_id = self._generate_id() if self.trace_store: tool_step = Step( step_id=tool_step_id, trace_id=trace_id, step_type="tool_call", sequence=sequence, parent_ids=[llm_step_id], data={ "tool_name": tool_name, "arguments": tool_args, "result": tool_result, } ) await self.trace_store.add_step(tool_step) sequence += 1 parent_step_ids.append(tool_step_id) yield AgentEvent("tool_result", { "step_id": tool_step_id, "tool_name": tool_name, "result": tool_result }) # 添加到消息 messages.append({ "role": "tool", "tool_call_id": tc["id"], "content": tool_result }) continue # 继续循环 # 无工具调用,任务完成 # 记录 conclusion Step conclusion_step_id = self._generate_id() if self.trace_store: conclusion_step = Step( step_id=conclusion_step_id, trace_id=trace_id, step_type="conclusion", sequence=sequence, parent_ids=parent_step_ids, data={ "content": response_content, "is_final": True } ) await self.trace_store.add_step(conclusion_step) yield AgentEvent("conclusion", { "step_id": conclusion_step_id, "content": response_content, "is_final": True }) break # 完成 Trace if self.trace_store: await self.trace_store.update_trace( trace_id, status="completed", completed_at=datetime.now(), total_tokens=total_tokens, total_cost=total_cost ) yield AgentEvent("trace_completed", { "trace_id": trace_id, "total_tokens": total_tokens, "total_cost": total_cost }) except Exception as e: logger.error(f"Agent run failed: {e}") if self.trace_store: await self.trace_store.update_trace( trace_id, status="failed", completed_at=datetime.now() ) yield AgentEvent("trace_failed", { "trace_id": trace_id, "error": str(e) }) raise # ===== 反馈 ===== async def add_feedback( self, trace_id: str, target_step_id: str, feedback_type: Literal["positive", "negative", "correction"], content: str, extract_experience: bool = True ) -> Optional[str]: """ 添加人工反馈 Args: trace_id: Trace ID target_step_id: 反馈针对的 Step ID feedback_type: 反馈类型 content: 反馈内容 extract_experience: 是否自动提取经验 Returns: experience_id: 如果提取了经验 """ if not self.trace_store: return None # 获取 Trace trace = await self.trace_store.get_trace(trace_id) if not trace: logger.warning(f"Trace not found: {trace_id}") return None # 创建 feedback Step steps = await self.trace_store.get_trace_steps(trace_id) max_seq = max(s.sequence for s in steps) if steps else 0 feedback_step = Step.create( trace_id=trace_id, step_type="feedback", sequence=max_seq + 1, parent_ids=[target_step_id], data={ "target_step_id": target_step_id, "feedback_type": feedback_type, "content": content } ) await self.trace_store.add_step(feedback_step) # 提取经验 exp_id = None if extract_experience and self.memory_store and feedback_type in ("positive", "correction"): exp = Experience.create( scope=f"agent:{trace.agent_type}" if trace.agent_type else "agent:default", condition=f"执行类似 '{trace.task}' 任务时" if trace.task else "通用场景", rule=content, evidence=[target_step_id, feedback_step.step_id], source="feedback", confidence=0.8 if feedback_type == "positive" else 0.6 ) exp_id = await self.memory_store.add_experience(exp) # 记录 memory_write Step mem_step = Step.create( trace_id=trace_id, step_type="memory_write", sequence=max_seq + 2, parent_ids=[feedback_step.step_id], data={ "experience_id": exp_id, "condition": exp.condition, "rule": exp.rule } ) await self.trace_store.add_step(mem_step) return exp_id # ===== 辅助方法 ===== def _format_skills(self, skills: List[Skill]) -> str: """格式化技能为 Prompt 文本""" if not skills: return "" return "\n\n".join(s.to_prompt_text() for s in skills) def _format_experiences(self, experiences: List[Experience]) -> str: """格式化经验为 Prompt 文本""" if not experiences: return "" return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)