| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671 |
- """
- Agent Runner - Agent 执行引擎
- 核心职责:
- 1. 执行 Agent 任务(循环调用 LLM + 工具)
- 2. 记录执行图(Trace + Steps)
- 3. 检索和注入记忆(Experience + Skill)
- 4. 收集反馈,提取经验
- """
- import logging
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal
- from agent.events import AgentEvent
- from agent.trace import Trace, Step, TraceStore
- from agent.models.memory import Experience, Skill
- from agent.storage.protocols import MemoryStore, StateStore
- from agent.storage.skill_loader import load_skills_from_dir
- from agent.tools import ToolRegistry, get_tool_registry
- from agent.debug import dump_tree, dump_markdown
- logger = logging.getLogger(__name__)
- # 内置工具列表(始终自动加载)
- BUILTIN_TOOLS = [
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- "bash_command",
- "skill",
- "list_skills",
- ]
- @dataclass
- class AgentConfig:
- """Agent 配置"""
- agent_type: str = "default"
- max_iterations: int = 10
- enable_memory: bool = True
- auto_execute_tools: bool = True
- @dataclass
- class CallResult:
- """单次调用结果"""
- reply: str
- tool_calls: Optional[List[Dict]] = None
- trace_id: Optional[str] = None
- step_id: Optional[str] = None
- tokens: Optional[Dict[str, int]] = None
- cost: float = 0.0
- class AgentRunner:
- """
- Agent 执行引擎
- 支持两种模式:
- 1. call(): 单次 LLM 调用(简洁 API)
- 2. run(): Agent 模式(循环 + 记忆 + 追踪)
- """
- def __init__(
- self,
- trace_store: Optional[TraceStore] = None,
- memory_store: Optional[MemoryStore] = None,
- state_store: Optional[StateStore] = None,
- tool_registry: Optional[ToolRegistry] = None,
- llm_call: Optional[Callable] = None,
- config: Optional[AgentConfig] = None,
- skills_dir: Optional[str] = None,
- debug: bool = False,
- ):
- """
- 初始化 AgentRunner
- Args:
- trace_store: Trace 存储(可选,不提供则不记录)
- memory_store: Memory 存储(可选,不提供则不使用记忆)
- state_store: State 存储(可选,用于任务状态)
- tool_registry: 工具注册表(可选,默认使用全局注册表)
- llm_call: LLM 调用函数(必须提供,用于实际调用 LLM)
- config: Agent 配置
- skills_dir: Skills 目录路径(可选,不提供则不加载 skills)
- debug: 是否启用 debug 模式(输出 step tree 到 .trace/tree.txt)
- """
- self.trace_store = trace_store
- self.memory_store = memory_store
- self.state_store = state_store
- self.tools = tool_registry or get_tool_registry()
- self.llm_call = llm_call
- self.config = config or AgentConfig()
- self.skills_dir = skills_dir
- self.debug = debug
- def _generate_id(self) -> str:
- """生成唯一 ID"""
- import uuid
- return str(uuid.uuid4())
- async def _dump_debug(self, trace_id: str) -> None:
- """Debug 模式下输出 step tree(txt + markdown 两种格式)"""
- if not self.debug or not self.trace_store:
- return
- trace = await self.trace_store.get_trace(trace_id)
- steps = await self.trace_store.get_trace_steps(trace_id)
- # 输出 tree.txt(简洁格式,兼容旧版)
- dump_tree(trace, steps)
- # 输出 tree.md(完整可折叠格式)
- dump_markdown(trace, steps)
- # ===== 单次调用 =====
- async def call(
- self,
- messages: List[Dict],
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- uid: Optional[str] = None,
- trace: bool = True,
- **kwargs
- ) -> CallResult:
- """
- 单次 LLM 调用
- Args:
- messages: 消息列表
- model: 模型名称
- tools: 工具名称列表
- uid: 用户 ID
- trace: 是否记录 Trace
- **kwargs: 其他参数传递给 LLM
- Returns:
- CallResult
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- trace_id = None
- step_id = None
- # 创建 Trace
- if trace and self.trace_store:
- trace_obj = Trace.create(
- mode="call",
- uid=uid,
- context={"model": model}
- )
- trace_id = await self.trace_store.create_trace(trace_obj)
- # 准备工具 Schema
- # 合并内置工具 + 用户指定工具
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- # 添加用户指定的工具(去重)
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 调用 LLM
- result = await self.llm_call(
- messages=messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- # 记录 Step
- if trace and self.trace_store and trace_id:
- step = Step.create(
- trace_id=trace_id,
- step_type="thought",
- sequence=0,
- status="completed",
- description=f"LLM 调用 ({model})",
- data={
- "messages": messages,
- "response": result.get("content", ""),
- "model": model,
- "tools": tool_schemas, # 记录传给模型的 tools schema
- "tool_calls": result.get("tool_calls"),
- },
- tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0),
- cost=result.get("cost", 0),
- )
- step_id = await self.trace_store.add_step(step)
- await self._dump_debug(trace_id)
- # 完成 Trace
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- total_tokens=result.get("prompt_tokens", 0) + result.get("completion_tokens", 0),
- total_cost=result.get("cost", 0)
- )
- return CallResult(
- reply=result.get("content", ""),
- tool_calls=result.get("tool_calls"),
- trace_id=trace_id,
- step_id=step_id,
- tokens={
- "prompt": result.get("prompt_tokens", 0),
- "completion": result.get("completion_tokens", 0),
- },
- cost=result.get("cost", 0)
- )
- # ===== Agent 模式 =====
- async def run(
- self,
- task: str,
- messages: Optional[List[Dict]] = None,
- system_prompt: Optional[str] = None,
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- agent_type: Optional[str] = None,
- uid: Optional[str] = None,
- max_iterations: Optional[int] = None,
- enable_memory: Optional[bool] = None,
- auto_execute_tools: Optional[bool] = None,
- **kwargs
- ) -> AsyncIterator[AgentEvent]:
- """
- Agent 模式执行
- Args:
- task: 任务描述
- messages: 初始消息(可选)
- system_prompt: 系统提示(可选)
- model: 模型名称
- tools: 工具名称列表
- agent_type: Agent 类型
- uid: 用户 ID
- max_iterations: 最大迭代次数
- enable_memory: 是否启用记忆
- auto_execute_tools: 是否自动执行工具
- **kwargs: 其他参数
- Yields:
- AgentEvent
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- # 使用配置默认值
- agent_type = agent_type or self.config.agent_type
- max_iterations = max_iterations or self.config.max_iterations
- enable_memory = enable_memory if enable_memory is not None else self.config.enable_memory
- auto_execute_tools = auto_execute_tools if auto_execute_tools is not None else self.config.auto_execute_tools
- # 创建 Trace
- trace_id = self._generate_id()
- if self.trace_store:
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task,
- agent_type=agent_type,
- uid=uid,
- context={"model": model, **kwargs}
- )
- await self.trace_store.create_trace(trace_obj)
- yield AgentEvent("trace_started", {
- "trace_id": trace_id,
- "task": task,
- "agent_type": agent_type
- })
- try:
- # 加载记忆(Experience 和 Skill)
- experiences_text = ""
- skills_text = ""
- if enable_memory and self.memory_store:
- scope = f"agent:{agent_type}"
- experiences = await self.memory_store.search_experiences(scope, task)
- experiences_text = self._format_experiences(experiences)
- # 记录 memory_read Step
- if self.trace_store:
- mem_step = Step.create(
- trace_id=trace_id,
- step_type="memory_read",
- sequence=0,
- status="completed",
- description=f"加载 {len(experiences)} 条经验",
- data={
- "experiences_count": len(experiences),
- "experiences": [e.to_dict() for e in experiences],
- }
- )
- await self.trace_store.add_step(mem_step)
- await self._dump_debug(trace_id)
- yield AgentEvent("memory_loaded", {
- "experiences_count": len(experiences)
- })
- # 加载 Skills(如果提供了 skills_dir)
- if self.skills_dir:
- skills = load_skills_from_dir(self.skills_dir)
- if skills:
- skills_text = self._format_skills(skills)
- logger.info(f"加载 {len(skills)} 个 skills 从 {self.skills_dir}")
- # 构建初始消息
- if messages is None:
- messages = []
- if system_prompt:
- # 注入记忆和 skills 到 system prompt
- full_system = system_prompt
- if skills_text:
- full_system += f"\n\n## Skills\n{skills_text}"
- if experiences_text:
- full_system += f"\n\n## 相关经验\n{experiences_text}"
- messages = [{"role": "system", "content": full_system}] + messages
- # 添加任务描述
- messages.append({"role": "user", "content": task})
- # 准备工具 Schema
- # 合并内置工具 + 用户指定工具
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- # 添加用户指定的工具(去重)
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- # 执行循环
- current_goal_id = None # 当前焦点 goal
- sequence = 1
- total_tokens = 0
- total_cost = 0.0
- for iteration in range(max_iterations):
- yield AgentEvent("step_started", {
- "iteration": iteration,
- "step_type": "thought"
- })
- # 调用 LLM
- result = await self.llm_call(
- messages=messages,
- model=model,
- tools=tool_schemas,
- **kwargs
- )
- response_content = result.get("content", "")
- tool_calls = result.get("tool_calls")
- step_tokens = result.get("prompt_tokens", 0) + result.get("completion_tokens", 0)
- step_cost = result.get("cost", 0)
- total_tokens += step_tokens
- total_cost += step_cost
- # 记录 LLM 调用 Step
- llm_step_id = self._generate_id()
- if self.trace_store:
- # 推断 step_type
- step_type = "thought"
- if tool_calls:
- step_type = "thought" # 有工具调用的思考
- elif not tool_calls and iteration > 0:
- step_type = "response" # 无工具调用,可能是最终回复
- llm_step = Step(
- step_id=llm_step_id,
- trace_id=trace_id,
- step_type=step_type,
- status="completed",
- sequence=sequence,
- parent_id=current_goal_id,
- description=response_content[:100] + "..." if len(response_content) > 100 else response_content,
- data={
- "messages": messages, # 记录完整的 messages(包含 system prompt)
- "content": response_content,
- "model": model,
- "tools": tool_schemas, # 记录传给模型的 tools schema
- "tool_calls": tool_calls,
- },
- tokens=step_tokens,
- cost=step_cost,
- )
- await self.trace_store.add_step(llm_step)
- await self._dump_debug(trace_id)
- sequence += 1
- yield AgentEvent("llm_call_completed", {
- "step_id": llm_step_id,
- "content": response_content,
- "tool_calls": tool_calls,
- "tokens": step_tokens,
- "cost": step_cost
- })
- # 处理工具调用
- if tool_calls and auto_execute_tools:
- # 检查是否需要用户确认
- if self.tools.check_confirmation_required(tool_calls):
- yield AgentEvent("awaiting_user_action", {
- "tool_calls": tool_calls,
- "confirmation_flags": self.tools.get_confirmation_flags(tool_calls),
- "editable_params": self.tools.get_editable_params_map(tool_calls)
- })
- # TODO: 等待用户确认
- break
- # 执行工具
- messages.append({"role": "assistant", "content": response_content, "tool_calls": tool_calls})
- for tc in tool_calls:
- tool_name = tc["function"]["name"]
- tool_args = tc["function"]["arguments"]
- if isinstance(tool_args, str):
- import json
- tool_args = json.loads(tool_args)
- yield AgentEvent("tool_executing", {
- "tool_name": tool_name,
- "arguments": tool_args
- })
- # 执行工具
- tool_result = await self.tools.execute(
- tool_name,
- tool_args,
- uid=uid or ""
- )
- # 记录 action Step
- action_step_id = self._generate_id()
- if self.trace_store:
- action_step = Step(
- step_id=action_step_id,
- trace_id=trace_id,
- step_type="action",
- status="completed",
- sequence=sequence,
- parent_id=llm_step_id,
- description=f"{tool_name}({', '.join(f'{k}={v}' for k, v in list(tool_args.items())[:2])})",
- data={
- "tool_name": tool_name,
- "arguments": tool_args,
- }
- )
- await self.trace_store.add_step(action_step)
- await self._dump_debug(trace_id)
- sequence += 1
- # 记录 result Step
- result_step_id = self._generate_id()
- if self.trace_store:
- result_step = Step(
- step_id=result_step_id,
- trace_id=trace_id,
- step_type="result",
- status="completed",
- sequence=sequence,
- parent_id=action_step_id,
- description=str(tool_result)[:100] if tool_result else "",
- data={
- "tool_name": tool_name,
- "output": tool_result,
- }
- )
- await self.trace_store.add_step(result_step)
- await self._dump_debug(trace_id)
- sequence += 1
- yield AgentEvent("tool_result", {
- "step_id": result_step_id,
- "tool_name": tool_name,
- "result": tool_result
- })
- # 添加到消息(Gemini 需要 name 字段!)
- messages.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "name": tool_name,
- "content": tool_result
- })
- continue # 继续循环
- # 无工具调用,任务完成
- # 记录 response Step
- response_step_id = self._generate_id()
- if self.trace_store:
- response_step = Step(
- step_id=response_step_id,
- trace_id=trace_id,
- step_type="response",
- status="completed",
- sequence=sequence,
- parent_id=current_goal_id,
- description=response_content[:100] + "..." if len(response_content) > 100 else response_content,
- data={
- "content": response_content,
- "is_final": True
- }
- )
- await self.trace_store.add_step(response_step)
- await self._dump_debug(trace_id)
- yield AgentEvent("conclusion", {
- "step_id": response_step_id,
- "content": response_content,
- "is_final": True
- })
- break
- # 完成 Trace
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- total_tokens=total_tokens,
- total_cost=total_cost
- )
- yield AgentEvent("trace_completed", {
- "trace_id": trace_id,
- "total_tokens": total_tokens,
- "total_cost": total_cost
- })
- except Exception as e:
- logger.error(f"Agent run failed: {e}")
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="failed",
- completed_at=datetime.now()
- )
- yield AgentEvent("trace_failed", {
- "trace_id": trace_id,
- "error": str(e)
- })
- raise
- # ===== 反馈 =====
- async def add_feedback(
- self,
- trace_id: str,
- target_step_id: str,
- feedback_type: Literal["positive", "negative", "correction"],
- content: str,
- extract_experience: bool = True
- ) -> Optional[str]:
- """
- 添加人工反馈
- Args:
- trace_id: Trace ID
- target_step_id: 反馈针对的 Step ID
- feedback_type: 反馈类型
- content: 反馈内容
- extract_experience: 是否自动提取经验
- Returns:
- experience_id: 如果提取了经验
- """
- if not self.trace_store:
- return None
- # 获取 Trace
- trace = await self.trace_store.get_trace(trace_id)
- if not trace:
- logger.warning(f"Trace not found: {trace_id}")
- return None
- # 创建 feedback Step
- steps = await self.trace_store.get_trace_steps(trace_id)
- max_seq = max(s.sequence for s in steps) if steps else 0
- feedback_step = Step.create(
- trace_id=trace_id,
- step_type="feedback",
- sequence=max_seq + 1,
- status="completed",
- description=f"{feedback_type}: {content[:50]}...",
- parent_id=target_step_id,
- data={
- "target_step_id": target_step_id,
- "feedback_type": feedback_type,
- "content": content
- }
- )
- await self.trace_store.add_step(feedback_step)
- await self._dump_debug(trace_id)
- # 提取经验
- exp_id = None
- if extract_experience and self.memory_store and feedback_type in ("positive", "correction"):
- exp = Experience.create(
- scope=f"agent:{trace.agent_type}" if trace.agent_type else "agent:default",
- condition=f"执行类似 '{trace.task}' 任务时" if trace.task else "通用场景",
- rule=content,
- evidence=[target_step_id, feedback_step.step_id],
- source="feedback",
- confidence=0.8 if feedback_type == "positive" else 0.6
- )
- exp_id = await self.memory_store.add_experience(exp)
- # 记录 memory_write Step
- mem_step = Step.create(
- trace_id=trace_id,
- step_type="memory_write",
- sequence=max_seq + 2,
- status="completed",
- description=f"保存经验: {exp.condition[:30]}...",
- parent_id=feedback_step.step_id,
- data={
- "experience_id": exp_id,
- "condition": exp.condition,
- "rule": exp.rule
- }
- )
- await self.trace_store.add_step(mem_step)
- await self._dump_debug(trace_id)
- return exp_id
- # ===== 辅助方法 =====
- def _format_skills(self, skills: List[Skill]) -> str:
- """格式化技能为 Prompt 文本"""
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
- def _format_experiences(self, experiences: List[Experience]) -> str:
- """格式化经验为 Prompt 文本"""
- if not experiences:
- return ""
- return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)
- def _format_skills(self, skills: List[Skill]) -> str:
- """格式化 Skills 为 Prompt 文本"""
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
|