| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295 |
- """
- Agent Runner - Agent 执行引擎
- 核心职责:
- 1. 执行 Agent 任务(循环调用 LLM + 工具)
- 2. 记录执行轨迹(Trace + Messages + GoalTree)
- 3. 加载和注入技能(Skill)
- 4. 管理执行计划(GoalTree)
- 5. 支持续跑(continue)和回溯重跑(rewind)
- 参数分层:
- - Infrastructure: AgentRunner 构造时设置(trace_store, llm_call 等)
- - RunConfig: 每次 run 时指定(model, trace_id, after_sequence 等)
- - Messages: OpenAI SDK 格式的任务消息
- """
- import asyncio
- import json
- import logging
- import os
- import uuid
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Tuple, Union
- from agent.trace.models import Trace, Message
- from agent.trace.protocols import TraceStore
- from agent.trace.goal_models import GoalTree
- from agent.trace.compaction import (
- CompressionConfig,
- filter_by_goal_status,
- estimate_tokens,
- needs_level2_compression,
- build_compression_prompt,
- )
- from agent.skill.models import Skill
- from agent.skill.skill_loader import load_skills_from_dir
- from agent.tools import ToolRegistry, get_tool_registry
- from agent.tools.builtin.knowledge import KnowledgeConfig
- from agent.core.prompts import (
- DEFAULT_SYSTEM_PREFIX,
- TRUNCATION_HINT,
- TOOL_INTERRUPTED_MESSAGE,
- AGENT_INTERRUPTED_SUMMARY,
- AGENT_CONTINUE_HINT_TEMPLATE,
- TASK_NAME_GENERATION_SYSTEM_PROMPT,
- TASK_NAME_FALLBACK,
- SUMMARY_HEADER_TEMPLATE,
- build_summary_header,
- build_tool_interrupted_message,
- build_agent_continue_hint,
- )
- logger = logging.getLogger(__name__)
- @dataclass
- class ContextUsage:
- """Context 使用情况"""
- trace_id: str
- message_count: int
- token_count: int
- max_tokens: int
- usage_percent: float
- image_count: int = 0
- @dataclass
- class SideBranchContext:
- """侧分支上下文(压缩/反思)"""
- type: Literal["compression", "reflection"]
- branch_id: str
- start_head_seq: int # 侧分支起点的 head_seq
- start_sequence: int # 侧分支第一条消息的 sequence
- start_history_length: int # 侧分支起点的 history 长度
- start_iteration: int # 侧分支开始时的 iteration
- max_turns: int = 5 # 最大轮次
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典(用于持久化和传递给工具)"""
- return {
- "type": self.type,
- "branch_id": self.branch_id,
- "start_head_seq": self.start_head_seq,
- "start_sequence": self.start_sequence,
- "start_iteration": self.start_iteration,
- "max_turns": self.max_turns,
- "is_side_branch": True,
- "started_at": datetime.now().isoformat(),
- }
- # ===== 运行配置 =====
- @dataclass
- class RunConfig:
- """
- 运行参数 — 控制 Agent 如何执行
- 分为模型层参数(由上游 agent 或用户决定)和框架层参数(由系统注入)。
- """
- # --- 模型层参数 ---
- model: str = "gpt-4o"
- temperature: float = 0.3
- max_iterations: int = 200
- tools: Optional[List[str]] = None # None = 全部已注册工具
- side_branch_max_turns: int = 5 # 侧分支最大轮次(压缩/反思)
- # --- 强制侧分支(用于 API 手动触发或自动压缩流程)---
- # 使用列表作为侧分支队列,每次完成一个侧分支后 pop(0) 取下一个
- force_side_branch: Optional[List[Literal["compression", "reflection"]]] = None
- # --- 框架层参数 ---
- agent_type: str = "default"
- uid: Optional[str] = None
- system_prompt: Optional[str] = None # None = 从 skills 自动构建
- skills: Optional[List[str]] = None # 注入 system prompt 的 skill 名称列表;None = 按 preset 决定
- enable_memory: bool = True
- auto_execute_tools: bool = True
- name: Optional[str] = None # 显示名称(空则由 utility_llm 自动生成)
- enable_prompt_caching: bool = True # 启用 Anthropic Prompt Caching(仅 Claude 模型有效)
- # --- Trace 控制 ---
- trace_id: Optional[str] = None # None = 新建
- parent_trace_id: Optional[str] = None # 子 Agent 专用
- parent_goal_id: Optional[str] = None
- # --- 续跑控制 ---
- after_sequence: Optional[int] = None # 从哪条消息后续跑(message sequence)
- # --- 额外 LLM 参数(传给 llm_call 的 **kwargs)---
- extra_llm_params: Dict[str, Any] = field(default_factory=dict)
- # --- 自定义元数据上下文 ---
- context: Dict[str, Any] = field(default_factory=dict)
- # --- 研究流程控制 ---
- enable_research_flow: bool = True # 是否启用自动研究流程(知识检索→经验检索→调研→计划)
- # --- 知识管理配置 ---
- knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
- # 内置工具列表(始终自动加载)
- BUILTIN_TOOLS = [
- # 文件操作工具
- "read_file",
- "edit_file",
- "write_file",
- "glob_files",
- "grep_content",
- # 系统工具
- "bash_command",
- # 技能和目标管理
- "skill",
- "list_skills",
- "goal",
- "agent",
- "evaluate",
- "get_current_context",
- # 搜索工具
- "search_posts",
- "get_search_suggestions",
- # 知识管理工具
- "knowledge_search",
- "knowledge_save",
- "knowledge_update",
- "knowledge_batch_update",
- "knowledge_list",
- "knowledge_slim",
- # 沙箱工具
- # "sandbox_create_environment",
- # "sandbox_run_shell",
- # "sandbox_rebuild_with_ports",
- # "sandbox_destroy_environment",
- # 浏览器工具
- "browser_get_live_url",
- "browser_navigate_to_url",
- "browser_search_web",
- "browser_go_back",
- "browser_wait",
- "browser_click_element",
- "browser_input_text",
- "browser_send_keys",
- "browser_upload_file",
- "browser_scroll_page",
- "browser_find_text",
- "browser_screenshot",
- "browser_switch_tab",
- "browser_close_tab",
- "browser_get_dropdown_options",
- "browser_select_dropdown_option",
- "browser_extract_content",
- "browser_read_long_content",
- "browser_download_direct_url",
- "browser_get_page_html",
- "browser_get_visual_selector_map",
- "browser_evaluate",
- "browser_ensure_login_with_cookies",
- # 可以暂时由飞书消息替代
- #"browser_wait_for_user_action",
- "browser_done",
- "browser_export_cookies",
- "browser_load_cookies",
- # 飞书工具
- "feishu_send_message_to_contact",
- "feishu_get_chat_history",
- "feishu_get_contact_replies",
- "feishu_get_contact_list",
- ]
- @dataclass
- class CallResult:
- """单次调用结果"""
- reply: str
- tool_calls: Optional[List[Dict]] = None
- trace_id: Optional[str] = None
- step_id: Optional[str] = None
- tokens: Optional[Dict[str, int]] = None
- cost: float = 0.0
- # ===== 执行引擎 =====
- CONTEXT_INJECTION_INTERVAL = 10 # 每 N 轮注入一次 GoalTree + Collaborators
- class AgentRunner:
- """
- Agent 执行引擎
- 支持三种运行模式(通过 RunConfig 区分):
- 1. 新建:trace_id=None
- 2. 续跑:trace_id=已有ID, after_sequence=None 或 == head
- 3. 回溯:trace_id=已有ID, after_sequence=N(N < head_sequence)
- """
- def __init__(
- self,
- trace_store: Optional[TraceStore] = None,
- tool_registry: Optional[ToolRegistry] = None,
- llm_call: Optional[Callable] = None,
- utility_llm_call: Optional[Callable] = None,
- skills_dir: Optional[str] = None,
- goal_tree: Optional[GoalTree] = None,
- debug: bool = False,
- ):
- """
- 初始化 AgentRunner
- Args:
- trace_store: Trace 存储
- tool_registry: 工具注册表(默认使用全局注册表)
- llm_call: 主 LLM 调用函数
- utility_llm_call: 轻量 LLM(用于生成任务标题等),可选
- skills_dir: Skills 目录路径
- goal_tree: 初始 GoalTree(可选)
- debug: 保留参数(已废弃)
- """
- self.trace_store = trace_store
- self.tools = tool_registry or get_tool_registry()
- self.llm_call = llm_call
- self.utility_llm_call = utility_llm_call
- self.skills_dir = skills_dir
- self.goal_tree = goal_tree
- self.debug = debug
- self._cancel_events: Dict[str, asyncio.Event] = {} # trace_id → cancel event
- # 知识保存跟踪(每个 trace 独立)
- self._saved_knowledge_ids: Dict[str, List[str]] = {} # trace_id → [knowledge_ids]
- # Context 使用跟踪
- self._context_warned: Dict[str, set] = {} # trace_id → {30, 50, 80} 已警告过的阈值
- self._context_usage: Dict[str, ContextUsage] = {} # trace_id → 当前用量快照
- # ===== 核心公开方法 =====
- def get_context_usage(self, trace_id: str) -> Optional[ContextUsage]:
- """获取指定 trace 的 context 使用情况"""
- return self._context_usage.get(trace_id)
- async def run(
- self,
- messages: List[Dict],
- config: Optional[RunConfig] = None,
- ) -> AsyncIterator[Union[Trace, Message]]:
- """
- Agent 模式执行(核心方法)
- Args:
- messages: OpenAI SDK 格式的输入消息
- 新建: 初始任务消息 [{"role": "user", "content": "..."}]
- 续跑: 追加的新消息
- 回溯: 在插入点之后追加的消息
- config: 运行配置
- Yields:
- Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- config = config or RunConfig()
- trace = None
- try:
- # Phase 1: PREPARE TRACE
- trace, goal_tree, sequence = await self._prepare_trace(messages, config)
- # 注册取消事件
- self._cancel_events[trace.trace_id] = asyncio.Event()
- yield trace
- # 检查是否有未完成的侧分支(用于用户追加消息场景)
- side_branch_ctx_for_build: Optional[SideBranchContext] = None
- if trace.context.get("active_side_branch") and messages:
- side_branch_data = trace.context["active_side_branch"]
- # 创建侧分支上下文(用于标记用户追加的消息)
- side_branch_ctx_for_build = SideBranchContext(
- type=side_branch_data["type"],
- branch_id=side_branch_data["branch_id"],
- start_head_seq=side_branch_data["start_head_seq"],
- start_sequence=side_branch_data["start_sequence"],
- start_history_length=0,
- start_iteration=side_branch_data.get("start_iteration", 0),
- max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
- )
- # Phase 2: BUILD HISTORY
- history, sequence, created_messages, head_seq = await self._build_history(
- trace.trace_id, messages, goal_tree, config, sequence, side_branch_ctx_for_build
- )
- # Update trace's head_sequence in memory
- trace.head_sequence = head_seq
- for msg in created_messages:
- yield msg
- # Phase 3: AGENT LOOP
- async for event in self._agent_loop(trace, history, goal_tree, config, sequence):
- yield event
- except Exception as e:
- logger.error(f"Agent run failed: {e}")
- tid = config.trace_id or (trace.trace_id if trace else None)
- if self.trace_store and tid:
- # 读取当前 last_sequence 作为 head_sequence,确保续跑时能加载完整历史
- current = await self.trace_store.get_trace(tid)
- head_seq = current.last_sequence if current else None
- await self.trace_store.update_trace(
- tid,
- status="failed",
- head_sequence=head_seq,
- error_message=str(e),
- completed_at=datetime.now()
- )
- trace_obj = await self.trace_store.get_trace(tid)
- if trace_obj:
- yield trace_obj
- raise
- finally:
- # 清理取消事件
- if trace:
- self._cancel_events.pop(trace.trace_id, None)
- async def run_result(
- self,
- messages: List[Dict],
- config: Optional[RunConfig] = None,
- on_event: Optional[Callable] = None,
- ) -> Dict[str, Any]:
- """
- 结果模式 — 消费 run(),返回结构化结果。
- 主要用于 agent/evaluate 工具内部。
- Args:
- on_event: 可选回调,每个 Trace/Message 事件触发一次,用于实时输出子 Agent 执行过程。
- """
- last_assistant_text = ""
- final_trace: Optional[Trace] = None
- async for item in self.run(messages=messages, config=config):
- if on_event:
- on_event(item)
- if isinstance(item, Message) and item.role == "assistant":
- content = item.content
- text = ""
- if isinstance(content, dict):
- text = content.get("text", "") or ""
- elif isinstance(content, str):
- text = content
- if text and text.strip():
- last_assistant_text = text
- elif isinstance(item, Trace):
- final_trace = item
- config = config or RunConfig()
- if not final_trace and config.trace_id and self.trace_store:
- final_trace = await self.trace_store.get_trace(config.trace_id)
- status = final_trace.status if final_trace else "unknown"
- error = final_trace.error_message if final_trace else None
- summary = last_assistant_text
- if not summary:
- status = "failed"
- error = error or "Agent 没有产生 assistant 文本结果"
- # 获取保存的知识 ID
- trace_id = final_trace.trace_id if final_trace else config.trace_id
- saved_knowledge_ids = self._saved_knowledge_ids.get(trace_id, [])
- return {
- "status": status,
- "summary": summary,
- "trace_id": trace_id,
- "error": error,
- "saved_knowledge_ids": saved_knowledge_ids, # 新增:返回保存的知识 ID
- "stats": {
- "total_messages": final_trace.total_messages if final_trace else 0,
- "total_tokens": final_trace.total_tokens if final_trace else 0,
- "total_cost": final_trace.total_cost if final_trace else 0.0,
- },
- }
- async def stop(self, trace_id: str) -> bool:
- """
- 停止运行中的 Trace
- 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
- Trace 状态置为 "stopped"。
- Returns:
- True 如果成功发送停止信号,False 如果该 trace 不在运行中
- """
- cancel_event = self._cancel_events.get(trace_id)
- if cancel_event is None:
- return False
- cancel_event.set()
- return True
- # ===== 单次调用(保留)=====
- async def call(
- self,
- messages: List[Dict],
- model: str = "gpt-4o",
- tools: Optional[List[str]] = None,
- uid: Optional[str] = None,
- trace: bool = True,
- **kwargs
- ) -> CallResult:
- """
- 单次 LLM 调用(无 Agent Loop)
- """
- if not self.llm_call:
- raise ValueError("llm_call function not provided")
- trace_id = None
- message_id = None
- tool_schemas = self._get_tool_schemas(tools)
- if trace and self.trace_store:
- trace_obj = Trace.create(mode="call", uid=uid, model=model, tools=tool_schemas, llm_params=kwargs)
- trace_id = await self.trace_store.create_trace(trace_obj)
- result = await self.llm_call(messages=messages, model=model, tools=tool_schemas, **kwargs)
- if trace and self.trace_store and trace_id:
- msg = Message.create(
- trace_id=trace_id, role="assistant", sequence=1, goal_id=None,
- content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
- prompt_tokens=result.get("prompt_tokens", 0),
- completion_tokens=result.get("completion_tokens", 0),
- finish_reason=result.get("finish_reason"),
- cost=result.get("cost", 0),
- )
- message_id = await self.trace_store.add_message(msg)
- await self.trace_store.update_trace(trace_id, status="completed", completed_at=datetime.now())
- return CallResult(
- reply=result.get("content", ""),
- tool_calls=result.get("tool_calls"),
- trace_id=trace_id,
- step_id=message_id,
- tokens={"prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0)},
- cost=result.get("cost", 0)
- )
- # ===== Phase 1: PREPARE TRACE =====
- async def _prepare_trace(
- self,
- messages: List[Dict],
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """
- 准备 Trace:创建新的或加载已有的
- Returns:
- (trace, goal_tree, next_sequence)
- """
- if config.trace_id:
- return await self._prepare_existing_trace(config)
- else:
- return await self._prepare_new_trace(messages, config)
- async def _prepare_new_trace(
- self,
- messages: List[Dict],
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """创建新 Trace"""
- trace_id = str(uuid.uuid4())
- # 生成任务名称
- task_name = config.name or await self._generate_task_name(messages)
- # 准备工具 Schema
- tool_schemas = self._get_tool_schemas(config.tools)
- trace_obj = Trace(
- trace_id=trace_id,
- mode="agent",
- task=task_name,
- agent_type=config.agent_type,
- parent_trace_id=config.parent_trace_id,
- parent_goal_id=config.parent_goal_id,
- uid=config.uid,
- model=config.model,
- tools=tool_schemas,
- llm_params={"temperature": config.temperature, **config.extra_llm_params},
- context=config.context,
- status="running",
- )
- goal_tree = self.goal_tree or GoalTree(mission=task_name)
- if self.trace_store:
- await self.trace_store.create_trace(trace_obj)
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- return trace_obj, goal_tree, 1
- async def _prepare_existing_trace(
- self,
- config: RunConfig,
- ) -> Tuple[Trace, Optional[GoalTree], int]:
- """加载已有 Trace(续跑或回溯)"""
- if not self.trace_store:
- raise ValueError("trace_store required for continue/rewind")
- trace_obj = await self.trace_store.get_trace(config.trace_id)
- if not trace_obj:
- raise ValueError(f"Trace not found: {config.trace_id}")
- goal_tree = await self.trace_store.get_goal_tree(config.trace_id)
- if goal_tree is None:
- # 防御性兜底:trace 存在但 goal.json 丢失时,创建空树
- goal_tree = GoalTree(mission=trace_obj.task or "Agent task")
- await self.trace_store.update_goal_tree(config.trace_id, goal_tree)
- # 自动判断行为:after_sequence 为 None 或 == head → 续跑;< head → 回溯
- after_seq = config.after_sequence
- # 如果 after_seq > head_sequence,说明 generator 被强制关闭时 store 的
- # head_sequence 未来得及更新(仍停在 Phase 2 写入的初始值)。
- # 用 last_sequence 修正 head_sequence,确保续跑时能看到完整历史。
- if after_seq is not None and after_seq > trace_obj.head_sequence:
- trace_obj.head_sequence = trace_obj.last_sequence
- await self.trace_store.update_trace(
- config.trace_id, head_sequence=trace_obj.head_sequence
- )
- if after_seq is not None and after_seq < trace_obj.head_sequence:
- # 回溯模式
- sequence = await self._rewind(config.trace_id, after_seq, goal_tree)
- else:
- # 续跑模式:从 last_sequence + 1 开始
- sequence = trace_obj.last_sequence + 1
- # 状态置为 running
- await self.trace_store.update_trace(
- config.trace_id,
- status="running",
- completed_at=None,
- )
- trace_obj.status = "running"
- # 广播状态变化给前端
- try:
- from agent.trace.websocket import broadcast_trace_status_changed
- await broadcast_trace_status_changed(config.trace_id, "running")
- except Exception:
- pass
- return trace_obj, goal_tree, sequence
- # ===== Phase 2: BUILD HISTORY =====
-
- async def _build_history(
- self,
- trace_id: str,
- new_messages: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- side_branch_ctx: Optional[SideBranchContext] = None,
- ) -> Tuple[List[Dict], int, List[Message], int]:
- """
- 构建完整的 LLM 消息历史
- 1. 从 head_sequence 沿 parent chain 加载主路径消息(续跑/回溯场景)
- 2. 构建 system prompt(新建时注入 skills)
- 3. 新建时:在第一条 user message 末尾注入当前经验
- 4. 追加 input messages(设置 parent_sequence 链接到当前 head)
- 5. 如果在侧分支中,追加的消息自动标记为侧分支消息
- Returns:
- (history, next_sequence, created_messages, head_sequence)
- created_messages: 本次新创建并持久化的 Message 列表,供 run() yield 给调用方
- head_sequence: 当前主路径头节点的 sequence
- """
- history: List[Dict] = []
- created_messages: List[Message] = []
- head_seq: Optional[int] = None # 当前主路径的头节点 sequence
- # 1. 加载已有 messages(通过主路径遍历)
- if config.trace_id and self.trace_store:
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj and trace_obj.head_sequence > 0:
- main_path = await self.trace_store.get_main_path_messages(
- trace_id, trace_obj.head_sequence
- )
- # 修复 orphaned tool_calls(中断导致的 tool_call 无 tool_result)
- main_path, sequence = await self._heal_orphaned_tool_calls(
- main_path, trace_id, goal_tree, sequence,
- )
- history = [msg.to_llm_dict() for msg in main_path]
- if main_path:
- head_seq = main_path[-1].sequence
- # 2. 构建/注入 skills 到 system prompt
- has_system = any(m.get("role") == "system" for m in history)
- has_system_in_new = any(m.get("role") == "system" for m in new_messages)
- if not has_system:
- if has_system_in_new:
- # 入参消息已含 system,将 skills 注入其中(在 step 4 持久化之前)
- augmented = []
- for msg in new_messages:
- if msg.get("role") == "system":
- base = msg.get("content") or ""
- enriched = await self._build_system_prompt(config, base_prompt=base)
- augmented.append({**msg, "content": enriched or base})
- else:
- augmented.append(msg)
- new_messages = augmented
- else:
- # 没有 system,自动构建并插入历史
- system_prompt = await self._build_system_prompt(config)
- if system_prompt:
- history = [{"role": "system", "content": system_prompt}] + history
- if self.trace_store:
- system_msg = Message.create(
- trace_id=trace_id, role="system", sequence=sequence,
- goal_id=None, content=system_prompt,
- parent_sequence=None, # system message 是 root
- )
- await self.trace_store.add_message(system_msg)
- created_messages.append(system_msg)
- head_seq = sequence
- sequence += 1
- # 3. 追加新 messages(设置 parent_sequence 链接到当前 head)
- for msg_dict in new_messages:
- history.append(msg_dict)
- if self.trace_store:
- # 如果在侧分支中,标记为侧分支消息
- if side_branch_ctx:
- stored_msg = Message.create(
- trace_id=trace_id,
- role=msg_dict["role"],
- sequence=sequence,
- goal_id=goal_tree.current_id if goal_tree else None,
- parent_sequence=head_seq,
- branch_type=side_branch_ctx.type,
- branch_id=side_branch_ctx.branch_id,
- content=msg_dict.get("content"),
- )
- logger.info(f"用户在侧分支 {side_branch_ctx.type} 中追加消息")
- else:
- stored_msg = Message.from_llm_dict(
- msg_dict, trace_id=trace_id, sequence=sequence,
- goal_id=None, parent_sequence=head_seq,
- )
- await self.trace_store.add_message(stored_msg)
- created_messages.append(stored_msg)
- head_seq = sequence
- sequence += 1
- # 5. 更新 trace 的 head_sequence
- if self.trace_store and head_seq is not None:
- await self.trace_store.update_trace(trace_id, head_sequence=head_seq)
- return history, sequence, created_messages, head_seq or 0
- # ===== Phase 3: AGENT LOOP =====
- async def _manage_context_usage(
- self,
- trace_id: str,
- history: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- head_seq: int,
- ) -> Tuple[List[Dict], int, int, bool]:
- """
- 管理 context 用量:检查、预警、压缩
- Returns:
- (updated_history, new_head_seq, next_sequence, needs_enter_compression_branch)
- """
- compression_config = CompressionConfig()
- token_count = estimate_tokens(history)
- max_tokens = compression_config.get_max_tokens(config.model)
- # 计算使用率
- progress_pct = (token_count / max_tokens * 100) if max_tokens > 0 else 0
- msg_count = len(history)
- img_count = sum(
- 1 for msg in history
- if isinstance(msg.get("content"), list)
- for part in msg["content"]
- if isinstance(part, dict) and part.get("type") in ("image", "image_url")
- )
- # 更新 context usage 快照
- self._context_usage[trace_id] = ContextUsage(
- trace_id=trace_id,
- message_count=msg_count,
- token_count=token_count,
- max_tokens=max_tokens,
- usage_percent=progress_pct,
- image_count=img_count,
- )
- # 阈值警告(30%, 50%, 80%)
- if trace_id not in self._context_warned:
- self._context_warned[trace_id] = set()
- for threshold in [30, 50, 80]:
- if progress_pct >= threshold and threshold not in self._context_warned[trace_id]:
- self._context_warned[trace_id].add(threshold)
- logger.warning(
- f"Context 使用率达到 {threshold}%: {token_count:,} / {max_tokens:,} tokens ({msg_count} 条消息)"
- )
- # 检查是否需要压缩(token 或消息数量超限)
- needs_compression_by_tokens = token_count > max_tokens
- needs_compression_by_count = (
- compression_config.max_messages > 0 and
- msg_count > compression_config.max_messages
- )
- needs_compression = needs_compression_by_tokens or needs_compression_by_count
- if not needs_compression:
- return history, head_seq, sequence, False
- # 知识提取:在任何压缩发生前,用完整 history 做反思(进入反思侧分支)
- if config.knowledge.enable_extraction and not config.force_side_branch:
- # 设置侧分支队列:先反思,再压缩
- config.force_side_branch = ["reflection", "compression"]
- return history, head_seq, sequence, True
- # Level 1 压缩:GoalTree 过滤
- if self.trace_store and goal_tree:
- if head_seq > 0:
- main_path_msgs = await self.trace_store.get_main_path_messages(
- trace_id, head_seq
- )
- filtered_msgs = filter_by_goal_status(main_path_msgs, goal_tree)
- if len(filtered_msgs) < len(main_path_msgs):
- logger.info(
- "Level 1 压缩: %d -> %d 条消息",
- len(main_path_msgs), len(filtered_msgs),
- )
- history = [msg.to_llm_dict() for msg in filtered_msgs]
- else:
- logger.info(
- "Level 1 压缩: 无可过滤消息 (%d 条全部保留)",
- len(main_path_msgs),
- )
- elif needs_compression:
- logger.warning(
- "消息数 (%d) 或 token 数 (%d) 超过阈值,但无法执行 Level 1 压缩(缺少 store 或 goal_tree)",
- msg_count, token_count,
- )
- # Level 2 压缩:检查 Level 1 后是否仍超阈值
- token_count_after = estimate_tokens(history)
- msg_count_after = len(history)
- needs_level2_by_tokens = token_count_after > max_tokens
- needs_level2_by_count = (
- compression_config.max_messages > 0 and
- msg_count_after > compression_config.max_messages
- )
- needs_level2 = needs_level2_by_tokens or needs_level2_by_count
- if needs_level2:
- logger.info(
- "Level 1 后仍超阈值 (消息数=%d/%d, token=%d/%d),需要进入压缩侧分支",
- msg_count_after, compression_config.max_messages, token_count_after, max_tokens,
- )
- # 如果还没有设置侧分支(说明没有启用知识提取),直接进入压缩
- if not config.force_side_branch:
- config.force_side_branch = ["compression"]
- # 返回标志,让主循环进入侧分支
- return history, head_seq, sequence, True
- # 压缩完成后,输出最终发给模型的消息列表
- logger.info("Level 1 压缩完成,发送给模型的消息列表:")
- for idx, msg in enumerate(history):
- role = msg.get("role", "unknown")
- content = msg.get("content", "")
- if isinstance(content, str):
- preview = content[:100] + ("..." if len(content) > 100 else "")
- elif isinstance(content, list):
- preview = f"[{len(content)} blocks]"
- else:
- preview = str(content)[:100]
- logger.info(f" [{idx}] {role}: {preview}")
- return history, head_seq, sequence, False
- async def _single_turn_compress(
- self,
- trace_id: str,
- history: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- start_head_seq: int,
- ) -> Tuple[List[Dict], int, int]:
- """单次 LLM 调用压缩(fallback 方案)"""
- logger.info("执行单次 LLM 压缩(fallback)")
- # 构建压缩 prompt
- compress_prompt = build_compression_prompt(goal_tree)
- compress_messages = list(history) + [
- {"role": "user", "content": compress_prompt}
- ]
- # 应用 Prompt Caching
- compress_messages = self._add_cache_control(
- compress_messages, config.model, config.enable_prompt_caching
- )
- # 单次 LLM 调用(无工具)
- result = await self.llm_call(
- messages=compress_messages,
- model=config.model,
- tools=[], # 不提供工具
- temperature=config.temperature,
- **config.extra_llm_params,
- )
- summary_text = result.get("content", "").strip()
- # 提取 [[SUMMARY]] 块
- if "[[SUMMARY]]" in summary_text:
- summary_text = summary_text[
- summary_text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):
- ].strip()
- if not summary_text:
- logger.warning("单次压缩未返回有效内容,跳过压缩")
- return history, start_head_seq, sequence
- # 创建 summary 消息
- from agent.core.prompts import build_summary_header
- summary_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- parent_sequence=start_head_seq,
- branch_type=None, # 主路径
- content=build_summary_header(summary_text),
- )
- if self.trace_store:
- await self.trace_store.add_message(summary_msg)
- # 重建 history
- system_msg = history[0] if history and history[0].get("role") == "system" else None
- new_history = [system_msg, summary_msg.to_llm_dict()] if system_msg else [summary_msg.to_llm_dict()]
- new_head_seq = sequence
- sequence += 1
- logger.info(f"单次压缩完成: {len(history)} → {len(new_history)} 条消息")
- return new_history, new_head_seq, sequence
- async def _agent_loop(
- self,
- trace: Trace,
- history: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- ) -> AsyncIterator[Union[Trace, Message]]:
- """ReAct 循环"""
- trace_id = trace.trace_id
- tool_schemas = self._get_tool_schemas(config.tools)
- # 当前主路径头节点的 sequence(用于设置 parent_sequence)
- head_seq = trace.head_sequence
- # 侧分支状态(None = 主路径)
- side_branch_ctx: Optional[SideBranchContext] = None
- # 检查是否有未完成的侧分支需要恢复
- if trace.context.get("active_side_branch"):
- side_branch_data = trace.context["active_side_branch"]
- branch_id = side_branch_data["branch_id"]
- # 从数据库查询侧分支消息
- if self.trace_store:
- all_messages = await self.trace_store.get_trace_messages(trace_id)
- side_messages = [
- m for m in all_messages
- if m.branch_id == branch_id
- ]
- # 恢复侧分支上下文
- side_branch_ctx = SideBranchContext(
- type=side_branch_data["type"],
- branch_id=branch_id,
- start_head_seq=side_branch_data["start_head_seq"],
- start_sequence=side_branch_data["start_sequence"],
- start_history_length=0, # 稍后重新计算
- start_iteration=side_branch_data.get("start_iteration", 0),
- max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
- )
- logger.info(
- f"恢复未完成的侧分支: {side_branch_ctx.type}, "
- f"max_turns={side_branch_ctx.max_turns}"
- )
- # 将侧分支消息追加到 history
- for m in side_messages:
- history.append(m.to_llm_dict())
- # 重新计算 start_history_length
- side_branch_ctx.start_history_length = len(history) - len(side_messages)
- for iteration in range(config.max_iterations):
- # 更新活动时间(表明trace正在活跃运行)
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- last_activity_at=datetime.now()
- )
- # 检查取消信号
- cancel_event = self._cancel_events.get(trace_id)
- if cancel_event and cancel_event.is_set():
- logger.info(f"Trace {trace_id} stopped by user")
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="stopped",
- head_sequence=head_seq,
- completed_at=datetime.now(),
- )
- # 广播状态变化给前端
- try:
- from agent.trace.websocket import broadcast_trace_status_changed
- await broadcast_trace_status_changed(trace_id, "stopped")
- except Exception:
- pass
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- return
- # Context 管理(仅主路径)
- needs_enter_side_branch = False
- if not side_branch_ctx:
- # 检查是否强制进入侧分支(API 手动触发)
- if config.force_side_branch:
- needs_enter_side_branch = True
- logger.info(f"强制进入侧分支: {config.force_side_branch}")
- else:
- # 正常的 context 管理逻辑
- history, head_seq, sequence, needs_enter_side_branch = await self._manage_context_usage(
- trace_id, history, goal_tree, config, sequence, head_seq
- )
- # 进入侧分支
- if needs_enter_side_branch and not side_branch_ctx:
- # 从队列中取出第一个侧分支类型
- if config.force_side_branch and isinstance(config.force_side_branch, list) and len(config.force_side_branch) > 0:
- branch_type = config.force_side_branch.pop(0)
- logger.info(f"从队列取出侧分支: {branch_type}, 剩余队列: {config.force_side_branch}")
- elif config.knowledge.enable_extraction:
- # 兼容旧的单值模式(如果 force_side_branch 是字符串)
- branch_type = "reflection"
- else:
- # 自动触发:压缩
- branch_type = "compression"
- branch_id = f"{branch_type}_{uuid.uuid4().hex[:8]}"
- side_branch_ctx = SideBranchContext(
- type=branch_type,
- branch_id=branch_id,
- start_head_seq=head_seq,
- start_sequence=sequence,
- start_history_length=len(history),
- start_iteration=iteration,
- max_turns=config.side_branch_max_turns,
- )
- # 持久化侧分支状态
- if self.trace_store:
- trace.context["active_side_branch"] = {
- "type": side_branch_ctx.type,
- "branch_id": side_branch_ctx.branch_id,
- "start_head_seq": side_branch_ctx.start_head_seq,
- "start_sequence": side_branch_ctx.start_sequence,
- "start_iteration": side_branch_ctx.start_iteration,
- "max_turns": side_branch_ctx.max_turns,
- "started_at": datetime.now().isoformat(),
- }
- await self.trace_store.update_trace(
- trace_id,
- context=trace.context
- )
- # 追加侧分支 prompt
- if branch_type == "reflection":
- prompt = config.knowledge.get_reflect_prompt()
- else: # compression
- from agent.trace.compaction import build_compression_prompt
- prompt = build_compression_prompt(goal_tree)
- branch_user_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- parent_sequence=head_seq,
- goal_id=goal_tree.current_id if goal_tree else None,
- branch_type=branch_type,
- branch_id=branch_id,
- content=prompt,
- )
- if self.trace_store:
- await self.trace_store.add_message(branch_user_msg)
- history.append(branch_user_msg.to_llm_dict())
- head_seq = sequence
- sequence += 1
- logger.info(f"进入侧分支: {branch_type}, branch_id={branch_id}")
- continue # 跳过本轮,下一轮开始侧分支
- # 构建 LLM messages(注入上下文)
- llm_messages = list(history)
- # 对历史消息应用 Prompt Caching
- llm_messages = self._add_cache_control(
- llm_messages,
- config.model,
- config.enable_prompt_caching
- )
- # 调用 LLM(等待完成后再检查 cancel 信号,不中断正在进行的调用)
- result = await self.llm_call(
- messages=llm_messages,
- model=config.model,
- tools=tool_schemas,
- temperature=config.temperature,
- **config.extra_llm_params,
- )
- response_content = result.get("content", "")
- tool_calls = result.get("tool_calls")
- finish_reason = result.get("finish_reason")
- prompt_tokens = result.get("prompt_tokens", 0)
- completion_tokens = result.get("completion_tokens", 0)
- step_cost = result.get("cost", 0)
- cache_creation_tokens = result.get("cache_creation_tokens")
- cache_read_tokens = result.get("cache_read_tokens")
- # 周期性自动注入上下文(仅主路径)
- if not side_branch_ctx and iteration % CONTEXT_INJECTION_INTERVAL == 0:
- # 检查是否已经调用了 get_current_context
- if tool_calls:
- has_context_call = any(
- tc.get("function", {}).get("name") == "get_current_context"
- for tc in tool_calls
- )
- else:
- has_context_call = False
- tool_calls = []
- if not has_context_call:
- # 手动添加 get_current_context 工具调用
- import uuid
- context_call_id = f"call_context_{uuid.uuid4().hex[:8]}"
- tool_calls.append({
- "id": context_call_id,
- "type": "function",
- "function": {"name": "get_current_context", "arguments": "{}"}
- })
- logger.info(f"[周期性注入] 自动添加 get_current_context 工具调用 (iteration={iteration})")
- # 按需自动创建 root goal(仅主路径)
- if not side_branch_ctx and goal_tree and not goal_tree.goals and tool_calls:
- has_goal_call = any(
- tc.get("function", {}).get("name") == "goal"
- for tc in tool_calls
- )
- logger.debug(f"[Auto Root Goal] Before tool execution: goal_tree.goals={len(goal_tree.goals)}, has_goal_call={has_goal_call}, tool_calls={[tc.get('function', {}).get('name') for tc in tool_calls]}")
- if not has_goal_call:
- mission = goal_tree.mission
- root_desc = mission[:200] if len(mission) > 200 else mission
- goal_tree.add_goals(
- descriptions=[root_desc],
- reasons=["系统自动创建:Agent 未显式创建目标"],
- parent_id=None
- )
- if self.trace_store:
- await self.trace_store.add_goal(trace_id, goal_tree.goals[0])
- await self.trace_store.update_goal_tree(trace_id, goal_tree)
- logger.info(f"自动创建 root goal: {goal_tree.goals[0].id}(未自动 focus,等待模型决定)")
- else:
- logger.debug(f"[Auto Root Goal] 检测到 goal 工具调用,跳过自动创建")
- # 获取当前 goal_id
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- # 记录 assistant Message(parent_sequence 指向当前 head)
- assistant_msg = Message.create(
- trace_id=trace_id,
- role="assistant",
- sequence=sequence,
- goal_id=current_goal_id,
- parent_sequence=head_seq if head_seq > 0 else None,
- branch_type=side_branch_ctx.type if side_branch_ctx else None,
- branch_id=side_branch_ctx.branch_id if side_branch_ctx else None,
- content={"text": response_content, "tool_calls": tool_calls},
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- cache_creation_tokens=cache_creation_tokens,
- cache_read_tokens=cache_read_tokens,
- finish_reason=finish_reason,
- cost=step_cost,
- )
- if self.trace_store:
- await self.trace_store.add_message(assistant_msg)
- # 记录模型使用
- await self.trace_store.record_model_usage(
- trace_id=trace_id,
- sequence=sequence,
- role="assistant",
- model=config.model,
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- cache_read_tokens=cache_read_tokens or 0,
- )
- # 如果在侧分支,记录到 assistant_msg(已持久化,不需要额外维护)
- yield assistant_msg
- head_seq = sequence
- sequence += 1
- # 检查侧分支是否应该退出
- if side_branch_ctx:
- # 计算侧分支已执行的轮次
- turns_in_branch = iteration - side_branch_ctx.start_iteration
- # 检查是否达到最大轮次
- if turns_in_branch >= side_branch_ctx.max_turns:
- logger.warning(
- f"侧分支 {side_branch_ctx.type} 达到最大轮次 "
- f"{side_branch_ctx.max_turns},强制退出"
- )
- if side_branch_ctx.type == "compression":
- # 压缩侧分支:fallback 到单次 LLM 调用
- logger.info("Fallback 到单次 LLM 压缩")
- # 清除侧分支状态
- trace.context.pop("active_side_branch", None)
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id, context=trace.context
- )
- # 恢复到侧分支开始前的 history
- if self.trace_store:
- main_path_messages = await self.trace_store.get_main_path_messages(
- trace_id, side_branch_ctx.start_head_seq
- )
- history = [m.to_llm_dict() for m in main_path_messages]
- # 执行单次 LLM 压缩
- history, head_seq, sequence = await self._single_turn_compress(
- trace_id, history, goal_tree, config, sequence,
- side_branch_ctx.start_head_seq
- )
- # 清除强制侧分支配置
- config.force_side_branch = None
- side_branch_ctx = None
- continue
- elif side_branch_ctx.type == "reflection":
- # 反思侧分支:直接退出,不管结果
- logger.info("反思侧分支超时,直接退出")
- # 清除侧分支状态
- trace.context.pop("active_side_branch", None)
- # 队列中如果还有侧分支,保持 force_side_branch;否则清空
- if not config.force_side_branch or len(config.force_side_branch) == 0:
- config.force_side_branch = None
- logger.info("反思超时,队列为空")
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id, context=trace.context
- )
- # 恢复到侧分支开始前的 history
- if self.trace_store:
- main_path_messages = await self.trace_store.get_main_path_messages(
- trace_id, side_branch_ctx.start_head_seq
- )
- history = [m.to_llm_dict() for m in main_path_messages]
- head_seq = side_branch_ctx.start_head_seq
- # 清除强制侧分支配置
- config.force_side_branch = None
- side_branch_ctx = None
- continue
- # 检查是否无工具调用(侧分支完成)
- if not tool_calls:
- logger.info(f"侧分支 {side_branch_ctx.type} 完成(无工具调用)")
- # 提取结果
- if side_branch_ctx.type == "compression":
- # 从数据库查询侧分支消息并提取 summary
- summary_text = ""
- if self.trace_store:
- all_messages = await self.trace_store.get_trace_messages(trace_id)
- side_messages = [
- m for m in all_messages
- if m.branch_id == side_branch_ctx.branch_id
- ]
- for msg in side_messages:
- if msg.role == "assistant" and isinstance(msg.content, dict):
- text = msg.content.get("text", "")
- if "[[SUMMARY]]" in text:
- summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
- break
- elif text:
- summary_text = text
- if not summary_text:
- logger.warning("侧分支未生成有效 summary,使用默认")
- summary_text = "压缩完成"
- # 创建主路径的 summary 消息(末尾追加详细 GoalTree)
- from agent.core.prompts import build_summary_header
- summary_content = build_summary_header(summary_text)
- # 追加详细 GoalTree(压缩后立即注入)
- if goal_tree and goal_tree.goals:
- goal_tree_detail = goal_tree.to_prompt(include_summary=True)
- summary_content += f"\n\n## Current Plan\n\n{goal_tree_detail}"
- summary_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- parent_sequence=side_branch_ctx.start_head_seq,
- branch_type=None, # 回到主路径
- content=summary_content,
- )
- if self.trace_store:
- await self.trace_store.add_message(summary_msg)
- # 重建 history
- if self.trace_store:
- main_path_messages = await self.trace_store.get_main_path_messages(
- trace_id, side_branch_ctx.start_head_seq
- )
- history = [m.to_llm_dict() for m in main_path_messages]
- history.append(summary_msg.to_llm_dict())
- head_seq = sequence
- sequence += 1
- logger.info(f"压缩侧分支完成,history 长度: {len(history)}")
- # 清除侧分支队列
- config.force_side_branch = None
- elif side_branch_ctx.type == "reflection":
- # 反思侧分支:直接恢复主路径
- logger.info("反思侧分支完成")
- if self.trace_store:
- main_path_messages = await self.trace_store.get_main_path_messages(
- trace_id, side_branch_ctx.start_head_seq
- )
- history = [m.to_llm_dict() for m in main_path_messages]
- head_seq = side_branch_ctx.start_head_seq
- # 队列中如果还有侧分支,保持 force_side_branch;否则清空
- if not config.force_side_branch or len(config.force_side_branch) == 0:
- config.force_side_branch = None
- logger.info("反思完成,队列为空")
- # 清除侧分支状态
- trace.context.pop("active_side_branch", None)
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- context=trace.context,
- head_sequence=head_seq,
- )
- # 注意:不在这里清除 force_side_branch,因为反思侧分支可能已经设置了下一个侧分支
- # force_side_branch 的清除由各个分支类型自己处理
- side_branch_ctx = None
- continue
- # 处理工具调用
- # 截断兜底:finish_reason == "length" 说明响应被 max_tokens 截断,
- # tool call 参数很可能不完整,不应执行,改为提示模型分批操作
- if tool_calls and finish_reason == "length":
- logger.warning(
- "[Runner] 响应被 max_tokens 截断,跳过 %d 个不完整的 tool calls",
- len(tool_calls),
- )
- truncation_hint = TRUNCATION_HINT
- history.append({
- "role": "assistant",
- "content": response_content,
- "tool_calls": tool_calls,
- })
- # 为每个被截断的 tool call 返回错误结果
- for tc in tool_calls:
- history.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "content": truncation_hint,
- })
- continue
- if tool_calls and config.auto_execute_tools:
- history.append({
- "role": "assistant",
- "content": response_content,
- "tool_calls": tool_calls,
- })
- for tc in tool_calls:
- current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
- tool_name = tc["function"]["name"]
- tool_args = tc["function"]["arguments"]
- if isinstance(tool_args, str):
- tool_args = json.loads(tool_args) if tool_args.strip() else {}
- elif tool_args is None:
- tool_args = {}
- # 记录工具调用(INFO 级别,显示参数)
- args_str = json.dumps(tool_args, ensure_ascii=False)
- args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
- logger.info(f"[Tool Call] {tool_name}({args_display})")
- tool_result = await self.tools.execute(
- tool_name,
- tool_args,
- uid=config.uid or "",
- context={
- "store": self.trace_store,
- "trace_id": trace_id,
- "goal_id": current_goal_id,
- "runner": self,
- "goal_tree": goal_tree,
- "knowledge_config": config.knowledge,
- # 新增:侧分支信息
- "side_branch": {
- "type": side_branch_ctx.type,
- "branch_id": side_branch_ctx.branch_id,
- "is_side_branch": True,
- "max_turns": side_branch_ctx.max_turns,
- } if side_branch_ctx else None,
- },
- )
- # 如果是 goal 工具,记录执行后的状态
- if tool_name == "goal" and goal_tree:
- logger.debug(f"[Goal Tool] After execution: goal_tree.goals={len(goal_tree.goals)}, current_id={goal_tree.current_id}")
- # 跟踪保存的知识 ID
- if tool_name == "knowledge_save" and isinstance(tool_result, dict):
- metadata = tool_result.get("metadata", {})
- knowledge_id = metadata.get("knowledge_id")
- if knowledge_id:
- if trace_id not in self._saved_knowledge_ids:
- self._saved_knowledge_ids[trace_id] = []
- self._saved_knowledge_ids[trace_id].append(knowledge_id)
- logger.info(f"[Knowledge Tracking] 记录保存的知识 ID: {knowledge_id}")
- # --- 支持多模态工具反馈 ---
- # execute() 返回 dict{"text","images","tool_usage"} 或 str
- # 统一为dict格式
- if isinstance(tool_result, str):
- tool_result = {"text": tool_result}
- tool_text = tool_result.get("text", str(tool_result))
- tool_images = tool_result.get("images", [])
- tool_usage = tool_result.get("tool_usage") # 新增:提取tool_usage
- # 处理多模态消息
- if tool_images:
- tool_result_text = tool_text
- # 构建多模态消息格式
- tool_content_for_llm = [{"type": "text", "text": tool_text}]
- for img in tool_images:
- if img.get("type") == "base64" and img.get("data"):
- media_type = img.get("media_type", "image/png")
- tool_content_for_llm.append({
- "type": "image_url",
- "image_url": {
- "url": f"data:{media_type};base64,{img['data']}"
- }
- })
- img_count = len(tool_content_for_llm) - 1 # 减去 text 块
- print(f"[Runner] 多模态工具反馈: tool={tool_name}, images={img_count}, text_len={len(tool_result_text)}")
- else:
- tool_result_text = tool_text
- tool_content_for_llm = tool_text
- tool_msg = Message.create(
- trace_id=trace_id,
- role="tool",
- sequence=sequence,
- goal_id=current_goal_id,
- parent_sequence=head_seq,
- tool_call_id=tc["id"],
- branch_type=side_branch_ctx.type if side_branch_ctx else None,
- branch_id=side_branch_ctx.branch_id if side_branch_ctx else None,
- # 存储完整内容:有图片时保留 list(含 image_url),纯文本时存字符串
- content={"tool_name": tool_name, "result": tool_content_for_llm},
- )
- if self.trace_store:
- await self.trace_store.add_message(tool_msg)
- # 记录工具的模型使用
- if tool_usage:
- await self.trace_store.record_model_usage(
- trace_id=trace_id,
- sequence=sequence,
- role="tool",
- tool_name=tool_name,
- model=tool_usage.get("model"),
- prompt_tokens=tool_usage.get("prompt_tokens", 0),
- completion_tokens=tool_usage.get("completion_tokens", 0),
- cache_read_tokens=tool_usage.get("cache_read_tokens", 0),
- )
- # 截图单独存为同名 PNG 文件
- if tool_images:
- import base64 as b64mod
- for img in tool_images:
- if img.get("data"):
- png_path = self.trace_store._get_messages_dir(trace_id) / f"{tool_msg.message_id}.png"
- png_path.write_bytes(b64mod.b64decode(img["data"]))
- print(f"[Runner] 截图已保存: {png_path.name}")
- break # 只存第一张
- # 如果在侧分支,tool_msg 已持久化(不需要额外维护)
- yield tool_msg
- head_seq = sequence
- sequence += 1
- history.append({
- "role": "tool",
- "tool_call_id": tc["id"],
- "name": tool_name,
- "content": tool_content_for_llm,
- })
- continue # 继续循环
- # 无工具调用,任务完成
- break
- # 任务完成后复盘提取
- if config.knowledge.enable_completion_extraction:
- await self._extract_knowledge_on_completion(trace_id, history, config)
- # 清理 trace 相关的跟踪数据
- self._context_warned.pop(trace_id, None)
- self._context_usage.pop(trace_id, None)
- self._saved_knowledge_ids.pop(trace_id, None)
- # 更新 head_sequence 并完成 Trace
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- status="completed",
- head_sequence=head_seq,
- completed_at=datetime.now(),
- )
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj:
- yield trace_obj
- # ===== Level 2: LLM 压缩 =====
- async def _compress_history(
- self,
- trace_id: str,
- history: List[Dict],
- goal_tree: Optional[GoalTree],
- config: RunConfig,
- sequence: int,
- head_seq: int,
- ) -> Tuple[List[Dict], int, int]:
- """
- Level 2 压缩:LLM 总结
- Step 1: 压缩总结 — LLM 生成 summary
- Step 2: 存储 summary 为新消息,parent_sequence 跳到 system msg
- Step 3: 重建 history
- Returns:
- (new_history, new_head_seq, next_sequence)
- """
- logger.info("Level 2 压缩开始: trace=%s, 当前 history 长度=%d", trace_id, len(history))
- # 找到 system message 的 sequence(主路径第一条消息)
- system_msg_seq = None
- system_msg_dict = None
- if self.trace_store:
- trace_obj = await self.trace_store.get_trace(trace_id)
- if trace_obj and trace_obj.head_sequence > 0:
- main_path = await self.trace_store.get_main_path_messages(
- trace_id, trace_obj.head_sequence
- )
- for msg in main_path:
- if msg.role == "system":
- system_msg_seq = msg.sequence
- system_msg_dict = msg.to_llm_dict()
- break
- # Fallback: 从 history 中找 system message
- if system_msg_dict is None:
- for msg_dict in history:
- if msg_dict.get("role") == "system":
- system_msg_dict = msg_dict
- break
- if system_msg_dict is None:
- logger.warning("Level 2 压缩跳过:未找到 system message")
- return history, head_seq, sequence
- # --- Step 1: 经验提取(reflect)---
- try:
- from agent.tools.builtin.knowledge import generate_and_save_reflection
- await generate_and_save_reflection(
- trace_id=trace_id,
- messages=history,
- llm_call_fn=self.llm_call,
- model=config.model
- )
- except Exception as e:
- logger.error(f"Level 2 经验提取失败: {e}")
- # --- Step 2: 压缩总结 + 经验评估 ---
- compress_prompt = build_compression_prompt(goal_tree)
- compress_messages = list(history) + [{"role": "user", "content": compress_prompt}]
- # 应用 Prompt Caching
- compress_messages = self._add_cache_control(
- compress_messages,
- config.model,
- config.enable_prompt_caching
- )
- compress_result = await self.llm_call(
- messages=compress_messages,
- model=config.model,
- tools=[],
- temperature=config.temperature,
- **config.extra_llm_params,
- )
- raw_output = compress_result.get("content", "").strip()
- if not raw_output:
- logger.warning("Level 2 压缩跳过:LLM 未返回内容")
- return history, head_seq, sequence
- # 提取 [[SUMMARY]] 块
- summary_text = raw_output
- if "[[SUMMARY]]" in raw_output:
- summary_text = raw_output[raw_output.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
- if not summary_text:
- logger.warning("Level 2 压缩跳过:LLM 未返回 summary")
- return history, head_seq, sequence
- # --- Step 3: 存储 summary 消息 ---
- summary_with_header = build_summary_header(summary_text)
- summary_msg = Message.create(
- trace_id=trace_id,
- role="user",
- sequence=sequence,
- goal_id=None,
- parent_sequence=system_msg_seq, # 跳到 system msg,跳过所有中间消息
- content=summary_with_header,
- )
- if self.trace_store:
- await self.trace_store.add_message(summary_msg)
- new_head_seq = sequence
- sequence += 1
- # --- Step 4: 重建 history ---
- new_history = [system_msg_dict, summary_msg.to_llm_dict()]
- # 更新 trace head_sequence
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- head_sequence=new_head_seq,
- )
- logger.info(
- "Level 2 压缩完成: 旧 history %d 条 → 新 history %d 条, summary 长度=%d",
- len(history), len(new_history), len(summary_text),
- )
- return new_history, new_head_seq, sequence
- async def _run_reflect(
- self,
- trace_id: str,
- history: List[Dict],
- config: RunConfig,
- reflect_prompt: str,
- source_name: str,
- ) -> None:
- """
- 执行反思提取:LLM 对历史消息进行反思,直接调用 knowledge_save 工具保存经验。
- Args:
- trace_id: Trace ID(作为知识的 message_id)
- history: 当前对话历史
- config: 运行配置
- reflect_prompt: 反思 prompt
- source_name: 来源名称(用于区分压缩时/完成时)
- """
- try:
- reflect_messages = list(history) + [{"role": "user", "content": reflect_prompt}]
- reflect_messages = self._add_cache_control(
- reflect_messages, config.model, config.enable_prompt_caching
- )
- # 只暴露 knowledge_save 工具,让 LLM 直接调用
- knowledge_save_schema = self._get_tool_schemas(["knowledge_save"])
- reflect_result = await self.llm_call(
- messages=reflect_messages,
- model=config.model,
- tools=knowledge_save_schema,
- temperature=0.2,
- **config.extra_llm_params,
- )
- tool_calls = reflect_result.get("tool_calls") or []
- if not tool_calls:
- logger.info("反思阶段无经验保存 (source=%s)", source_name)
- return
- saved_count = 0
- for tc in tool_calls:
- tool_name = tc.get("function", {}).get("name")
- if tool_name != "knowledge_save":
- continue
- tool_args = tc.get("function", {}).get("arguments") or {}
- if isinstance(tool_args, str):
- tool_args = json.loads(tool_args) if tool_args.strip() else {}
- # 注入来源信息(LLM 不需要填写这些字段)
- tool_args.setdefault("source_name", source_name)
- tool_args.setdefault("source_category", "exp")
- tool_args.setdefault("message_id", trace_id)
- try:
- await self.tools.execute(
- "knowledge_save",
- tool_args,
- uid=config.uid or "",
- context={
- "store": self.trace_store,
- "trace_id": trace_id,
- "knowledge_config": config.knowledge,
- },
- )
- saved_count += 1
- except Exception as e:
- logger.warning("保存经验失败: %s", e)
- logger.info("已提取并保存 %d 条经验 (source=%s)", saved_count, source_name)
- except Exception as e:
- logger.error("知识反思提取失败 (source=%s): %s", source_name, e)
- async def _extract_knowledge_on_completion(
- self,
- trace_id: str,
- history: List[Dict],
- config: RunConfig,
- ) -> None:
- """任务完成后执行全局复盘,提取经验保存到知识库。"""
- logger.info("任务完成后复盘提取: trace=%s", trace_id)
- await self._run_reflect(
- trace_id, history, config,
- reflect_prompt=config.knowledge.get_completion_reflect_prompt(),
- source_name="completion_reflection",
- )
- # ===== 回溯(Rewind)=====
- async def _rewind(
- self,
- trace_id: str,
- after_sequence: int,
- goal_tree: Optional[GoalTree],
- ) -> int:
- """
- 执行回溯:快照 GoalTree,重建干净树,设置 head_sequence
- 新消息的 parent_sequence 将指向 rewind 点,旧消息通过树结构自然脱离主路径。
- Returns:
- 下一个可用的 sequence 号
- """
- if not self.trace_store:
- raise ValueError("trace_store required for rewind")
- # 1. 加载所有 messages(用于 safe cutoff 和 max sequence)
- all_messages = await self.trace_store.get_trace_messages(trace_id)
- if not all_messages:
- return 1
- # 2. 找到安全截断点(确保不截断在 tool_call 和 tool response 之间)
- cutoff = self._find_safe_cutoff(all_messages, after_sequence)
- # 3. 快照并重建 GoalTree
- if goal_tree:
- # 获取截断点消息的 created_at 作为时间界限
- cutoff_msg = None
- for msg in all_messages:
- if msg.sequence == cutoff:
- cutoff_msg = msg
- break
- cutoff_time = cutoff_msg.created_at if cutoff_msg else datetime.now()
- # 快照到 events(含 head_sequence 供前端感知分支切换)
- await self.trace_store.append_event(trace_id, "rewind", {
- "after_sequence": cutoff,
- "head_sequence": cutoff,
- "goal_tree_snapshot": goal_tree.to_dict(),
- })
- # 按时间重建干净的 GoalTree
- new_tree = goal_tree.rebuild_for_rewind(cutoff_time)
- await self.trace_store.update_goal_tree(trace_id, new_tree)
- # 更新内存中的引用
- goal_tree.goals = new_tree.goals
- goal_tree.current_id = new_tree.current_id
- # 4. 更新 head_sequence 到 rewind 点
- await self.trace_store.update_trace(trace_id, head_sequence=cutoff)
- # 5. 返回 next sequence(全局递增,不复用)
- max_seq = max((m.sequence for m in all_messages), default=0)
- return max_seq + 1
- def _find_safe_cutoff(self, messages: List[Message], after_sequence: int) -> int:
- """
- 找到安全的截断点。
- 如果 after_sequence 指向一条带 tool_calls 的 assistant message,
- 则自动扩展到其所有对应的 tool response 之后。
- """
- cutoff = after_sequence
- # 找到 after_sequence 对应的 message
- target_msg = None
- for msg in messages:
- if msg.sequence == after_sequence:
- target_msg = msg
- break
- if not target_msg:
- return cutoff
- # 如果是 assistant 且有 tool_calls,找到所有对应的 tool responses
- if target_msg.role == "assistant":
- content = target_msg.content
- if isinstance(content, dict) and content.get("tool_calls"):
- tool_call_ids = set()
- for tc in content["tool_calls"]:
- if isinstance(tc, dict) and tc.get("id"):
- tool_call_ids.add(tc["id"])
- # 找到这些 tool_call 对应的 tool messages
- for msg in messages:
- if (msg.role == "tool" and msg.tool_call_id
- and msg.tool_call_id in tool_call_ids):
- cutoff = max(cutoff, msg.sequence)
- return cutoff
- async def _heal_orphaned_tool_calls(
- self,
- messages: List[Message],
- trace_id: str,
- goal_tree: Optional[GoalTree],
- sequence: int,
- ) -> tuple:
- """
- 检测并修复消息历史中的 orphaned tool_calls。
- 当 agent 被 stop/crash 中断时,可能有 assistant 的 tool_calls 没有对应的
- tool results(包括多 tool_call 部分完成的情况)。直接发给 LLM 会导致 400。
- 修复策略:为每个缺失的 tool_result 插入合成的"中断通知"消息,而非裁剪。
- - 普通工具:简短中断提示
- - agent/evaluate:包含 sub_trace_id、执行统计、continue_from 指引
- 合成消息持久化到 store,确保幂等(下次续跑不再触发)。
- Returns:
- (healed_messages, next_sequence)
- """
- if not messages:
- return messages, sequence
- # 收集所有 tool_call IDs → (assistant_msg, tool_call_dict)
- tc_map: Dict[str, tuple] = {}
- result_ids: set = set()
- for msg in messages:
- if msg.role == "assistant":
- content = msg.content
- if isinstance(content, dict) and content.get("tool_calls"):
- for tc in content["tool_calls"]:
- tc_id = tc.get("id")
- if tc_id:
- tc_map[tc_id] = (msg, tc)
- elif msg.role == "tool" and msg.tool_call_id:
- result_ids.add(msg.tool_call_id)
- orphaned_ids = [tc_id for tc_id in tc_map if tc_id not in result_ids]
- if not orphaned_ids:
- return messages, sequence
- logger.info(
- "检测到 %d 个 orphaned tool_calls,生成合成中断通知",
- len(orphaned_ids),
- )
- healed = list(messages)
- head_seq = messages[-1].sequence
- for tc_id in orphaned_ids:
- assistant_msg, tc = tc_map[tc_id]
- tool_name = tc.get("function", {}).get("name", "unknown")
- if tool_name in ("agent", "evaluate"):
- result_text = self._build_agent_interrupted_result(
- tc, goal_tree, assistant_msg,
- )
- else:
- result_text = build_tool_interrupted_message(tool_name)
- synthetic_msg = Message.create(
- trace_id=trace_id,
- role="tool",
- sequence=sequence,
- goal_id=assistant_msg.goal_id,
- parent_sequence=head_seq,
- tool_call_id=tc_id,
- content={"tool_name": tool_name, "result": result_text},
- )
- if self.trace_store:
- await self.trace_store.add_message(synthetic_msg)
- healed.append(synthetic_msg)
- head_seq = sequence
- sequence += 1
- # 更新 trace head/last sequence
- if self.trace_store:
- await self.trace_store.update_trace(
- trace_id,
- head_sequence=head_seq,
- last_sequence=max(head_seq, sequence - 1),
- )
- return healed, sequence
- def _build_agent_interrupted_result(
- self,
- tc: Dict,
- goal_tree: Optional[GoalTree],
- assistant_msg: Message,
- ) -> str:
- """为中断的 agent/evaluate 工具调用构建合成结果(对齐正常返回值格式)"""
- args_str = tc.get("function", {}).get("arguments", "{}")
- try:
- args = json.loads(args_str) if isinstance(args_str, str) else args_str
- except json.JSONDecodeError:
- args = {}
- task = args.get("task", "未知任务")
- if isinstance(task, list):
- task = "; ".join(task)
- tool_name = tc.get("function", {}).get("name", "agent")
- mode = "evaluate" if tool_name == "evaluate" else "delegate"
- # 从 goal_tree 查找 sub_trace 信息
- sub_trace_id = None
- stats = None
- if goal_tree and assistant_msg.goal_id:
- goal = goal_tree.find(assistant_msg.goal_id)
- if goal and goal.sub_trace_ids:
- first = goal.sub_trace_ids[0]
- if isinstance(first, dict):
- sub_trace_id = first.get("trace_id")
- elif isinstance(first, str):
- sub_trace_id = first
- if goal.cumulative_stats:
- s = goal.cumulative_stats
- if s.message_count > 0:
- stats = {
- "message_count": s.message_count,
- "total_tokens": s.total_tokens,
- "total_cost": round(s.total_cost, 4),
- }
- result: Dict[str, Any] = {
- "mode": mode,
- "status": "interrupted",
- "summary": AGENT_INTERRUPTED_SUMMARY,
- "task": task,
- }
- if sub_trace_id:
- result["sub_trace_id"] = sub_trace_id
- result["hint"] = build_agent_continue_hint(sub_trace_id)
- if stats:
- result["stats"] = stats
- return json.dumps(result, ensure_ascii=False, indent=2)
- # ===== 上下文注入 =====
- def _build_context_injection(
- self,
- trace: Trace,
- goal_tree: Optional[GoalTree],
- ) -> str:
- """构建周期性注入的上下文(GoalTree + Active Collaborators + Focus 提醒)"""
- parts = []
- # GoalTree
- if goal_tree and goal_tree.goals:
- parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
- if goal_tree.current_id:
- # 检测 focus 在有子节点的父目标上:提醒模型 focus 到具体子目标
- children = goal_tree.get_children(goal_tree.current_id)
- pending_children = [c for c in children if c.status in ("pending", "in_progress")]
- if pending_children:
- child_ids = ", ".join(
- goal_tree._generate_display_id(c) for c in pending_children[:3]
- )
- parts.append(
- f"**提醒**:当前焦点在父目标上,建议用 `goal(focus=\"...\")` "
- f"切换到具体子目标(如 {child_ids})再执行。"
- )
- else:
- # 无焦点:提醒模型 focus
- parts.append(
- "**提醒**:当前没有焦点目标。请用 `goal(focus=\"...\")` 选择一个目标开始执行。"
- )
- # Active Collaborators
- collaborators = trace.context.get("collaborators", [])
- if collaborators:
- lines = ["## Active Collaborators"]
- for c in collaborators:
- status_str = c.get("status", "unknown")
- ctype = c.get("type", "agent")
- summary = c.get("summary", "")
- name = c.get("name", "unnamed")
- lines.append(f"- {name} [{ctype}, {status_str}]: {summary}")
- parts.append("\n".join(lines))
- return "\n\n".join(parts)
- # ===== 辅助方法 =====
- def _add_cache_control(
- self,
- messages: List[Dict],
- model: str,
- enable: bool
- ) -> List[Dict]:
- """
- 为支持的模型添加 Prompt Caching 标记
- 策略:固定位置 + 延迟查找
- 1. system message 添加缓存(如果足够长)
- 2. 固定位置缓存点(20, 40, 60, 80),确保每个缓存点间隔 >= 1024 tokens
- 3. 最多使用 4 个缓存点(含 system)
- Args:
- messages: 原始消息列表
- model: 模型名称
- enable: 是否启用缓存
- Returns:
- 添加了 cache_control 的消息列表(深拷贝)
- """
- if not enable:
- return messages
- # 只对 Claude 模型启用
- if "claude" not in model.lower():
- return messages
- # 深拷贝避免修改原始数据
- import copy
- messages = copy.deepcopy(messages)
- # 策略 1: 为 system message 添加缓存
- system_cached = False
- for msg in messages:
- if msg.get("role") == "system":
- content = msg.get("content", "")
- if isinstance(content, str) and len(content) > 1000:
- msg["content"] = [{
- "type": "text",
- "text": content,
- "cache_control": {"type": "ephemeral"}
- }]
- system_cached = True
- logger.debug(f"[Cache] 为 system message 添加缓存标记 (len={len(content)})")
- break
- # 策略 2: 固定位置缓存点
- CACHE_INTERVAL = 20
- MAX_POINTS = 3 if system_cached else 4
- MIN_TOKENS = 1024
- AVG_TOKENS_PER_MSG = 70
- total_msgs = len(messages)
- if total_msgs == 0:
- return messages
- cache_positions = []
- last_cache_pos = 0
- for i in range(1, MAX_POINTS + 1):
- target_pos = i * CACHE_INTERVAL - 1 # 19, 39, 59, 79
- if target_pos >= total_msgs:
- break
- # 从目标位置开始查找合适的 user/assistant 消息
- for j in range(target_pos, total_msgs):
- msg = messages[j]
- if msg.get("role") not in ("user", "assistant"):
- continue
- content = msg.get("content", "")
- if not content:
- continue
- # 检查 content 是否非空
- is_valid = False
- if isinstance(content, str):
- is_valid = len(content) > 0
- elif isinstance(content, list):
- is_valid = any(
- isinstance(block, dict) and
- block.get("type") == "text" and
- len(block.get("text", "")) > 0
- for block in content
- )
- if not is_valid:
- continue
- # 检查 token 距离
- msg_count = j - last_cache_pos
- estimated_tokens = msg_count * AVG_TOKENS_PER_MSG
- if estimated_tokens >= MIN_TOKENS:
- cache_positions.append(j)
- last_cache_pos = j
- logger.debug(f"[Cache] 在位置 {j} 添加缓存点 (估算 {estimated_tokens} tokens)")
- break
- # 应用缓存标记
- for idx in cache_positions:
- msg = messages[idx]
- content = msg.get("content", "")
- if isinstance(content, str):
- msg["content"] = [{
- "type": "text",
- "text": content,
- "cache_control": {"type": "ephemeral"}
- }]
- logger.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 添加缓存标记")
- elif isinstance(content, list):
- # 在最后一个 text block 添加 cache_control
- for block in reversed(content):
- if isinstance(block, dict) and block.get("type") == "text":
- block["cache_control"] = {"type": "ephemeral"}
- logger.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 添加缓存标记")
- break
- logger.debug(
- f"[Cache] 总消息: {total_msgs}, "
- f"缓存点: {len(cache_positions)} at {cache_positions}"
- )
- return messages
- def _get_tool_schemas(self, tools: Optional[List[str]]) -> List[Dict]:
- """
- 获取工具 Schema
- - tools=None: 使用 registry 中全部已注册工具(含内置 + 外部注册的)
- - tools=["a", "b"]: 在 BUILTIN_TOOLS 基础上追加指定工具
- """
- if tools is None:
- # 全部已注册工具
- tool_names = self.tools.get_tool_names()
- else:
- # BUILTIN_TOOLS + 显式指定的额外工具
- tool_names = BUILTIN_TOOLS.copy()
- for t in tools:
- if t not in tool_names:
- tool_names.append(t)
- return self.tools.get_schemas(tool_names)
- # 默认 system prompt 前缀(当 config.system_prompt 和前端都未提供 system message 时使用)
- # 注意:此常量已迁移到 agent.core.prompts,这里保留引用以保持向后兼容
- async def _build_system_prompt(self, config: RunConfig, base_prompt: Optional[str] = None) -> Optional[str]:
- """构建 system prompt(注入 skills)
- 优先级:
- 1. config.skills 显式指定 → 按名称过滤
- 2. config.skills 为 None → 查 preset 的默认 skills 列表
- 3. preset 也无 skills(None)→ 加载全部(向后兼容)
- Args:
- base_prompt: 已有 system 内容(来自消息或 config.system_prompt),
- None 时使用 config.system_prompt
- """
- from agent.core.presets import AGENT_PRESETS
- system_prompt = base_prompt if base_prompt is not None else config.system_prompt
- # 确定要加载哪些 skills
- skills_filter: Optional[List[str]] = config.skills
- if skills_filter is None:
- preset = AGENT_PRESETS.get(config.agent_type)
- if preset is not None:
- skills_filter = preset.skills # 可能仍为 None(加载全部)
- # 加载并过滤
- all_skills = load_skills_from_dir(self.skills_dir)
- if skills_filter is not None:
- skills = [s for s in all_skills if s.name in skills_filter]
- else:
- skills = all_skills
- skills_text = self._format_skills(skills) if skills else ""
- if system_prompt:
- if skills_text:
- system_prompt += f"\n\n## Skills\n{skills_text}"
- else:
- system_prompt = DEFAULT_SYSTEM_PREFIX
- if skills_text:
- system_prompt += f"\n\n## Skills\n{skills_text}"
- return system_prompt
- async def _generate_task_name(self, messages: List[Dict]) -> str:
- """生成任务名称:优先使用 utility_llm,fallback 到文本截取"""
- # 提取 messages 中的文本内容
- text_parts = []
- for msg in messages:
- content = msg.get("content", "")
- if isinstance(content, str):
- text_parts.append(content)
- elif isinstance(content, list):
- for part in content:
- if isinstance(part, dict) and part.get("type") == "text":
- text_parts.append(part.get("text", ""))
- raw_text = " ".join(text_parts).strip()
- if not raw_text:
- return TASK_NAME_FALLBACK
- # 尝试使用 utility_llm 生成标题
- if self.utility_llm_call:
- try:
- result = await self.utility_llm_call(
- messages=[
- {"role": "system", "content": TASK_NAME_GENERATION_SYSTEM_PROMPT},
- {"role": "user", "content": raw_text[:2000]},
- ],
- model="gpt-4o-mini", # 使用便宜模型
- )
- title = result.get("content", "").strip()
- if title and len(title) < 100:
- return title
- except Exception:
- pass
- # Fallback: 截取前 50 字符
- return raw_text[:50] + ("..." if len(raw_text) > 50 else "")
- def _format_skills(self, skills: List[Skill]) -> str:
- if not skills:
- return ""
- return "\n\n".join(s.to_prompt_text() for s in skills)
|