| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886 |
- """
- Agent Runner - Agent 执行引擎
- 核心职责:
- 1. 执行 Agent 任务(循环调用 LLM + 工具)
- 2. 记录执行轨迹(Trace + Messages + GoalTree)
- 3. 检索和注入记忆(Experience + Skill)
- 4. 管理执行计划(GoalTree)
- 5. 支持续跑(continue)和回溯重跑(rewind)
- 参数分层:
- - Infrastructure: AgentRunner 构造时设置(trace_store, llm_call 等)
- - RunConfig: 每次 run 时指定(model, trace_id, insert_after 等)
- - Messages: OpenAI SDK 格式的任务消息
- """
- import json
- import logging
- import uuid
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Tuple, Union
- from agent.trace.models import Trace, Message
- from agent.trace.protocols import TraceStore
- from agent.trace.goal_models import GoalTree
- from agent.memory.models import Experience, Skill
- from agent.memory.protocols import MemoryStore, StateStore
- from agent.memory.skill_loader import load_skills_from_dir
- from agent.tools import ToolRegistry, get_tool_registry
- logger = logging.getLogger(__name__)
- # ===== 运行配置 =====
- @dataclass
- class RunConfig:
- """
- 运行参数 — 控制 Agent 如何执行
- 分为模型层参数(由上游 agent 或用户决定)和框架层参数(由系统注入)。
- """
- # --- 模型层参数 ---
- model: str = "gpt-4o"
- temperature: float = 0.3
- max_iterations: int = 200
- tools: Optional[List[str]] = None # None = 全部内置工具
- # --- 框架层参数 ---
- agent_type: str = "default"
- uid: Optional[str] = None
- system_prompt: Optional[str] = None # None = 从 skills 自动构建
- enable_memory: bool = True
- auto_execute_tools: bool = True
- name: Optional[str] = None # 显示名称(空则由 utility_llm 自动生成)
- # --- Trace 控制 ---
- trace_id: Optional[str] = None # None = 新建
- parent_trace_id: Optional[str] = None # 子 Agent 专用
- parent_goal_id: Optional[str] = None
- # --- 续跑控制 ---
- insert_after: Optional[int] = None # 回溯插入点(message sequence)
- # --- 额外 LLM 参数(传给 llm_call 的 **kwargs)---
- extra_llm_params: Dict[str, Any] = field(default_factory=dict)
- # 内置工具列表(始终自动加载)
- BUILTIN_TOOLS = [
- # 文件操作工具
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- # 系统工具
- "bash_command",
- # 技能和目标管理
- "skill",
- "list_skills",
- "goal",
- "agent",
- "evaluate",
- # 搜索工具
- "search_posts",
- "get_search_suggestions",
- # 沙箱工具
- "sandbox_create_environment",
- "sandbox_run_shell",
- "sandbox_rebuild_with_ports",
- "sandbox_destroy_environment",
- # 浏览器工具
- "browser_navigate_to_url",
- "browser_search_web",
- "browser_go_back",
- "browser_wait",
- "browser_click_element",
- "browser_input_text",
- "browser_send_keys",
- "browser_upload_file",
- "browser_scroll_page",
- "browser_find_text",
- "browser_screenshot",
- "browser_switch_tab",
- "browser_close_tab",
- "browser_get_dropdown_options",
- "browser_select_dropdown_option",
- "browser_extract_content",
- "browser_read_long_content",
- "browser_get_page_html",
- "browser_get_selector_map",
- "browser_evaluate",
- "browser_ensure_login_with_cookies",
- "browser_wait_for_user_action",
- "browser_done",
- ]
- # ===== 向后兼容 =====
- @dataclass
- class AgentConfig:
- """[向后兼容] Agent 配置,新代码请使用 RunConfig"""
- agent_type: str = "default"
- max_iterations: int = 200
- enable_memory: bool = True
- auto_execute_tools: bool = True
- @dataclass
- class CallResult:
- """单次调用结果"""
- reply: str
- tool_calls: Optional[List[Dict]] = None
- trace_id: Optional[str] = None
- step_id: Optional[str] = None
- tokens: Optional[Dict[str, int]] = None
- cost: float = 0.0
- # ===== 执行引擎 =====
- CONTEXT_INJECTION_INTERVAL = 10 # 每 N 轮注入一次 GoalTree + Collaborators
- class AgentRunner:
- """
- Agent 执行引擎
- 支持三种运行模式(通过 RunConfig 区分):
- 1. 新建:trace_id=None
- 2. 续跑:trace_id=已有ID, insert_after=None
- 3. 回溯:trace_id=已有ID, insert_after=N
- """
- def __init__(
- self,
- trace_store: Optional[TraceStore] = None,
- memory_store: Optional[MemoryStore] = None,
- state_store: Optional[StateStore] = None,
- tool_registry: Optional[ToolRegistry] = None,
- llm_call: Optional[Callable] = None,
- utility_llm_call: Optional[Callable] = None,
- config: Optional[AgentConfig] = None,
- skills_dir: Optional[str] = None,
- goal_tree: Optional[GoalTree] = None,
- debug: bool = False,
- ):
- """
- 初始化 AgentRunner
- Args:
- trace_store: Trace 存储
- memory_store: Memory 存储(可选)
- state_store: State 存储(可选)
- tool_registry: 工具注册表(默认使用全局注册表)
- llm_call: 主 LLM 调用函数
- utility_llm_call: 轻量 LLM(用于生成任务标题等),可选
- config: [向后兼容] AgentConfig
- skills_dir: Skills 目录路径
- goal_tree: 初始 GoalTree(可选)
- debug: 保留参数(已废弃)
- """
- self.trace_store = trace_store
- self.memory_store = memory_store
- self.state_store = state_store
- self.tools = tool_registry or get_tool_registry()
- self.llm_call = llm_call
- self.utility_llm_call = utility_llm_call
- self.config = config or AgentConfig()
- self.skills_dir = skills_dir
- self.goal_tree = goal_tree
- self.debug = debug
- # ===== 核心公开方法 =====
- async def run(
- self,
- messages: List[Dict],
- config: Optional[RunConfig] = None,
- ) -> AsyncIterator[Union[Trace, Message]]:
- """
- Agent 模式执行(核心方法)
- Args:
- messages: OpenAI SDK 格式的输入消息
- 新建: 初始任务消息 [{"role": "user", "content": "..."}]
- 续跑: 追加的新消息
- 回溯: 在插入点之后追加的消息
- config: 运行配置
- Yields:
- Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- config = config or RunConfig()
- trace = None
- try:
- # Phase 1: PREPARE TRACE
- trace, goal_tree, sequence = await self._prepare_trace(messages, config)
- yield trace
- # Phase 2: BUILD HISTORY
- history, sequence, created_messages = await self._build_history(
- trace.trace_id, messages, goal_tree, config, sequence
- )
- for msg in created_messages:
- yield msg
- # Phase 3: AGENT LOOP
- async for event in self._agent_loop(trace, history, goal_tree, config, sequence):
- yield event
- except Exception as e:
- logger.error(f"Agent run failed: {e}")
- tid = config.trace_id or (trace.trace_id if trace else None)
- if self.trace_store and tid:
- await self.trace_store.update_trace(
- tid,
- status="failed",
- error_message=str(e),
- completed_at=datetime.now()
- )
- trace_obj = await self.trace_store.get_trace(tid)
- if trace_obj:
- yield trace_obj
- raise
- async def run_result(
- self,
- messages: List[Dict],
- config: Optional[RunConfig] = None,
- ) -> Dict[str, Any]:
- """
- 结果模式 — 消费 run(),返回结构化结果。
- 主要用于 agent/evaluate 工具内部。
- """
- last_assistant_text = ""
- final_trace: Optional[Trace] = None
- async for item in self.run(messages=messages, config=config):
- if isinstance(item, Message) and item.role == "assistant":
- content = item.content
- text = ""
- if isinstance(content, dict):
- text = content.get("text", "") or ""
- elif isinstance(content, str):
- text = content
- if text and text.strip():
- last_assistant_text = text
- elif isinstance(item, Trace):
- final_trace = item
- config = config or RunConfig()
- if not final_trace and config.trace_id and self.trace_store:
- final_trace = await self.trace_store.get_trace(config.trace_id)
- status = final_trace.status if final_trace else "unknown"
- error = final_trace.error_message if final_trace else None
- summary = last_assistant_text
- if not summary:
- status = "failed"
- error = error or "Agent 没有产生 assistant 文本结果"
- return {
- "status": status,
- "summary": summary,
- "trace_id": final_trace.trace_id if final_trace else config.trace_id,
- "error": error,
- "stats": {
- "total_messages": final_trace.total_messages if final_trace else 0,
- "total_tokens": final_trace.total_tokens if final_trace else 0,
- "total_cost": final_trace.total_cost if final_trace else 0.0,
- },
- }
- # ===== 单次调用(保留)=====
- async def call(
- self,
- messages: List[Dict],
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- uid: Optional[str] = None,
- trace: bool = True,
- **kwargs
- ) -> CallResult:
- """
- 单次 LLM 调用(无 Agent Loop)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- trace_id = None
- message_id = None
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- tool_schemas = self.tools.get_schemas(tool_names)
- if trace and self.trace_store:
- trace_obj = Trace.create(mode="call", uid=uid, model=model, tools=tool_schemas, llm_params=kwargs)
- trace_id = await self.trace_store.create_trace(trace_obj)
- result = await self.llm_call(messages=messages, model=model, tools=tool_schemas, **kwargs)
- if trace and self.trace_store and trace_id:
- msg = Message.create(
- trace_id=trace_id, role="assistant", sequence=1, goal_id=None,
- content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
- prompt_tokens=result.get("prompt_tokens", 0),
- completion_tokens=result.get("completion_tokens", 0),
- finish_reason=result.get("finish_reason"),
- cost=result.get("cost", 0),
- )
- message_id = await self.trace_store.add_message(msg)
- await self.trace_store.update_trace(trace_id, status="completed", completed_at=datetime.now())
- return CallResult(
- reply=result.get("content", ""),
- tool_calls=result.get("tool_calls"),
- trace_id=trace_id,
- step_id=message_id,
- tokens={"prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0)},
- cost=result.get("cost", 0)
- )
- # ===== Phase 1: PREPARE TRACE =====
- async def _prepare_trace(
- self,
- messages: List[Dict],
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """
- 准备 Trace:创建新的或加载已有的
- Returns:
- (trace, goal_tree, next_sequence)
- """
- if config.trace_id:
- return await self._prepare_existing_trace(config)
- else:
- return await self._prepare_new_trace(messages, config)
- async def _prepare_new_trace(
- self,
- messages: List[Dict],
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """创建新 Trace"""
- trace_id = str(uuid.uuid4())
- # 生成任务名称
- task_name = config.name or await self._generate_task_name(messages)
- # 准备工具 Schema
- tool_schemas = self._get_tool_schemas(config.tools)
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task_name,
- agent_type=config.agent_type,
- parent_trace_id=config.parent_trace_id,
- parent_goal_id=config.parent_goal_id,
- uid=config.uid,
- model=config.model,
- tools=tool_schemas,
- llm_params={"temperature": config.temperature, **config.extra_llm_params},
- status="running",
- )
- goal_tree = self.goal_tree or GoalTree(mission=task_name)
- if self.trace_store:
- await self.trace_store.create_trace(trace_obj)
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- return trace_obj, goal_tree, 1
- async def _prepare_existing_trace(
- self,
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """加载已有 Trace(续跑或回溯)"""
- if not self.trace_store:
- raise ValueError("trace_store required for continue/rewind")
- trace_obj = await self.trace_store.get_trace(config.trace_id)
- if not trace_obj:
- raise ValueError(f"Trace not found: {config.trace_id}")
- goal_tree = await self.trace_store.get_goal_tree(config.trace_id)
- if config.insert_after is not None:
- # 回溯模式
- sequence = await self._rewind(config.trace_id, config.insert_after, goal_tree)
- else:
- # 续跑模式:从最大 sequence + 1 开始
- all_messages = await self.trace_store.get_trace_messages(
- config.trace_id, include_abandoned=True
- )
- sequence = max((m.sequence for m in all_messages), default=0) + 1
- # 状态置为 running
- await self.trace_store.update_trace(
- config.trace_id,
- status="running",
- completed_at=None,
- )
- trace_obj.status = "running"
- return trace_obj, goal_tree, sequence
- # ===== Phase 2: BUILD HISTORY =====
- async def _build_history(
- self,
- trace_id: str,
- new_messages: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- ) -> Tuple[List[Dict], int, List[Message]]:
- """
- 构建完整的 LLM 消息历史
- 1. 加载已有 active messages(续跑/回溯场景)
- 2. 构建 system prompt(新建时注入 skills/experiences)
- 3. 追加 input messages
- Returns:
- (history, next_sequence, created_messages)
- created_messages: 本次新创建并持久化的 Message 列表,供 run() yield 给调用方
- """
- history: List[Dict] = []
- created_messages: List[Message] = []
- # 1. 加载已有 messages
- if config.trace_id and self.trace_store:
- existing_messages = await self.trace_store.get_trace_messages(trace_id)
- history = [msg.to_llm_dict() for msg in existing_messages]
- # 2. 构建 system prompt(如果历史中没有 system message)
- has_system = any(m.get("role") == "system" for m in history)
- has_system_in_new = any(m.get("role") == "system" for m in new_messages)
- if not has_system and not has_system_in_new:
- system_prompt = await self._build_system_prompt(config)
- if system_prompt:
- history = [{"role": "system", "content": system_prompt}] + history
- if self.trace_store:
- system_msg = Message.create(
- trace_id=trace_id, role="system", sequence=sequence,
- goal_id=None, content=system_prompt,
- )
- await self.trace_store.add_message(system_msg)
- created_messages.append(system_msg)
- sequence += 1
- # 3. 追加新 messages
- for msg_dict in new_messages:
- history.append(msg_dict)
- if self.trace_store:
- stored_msg = Message.from_llm_dict(
- msg_dict, trace_id=trace_id, sequence=sequence, goal_id=None
- )
- await self.trace_store.add_message(stored_msg)
- created_messages.append(stored_msg)
- sequence += 1
- return history, sequence, created_messages
- # ===== Phase 3: AGENT LOOP =====
- async def _agent_loop(
- self,
- trace: Trace,
- history: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- ) -> AsyncIterator[Union[Trace, Message]]:
- """ReAct 循环"""
- trace_id = trace.trace_id
- tool_schemas = self._get_tool_schemas(config.tools)
- # 设置 goal_tree 到 goal 工具
- if goal_tree and self.trace_store:
- from agent.trace.goal_tool import set_goal_tree
- set_goal_tree(goal_tree)
- for iteration in range(config.max_iterations):
- # 构建 LLM messages(注入上下文)
- llm_messages = list(history)
- # 周期性注入 GoalTree + Collaborators
- if iteration % CONTEXT_INJECTION_INTERVAL == 0:
- context_injection = self._build_context_injection(trace, goal_tree)
- if context_injection:
- llm_messages.append({"role": "system", "content": context_injection})
- # 调用 LLM
- result = await self.llm_call(
- messages=llm_messages,
- model=config.model,
- tools=tool_schemas,
- temperature=config.temperature,
- **config.extra_llm_params,
- )
- response_content = result.get("content", "")
- tool_calls = result.get("tool_calls")
- finish_reason = result.get("finish_reason")
- prompt_tokens = result.get("prompt_tokens", 0)
- completion_tokens = result.get("completion_tokens", 0)
- step_cost = result.get("cost", 0)
- # 按需自动创建 root goal
- if goal_tree and not goal_tree.goals and tool_calls:
- has_goal_call = any(
- tc.get("function", {}).get("name") == "goal"
- for tc in tool_calls
- )
- if not has_goal_call:
- mission = goal_tree.mission
- root_desc = mission[:200] if len(mission) > 200 else mission
- goal_tree.add_goals(
- descriptions=[root_desc],
- reasons=["系统自动创建:Agent 未显式创建目标"],
- parent_id=None
- )
- goal_tree.focus(goal_tree.goals[0].id)
- if self.trace_store:
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- await self.trace_store.add_goal(trace_id, goal_tree.goals[0])
- logger.info(f"自动创建 root goal: {goal_tree.goals[0].id}")
- # 获取当前 goal_id
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- # 记录 assistant Message
- assistant_msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=sequence,
- goal_id=current_goal_id,
- content={"text": response_content, "tool_calls": tool_calls},
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- finish_reason=finish_reason,
- cost=step_cost,
- )
- if self.trace_store:
- await self.trace_store.add_message(assistant_msg)
- yield assistant_msg
- sequence += 1
- # 处理工具调用
- if tool_calls and config.auto_execute_tools:
- history.append({
- "role": "assistant",
- "content": response_content,
- "tool_calls": tool_calls,
- })
- for tc in tool_calls:
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- tool_name = tc["function"]["name"]
- tool_args = tc["function"]["arguments"]
- if isinstance(tool_args, str):
- tool_args = json.loads(tool_args) if tool_args.strip() else {}
- elif tool_args is None:
- tool_args = {}
- tool_result = await self.tools.execute(
- tool_name,
- tool_args,
- uid=config.uid or "",
- context={
- "store": self.trace_store,
- "trace_id": trace_id,
- "goal_id": current_goal_id,
- "runner": self,
- }
- )
- tool_msg = Message.create(
- trace_id=trace_id,
- role="tool",
- sequence=sequence,
- goal_id=current_goal_id,
- tool_call_id=tc["id"],
- content={"tool_name": tool_name, "result": tool_result},
- )
- if self.trace_store:
- await self.trace_store.add_message(tool_msg)
- yield tool_msg
- sequence += 1
- history.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "name": tool_name,
- "content": str(tool_result),
- })
- continue # 继续循环
- # 无工具调用,任务完成
- break
- # 完成 Trace
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- completed_at=datetime.now(),
- )
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- # ===== 回溯(Rewind)=====
- async def _rewind(
- self,
- trace_id: str,
- insert_after: int,
- goal_tree: Optional[GoalTree],
- ) -> int:
- """
- 执行回溯:标记 insert_after 之后的 messages 和 goals 为 abandoned
- Returns:
- 下一个可用的 sequence 号
- """
- if not self.trace_store:
- raise ValueError("trace_store required for rewind")
- # 1. 加载所有 messages(含已 abandoned 的)
- all_messages = await self.trace_store.get_trace_messages(
- trace_id, include_abandoned=True
- )
- if not all_messages:
- return 1
- # 2. 找到安全截断点(确保不截断在 tool_call 和 tool response 之间)
- cutoff = self._find_safe_cutoff(all_messages, insert_after)
- # 3. 批量标记 messages 为 abandoned
- abandoned_ids = await self.trace_store.abandon_messages_after(trace_id, cutoff)
- # 4. 处理 Goals
- if goal_tree:
- active_messages = [m for m in all_messages if m.sequence <= cutoff]
- active_goal_ids = {m.goal_id for m in active_messages if m.goal_id}
- for goal in goal_tree.goals:
- if goal.status == "abandoned":
- continue # 已 abandoned,跳过
- if goal.status == "completed" and goal.id in active_goal_ids:
- continue # 已完成且有截断点之前的 messages → 保留
- # 其余全部 abandon(含无 active messages 的 completed goal)
- goal.status = "abandoned"
- goal.summary = "回溯导致放弃"
- # 重置 current_id
- goal_tree._current_id = None
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- # 5. 记录 rewind 事件
- abandoned_sequences = [
- m.sequence for m in all_messages
- if m.sequence > cutoff and m.status != "abandoned" # 本次新 abandon 的
- ]
- await self.trace_store.append_event(trace_id, "rewind", {
- "insert_after_sequence": cutoff,
- "abandoned_message_count": len(abandoned_ids),
- "abandoned_sequences": abandoned_sequences[:20], # 只记前 20 条
- })
- # 6. 返回 next sequence
- max_seq = max((m.sequence for m in all_messages), default=0)
- return max_seq + 1
- def _find_safe_cutoff(self, messages: List[Message], insert_after: int) -> int:
- """
- 找到安全的截断点。
- 如果 insert_after 指向一条带 tool_calls 的 assistant message,
- 则自动扩展到其所有对应的 tool response 之后。
- """
- cutoff = insert_after
- # 找到 insert_after 对应的 message
- target_msg = None
- for msg in messages:
- if msg.sequence == insert_after:
- target_msg = msg
- break
- if not target_msg:
- return cutoff
- # 如果是 assistant 且有 tool_calls,找到所有对应的 tool responses
- if target_msg.role == "assistant":
- content = target_msg.content
- if isinstance(content, dict) and content.get("tool_calls"):
- tool_call_ids = set()
- for tc in content["tool_calls"]:
- if isinstance(tc, dict) and tc.get("id"):
- tool_call_ids.add(tc["id"])
- # 找到这些 tool_call 对应的 tool messages
- for msg in messages:
- if (msg.role == "tool" and msg.tool_call_id
- and msg.tool_call_id in tool_call_ids):
- cutoff = max(cutoff, msg.sequence)
- return cutoff
- # ===== 上下文注入 =====
- def _build_context_injection(
- self,
- trace: Trace,
- goal_tree: Optional[GoalTree],
- ) -> str:
- """构建周期性注入的上下文(GoalTree + Active Collaborators)"""
- parts = []
- # GoalTree
- if goal_tree and goal_tree.goals:
- parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
- # Active Collaborators
- collaborators = trace.context.get("collaborators", [])
- if collaborators:
- lines = ["## Active Collaborators"]
- for c in collaborators:
- status_str = c.get("status", "unknown")
- ctype = c.get("type", "agent")
- summary = c.get("summary", "")
- name = c.get("name", "unnamed")
- lines.append(f"- {name} [{ctype}, {status_str}]: {summary}")
- parts.append("\n".join(lines))
- return "\n\n".join(parts)
- # ===== 辅助方法 =====
- def _get_tool_schemas(self, tools: Optional[List[str]]) -> List[Dict]:
- """获取工具 Schema"""
- tool_names = BUILTIN_TOOLS.copy()
- if tools:
- for tool in tools:
- if tool not in tool_names:
- tool_names.append(tool)
- return self.tools.get_schemas(tool_names)
- async def _build_system_prompt(self, config: RunConfig) -> Optional[str]:
- """构建 system prompt(注入 skills 和 experiences)"""
- system_prompt = config.system_prompt
- # 加载 Skills
- skills_text = ""
- skills = load_skills_from_dir(self.skills_dir)
- if skills:
- skills_text = self._format_skills(skills)
- # 加载 Experiences
- experiences_text = ""
- if config.enable_memory and self.memory_store:
- scope = f"agent:{config.agent_type}"
- # 从 messages 提取文本作为查询
- experiences = await self.memory_store.search_experiences(scope, system_prompt or "")
- experiences_text = self._format_experiences(experiences)
- # 拼装
- if system_prompt:
- if skills_text:
- system_prompt += f"\n\n## Skills\n{skills_text}"
- if experiences_text:
- system_prompt += f"\n\n## 相关经验\n{experiences_text}"
- elif skills_text or experiences_text:
- parts = []
- if skills_text:
- parts.append(f"## Skills\n{skills_text}")
- if experiences_text:
- parts.append(f"## 相关经验\n{experiences_text}")
- system_prompt = "\n\n".join(parts)
- return system_prompt
- async def _generate_task_name(self, messages: List[Dict]) -> str:
- """生成任务名称:优先使用 utility_llm,fallback 到文本截取"""
- # 提取 messages 中的文本内容
- text_parts = []
- for msg in messages:
- content = msg.get("content", "")
- if isinstance(content, str):
- text_parts.append(content)
- elif isinstance(content, list):
- for part in content:
- if isinstance(part, dict) and part.get("type") == "text":
- text_parts.append(part.get("text", ""))
- raw_text = " ".join(text_parts).strip()
- if not raw_text:
- return "未命名任务"
- # 尝试使用 utility_llm 生成标题
- if self.utility_llm_call:
- try:
- result = await self.utility_llm_call(
- messages=[
- {"role": "system", "content": "用中文为以下任务生成一个简短标题(10-30字),只输出标题本身:"},
- {"role": "user", "content": raw_text[:2000]},
- ],
- model="gpt-4o-mini", # 使用便宜模型
- )
- title = result.get("content", "").strip()
- if title and len(title) < 100:
- return title
- except Exception:
- pass
- # Fallback: 截取前 50 字符
- return raw_text[:50] + ("..." if len(raw_text) > 50 else "")
- def _format_skills(self, skills: List[Skill]) -> str:
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
- def _format_experiences(self, experiences: List[Experience]) -> str:
- if not experiences:
- return ""
- return "\n".join(f"- {e.to_prompt_text()}" for e in experiences)
|