runner.py 139 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123
  1. """
  2. Agent Runner - Agent 执行引擎
  3. 核心职责:
  4. 1. 执行 Agent 任务(循环调用 LLM + 工具)
  5. 2. 记录执行轨迹(Trace + Messages + GoalTree)
  6. 3. 加载和注入技能(Skill)
  7. 4. 管理执行计划(GoalTree)
  8. 5. 支持续跑(continue)和回溯重跑(rewind)
  9. 参数分层:
  10. - Infrastructure: AgentRunner 构造时设置(trace_store, llm_call 等)
  11. - RunConfig: 每次 run 时指定(model, trace_id, after_sequence 等)
  12. - Messages: OpenAI SDK 格式的任务消息
  13. """
  14. import asyncio
  15. import json
  16. import logging
  17. import os
  18. import uuid
  19. from dataclasses import dataclass, field
  20. from datetime import datetime
  21. from typing import AsyncIterator, Optional, Dict, Any, List, Callable, Literal, Tuple, Union
  22. from agent.trace.models import Trace, Message
  23. from agent.trace.protocols import TraceStore
  24. from agent.trace.goal_models import GoalTree
  25. from agent.trace.compaction import (
  26. CompressionConfig,
  27. compress_completed_goals,
  28. estimate_tokens,
  29. needs_level2_compression,
  30. build_compression_prompt,
  31. )
  32. from agent.skill.models import Skill
  33. from agent.skill.skill_loader import load_skills_from_dir
  34. from agent.tools import ToolRegistry, get_tool_registry
  35. from agent.tools.builtin.knowledge import KnowledgeConfig
  36. from agent.core.memory import MemoryConfig
  37. from agent.core.prompts import (
  38. DEFAULT_SYSTEM_PREFIX,
  39. TRUNCATION_HINT,
  40. TOOL_INTERRUPTED_MESSAGE,
  41. AGENT_INTERRUPTED_SUMMARY,
  42. AGENT_CONTINUE_HINT_TEMPLATE,
  43. TASK_NAME_GENERATION_SYSTEM_PROMPT,
  44. TASK_NAME_FALLBACK,
  45. SUMMARY_HEADER_TEMPLATE,
  46. build_summary_header,
  47. build_tool_interrupted_message,
  48. build_agent_continue_hint,
  49. )
  50. logger = logging.getLogger(__name__)
  51. @dataclass
  52. class ContextUsage:
  53. """Context 使用情况"""
  54. trace_id: str
  55. message_count: int
  56. token_count: int
  57. max_tokens: int
  58. usage_percent: float
  59. image_count: int = 0
  60. @dataclass
  61. class SideBranchContext:
  62. """侧分支上下文(压缩/反思/知识评估)"""
  63. type: Literal["compression", "reflection", "knowledge_eval"]
  64. branch_id: str
  65. start_head_seq: int # 侧分支起点的 head_seq
  66. start_sequence: int # 侧分支第一条消息的 sequence
  67. start_history_length: int # 侧分支起点的 history 长度
  68. start_iteration: int # 侧分支开始时的 iteration
  69. max_turns: int = 5 # 最大轮次
  70. def to_dict(self) -> Dict[str, Any]:
  71. """转换为字典(用于持久化和传递给工具)"""
  72. return {
  73. "type": self.type,
  74. "branch_id": self.branch_id,
  75. "start_head_seq": self.start_head_seq,
  76. "start_sequence": self.start_sequence,
  77. "start_iteration": self.start_iteration,
  78. "max_turns": self.max_turns,
  79. "is_side_branch": True,
  80. "started_at": datetime.now().isoformat(),
  81. }
  82. # ===== 运行配置 =====
  83. @dataclass
  84. class RunConfig:
  85. """
  86. 运行参数 — 控制 Agent 如何执行
  87. 分为模型层参数(由上游 agent 或用户决定)和框架层参数(由系统注入)。
  88. """
  89. # --- 模型层参数 ---
  90. model: str = "gpt-4o"
  91. temperature: float = 0.3
  92. max_iterations: int = 200
  93. tools: Optional[List[str]] = None # None = 按 tool_groups 过滤;显式列表 = 精确指定
  94. tool_groups: Optional[List[str]] = field(default_factory=lambda: ["core"]) # 工具分组白名单;默认仅 core,项目按需追加
  95. exclude_tools: List[str] = field(default_factory=list) # 从 tools / tool_groups 结果中再排除的工具名(如远程 agent 禁用 agent/evaluate)
  96. side_branch_max_turns: int = 5 # 侧分支最大轮次(压缩/反思)
  97. goal_compression: Literal["none", "on_complete", "on_overflow"] = "on_overflow" # Goal 压缩模式
  98. # --- 强制侧分支(用于 API 手动触发或自动压缩流程)---
  99. # 使用列表作为侧分支队列,每次完成一个侧分支后 pop(0) 取下一个
  100. force_side_branch: Optional[List[Literal["compression", "reflection"]]] = None
  101. # --- 框架层参数 ---
  102. agent_type: str = "default"
  103. uid: Optional[str] = None
  104. system_prompt: Optional[str] = None # None = 从 skills 自动构建
  105. skills: Optional[List[str]] = None # 注入 system prompt 的 skill 名称列表;None = 按 preset 决定
  106. enable_memory: bool = True
  107. auto_execute_tools: bool = True
  108. name: Optional[str] = None # 显示名称(空则由 utility_llm 自动生成)
  109. enable_prompt_caching: bool = True # 启用 Anthropic Prompt Caching(仅 Claude 模型有效)
  110. parallel_tool_execution: bool = False # 是否启用并发 Tool Call 执行(慎用,需确保无资源冲突)
  111. context_injection_interval: int = 5 # 每 N 轮自动注入一次 get_current_context(GoalTree+协作者+IM 通知);**0=完全关闭**(含第 0 轮)
  112. # --- Trace 控制 ---
  113. trace_id: Optional[str] = None # None = 新建
  114. parent_trace_id: Optional[str] = None # 子 Agent 专用
  115. parent_goal_id: Optional[str] = None
  116. # --- 续跑控制 ---
  117. after_sequence: Optional[int] = None # 从哪条消息后续跑(message sequence)
  118. # --- 额外 LLM 参数(传给 llm_call 的 **kwargs)---
  119. extra_llm_params: Dict[str, Any] = field(default_factory=dict)
  120. # --- 自定义元数据上下文 ---
  121. context: Dict[str, Any] = field(default_factory=dict)
  122. # --- 研究流程控制 ---
  123. enable_research_flow: bool = True # 是否启用自动研究流程(知识检索→经验检索→调研→计划)
  124. # --- 知识管理配置 ---
  125. knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
  126. # --- Memory 配置(见 agent/docs/memory.md) ---
  127. # None = 默认 Agent(无长期记忆);赋值 MemoryConfig 使该 Agent 成为 memory-bearing Agent
  128. memory: Optional["MemoryConfig"] = None
  129. # BUILTIN_TOOLS 硬编码列表已移除(2026-04)。
  130. # 工具可用性现在由 @tool(groups=[...]) 声明 + RunConfig.tool_groups 过滤控制。
  131. @dataclass
  132. class CallResult:
  133. """单次调用结果"""
  134. reply: str
  135. tool_calls: Optional[List[Dict]] = None
  136. trace_id: Optional[str] = None
  137. step_id: Optional[str] = None
  138. tokens: Optional[Dict[str, int]] = None
  139. cost: float = 0.0
  140. # ===== 执行引擎 =====
  141. CONTEXT_INJECTION_INTERVAL = 5 # 每 N 轮注入一次 GoalTree + Collaborators + IM 通知
  142. class AgentRunner:
  143. """
  144. Agent 执行引擎
  145. 支持三种运行模式(通过 RunConfig 区分):
  146. 1. 新建:trace_id=None
  147. 2. 续跑:trace_id=已有ID, after_sequence=None 或 == head
  148. 3. 回溯:trace_id=已有ID, after_sequence=N(N < head_sequence)
  149. """
  150. def __init__(
  151. self,
  152. trace_store: Optional[TraceStore] = None,
  153. tool_registry: Optional[ToolRegistry] = None,
  154. llm_call: Optional[Callable] = None,
  155. utility_llm_call: Optional[Callable] = None,
  156. skills_dir: Optional[str] = None,
  157. goal_tree: Optional[GoalTree] = None,
  158. debug: bool = False,
  159. logger_name: Optional[str] = None,
  160. ):
  161. """
  162. 初始化 AgentRunner
  163. Args:
  164. trace_store: Trace 存储
  165. tool_registry: 工具注册表(默认使用全局注册表)
  166. llm_call: 主 LLM 调用函数
  167. utility_llm_call: 轻量 LLM(用于生成任务标题等),可选
  168. skills_dir: Skills 目录路径
  169. goal_tree: 初始 GoalTree(可选)
  170. debug: 保留参数(已废弃)
  171. logger_name: 自定义日志名称(如 "agents.knowledge_manager"),默认用模块名
  172. """
  173. self.trace_store = trace_store
  174. self.tools = tool_registry or get_tool_registry()
  175. self.llm_call = llm_call
  176. self.utility_llm_call = utility_llm_call
  177. self.skills_dir = skills_dir
  178. self.goal_tree = goal_tree
  179. self.debug = debug
  180. self.log = logging.getLogger(logger_name) if logger_name else logger
  181. self.stdin_check: Optional[Callable] = None # 由外部设置,用于子 agent 执行期间检查 stdin
  182. self._cancel_events: Dict[str, asyncio.Event] = {} # trace_id → cancel event
  183. # 知识保存跟踪(每个 trace 独立)
  184. self._saved_knowledge_ids: Dict[str, List[str]] = {} # trace_id → [knowledge_ids]
  185. # Context 使用跟踪
  186. self._context_warned: Dict[str, set] = {} # trace_id → {30, 50, 80} 已警告过的阈值
  187. self._context_usage: Dict[str, ContextUsage] = {} # trace_id → 当前用量快照
  188. # 图片优化缓存(避免重复处理)
  189. # key: 图片内容的 hash, value: {"downscaled": ..., "description": ...}
  190. self._image_opt_cache: Dict[str, Dict[str, Any]] = {}
  191. # 当前 run 的 MemoryConfig(由 run() 根据 RunConfig.memory 设置)
  192. # dream 工具从 context.runner 读取此字段,判断是否 memory-bearing
  193. self._current_memory_config: Optional[MemoryConfig] = None
  194. # ===== 核心公开方法 =====
  195. def get_context_usage(self, trace_id: str) -> Optional[ContextUsage]:
  196. """获取指定 trace 的 context 使用情况"""
  197. return self._context_usage.get(trace_id)
  198. async def dream(
  199. self,
  200. memory_config: MemoryConfig,
  201. trace_filter: Optional[Callable[["Trace"], bool]] = None,
  202. reflect_model: str = "gpt-4o-mini",
  203. dream_model: str = "gpt-4o",
  204. ) -> "DreamReport":
  205. """执行 dream(整理长期记忆)——外部调度入口。
  206. Agent 主动调用走 dream 工具;外部调度(定时器、CLI)走这个方法。
  207. Args:
  208. memory_config: 记忆配置
  209. trace_filter: 可选 trace 过滤(按 agent_type/owner 等)
  210. reflect_model: per-trace 反思模型
  211. dream_model: 跨 trace 整合模型
  212. """
  213. from agent.core.dream import run_dream
  214. if not self.trace_store or not self.llm_call:
  215. raise RuntimeError("dream 需要 trace_store 和 llm_call 均已配置")
  216. return await run_dream(
  217. store=self.trace_store,
  218. llm_call=self.llm_call,
  219. memory_config=memory_config,
  220. trace_filter=trace_filter,
  221. reflect_model=reflect_model,
  222. dream_model=dream_model,
  223. )
  224. async def run(
  225. self,
  226. messages: List[Dict],
  227. config: Optional[RunConfig] = None,
  228. inject_skills: Optional[List[str]] = None,
  229. skill_recency_threshold: int = 10,
  230. ) -> AsyncIterator[Union[Trace, Message]]:
  231. """
  232. Agent 模式执行(核心方法)
  233. Args:
  234. messages: OpenAI SDK 格式的输入消息
  235. 新建: 初始任务消息 [{"role": "user", "content": "..."}]
  236. 续跑: 追加的新消息
  237. 回溯: 在插入点之后追加的消息
  238. config: 运行配置
  239. inject_skills: 本次调用需要指定注入的 skill 列表(skill 名称)
  240. skill_recency_threshold: 最近 N 条消息内有该 skill 就不重复注入
  241. Yields:
  242. Union[Trace, Message]: Trace 对象(状态变化)或 Message 对象(执行过程)
  243. """
  244. if not self.llm_call:
  245. raise ValueError("llm_call function not provided")
  246. config = config or RunConfig()
  247. trace = None
  248. # Memory 模式开关(dream 工具会读取此字段)
  249. self._current_memory_config = config.memory
  250. try:
  251. # Phase 1: PREPARE TRACE
  252. trace, goal_tree, sequence = await self._prepare_trace(messages, config)
  253. # 注册取消事件
  254. self._cancel_events[trace.trace_id] = asyncio.Event()
  255. yield trace
  256. # 检查是否有未完成的侧分支(用于用户追加消息场景)
  257. side_branch_ctx_for_build: Optional[SideBranchContext] = None
  258. if trace.context.get("active_side_branch") and messages:
  259. side_branch_data = trace.context["active_side_branch"]
  260. # 创建侧分支上下文(用于标记用户追加的消息)
  261. side_branch_ctx_for_build = SideBranchContext(
  262. type=side_branch_data["type"],
  263. branch_id=side_branch_data["branch_id"],
  264. start_head_seq=side_branch_data["start_head_seq"],
  265. start_sequence=side_branch_data["start_sequence"],
  266. start_history_length=0,
  267. start_iteration=side_branch_data.get("start_iteration", 0),
  268. max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
  269. )
  270. # Phase 2: BUILD HISTORY
  271. history, sequence, created_messages, head_seq = await self._build_history(
  272. trace.trace_id, messages, goal_tree, config, sequence, side_branch_ctx_for_build
  273. )
  274. # Update trace's head_sequence in memory
  275. trace.head_sequence = head_seq
  276. for msg in created_messages:
  277. yield msg
  278. # Phase 3: AGENT LOOP
  279. async for event in self._agent_loop(
  280. trace, history, goal_tree, config, sequence,
  281. inject_skills=inject_skills,
  282. skill_recency_threshold=skill_recency_threshold,
  283. ):
  284. yield event
  285. except Exception as e:
  286. self.log.error(f"Agent run failed: {e}")
  287. tid = config.trace_id or (trace.trace_id if trace else None)
  288. if self.trace_store and tid:
  289. # 读取当前 last_sequence 作为 head_sequence,确保续跑时能加载完整历史
  290. current = await self.trace_store.get_trace(tid)
  291. head_seq = current.last_sequence if current else None
  292. await self.trace_store.update_trace(
  293. tid,
  294. status="failed",
  295. head_sequence=head_seq,
  296. error_message=str(e),
  297. completed_at=datetime.now()
  298. )
  299. trace_obj = await self.trace_store.get_trace(tid)
  300. if trace_obj:
  301. yield trace_obj
  302. raise
  303. finally:
  304. # 清理取消事件
  305. if trace:
  306. self._cancel_events.pop(trace.trace_id, None)
  307. async def run_result(
  308. self,
  309. messages: List[Dict],
  310. config: Optional[RunConfig] = None,
  311. on_event: Optional[Callable] = None,
  312. inject_skills: Optional[List[str]] = None,
  313. ) -> Dict[str, Any]:
  314. """
  315. 结果模式 — 消费 run(),返回结构化结果。
  316. 主要用于 agent/evaluate 工具内部。
  317. Args:
  318. on_event: 可选回调,每个 Trace/Message 事件触发一次,用于实时输出子 Agent 执行过程。
  319. inject_skills: 本次调用需要指定注入的 skill 列表(透传给 run())。
  320. """
  321. last_assistant_text = ""
  322. final_trace: Optional[Trace] = None
  323. async for item in self.run(messages=messages, config=config, inject_skills=inject_skills):
  324. if on_event:
  325. on_event(item)
  326. if isinstance(item, Message) and item.role == "assistant":
  327. content = item.content
  328. text = ""
  329. if isinstance(content, dict):
  330. text = content.get("text", "") or ""
  331. elif isinstance(content, str):
  332. text = content
  333. if text and text.strip():
  334. last_assistant_text = text
  335. elif isinstance(item, Trace):
  336. final_trace = item
  337. config = config or RunConfig()
  338. if not final_trace and config.trace_id and self.trace_store:
  339. final_trace = await self.trace_store.get_trace(config.trace_id)
  340. status = final_trace.status if final_trace else "unknown"
  341. error = final_trace.error_message if final_trace else None
  342. summary = last_assistant_text
  343. if not summary:
  344. status = "failed"
  345. error = error or "Agent 没有产生 assistant 文本结果"
  346. # 获取保存的知识 ID
  347. trace_id = final_trace.trace_id if final_trace else config.trace_id
  348. saved_knowledge_ids = self._saved_knowledge_ids.get(trace_id, [])
  349. return {
  350. "status": status,
  351. "summary": summary,
  352. "trace_id": trace_id,
  353. "error": error,
  354. "saved_knowledge_ids": saved_knowledge_ids, # 新增:返回保存的知识 ID
  355. "stats": {
  356. "total_messages": final_trace.total_messages if final_trace else 0,
  357. "total_tokens": final_trace.total_tokens if final_trace else 0,
  358. "total_cost": final_trace.total_cost if final_trace else 0.0,
  359. },
  360. }
  361. async def stop(self, trace_id: str) -> bool:
  362. """
  363. 停止运行中的 Trace
  364. 设置取消信号,agent loop 在下一个 LLM 调用前检查并退出。
  365. Trace 状态置为 "stopped"。
  366. Returns:
  367. True 如果成功发送停止信号,False 如果该 trace 不在运行中
  368. """
  369. cancel_event = self._cancel_events.get(trace_id)
  370. if cancel_event is None:
  371. return False
  372. cancel_event.set()
  373. return True
  374. # ===== 单次调用(保留)=====
  375. async def call(
  376. self,
  377. messages: List[Dict],
  378. model: str = "gpt-4o",
  379. tools: Optional[List[str]] = None,
  380. uid: Optional[str] = None,
  381. trace: bool = True,
  382. **kwargs
  383. ) -> CallResult:
  384. """
  385. 单次 LLM 调用(无 Agent Loop)
  386. """
  387. if not self.llm_call:
  388. raise ValueError("llm_call function not provided")
  389. trace_id = None
  390. message_id = None
  391. tool_schemas = self._get_tool_schemas(tools)
  392. if trace and self.trace_store:
  393. trace_obj = Trace.create(mode="call", uid=uid, model=model, tools=tool_schemas, llm_params=kwargs)
  394. trace_id = await self.trace_store.create_trace(trace_obj)
  395. result = await self.llm_call(messages=messages, model=model, tools=tool_schemas, **kwargs)
  396. if trace and self.trace_store and trace_id:
  397. msg = Message.create(
  398. trace_id=trace_id, role="assistant", sequence=1, goal_id=None,
  399. content={"text": result.get("content", ""), "tool_calls": result.get("tool_calls")},
  400. prompt_tokens=result.get("prompt_tokens", 0),
  401. completion_tokens=result.get("completion_tokens", 0),
  402. finish_reason=result.get("finish_reason"),
  403. cost=result.get("cost", 0),
  404. )
  405. message_id = await self.trace_store.add_message(msg)
  406. await self.trace_store.update_trace(trace_id, status="completed", completed_at=datetime.now())
  407. return CallResult(
  408. reply=result.get("content", ""),
  409. tool_calls=result.get("tool_calls"),
  410. trace_id=trace_id,
  411. step_id=message_id,
  412. tokens={"prompt": result.get("prompt_tokens", 0), "completion": result.get("completion_tokens", 0)},
  413. cost=result.get("cost", 0)
  414. )
  415. # ===== Phase 1: PREPARE TRACE =====
  416. async def _prepare_trace(
  417. self,
  418. messages: List[Dict],
  419. config: RunConfig,
  420. ) -> Tuple[Trace, Optional[GoalTree], int]:
  421. """
  422. 准备 Trace:创建新的或加载已有的
  423. Returns:
  424. (trace, goal_tree, next_sequence)
  425. """
  426. if config.trace_id:
  427. return await self._prepare_existing_trace(config)
  428. else:
  429. return await self._prepare_new_trace(messages, config)
  430. async def _prepare_new_trace(
  431. self,
  432. messages: List[Dict],
  433. config: RunConfig,
  434. ) -> Tuple[Trace, Optional[GoalTree], int]:
  435. """创建新 Trace"""
  436. trace_id = str(uuid.uuid4())
  437. # 生成任务名称
  438. task_name = config.name or await self._generate_task_name(messages)
  439. # 准备工具 Schema
  440. tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups, config.exclude_tools)
  441. trace_obj = Trace(
  442. trace_id=trace_id,
  443. mode="agent",
  444. task=task_name,
  445. agent_type=config.agent_type,
  446. parent_trace_id=config.parent_trace_id,
  447. parent_goal_id=config.parent_goal_id,
  448. uid=config.uid,
  449. model=config.model,
  450. tools=tool_schemas,
  451. llm_params={"temperature": config.temperature, **config.extra_llm_params},
  452. context=config.context,
  453. status="running",
  454. )
  455. goal_tree = self.goal_tree or GoalTree(mission=task_name)
  456. if self.trace_store:
  457. await self.trace_store.create_trace(trace_obj)
  458. await self.trace_store.update_goal_tree(trace_id, goal_tree)
  459. return trace_obj, goal_tree, 1
  460. async def _prepare_existing_trace(
  461. self,
  462. config: RunConfig,
  463. ) -> Tuple[Trace, Optional[GoalTree], int]:
  464. """加载已有 Trace(续跑或回溯)"""
  465. if not self.trace_store:
  466. raise ValueError("trace_store required for continue/rewind")
  467. trace_obj = await self.trace_store.get_trace(config.trace_id)
  468. if not trace_obj:
  469. raise ValueError(f"Trace not found: {config.trace_id}")
  470. goal_tree = await self.trace_store.get_goal_tree(config.trace_id)
  471. if goal_tree is None:
  472. # 防御性兜底:trace 存在但 goal.json 丢失时,创建空树
  473. goal_tree = GoalTree(mission=trace_obj.task or "Agent task")
  474. await self.trace_store.update_goal_tree(config.trace_id, goal_tree)
  475. # 自动判断行为:after_sequence 为 None 或 == head → 续跑;< head → 回溯
  476. after_seq = config.after_sequence
  477. # 如果 after_seq > head_sequence,说明 generator 被强制关闭时 store 的
  478. # head_sequence 未来得及更新(仍停在 Phase 2 写入的初始值)。
  479. # 用 last_sequence 修正 head_sequence,确保续跑时能看到完整历史。
  480. if after_seq is not None and after_seq > trace_obj.head_sequence:
  481. trace_obj.head_sequence = trace_obj.last_sequence
  482. await self.trace_store.update_trace(
  483. config.trace_id, head_sequence=trace_obj.head_sequence
  484. )
  485. if after_seq is not None and after_seq < trace_obj.head_sequence:
  486. # 回溯模式
  487. sequence = await self._rewind(config.trace_id, after_seq, goal_tree)
  488. else:
  489. # 续跑模式:从 last_sequence + 1 开始
  490. sequence = trace_obj.last_sequence + 1
  491. # 状态置为 running
  492. await self.trace_store.update_trace(
  493. config.trace_id,
  494. status="running",
  495. completed_at=None,
  496. )
  497. trace_obj.status = "running"
  498. # 广播状态变化给前端
  499. try:
  500. from agent.trace.websocket import broadcast_trace_status_changed
  501. await broadcast_trace_status_changed(config.trace_id, "running")
  502. except Exception:
  503. pass
  504. return trace_obj, goal_tree, sequence
  505. # ===== Phase 2: BUILD HISTORY =====
  506. async def _build_history(
  507. self,
  508. trace_id: str,
  509. new_messages: List[Dict],
  510. goal_tree: Optional[GoalTree],
  511. config: RunConfig,
  512. sequence: int,
  513. side_branch_ctx: Optional[SideBranchContext] = None,
  514. ) -> Tuple[List[Dict], int, List[Message], int]:
  515. """
  516. 构建完整的 LLM 消息历史
  517. 1. 从 head_sequence 沿 parent chain 加载主路径消息(续跑/回溯场景)
  518. 2. 构建 system prompt(新建时注入 skills)
  519. 3. 新建时:在第一条 user message 末尾注入当前经验
  520. 4. 追加 input messages(设置 parent_sequence 链接到当前 head)
  521. 5. 如果在侧分支中,追加的消息自动标记为侧分支消息
  522. Returns:
  523. (history, next_sequence, created_messages, head_sequence)
  524. created_messages: 本次新创建并持久化的 Message 列表,供 run() yield 给调用方
  525. head_sequence: 当前主路径头节点的 sequence
  526. """
  527. history: List[Dict] = []
  528. created_messages: List[Message] = []
  529. head_seq: Optional[int] = None # 当前主路径的头节点 sequence
  530. # 1. 加载已有 messages(通过主路径遍历)
  531. if config.trace_id and self.trace_store:
  532. trace_obj = await self.trace_store.get_trace(trace_id)
  533. if trace_obj and trace_obj.head_sequence > 0:
  534. main_path = await self.trace_store.get_main_path_messages(
  535. trace_id, trace_obj.head_sequence
  536. )
  537. # 修复 orphaned tool_calls(中断导致的 tool_call 无 tool_result)
  538. main_path, sequence = await self._heal_orphaned_tool_calls(
  539. main_path, trace_id, goal_tree, sequence,
  540. )
  541. history = [msg.to_llm_dict() for msg in main_path]
  542. if main_path:
  543. head_seq = main_path[-1].sequence
  544. # 2. 构建/注入 skills 到 system prompt
  545. has_system = any(m.get("role") == "system" for m in history)
  546. has_system_in_new = any(m.get("role") == "system" for m in new_messages)
  547. if not has_system:
  548. if has_system_in_new:
  549. # 入参消息已含 system,将 skills 注入其中(在 step 4 持久化之前)
  550. augmented = []
  551. for msg in new_messages:
  552. if msg.get("role") == "system":
  553. base = msg.get("content") or ""
  554. enriched = await self._build_system_prompt(config, base_prompt=base)
  555. augmented.append({**msg, "content": enriched or base})
  556. else:
  557. augmented.append(msg)
  558. new_messages = augmented
  559. else:
  560. # 没有 system,自动构建并插入历史
  561. system_prompt = await self._build_system_prompt(config)
  562. if system_prompt:
  563. history = [{"role": "system", "content": system_prompt}] + history
  564. if self.trace_store:
  565. system_msg = Message.create(
  566. trace_id=trace_id, role="system", sequence=sequence,
  567. goal_id=None, content=system_prompt,
  568. parent_sequence=None, # system message 是 root
  569. )
  570. await self.trace_store.add_message(system_msg)
  571. created_messages.append(system_msg)
  572. head_seq = sequence
  573. sequence += 1
  574. # 3. 追加新 messages(设置 parent_sequence 链接到当前 head)
  575. for msg_dict in new_messages:
  576. history.append(msg_dict)
  577. if self.trace_store:
  578. # 如果在侧分支中,标记为侧分支消息
  579. if side_branch_ctx:
  580. stored_msg = Message.create(
  581. trace_id=trace_id,
  582. role=msg_dict["role"],
  583. sequence=sequence,
  584. goal_id=goal_tree.current_id if goal_tree else None,
  585. parent_sequence=head_seq,
  586. branch_type=side_branch_ctx.type,
  587. branch_id=side_branch_ctx.branch_id,
  588. content=msg_dict.get("content"),
  589. )
  590. self.log.info(f"用户在侧分支 {side_branch_ctx.type} 中追加消息")
  591. else:
  592. stored_msg = Message.from_llm_dict(
  593. msg_dict, trace_id=trace_id, sequence=sequence,
  594. goal_id=None, parent_sequence=head_seq,
  595. )
  596. await self.trace_store.add_message(stored_msg)
  597. created_messages.append(stored_msg)
  598. head_seq = sequence
  599. sequence += 1
  600. # 5. 更新 trace 的 head_sequence
  601. if self.trace_store and head_seq is not None:
  602. await self.trace_store.update_trace(trace_id, head_sequence=head_seq)
  603. return history, sequence, created_messages, head_seq or 0
  604. # ===== Phase 3: AGENT LOOP =====
  605. async def _manage_context_usage(
  606. self,
  607. trace_id: str,
  608. history: List[Dict],
  609. goal_tree: Optional[GoalTree],
  610. config: RunConfig,
  611. sequence: int,
  612. head_seq: int,
  613. ) -> Tuple[List[Dict], int, int, bool]:
  614. """
  615. 管理 context 用量:检查、预警、压缩
  616. Returns:
  617. (updated_history, new_head_seq, next_sequence, needs_enter_compression_branch)
  618. """
  619. compression_config = CompressionConfig()
  620. token_count = estimate_tokens(history)
  621. max_tokens = compression_config.get_max_tokens(config.model)
  622. # 计算使用率
  623. progress_pct = (token_count / max_tokens * 100) if max_tokens > 0 else 0
  624. msg_count = len(history)
  625. img_count = sum(
  626. 1 for msg in history
  627. if isinstance(msg.get("content"), list)
  628. for part in msg["content"]
  629. if isinstance(part, dict) and part.get("type") in ("image", "image_url")
  630. )
  631. # 更新 context usage 快照
  632. self._context_usage[trace_id] = ContextUsage(
  633. trace_id=trace_id,
  634. message_count=msg_count,
  635. token_count=token_count,
  636. max_tokens=max_tokens,
  637. usage_percent=progress_pct,
  638. image_count=img_count,
  639. )
  640. # 阈值警告(30%, 50%, 80%)
  641. if trace_id not in self._context_warned:
  642. self._context_warned[trace_id] = set()
  643. for threshold in [30, 50, 80]:
  644. if progress_pct >= threshold and threshold not in self._context_warned[trace_id]:
  645. self._context_warned[trace_id].add(threshold)
  646. self.log.warning(
  647. f"Context 使用率达到 {threshold}%: {token_count:,} / {max_tokens:,} tokens ({msg_count} 条消息)"
  648. )
  649. # 检查是否需要压缩(仅基于 token 数量)
  650. needs_compression = token_count > max_tokens
  651. if not needs_compression:
  652. return history, head_seq, sequence, False
  653. # 检查是否有待评估知识(压缩前必须先评估)
  654. if self.trace_store and not config.force_side_branch:
  655. pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
  656. if pending:
  657. # 设置侧分支队列:反思 → 知识评估 → 压缩
  658. # 反思放在前面,确保反思期间完成的 goal 产生的新知识也能在压缩前被评估
  659. if config.knowledge.enable_extraction:
  660. config.force_side_branch = ["reflection", "knowledge_eval", "compression"]
  661. else:
  662. config.force_side_branch = ["knowledge_eval", "compression"]
  663. # 在 trace.context 中设置触发事件
  664. trace = await self.trace_store.get_trace(trace_id)
  665. if trace:
  666. if not trace.context:
  667. trace.context = {}
  668. trace.context["knowledge_eval_trigger"] = "compression"
  669. await self.trace_store.update_trace(trace_id, context=trace.context)
  670. self.log.info(f"[Knowledge Eval] 压缩前触发知识评估,待评估: {len(pending)} 条")
  671. return history, head_seq, sequence, True
  672. # 知识提取:在任何压缩发生前,用完整 history 做反思(进入反思侧分支)
  673. if config.knowledge.enable_extraction and not config.force_side_branch:
  674. # 设置侧分支队列:先反思,再压缩
  675. config.force_side_branch = ["reflection", "compression"]
  676. return history, head_seq, sequence, True
  677. # 以下为未启用反思、需要压缩的情况,直接进行level 1压缩,并检查是否需要进行level 2压缩(进入侧分支)
  678. # Level 1 压缩:Goal 完成压缩
  679. if config.goal_compression != "none" and self.trace_store and goal_tree:
  680. if head_seq > 0:
  681. main_path_msgs = await self.trace_store.get_main_path_messages(
  682. trace_id, head_seq
  683. )
  684. compressed_msgs = compress_completed_goals(main_path_msgs, goal_tree)
  685. if len(compressed_msgs) < len(main_path_msgs):
  686. self.log.info(
  687. "Level 1 压缩: %d -> %d 条消息",
  688. len(main_path_msgs), len(compressed_msgs),
  689. )
  690. history = [msg.to_llm_dict() for msg in compressed_msgs]
  691. else:
  692. self.log.info(
  693. "Level 1 压缩: 无可过滤消息 (%d 条全部保留)",
  694. len(main_path_msgs),
  695. )
  696. elif needs_compression:
  697. self.log.warning(
  698. "Token 数 (%d) 超过阈值,但无法执行 Level 1 压缩(缺少 store 或 goal_tree,或 goal_compression=none)",
  699. token_count,
  700. )
  701. # Level 2 压缩:检查 Level 1 后是否仍超阈值
  702. # 注意:Level 1 压缩后需要重新优化图片并计算 token
  703. optimized_history_after = await self._optimize_images(history, config.model)
  704. token_count_after = estimate_tokens(optimized_history_after)
  705. needs_level2 = token_count_after > max_tokens
  706. if needs_level2:
  707. self.log.info(
  708. "Level 1 后仍超阈值 (token=%d/%d),需要进入压缩侧分支",
  709. token_count_after, max_tokens,
  710. )
  711. # 如果还没有设置侧分支(说明没有启用知识提取),直接进入压缩
  712. if not config.force_side_branch:
  713. config.force_side_branch = ["compression"]
  714. # 返回标志,让主循环进入侧分支
  715. return history, head_seq, sequence, True
  716. # 压缩完成后,输出最终发给模型的消息列表
  717. self.log.info("Level 1 压缩完成,发送给模型的消息列表:")
  718. for idx, msg in enumerate(history):
  719. role = msg.get("role", "unknown")
  720. content = msg.get("content", "")
  721. if isinstance(content, str):
  722. preview = content[:100] + ("..." if len(content) > 100 else "")
  723. elif isinstance(content, list):
  724. preview = f"[{len(content)} blocks]"
  725. else:
  726. preview = str(content)[:100]
  727. self.log.info(f" [{idx}] {role}: {preview}")
  728. return history, head_seq, sequence, False
  729. async def _build_knowledge_eval_prompt(
  730. self,
  731. trace_id: str,
  732. goal_tree: Optional[GoalTree]
  733. ) -> str:
  734. """构建知识评估 prompt"""
  735. if not self.trace_store:
  736. return ""
  737. pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
  738. if not pending:
  739. return ""
  740. # 获取mission
  741. trace = await self.trace_store.get_trace(trace_id)
  742. mission = trace.task if trace else "未知任务"
  743. # 获取当前Goal
  744. current_goal = goal_tree.find(goal_tree.current_id) if goal_tree and goal_tree.current_id else None
  745. goal_desc = current_goal.description if current_goal else "无当前目标"
  746. # 构建知识列表
  747. knowledge_list = []
  748. for idx, entry in enumerate(pending, 1):
  749. knowledge_list.append(
  750. f"### 知识 {idx}: {entry['knowledge_id']}\n"
  751. f"- task: {entry['task']}\n"
  752. f"- content: {entry['content']}\n"
  753. f"- 注入于: sequence {entry['injected_at_sequence']}, goal {entry['goal_id']}"
  754. )
  755. prompt = f"""你是知识评估助手。请评估以下知识在本次任务执行中的实际效果。
  756. ## 当前任务(Mission)
  757. {mission}
  758. ## 当前 Goal
  759. {goal_desc}
  760. ## 待评估知识列表
  761. {chr(10).join(knowledge_list)}
  762. ## 评估维度
  763. 1. **helpfulness**: 知识内容是否对完成任务有实质帮助?
  764. 2. **relevance**: 执行过程中是否体现了该知识的内容?
  765. ## 评估分类
  766. - irrelevant: task与当前任务无关
  767. - unused: 相关但未使用
  768. - helpful: 有帮助
  769. - harmful: 有负面作用
  770. - neutral: 无明显作用
  771. ## 输出格式
  772. 请直接输出评估结果,使用JSON格式:
  773. {{
  774. "evaluations": [
  775. {{
  776. "knowledge_id": "knowledge-xxx",
  777. "eval_status": "helpful",
  778. "reason": "1-2句评估理由"
  779. }}
  780. ]
  781. }}
  782. """
  783. return prompt
  784. async def _single_turn_compress(
  785. self,
  786. trace_id: str,
  787. history: List[Dict],
  788. goal_tree: Optional[GoalTree],
  789. config: RunConfig,
  790. ) -> str:
  791. """单次 LLM 调用生成压缩摘要,返回 summary 文本"""
  792. self.log.info("执行单次 LLM 压缩")
  793. # 构建压缩 prompt(使用 SINGLE_TURN_PROMPT)
  794. from agent.core.prompts import build_single_turn_prompt
  795. goal_prompt = goal_tree.to_prompt(include_summary=True) if goal_tree else ""
  796. compress_prompt = build_single_turn_prompt(goal_prompt)
  797. compress_messages = list(history) + [
  798. {"role": "user", "content": compress_prompt}
  799. ]
  800. # 应用 Prompt Caching
  801. compress_messages = self._add_cache_control(
  802. compress_messages, config.model, config.enable_prompt_caching
  803. )
  804. # 单次 LLM 调用(无工具)
  805. result = await self.llm_call(
  806. messages=compress_messages,
  807. model=config.model,
  808. tools=[], # 不提供工具
  809. temperature=config.temperature,
  810. **config.extra_llm_params,
  811. )
  812. summary_text = result.get("content", "").strip()
  813. # 提取 [[SUMMARY]] 块
  814. if "[[SUMMARY]]" in summary_text:
  815. summary_text = summary_text[
  816. summary_text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):
  817. ].strip()
  818. return summary_text
  819. @staticmethod
  820. def _try_fix_json(s: str) -> Optional[dict]:
  821. """尝试修复常见的 JSON 截断/格式问题,返回 dict 或 None"""
  822. import re
  823. fixed = s.strip()
  824. # 1. 修复值中未转义的引号(如 "key": "he said "hello" to me")
  825. # 策略:找到 key-value 模式中值字符串内部的裸引号并转义
  826. def _fix_inner_quotes(text: str) -> str:
  827. # 匹配 ": "..." 模式,修复值内部的未转义引号
  828. result = []
  829. i = 0
  830. while i < len(text):
  831. # 找到 ": " 后面的值字符串开头
  832. if text[i] == '"':
  833. # 找到这个引号对应的字符串结束位置
  834. j = i + 1
  835. while j < len(text):
  836. if text[j] == '\\':
  837. j += 2 # 跳过转义字符
  838. continue
  839. if text[j] == '"':
  840. break
  841. j += 1
  842. # 检查引号后面是否是合法的 JSON 分隔符
  843. if j < len(text):
  844. after = j + 1
  845. # 跳过空白
  846. while after < len(text) and text[after] in ' \t\n\r':
  847. after += 1
  848. if after < len(text) and text[after] not in ':,}]\n\r':
  849. # 这个引号不是真正的结束引号,继续往后找
  850. # 找到下一个后面跟合法分隔符的引号
  851. k = j + 1
  852. found_end = False
  853. while k < len(text):
  854. if text[k] == '"':
  855. peek = k + 1
  856. while peek < len(text) and text[peek] in ' \t\n\r':
  857. peek += 1
  858. if peek >= len(text) or text[peek] in ':,}]':
  859. # 这才是真正的结束引号,转义中间的引号
  860. inner = text[i+1:k].replace('"', '\\"')
  861. result.append('"' + inner + '"')
  862. i = k + 1
  863. found_end = True
  864. break
  865. k += 1
  866. if found_end:
  867. continue
  868. result.append(text[i])
  869. i += 1
  870. return ''.join(result)
  871. fixed = _fix_inner_quotes(fixed)
  872. # 2. 去掉尾部多余逗号
  873. fixed = re.sub(r',\s*([}\]])', r'\1', fixed)
  874. # 3. 尝试补全截断的字符串和括号
  875. for suffix in ['', '"', '"}', '"]', '"}]', '"}}']:
  876. try:
  877. attempt = fixed + suffix
  878. open_braces = attempt.count('{') - attempt.count('}')
  879. open_brackets = attempt.count('[') - attempt.count(']')
  880. attempt += '}' * max(0, open_braces) + ']' * max(0, open_brackets)
  881. result = json.loads(attempt)
  882. if isinstance(result, dict):
  883. self.log.info(f"[JSON Fix] 成功修复 JSON (suffix={repr(suffix)})")
  884. return result
  885. except json.JSONDecodeError:
  886. continue
  887. return None
  888. async def _agent_loop(
  889. self,
  890. trace: Trace,
  891. history: List[Dict],
  892. goal_tree: Optional[GoalTree],
  893. config: RunConfig,
  894. sequence: int,
  895. inject_skills: Optional[List[str]] = None,
  896. skill_recency_threshold: int = 10,
  897. ) -> AsyncIterator[Union[Trace, Message]]:
  898. """ReAct 循环"""
  899. trace_id = trace.trace_id
  900. tool_schemas = self._get_tool_schemas(config.tools, config.tool_groups, config.exclude_tools)
  901. # 当前主路径头节点的 sequence(用于设置 parent_sequence)
  902. head_seq = trace.head_sequence
  903. # 侧分支状态(None = 主路径)
  904. side_branch_ctx: Optional[SideBranchContext] = None
  905. # 检查是否有未完成的侧分支需要恢复
  906. if trace.context.get("active_side_branch"):
  907. side_branch_data = trace.context["active_side_branch"]
  908. branch_id = side_branch_data["branch_id"]
  909. start_sequence = side_branch_data["start_sequence"]
  910. # 从数据库查询侧分支消息(按 sequence 范围)
  911. if self.trace_store:
  912. all_messages = await self.trace_store.get_trace_messages(trace_id)
  913. side_messages = [
  914. m for m in all_messages
  915. if m.sequence >= start_sequence
  916. ]
  917. # 恢复侧分支上下文
  918. side_branch_ctx = SideBranchContext(
  919. type=side_branch_data["type"],
  920. branch_id=branch_id,
  921. start_head_seq=side_branch_data["start_head_seq"],
  922. start_sequence=side_branch_data["start_sequence"],
  923. start_history_length=0, # 稍后重新计算
  924. start_iteration=side_branch_data.get("start_iteration", 0),
  925. max_turns=side_branch_data.get("max_turns", config.side_branch_max_turns),
  926. )
  927. self.log.info(
  928. f"恢复未完成的侧分支: {side_branch_ctx.type}, "
  929. f"max_turns={side_branch_ctx.max_turns}"
  930. )
  931. # 将侧分支消息追加到 history
  932. for m in side_messages:
  933. history.append(m.to_llm_dict())
  934. # 重新计算 start_history_length
  935. side_branch_ctx.start_history_length = len(history) - len(side_messages)
  936. break_after_side_branch = False # 侧分支退出后是否 break 主循环
  937. for iteration in range(config.max_iterations):
  938. # 更新活动时间(表明trace正在活跃运行)
  939. if self.trace_store:
  940. await self.trace_store.update_trace(
  941. trace_id,
  942. last_activity_at=datetime.now()
  943. )
  944. # 检查取消信号
  945. cancel_event = self._cancel_events.get(trace_id)
  946. if cancel_event and cancel_event.is_set():
  947. self.log.info(f"Trace {trace_id} stopped by user")
  948. if self.trace_store:
  949. await self.trace_store.update_trace(
  950. trace_id,
  951. status="stopped",
  952. head_sequence=head_seq,
  953. completed_at=datetime.now(),
  954. )
  955. # 广播状态变化给前端
  956. try:
  957. from agent.trace.websocket import broadcast_trace_status_changed
  958. await broadcast_trace_status_changed(trace_id, "stopped")
  959. except Exception:
  960. pass
  961. trace_obj = await self.trace_store.get_trace(trace_id)
  962. if trace_obj:
  963. yield trace_obj
  964. return
  965. # 检查Goal完成触发的知识评估
  966. if not side_branch_ctx and self.trace_store:
  967. trace = await self.trace_store.get_trace(trace_id)
  968. if trace and trace.context and trace.context.get("pending_knowledge_eval"):
  969. # 清除标志
  970. trace.context.pop("pending_knowledge_eval", None)
  971. await self.trace_store.update_trace(trace_id, context=trace.context)
  972. # 设置侧分支队列
  973. config.force_side_branch = ["knowledge_eval"]
  974. self.log.info("[Knowledge Eval] 检测到Goal完成触发,进入知识评估侧分支")
  975. # Context 管理(仅主路径)
  976. needs_enter_side_branch = False
  977. if not side_branch_ctx:
  978. # 侧分支退出后需要 break 主循环
  979. if break_after_side_branch and not config.force_side_branch:
  980. break
  981. # 检查是否强制进入侧分支(API 手动触发或自动压缩流程)
  982. if config.force_side_branch:
  983. needs_enter_side_branch = True
  984. self.log.info(f"强制进入侧分支: {config.force_side_branch}")
  985. else:
  986. # 正常的 context 管理逻辑
  987. history, head_seq, sequence, needs_enter_side_branch = await self._manage_context_usage(
  988. trace_id, history, goal_tree, config, sequence, head_seq
  989. )
  990. # 进入侧分支
  991. if needs_enter_side_branch and not side_branch_ctx:
  992. # 刷新 trace,获取 _manage_context_usage 可能写入 DB 的 knowledge_eval_trigger
  993. if self.trace_store:
  994. fresh = await self.trace_store.get_trace(trace_id)
  995. if fresh:
  996. trace = fresh
  997. # 从队列中取出第一个侧分支类型
  998. branch_type: Literal["compression", "reflection", "knowledge_eval"]
  999. if config.force_side_branch and isinstance(config.force_side_branch, list) and len(config.force_side_branch) > 0:
  1000. branch_type = config.force_side_branch.pop(0) # type: ignore
  1001. self.log.info(f"从队列取出侧分支: {branch_type}, 剩余队列: {config.force_side_branch}")
  1002. elif config.knowledge.enable_extraction:
  1003. # 兼容旧的单值模式(如果 force_side_branch 是字符串)
  1004. branch_type = "reflection"
  1005. else:
  1006. # 自动触发:压缩
  1007. branch_type = "compression"
  1008. branch_id = f"{branch_type}_{uuid.uuid4().hex[:8]}"
  1009. side_branch_ctx = SideBranchContext(
  1010. type=branch_type,
  1011. branch_id=branch_id,
  1012. start_head_seq=head_seq,
  1013. start_sequence=sequence,
  1014. start_history_length=len(history),
  1015. start_iteration=iteration,
  1016. max_turns=config.side_branch_max_turns,
  1017. )
  1018. # 持久化侧分支状态
  1019. if self.trace_store:
  1020. # 获取触发事件(如果是 knowledge_eval 分支)
  1021. trigger_event = trace.context.get("knowledge_eval_trigger", "unknown") if branch_type == "knowledge_eval" else None
  1022. trace.context["active_side_branch"] = {
  1023. "type": side_branch_ctx.type,
  1024. "branch_id": side_branch_ctx.branch_id,
  1025. "start_head_seq": side_branch_ctx.start_head_seq,
  1026. "start_sequence": side_branch_ctx.start_sequence,
  1027. "start_iteration": side_branch_ctx.start_iteration,
  1028. "max_turns": side_branch_ctx.max_turns,
  1029. "started_at": datetime.now().isoformat(),
  1030. }
  1031. # 如果是 knowledge_eval 分支,添加 trigger_event
  1032. if trigger_event:
  1033. trace.context["active_side_branch"]["trigger_event"] = trigger_event
  1034. # 清除触发事件标记
  1035. trace.context.pop("knowledge_eval_trigger", None)
  1036. await self.trace_store.update_trace(
  1037. trace_id,
  1038. context=trace.context
  1039. )
  1040. # 追加侧分支 prompt
  1041. if branch_type == "reflection":
  1042. # 完成场景用全局复盘 prompt,压缩场景用阶段性反思 prompt
  1043. if break_after_side_branch:
  1044. prompt = config.knowledge.get_completion_reflect_prompt()
  1045. else:
  1046. prompt = config.knowledge.get_reflect_prompt()
  1047. elif branch_type == "knowledge_eval":
  1048. prompt = await self._build_knowledge_eval_prompt(trace_id, goal_tree)
  1049. else: # compression
  1050. from agent.trace.compaction import build_compression_prompt
  1051. prompt = build_compression_prompt(goal_tree)
  1052. branch_user_msg = Message.create(
  1053. trace_id=trace_id,
  1054. role="user",
  1055. sequence=sequence,
  1056. parent_sequence=head_seq,
  1057. goal_id=goal_tree.current_id if goal_tree else None,
  1058. branch_type=branch_type,
  1059. branch_id=branch_id,
  1060. content=prompt,
  1061. )
  1062. if self.trace_store:
  1063. await self.trace_store.add_message(branch_user_msg)
  1064. history.append(branch_user_msg.to_llm_dict())
  1065. head_seq = sequence
  1066. sequence += 1
  1067. self.log.info(f"进入侧分支: {branch_type}, branch_id={branch_id}")
  1068. continue # 跳过本轮,下一轮开始侧分支
  1069. # 构建 LLM messages(注入上下文,移除内部字段)
  1070. llm_messages = [{k: v for k, v in msg.items() if not k.startswith("_")} for msg in history]
  1071. # 优化已处理的图片(分级处理:保留/压缩/描述)
  1072. llm_messages = await self._optimize_images(llm_messages, config.model)
  1073. # 对历史消息应用 Prompt Caching
  1074. llm_messages = self._add_cache_control(
  1075. llm_messages,
  1076. config.model,
  1077. config.enable_prompt_caching
  1078. )
  1079. # 调用 LLM(等待完成后再检查 cancel 信号,不中断正在进行的调用)
  1080. result = await self.llm_call(
  1081. messages=llm_messages,
  1082. model=config.model,
  1083. tools=tool_schemas,
  1084. temperature=config.temperature,
  1085. **config.extra_llm_params,
  1086. )
  1087. response_content = result.get("content", "")
  1088. reasoning_content = result.get("reasoning_content", "")
  1089. tool_calls = result.get("tool_calls")
  1090. finish_reason = result.get("finish_reason")
  1091. prompt_tokens = result.get("prompt_tokens", 0)
  1092. completion_tokens = result.get("completion_tokens", 0)
  1093. step_cost = result.get("cost", 0)
  1094. cache_creation_tokens = result.get("cache_creation_tokens")
  1095. cache_read_tokens = result.get("cache_read_tokens")
  1096. # 周期性自动注入上下文(仅主路径;config.context_injection_interval=0 时完全关闭)
  1097. _cii = getattr(config, "context_injection_interval", CONTEXT_INJECTION_INTERVAL)
  1098. if not side_branch_ctx and _cii > 0 and iteration % _cii == 0:
  1099. # 检查是否已经调用了 get_current_context
  1100. if tool_calls:
  1101. has_context_call = any(
  1102. tc.get("function", {}).get("name") == "get_current_context"
  1103. for tc in tool_calls
  1104. )
  1105. else:
  1106. has_context_call = False
  1107. tool_calls = []
  1108. if not has_context_call:
  1109. # 手动添加 get_current_context 工具调用
  1110. context_call_id = f"call_context_{uuid.uuid4().hex[:8]}"
  1111. tool_calls.append({
  1112. "id": context_call_id,
  1113. "type": "function",
  1114. "function": {"name": "get_current_context", "arguments": "{}"}
  1115. })
  1116. self.log.info(f"[周期性注入] 自动添加 get_current_context 工具调用 (iteration={iteration})")
  1117. # Skill 指定注入(仅主路径,首轮 iteration==0 时执行)
  1118. if not side_branch_ctx and inject_skills and iteration == 0:
  1119. skills_to_inject = self._check_skills_need_injection(
  1120. trace, inject_skills, history, skill_recency_threshold
  1121. )
  1122. if skills_to_inject:
  1123. if not tool_calls:
  1124. tool_calls = []
  1125. for skill_name in skills_to_inject:
  1126. skill_call_id = f"call_skill_{skill_name}_{uuid.uuid4().hex[:8]}"
  1127. tool_calls.append({
  1128. "id": skill_call_id,
  1129. "type": "function",
  1130. "function": {
  1131. "name": "skill",
  1132. "arguments": json.dumps({"skill_name": skill_name})
  1133. }
  1134. })
  1135. self.log.info(f"[Skill 指定注入] 自动添加 skill(\"{skill_name}\") 工具调用")
  1136. # 按需自动创建 root goal(仅主路径)
  1137. if not side_branch_ctx and goal_tree and not goal_tree.goals and tool_calls:
  1138. has_goal_call = any(
  1139. tc.get("function", {}).get("name") == "goal"
  1140. for tc in tool_calls
  1141. )
  1142. self.log.debug(f"[Auto Root Goal] Before tool execution: goal_tree.goals={len(goal_tree.goals)}, has_goal_call={has_goal_call}, tool_calls={[tc.get('function', {}).get('name') for tc in tool_calls]}")
  1143. if not has_goal_call:
  1144. mission = goal_tree.mission
  1145. root_desc = mission[:200] if len(mission) > 200 else mission
  1146. goal_tree.add_goals(
  1147. descriptions=[root_desc],
  1148. reasons=["系统自动创建:Agent 未显式创建目标"],
  1149. parent_id=None
  1150. )
  1151. if self.trace_store:
  1152. await self.trace_store.add_goal(trace_id, goal_tree.goals[0])
  1153. await self.trace_store.update_goal_tree(trace_id, goal_tree)
  1154. self.log.info(f"自动创建 root goal: {goal_tree.goals[0].id}(未自动 focus,等待模型决定)")
  1155. else:
  1156. self.log.debug(f"[Auto Root Goal] 检测到 goal 工具调用,跳过自动创建")
  1157. # 获取当前 goal_id
  1158. current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
  1159. # 记录 assistant Message(parent_sequence 指向当前 head)
  1160. assistant_msg = Message.create(
  1161. trace_id=trace_id,
  1162. role="assistant",
  1163. sequence=sequence,
  1164. goal_id=current_goal_id,
  1165. parent_sequence=head_seq if head_seq > 0 else None,
  1166. branch_type=side_branch_ctx.type if side_branch_ctx else None,
  1167. branch_id=side_branch_ctx.branch_id if side_branch_ctx else None,
  1168. content={"text": response_content, "tool_calls": tool_calls, "reasoning_content": reasoning_content or None},
  1169. prompt_tokens=prompt_tokens,
  1170. completion_tokens=completion_tokens,
  1171. cache_creation_tokens=cache_creation_tokens,
  1172. cache_read_tokens=cache_read_tokens,
  1173. finish_reason=finish_reason,
  1174. cost=step_cost,
  1175. )
  1176. if self.trace_store:
  1177. await self.trace_store.add_message(assistant_msg)
  1178. # 记录模型使用
  1179. await self.trace_store.record_model_usage(
  1180. trace_id=trace_id,
  1181. sequence=sequence,
  1182. role="assistant",
  1183. model=config.model,
  1184. prompt_tokens=prompt_tokens,
  1185. completion_tokens=completion_tokens,
  1186. cache_read_tokens=cache_read_tokens or 0,
  1187. )
  1188. # 知识评估侧分支:即时检测并写入评估结果
  1189. if side_branch_ctx and side_branch_ctx.type == "knowledge_eval":
  1190. text = response_content if isinstance(response_content, str) else ""
  1191. eval_results = None
  1192. try:
  1193. eval_results = json.loads(text.strip())
  1194. if "evaluations" not in eval_results:
  1195. eval_results = None
  1196. except json.JSONDecodeError:
  1197. import re
  1198. json_match = re.search(r'```json\s*(\{.*?\})\s*```', text, re.DOTALL)
  1199. if json_match:
  1200. try:
  1201. eval_results = json.loads(json_match.group(1))
  1202. except json.JSONDecodeError:
  1203. pass
  1204. if not eval_results:
  1205. json_match = re.search(r'\{[^{]*"evaluations"[^}]*\[[^\]]*\][^}]*\}', text, re.DOTALL)
  1206. if json_match:
  1207. try:
  1208. eval_results = json.loads(json_match.group(0))
  1209. except json.JSONDecodeError:
  1210. pass
  1211. if eval_results and self.trace_store:
  1212. current_trace = await self.trace_store.get_trace(trace_id)
  1213. trigger_event = current_trace.context.get("active_side_branch", {}).get("trigger_event", "unknown")
  1214. for eval_item in eval_results.get("evaluations", []):
  1215. await self.trace_store.update_knowledge_evaluation(
  1216. trace_id=trace_id,
  1217. knowledge_id=eval_item["knowledge_id"],
  1218. eval_result={
  1219. "eval_status": eval_item["eval_status"],
  1220. "reason": eval_item.get("reason", "")
  1221. },
  1222. trigger_event=trigger_event
  1223. )
  1224. self.log.info(f"[Knowledge Eval] 已写入 {len(eval_results.get('evaluations', []))} 条评估结果")
  1225. # 如果在侧分支,记录到 assistant_msg(已持久化,不需要额外维护)
  1226. yield assistant_msg
  1227. head_seq = sequence
  1228. sequence += 1
  1229. # 检查侧分支是否应该退出
  1230. if side_branch_ctx:
  1231. # 计算侧分支已执行的轮次
  1232. turns_in_branch = iteration - side_branch_ctx.start_iteration
  1233. should_exit = turns_in_branch >= side_branch_ctx.max_turns or not tool_calls
  1234. if turns_in_branch >= side_branch_ctx.max_turns:
  1235. self.log.warning(
  1236. f"侧分支 {side_branch_ctx.type} 达到最大轮次 "
  1237. f"{side_branch_ctx.max_turns},强制退出"
  1238. )
  1239. if should_exit and side_branch_ctx.type == "compression":
  1240. # === 压缩侧分支退出(超时 + 正常完成统一处理)===
  1241. summary_text = ""
  1242. # 1. 从当前回复提取
  1243. if response_content:
  1244. if "[[SUMMARY]]" in response_content:
  1245. summary_text = response_content[
  1246. response_content.index("[[SUMMARY]]") + len("[[SUMMARY]]"):
  1247. ].strip()
  1248. elif response_content.strip():
  1249. summary_text = response_content.strip()
  1250. # 2. 从持久化存储按 sequence 范围查询
  1251. if not summary_text and self.trace_store:
  1252. all_messages = await self.trace_store.get_trace_messages(trace_id)
  1253. side_messages = [
  1254. m for m in all_messages
  1255. if m.sequence >= side_branch_ctx.start_sequence
  1256. ]
  1257. for msg in reversed(side_messages):
  1258. if msg.role == "assistant" and isinstance(msg.content, dict):
  1259. text = msg.content.get("text", "")
  1260. if "[[SUMMARY]]" in text:
  1261. summary_text = text[text.index("[[SUMMARY]]") + len("[[SUMMARY]]"):].strip()
  1262. break
  1263. elif text:
  1264. summary_text = text
  1265. break
  1266. # 3. 单次 LLM 调用
  1267. if not summary_text:
  1268. self.log.warning("侧分支未生成有效 summary,fallback 到单次 LLM 压缩")
  1269. pre_branch_history = history[:side_branch_ctx.start_history_length]
  1270. summary_text = await self._single_turn_compress(
  1271. trace_id, pre_branch_history, goal_tree, config,
  1272. )
  1273. # 创建主路径 summary 消息并重建 history
  1274. if summary_text:
  1275. # 清理侧分支指令,防止泄露到主分支
  1276. summary_text = summary_text.replace(
  1277. "**生成摘要后立即停止,不要继续执行原有任务。**", ""
  1278. ).strip()
  1279. from agent.core.prompts import build_summary_header
  1280. summary_content = build_summary_header(summary_text)
  1281. if goal_tree and goal_tree.goals:
  1282. goal_tree_detail = goal_tree.to_prompt(include_summary=True)
  1283. summary_content += f"\n\n## Current Plan\n\n{goal_tree_detail}"
  1284. # 找第一条 user message 的 sequence 作为 parent
  1285. # 续跑时 get_main_path_messages 沿 parent 链回溯,
  1286. # 指向 first_user 可以跳过所有被压缩的中间消息
  1287. first_user_seq = None
  1288. if self.trace_store:
  1289. all_msgs = await self.trace_store.get_trace_messages(trace_id)
  1290. for m in all_msgs:
  1291. if m.role == "user":
  1292. first_user_seq = m.sequence
  1293. break
  1294. summary_msg = Message.create(
  1295. trace_id=trace_id,
  1296. role="user",
  1297. sequence=sequence,
  1298. parent_sequence=first_user_seq,
  1299. branch_type=None,
  1300. content=summary_content,
  1301. )
  1302. if self.trace_store:
  1303. await self.trace_store.add_message(summary_msg)
  1304. history = self._rebuild_history_after_compression(
  1305. history, summary_msg.to_llm_dict(), label="压缩侧分支"
  1306. )
  1307. head_seq = sequence
  1308. sequence += 1
  1309. else:
  1310. self.log.error("所有压缩方案均未生成有效 summary,跳过压缩")
  1311. # 回退 history 到侧分支开始前,防止侧分支指令泄露到主分支
  1312. history = history[:side_branch_ctx.start_history_length]
  1313. head_seq = side_branch_ctx.start_head_seq
  1314. # 清理
  1315. trace.context.pop("active_side_branch", None)
  1316. config.force_side_branch = None
  1317. if self.trace_store:
  1318. await self.trace_store.update_trace(
  1319. trace_id, context=trace.context, head_sequence=head_seq,
  1320. )
  1321. side_branch_ctx = None
  1322. continue
  1323. elif should_exit and side_branch_ctx.type == "reflection":
  1324. # === 反思侧分支退出(超时 + 正常完成统一处理)===
  1325. self.log.info("反思侧分支退出")
  1326. # auto-commit hook:默认 pending 要等人工 review,
  1327. # 但 reflect_auto_commit=True 时视作全部 approved,直接批量 upload。
  1328. if (
  1329. self.trace_store
  1330. and getattr(config.knowledge, "reflect_auto_commit", False)
  1331. ):
  1332. try:
  1333. from agent.trace.extraction_review import auto_commit_branch
  1334. report = await auto_commit_branch(
  1335. self.trace_store,
  1336. trace_id,
  1337. side_branch_ctx.branch_id,
  1338. )
  1339. if report.committed or report.failed:
  1340. self.log.info(
  1341. f"[auto-commit] committed={len(report.committed)} "
  1342. f"failed={len(report.failed)} skipped={len(report.skipped)}"
  1343. )
  1344. except Exception as e:
  1345. self.log.error(f"[auto-commit] 反思分支自动提交失败: {e}")
  1346. # 恢复主路径
  1347. if self.trace_store:
  1348. main_path_messages = await self.trace_store.get_main_path_messages(
  1349. trace_id, side_branch_ctx.start_head_seq
  1350. )
  1351. history = [m.to_llm_dict() for m in main_path_messages]
  1352. head_seq = side_branch_ctx.start_head_seq
  1353. # 清理
  1354. trace.context.pop("active_side_branch", None)
  1355. if not config.force_side_branch or len(config.force_side_branch) == 0:
  1356. config.force_side_branch = None
  1357. self.log.info("反思完成,队列为空")
  1358. if self.trace_store:
  1359. await self.trace_store.update_trace(
  1360. trace_id, context=trace.context, head_sequence=head_seq,
  1361. )
  1362. side_branch_ctx = None
  1363. continue
  1364. elif should_exit and side_branch_ctx.type == "knowledge_eval":
  1365. # === 知识评估侧分支退出 ===
  1366. self.log.info("知识评估侧分支退出")
  1367. # 恢复主路径
  1368. if self.trace_store:
  1369. main_path_messages = await self.trace_store.get_main_path_messages(
  1370. trace_id, side_branch_ctx.start_head_seq
  1371. )
  1372. history = [m.to_llm_dict() for m in main_path_messages]
  1373. head_seq = side_branch_ctx.start_head_seq
  1374. # 清理
  1375. trace.context.pop("active_side_branch", None)
  1376. if not config.force_side_branch or len(config.force_side_branch) == 0:
  1377. config.force_side_branch = None
  1378. self.log.info("知识评估完成,队列为空")
  1379. if self.trace_store:
  1380. await self.trace_store.update_trace(
  1381. trace_id, context=trace.context, head_sequence=head_seq,
  1382. )
  1383. side_branch_ctx = None
  1384. continue
  1385. # 处理工具调用
  1386. # 截断兜底:finish_reason == "length" 说明响应被 max_tokens 截断,
  1387. # tool call 参数很可能不完整,不应执行,改为提示模型分批操作
  1388. if tool_calls and finish_reason == "length":
  1389. self.log.warning(
  1390. "[Runner] 响应被 max_tokens 截断,跳过 %d 个不完整的 tool calls",
  1391. len(tool_calls),
  1392. )
  1393. truncation_hint = TRUNCATION_HINT
  1394. history.append({
  1395. "role": "assistant",
  1396. "content": response_content,
  1397. "tool_calls": tool_calls,
  1398. })
  1399. # 为每个被截断的 tool call 返回错误结果
  1400. for tc in tool_calls:
  1401. history.append({
  1402. "role": "tool",
  1403. "tool_call_id": tc["id"],
  1404. "content": truncation_hint,
  1405. })
  1406. continue
  1407. if tool_calls and config.auto_execute_tools:
  1408. history.append({
  1409. "role": "assistant",
  1410. "content": response_content,
  1411. "tool_calls": tool_calls,
  1412. })
  1413. if config.parallel_tool_execution:
  1414. # === 并发执行 ===
  1415. current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
  1416. async def _execute_single_tool(tc: dict) -> tuple:
  1417. tool_name = tc["function"]["name"]
  1418. tool_args = tc["function"]["arguments"]
  1419. if isinstance(tool_args, str):
  1420. if not tool_args.strip():
  1421. tool_args = {}
  1422. else:
  1423. try:
  1424. tool_args = json.loads(tool_args)
  1425. except json.JSONDecodeError:
  1426. tool_args = self._try_fix_json(tool_args)
  1427. if tool_args is None:
  1428. self.log.warning(f"[Tool Call] JSON 解析失败: {tc['function']['arguments'][:200]}")
  1429. tc["function"]["arguments"] = json.dumps({"_error": "JSON parse failed", "_raw": tc["function"]["arguments"][:200]}, ensure_ascii=False)
  1430. return (tc, None, f"Error: 工具参数 JSON 格式错误,无法解析。原始参数: {tc['function']['arguments'][:200]}")
  1431. elif tool_args is None:
  1432. tool_args = {}
  1433. args_str = json.dumps(tool_args, ensure_ascii=False)
  1434. args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
  1435. self.log.info(f"[Tool Call] {tool_name}({args_display})")
  1436. trigger_event_for_tool = None
  1437. if side_branch_ctx and side_branch_ctx.type == "knowledge_eval" and self.trace_store:
  1438. current_trace = await self.trace_store.get_trace(trace_id)
  1439. if current_trace:
  1440. trigger_event_for_tool = current_trace.context.get("active_side_branch", {}).get("trigger_event", "unknown")
  1441. if tool_name in ("toolhub_call", "toolhub_search", "toolhub_health"):
  1442. try:
  1443. from agent.tools.builtin.toolhub import set_trace_context
  1444. set_trace_context(trace_id)
  1445. except ImportError:
  1446. pass
  1447. try:
  1448. tool_result = await self.tools.execute(
  1449. tool_name, tool_args, uid=config.uid or "",
  1450. context={"store": self.trace_store, "trace_id": trace_id, "goal_id": current_goal_id, "runner": self, "goal_tree": goal_tree, "knowledge_config": config.knowledge, "sequence": sequence, "side_branch": {"type": side_branch_ctx.type, "branch_id": side_branch_ctx.branch_id, "is_side_branch": True, "max_turns": side_branch_ctx.max_turns, "trigger_event": trigger_event_for_tool} if side_branch_ctx else None, **(config.context or {})}
  1451. )
  1452. return (tc, tool_args, tool_result)
  1453. except Exception as e:
  1454. import traceback
  1455. return (tc, tool_args, f"Error executing tool {tool_name}: {str(e)}\n{traceback.format_exc()}")
  1456. tasks = [_execute_single_tool(tc) for tc in tool_calls]
  1457. results = await asyncio.gather(*tasks)
  1458. for res in results:
  1459. tc, tool_args, tool_result = res
  1460. tool_name = tc["function"]["name"]
  1461. if tool_args is None:
  1462. history.append({"role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": tool_result})
  1463. yield Message.create(trace_id=trace_id, role="tool", sequence=sequence, parent_sequence=head_seq, tool_call_id=tc["id"], content=tool_result)
  1464. head_seq = sequence
  1465. sequence += 1
  1466. continue
  1467. if tool_name == "goal" and goal_tree:
  1468. self.log.debug(f"[Goal Tool] After execution: goal_tree.goals={len(goal_tree.goals)}, current_id={goal_tree.current_id}")
  1469. if tool_name == "upload_knowledge" and isinstance(tool_result, dict):
  1470. self.log.info(f"[Knowledge Tracking] 知识已上传")
  1471. if isinstance(tool_result, str):
  1472. tool_result = {"text": tool_result}
  1473. elif not isinstance(tool_result, dict):
  1474. tool_result = {"text": str(tool_result)}
  1475. tool_text = tool_result.get("text", str(tool_result))
  1476. tool_images = tool_result.get("images", [])
  1477. tool_usage = tool_result.get("tool_usage")
  1478. if tool_images:
  1479. tool_result_text = tool_text
  1480. tool_content_for_llm = [{"type": "text", "text": tool_text}]
  1481. for img in tool_images:
  1482. if img.get("type") == "base64" and img.get("data"):
  1483. media_type = img.get("media_type", "image/png")
  1484. tool_content_for_llm.append({"type": "image_url", "image_url": {"url": f"data:{media_type};base64,{img['data']}"}})
  1485. elif img.get("type") == "url" and img.get("url"):
  1486. tool_content_for_llm.append({"type": "image_url", "image_url": {"url": img["url"]}})
  1487. else:
  1488. tool_result_text = tool_text
  1489. tool_content_for_llm = tool_text
  1490. tool_msg = Message.create(trace_id=trace_id, role="tool", sequence=sequence, goal_id=current_goal_id, parent_sequence=head_seq, tool_call_id=tc["id"], branch_type=side_branch_ctx.type if side_branch_ctx else None, branch_id=side_branch_ctx.branch_id if side_branch_ctx else None, content={"tool_name": tool_name, "result": tool_content_for_llm})
  1491. if self.trace_store:
  1492. await self.trace_store.add_message(tool_msg)
  1493. if tool_usage:
  1494. await self.trace_store.record_model_usage(trace_id=trace_id, sequence=sequence, role="tool", tool_name=tool_name, model=tool_usage.get("model"), prompt_tokens=tool_usage.get("prompt_tokens", 0), completion_tokens=tool_usage.get("completion_tokens", 0), cache_read_tokens=tool_usage.get("cache_read_tokens", 0))
  1495. if tool_images:
  1496. import base64 as b64mod
  1497. for img in tool_images:
  1498. if img.get("data"):
  1499. png_path = self.trace_store._get_messages_dir(trace_id) / f"{tool_msg.message_id}.png"
  1500. png_path.write_bytes(b64mod.b64decode(img["data"]))
  1501. break
  1502. yield tool_msg
  1503. head_seq = sequence
  1504. sequence += 1
  1505. history.append({"role": "tool", "tool_call_id": tc["id"], "name": tool_name, "content": tool_content_for_llm, "_message_id": tool_msg.message_id})
  1506. if tool_name == "skill" and tc["id"].startswith("call_skill_"):
  1507. try:
  1508. skill_args = json.loads(tc["function"]["arguments"]) if isinstance(tc["function"]["arguments"], str) else tc["function"]["arguments"]
  1509. injected_skill_name = skill_args.get("skill_name", "")
  1510. if injected_skill_name:
  1511. await self._update_skill_injection_record(trace_id, trace, injected_skill_name, tool_msg.message_id, tool_msg.sequence)
  1512. self.log.info(f"[Skill 指定注入] 已记录 {injected_skill_name} → msg={tool_msg.message_id}")
  1513. except Exception as e:
  1514. self.log.warning(f"[Skill 指定注入] 记录追踪失败: {e}")
  1515. else:
  1516. for tc in tool_calls:
  1517. current_goal_id = goal_tree.current_id if (goal_tree and goal_tree.current_id) else None
  1518. tool_name = tc["function"]["name"]
  1519. tool_args = tc["function"]["arguments"]
  1520. if isinstance(tool_args, str):
  1521. if not tool_args.strip():
  1522. tool_args = {}
  1523. else:
  1524. try:
  1525. tool_args = json.loads(tool_args)
  1526. except json.JSONDecodeError:
  1527. # 尝试修复常见的截断/格式问题
  1528. tool_args = self._try_fix_json(tool_args)
  1529. if tool_args is None:
  1530. self.log.warning(f"[Tool Call] JSON 解析失败,跳过工具调用 {tool_name}: {tc['function']['arguments'][:200]}")
  1531. # 修复 history 中 assistant message 里的残缺 JSON,
  1532. # 避免 Qwen API 拒绝 "function.arguments must be in JSON format"
  1533. tc["function"]["arguments"] = json.dumps(
  1534. {"_error": "JSON parse failed", "_raw": tc["function"]["arguments"][:200]},
  1535. ensure_ascii=False,
  1536. )
  1537. history.append({
  1538. "role": "tool",
  1539. "tool_call_id": tc["id"],
  1540. "content": f"Error: 工具参数 JSON 格式错误,无法解析。请重新生成正确的 JSON 参数调用此工具。原始参数: {tc['function']['arguments'][:200]}",
  1541. })
  1542. # 注意:这里不 yield Message,因为缺少必需参数会导致错误
  1543. # yield Message 应该由 trace_store 统一管理
  1544. continue
  1545. elif tool_args is None:
  1546. tool_args = {}
  1547. # 记录工具调用(INFO 级别,显示参数)
  1548. args_str = json.dumps(tool_args, ensure_ascii=False)
  1549. args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
  1550. self.log.info(f"[Tool Call] {tool_name}({args_display})")
  1551. # 获取trigger_event(如果在knowledge_eval侧分支中)
  1552. trigger_event_for_tool = None
  1553. if side_branch_ctx and side_branch_ctx.type == "knowledge_eval" and self.trace_store:
  1554. current_trace = await self.trace_store.get_trace(trace_id)
  1555. if current_trace:
  1556. trigger_event_for_tool = current_trace.context.get("active_side_branch", {}).get("trigger_event", "unknown")
  1557. # 设置 trace_id 上下文供 toolhub 使用(图片保存到 outputs/{trace_id}/)
  1558. if tool_name in ("toolhub_call", "toolhub_search", "toolhub_health"):
  1559. try:
  1560. from agent.tools.builtin.toolhub import set_trace_context
  1561. set_trace_context(trace_id)
  1562. except ImportError:
  1563. pass
  1564. tool_result = await self.tools.execute(
  1565. tool_name,
  1566. tool_args,
  1567. uid=config.uid or "",
  1568. context={
  1569. "store": self.trace_store,
  1570. "trace_id": trace_id,
  1571. "goal_id": current_goal_id,
  1572. "runner": self,
  1573. "goal_tree": goal_tree,
  1574. "knowledge_config": config.knowledge,
  1575. "sequence": sequence, # 添加sequence用于知识注入记录
  1576. # 新增:侧分支信息
  1577. "side_branch": {
  1578. "type": side_branch_ctx.type,
  1579. "branch_id": side_branch_ctx.branch_id,
  1580. "is_side_branch": True,
  1581. "max_turns": side_branch_ctx.max_turns,
  1582. "trigger_event": trigger_event_for_tool,
  1583. } if side_branch_ctx else None,
  1584. # 合并用户自定义 context(RunConfig.context)
  1585. **(config.context or {}),
  1586. },
  1587. )
  1588. # 如果是 goal 工具,记录执行后的状态
  1589. if tool_name == "goal" and goal_tree:
  1590. self.log.debug(f"[Goal Tool] After execution: goal_tree.goals={len(goal_tree.goals)}, current_id={goal_tree.current_id}")
  1591. # 跟踪上传的知识(通过 upload_knowledge)
  1592. if tool_name == "upload_knowledge" and isinstance(tool_result, dict):
  1593. metadata = tool_result.get("metadata", {})
  1594. # upload_knowledge 返回的是统计信息,不是单个 knowledge_id
  1595. # 这里只记录上传动作,不跟踪具体 ID
  1596. self.log.info(f"[Knowledge Tracking] 知识已上传到 Knowledge Manager")
  1597. # --- 支持多模态工具反馈 ---
  1598. # execute() 返回 dict{"text","images","tool_usage"} 或 str
  1599. # 统一为dict格式
  1600. if isinstance(tool_result, str):
  1601. tool_result = {"text": tool_result}
  1602. tool_text = tool_result.get("text", str(tool_result))
  1603. tool_images = tool_result.get("images", [])
  1604. tool_usage = tool_result.get("tool_usage") # 新增:提取tool_usage
  1605. # 处理多模态消息
  1606. if tool_images:
  1607. tool_result_text = tool_text
  1608. # 构建多模态消息格式
  1609. tool_content_for_llm = [{"type": "text", "text": tool_text}]
  1610. for img in tool_images:
  1611. if img.get("type") == "base64" and img.get("data"):
  1612. media_type = img.get("media_type", "image/png")
  1613. tool_content_for_llm.append({
  1614. "type": "image_url",
  1615. "image_url": {
  1616. "url": f"data:{media_type};base64,{img['data']}"
  1617. }
  1618. })
  1619. elif img.get("type") == "url" and img.get("url"):
  1620. tool_content_for_llm.append({
  1621. "type": "image_url",
  1622. "image_url": {
  1623. "url": img["url"]
  1624. }
  1625. })
  1626. img_count = len(tool_content_for_llm) - 1 # 减去 text 块
  1627. print(f"[Runner] 多模态工具反馈: tool={tool_name}, images={img_count}, text_len={len(tool_result_text)}")
  1628. else:
  1629. tool_result_text = tool_text
  1630. tool_content_for_llm = tool_text
  1631. tool_msg = Message.create(
  1632. trace_id=trace_id,
  1633. role="tool",
  1634. sequence=sequence,
  1635. goal_id=current_goal_id,
  1636. parent_sequence=head_seq,
  1637. tool_call_id=tc["id"],
  1638. branch_type=side_branch_ctx.type if side_branch_ctx else None,
  1639. branch_id=side_branch_ctx.branch_id if side_branch_ctx else None,
  1640. # 存储完整内容:有图片时保留 list(含 image_url),纯文本时存字符串
  1641. content={"tool_name": tool_name, "result": tool_content_for_llm},
  1642. )
  1643. if self.trace_store:
  1644. await self.trace_store.add_message(tool_msg)
  1645. # 记录工具的模型使用
  1646. if tool_usage:
  1647. await self.trace_store.record_model_usage(
  1648. trace_id=trace_id,
  1649. sequence=sequence,
  1650. role="tool",
  1651. tool_name=tool_name,
  1652. model=tool_usage.get("model"),
  1653. prompt_tokens=tool_usage.get("prompt_tokens", 0),
  1654. completion_tokens=tool_usage.get("completion_tokens", 0),
  1655. cache_read_tokens=tool_usage.get("cache_read_tokens", 0),
  1656. )
  1657. # 截图单独存为同名 PNG 文件
  1658. if tool_images:
  1659. import base64 as b64mod
  1660. for img in tool_images:
  1661. if img.get("data"):
  1662. png_path = self.trace_store._get_messages_dir(trace_id) / f"{tool_msg.message_id}.png"
  1663. png_path.write_bytes(b64mod.b64decode(img["data"]))
  1664. print(f"[Runner] 截图已保存: {png_path.name}")
  1665. break # 只存第一张
  1666. # 如果在侧分支,tool_msg 已持久化(不需要额外维护)
  1667. yield tool_msg
  1668. head_seq = sequence
  1669. sequence += 1
  1670. history.append({
  1671. "role": "tool",
  1672. "tool_call_id": tc["id"],
  1673. "name": tool_name,
  1674. "content": tool_content_for_llm,
  1675. "_message_id": tool_msg.message_id,
  1676. })
  1677. # 更新 skill 注入追踪记录
  1678. if tool_name == "skill" and tc["id"].startswith("call_skill_"):
  1679. try:
  1680. skill_args = json.loads(tc["function"]["arguments"]) if isinstance(tc["function"]["arguments"], str) else tc["function"]["arguments"]
  1681. injected_skill_name = skill_args.get("skill_name", "")
  1682. if injected_skill_name:
  1683. await self._update_skill_injection_record(
  1684. trace_id, trace, injected_skill_name,
  1685. tool_msg.message_id, tool_msg.sequence,
  1686. )
  1687. self.log.info(f"[Skill 指定注入] 已记录 {injected_skill_name} → msg={tool_msg.message_id}")
  1688. except Exception as e:
  1689. self.log.warning(f"[Skill 指定注入] 记录追踪失败: {e}")
  1690. # on_complete 模式:goal(done=...) 后立即压缩该 goal 的消息
  1691. if (
  1692. not side_branch_ctx
  1693. and config.goal_compression == "on_complete"
  1694. and self.trace_store
  1695. and goal_tree
  1696. ):
  1697. has_goal_done = False
  1698. for tc in tool_calls:
  1699. if tc["function"]["name"] != "goal":
  1700. continue
  1701. try:
  1702. raw = tc["function"]["arguments"]
  1703. args = json.loads(raw) if isinstance(raw, str) and raw.strip() else {}
  1704. except (json.JSONDecodeError, TypeError):
  1705. args = {}
  1706. if args.get("done") is not None:
  1707. has_goal_done = True
  1708. break
  1709. if has_goal_done:
  1710. main_path_msgs = await self.trace_store.get_main_path_messages(
  1711. trace_id, head_seq
  1712. )
  1713. compressed_msgs = compress_completed_goals(main_path_msgs, goal_tree)
  1714. if len(compressed_msgs) < len(main_path_msgs):
  1715. self.log.info(
  1716. "on_complete 压缩: %d -> %d 条消息",
  1717. len(main_path_msgs), len(compressed_msgs),
  1718. )
  1719. history = [msg.to_llm_dict() for msg in compressed_msgs]
  1720. continue # 继续循环
  1721. # 无工具调用
  1722. # 如果在侧分支中,已经在上面处理过了(不会走到这里)
  1723. # 主路径无工具调用 → 任务完成,检查是否需要完成后反思或知识评估
  1724. # 检查是否有待评估的知识
  1725. if not side_branch_ctx and self.trace_store:
  1726. pending = await self.trace_store.get_pending_knowledge_entries(trace_id)
  1727. if pending:
  1728. self.log.info(f"任务即将结束,但仍有 {len(pending)} 条知识未评估,强制触发评估")
  1729. config.force_side_branch = ["knowledge_eval"]
  1730. trace = await self.trace_store.get_trace(trace_id)
  1731. if trace:
  1732. trace.context["knowledge_eval_trigger"] = "task_completion"
  1733. await self.trace_store.update_trace(trace_id, context=trace.context)
  1734. continue
  1735. if not side_branch_ctx and config.knowledge.enable_completion_extraction and not break_after_side_branch:
  1736. config.force_side_branch = ["reflection"]
  1737. break_after_side_branch = True
  1738. self.log.info("任务完成,进入完成后反思侧分支")
  1739. continue
  1740. break
  1741. # 清理 trace 相关的跟踪数据
  1742. self._context_warned.pop(trace_id, None)
  1743. self._context_usage.pop(trace_id, None)
  1744. self._saved_knowledge_ids.pop(trace_id, None)
  1745. # 更新 head_sequence 并完成 Trace
  1746. if self.trace_store:
  1747. await self.trace_store.update_trace(
  1748. trace_id,
  1749. status="completed",
  1750. head_sequence=head_seq,
  1751. completed_at=datetime.now(),
  1752. )
  1753. trace_obj = await self.trace_store.get_trace(trace_id)
  1754. if trace_obj:
  1755. yield trace_obj
  1756. # ===== 压缩辅助方法 =====
  1757. def _rebuild_history_after_compression(
  1758. self,
  1759. history: List[Dict],
  1760. summary_msg_dict: Dict,
  1761. label: str = "压缩",
  1762. ) -> List[Dict]:
  1763. """
  1764. 压缩后重建 history:system prompt + 第一条 user message + summary
  1765. Args:
  1766. history: 压缩前的 history
  1767. summary_msg_dict: summary 消息的 LLM dict
  1768. label: 日志标签
  1769. Returns:
  1770. 新的 history
  1771. """
  1772. system_msg = None
  1773. first_user_msg = None
  1774. for msg in history:
  1775. if msg.get("role") == "system" and not system_msg:
  1776. system_msg = msg
  1777. elif msg.get("role") == "user" and not first_user_msg:
  1778. first_user_msg = msg
  1779. if system_msg and first_user_msg:
  1780. break
  1781. new_history = []
  1782. if system_msg:
  1783. new_history.append(system_msg)
  1784. if first_user_msg:
  1785. new_history.append(first_user_msg)
  1786. new_history.append(summary_msg_dict)
  1787. self.log.info(f"{label}完成: {len(history)} → {len(new_history)} 条消息")
  1788. for idx, msg in enumerate(new_history):
  1789. role = msg.get("role", "unknown")
  1790. content = msg.get("content", "")
  1791. if isinstance(content, str):
  1792. preview = content
  1793. elif isinstance(content, list):
  1794. preview = f"[{len(content)} blocks]"
  1795. else:
  1796. preview = str(content)
  1797. self.log.info(f" {label}后[{idx}] {role}: {preview}")
  1798. return new_history
  1799. # ===== 回溯(Rewind)=====
  1800. async def _rewind(
  1801. self,
  1802. trace_id: str,
  1803. after_sequence: int,
  1804. goal_tree: Optional[GoalTree],
  1805. ) -> int:
  1806. """
  1807. 执行回溯:快照 GoalTree,重建干净树,设置 head_sequence
  1808. 新消息的 parent_sequence 将指向 rewind 点,旧消息通过树结构自然脱离主路径。
  1809. Returns:
  1810. 下一个可用的 sequence 号
  1811. """
  1812. if not self.trace_store:
  1813. raise ValueError("trace_store required for rewind")
  1814. # 1. 加载所有 messages(用于 safe cutoff 和 max sequence)
  1815. all_messages = await self.trace_store.get_trace_messages(trace_id)
  1816. if not all_messages:
  1817. return 1
  1818. # 2. 找到安全截断点(确保不截断在 tool_call 和 tool response 之间)
  1819. cutoff = self._find_safe_cutoff(all_messages, after_sequence)
  1820. # 3. 快照并重建 GoalTree
  1821. if goal_tree:
  1822. # 获取截断点消息的 created_at 作为时间界限
  1823. cutoff_msg = None
  1824. for msg in all_messages:
  1825. if msg.sequence == cutoff:
  1826. cutoff_msg = msg
  1827. break
  1828. cutoff_time = cutoff_msg.created_at if cutoff_msg else datetime.now()
  1829. # 快照到 events(含 head_sequence 供前端感知分支切换)
  1830. await self.trace_store.append_event(trace_id, "rewind", {
  1831. "after_sequence": cutoff,
  1832. "head_sequence": cutoff,
  1833. "goal_tree_snapshot": goal_tree.to_dict(),
  1834. })
  1835. # 按时间重建干净的 GoalTree
  1836. new_tree = goal_tree.rebuild_for_rewind(cutoff_time)
  1837. await self.trace_store.update_goal_tree(trace_id, new_tree)
  1838. # 更新内存中的引用
  1839. goal_tree.goals = new_tree.goals
  1840. goal_tree.current_id = new_tree.current_id
  1841. # 4. 更新 head_sequence 到 rewind 点
  1842. await self.trace_store.update_trace(trace_id, head_sequence=cutoff)
  1843. # 5. 返回 next sequence(全局递增,不复用)
  1844. max_seq = max((m.sequence for m in all_messages), default=0)
  1845. return max_seq + 1
  1846. def _find_safe_cutoff(self, messages: List[Message], after_sequence: int) -> int:
  1847. """
  1848. 找到安全的截断点。
  1849. 如果 after_sequence 指向一条带 tool_calls 的 assistant message,
  1850. 则自动扩展到其所有对应的 tool response 之后。
  1851. """
  1852. cutoff = after_sequence
  1853. # 找到 after_sequence 对应的 message
  1854. target_msg = None
  1855. for msg in messages:
  1856. if msg.sequence == after_sequence:
  1857. target_msg = msg
  1858. break
  1859. if not target_msg:
  1860. return cutoff
  1861. # 如果是 assistant 且有 tool_calls,找到所有对应的 tool responses
  1862. if target_msg.role == "assistant":
  1863. content = target_msg.content
  1864. if isinstance(content, dict) and content.get("tool_calls"):
  1865. tool_call_ids = set()
  1866. for tc in content["tool_calls"]:
  1867. if isinstance(tc, dict) and tc.get("id"):
  1868. tool_call_ids.add(tc["id"])
  1869. # 找到这些 tool_call 对应的 tool messages
  1870. for msg in messages:
  1871. if (msg.role == "tool" and msg.tool_call_id
  1872. and msg.tool_call_id in tool_call_ids):
  1873. cutoff = max(cutoff, msg.sequence)
  1874. return cutoff
  1875. async def _heal_orphaned_tool_calls(
  1876. self,
  1877. messages: List[Message],
  1878. trace_id: str,
  1879. goal_tree: Optional[GoalTree],
  1880. sequence: int,
  1881. ) -> tuple:
  1882. """
  1883. 检测并修复消息历史中的 orphaned tool_calls。
  1884. 当 agent 被 stop/crash 中断时,可能有 assistant 的 tool_calls 没有对应的
  1885. tool results(包括多 tool_call 部分完成的情况)。直接发给 LLM 会导致 400。
  1886. 修复策略:为每个缺失的 tool_result 插入合成的"中断通知"消息,而非裁剪。
  1887. - 普通工具:简短中断提示
  1888. - agent/evaluate:包含 sub_trace_id、执行统计、continue_from 指引
  1889. 合成消息持久化到 store,确保幂等(下次续跑不再触发)。
  1890. Returns:
  1891. (healed_messages, next_sequence)
  1892. """
  1893. if not messages:
  1894. return messages, sequence
  1895. # 收集所有 tool_call IDs → (assistant_msg, tool_call_dict)
  1896. tc_map: Dict[str, tuple] = {}
  1897. result_ids: set = set()
  1898. for msg in messages:
  1899. if msg.role == "assistant":
  1900. content = msg.content
  1901. if isinstance(content, dict) and content.get("tool_calls"):
  1902. for tc in content["tool_calls"]:
  1903. tc_id = tc.get("id")
  1904. if tc_id:
  1905. tc_map[tc_id] = (msg, tc)
  1906. elif msg.role == "tool" and msg.tool_call_id:
  1907. result_ids.add(msg.tool_call_id)
  1908. orphaned_ids = [tc_id for tc_id in tc_map if tc_id not in result_ids]
  1909. if not orphaned_ids:
  1910. return messages, sequence
  1911. self.log.info(
  1912. "检测到 %d 个 orphaned tool_calls,生成合成中断通知",
  1913. len(orphaned_ids),
  1914. )
  1915. healed = list(messages)
  1916. head_seq = messages[-1].sequence
  1917. for tc_id in orphaned_ids:
  1918. assistant_msg, tc = tc_map[tc_id]
  1919. tool_name = tc.get("function", {}).get("name", "unknown")
  1920. if tool_name in ("agent", "evaluate"):
  1921. result_text = self._build_agent_interrupted_result(
  1922. tc, goal_tree, assistant_msg,
  1923. )
  1924. else:
  1925. result_text = build_tool_interrupted_message(tool_name)
  1926. synthetic_msg = Message.create(
  1927. trace_id=trace_id,
  1928. role="tool",
  1929. sequence=sequence,
  1930. goal_id=assistant_msg.goal_id,
  1931. parent_sequence=head_seq,
  1932. tool_call_id=tc_id,
  1933. content={"tool_name": tool_name, "result": result_text},
  1934. )
  1935. if self.trace_store:
  1936. await self.trace_store.add_message(synthetic_msg)
  1937. healed.append(synthetic_msg)
  1938. head_seq = sequence
  1939. sequence += 1
  1940. # 更新 trace head/last sequence
  1941. if self.trace_store:
  1942. await self.trace_store.update_trace(
  1943. trace_id,
  1944. head_sequence=head_seq,
  1945. last_sequence=max(head_seq, sequence - 1),
  1946. )
  1947. return healed, sequence
  1948. def _build_agent_interrupted_result(
  1949. self,
  1950. tc: Dict,
  1951. goal_tree: Optional[GoalTree],
  1952. assistant_msg: Message,
  1953. ) -> str:
  1954. """为中断的 agent/evaluate 工具调用构建合成结果(对齐正常返回值格式)"""
  1955. args_str = tc.get("function", {}).get("arguments", "{}")
  1956. try:
  1957. args = json.loads(args_str) if isinstance(args_str, str) else args_str
  1958. except json.JSONDecodeError:
  1959. args = {}
  1960. task = args.get("task", "未知任务")
  1961. if isinstance(task, list):
  1962. task = "; ".join(task)
  1963. tool_name = tc.get("function", {}).get("name", "agent")
  1964. mode = "evaluate" if tool_name == "evaluate" else "delegate"
  1965. # 从 goal_tree 查找 sub_trace 信息
  1966. sub_trace_id = None
  1967. stats = None
  1968. if goal_tree and assistant_msg.goal_id:
  1969. goal = goal_tree.find(assistant_msg.goal_id)
  1970. if goal and goal.sub_trace_ids:
  1971. first = goal.sub_trace_ids[0]
  1972. if isinstance(first, dict):
  1973. sub_trace_id = first.get("trace_id")
  1974. elif isinstance(first, str):
  1975. sub_trace_id = first
  1976. if goal.cumulative_stats:
  1977. s = goal.cumulative_stats
  1978. if s.message_count > 0:
  1979. stats = {
  1980. "message_count": s.message_count,
  1981. "total_tokens": s.total_tokens,
  1982. "total_cost": round(s.total_cost, 4),
  1983. }
  1984. result: Dict[str, Any] = {
  1985. "mode": mode,
  1986. "status": "interrupted",
  1987. "summary": AGENT_INTERRUPTED_SUMMARY,
  1988. "task": task,
  1989. }
  1990. if sub_trace_id:
  1991. result["sub_trace_id"] = sub_trace_id
  1992. result["hint"] = build_agent_continue_hint(sub_trace_id)
  1993. if stats:
  1994. result["stats"] = stats
  1995. return json.dumps(result, ensure_ascii=False, indent=2)
  1996. # ===== 上下文注入 =====
  1997. # ===== Skill 指定注入 =====
  1998. def _check_skills_need_injection(
  1999. self,
  2000. trace: Trace,
  2001. inject_skills: List[str],
  2002. history: List[Dict],
  2003. recency_threshold: int,
  2004. ) -> List[str]:
  2005. """
  2006. 检查哪些 skill 需要注入。
  2007. 通过 trace.context["injected_skills"] 中记录的 message_id
  2008. 检查是否仍在当前 history 的最近 recency_threshold 条消息中。
  2009. Returns:
  2010. 需要注入的 skill 名称列表
  2011. """
  2012. injected = (trace.context or {}).get("injected_skills", {})
  2013. # 收集 history 中最近 recency_threshold 条消息的 message_id
  2014. recent_msgs = history[-recency_threshold:] if recency_threshold > 0 else []
  2015. recent_ids = set()
  2016. for msg in recent_msgs:
  2017. mid = msg.get("message_id") or msg.get("_message_id")
  2018. if mid:
  2019. recent_ids.add(mid)
  2020. needs_inject = []
  2021. for skill_name in inject_skills:
  2022. record = injected.get(skill_name)
  2023. if not record:
  2024. needs_inject.append(skill_name)
  2025. continue
  2026. if record.get("message_id") not in recent_ids:
  2027. needs_inject.append(skill_name)
  2028. return needs_inject
  2029. async def _update_skill_injection_record(
  2030. self,
  2031. trace_id: str,
  2032. trace: Trace,
  2033. skill_name: str,
  2034. message_id: str,
  2035. sequence: int,
  2036. ):
  2037. """更新 trace.context 中的 skill 注入记录"""
  2038. if not trace.context:
  2039. trace.context = {}
  2040. if "injected_skills" not in trace.context:
  2041. trace.context["injected_skills"] = {}
  2042. trace.context["injected_skills"][skill_name] = {
  2043. "message_id": message_id,
  2044. "sequence": sequence,
  2045. }
  2046. if self.trace_store:
  2047. await self.trace_store.update_trace(trace_id, context=trace.context)
  2048. # ===== 上下文注入 =====
  2049. def _build_context_injection(
  2050. self,
  2051. trace: Trace,
  2052. goal_tree: Optional[GoalTree],
  2053. ) -> str:
  2054. """构建周期性注入的上下文(GoalTree + Active Collaborators + Focus 提醒 + IM 消息通知)"""
  2055. from datetime import datetime
  2056. parts = [f"## Current Time\n\n{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"]
  2057. # GoalTree
  2058. if goal_tree and goal_tree.goals:
  2059. parts.append(f"## Current Plan\n\n{goal_tree.to_prompt()}")
  2060. if goal_tree.current_id:
  2061. # 检测 focus 在有子节点的父目标上:提醒模型 focus 到具体子目标
  2062. children = goal_tree.get_children(goal_tree.current_id)
  2063. pending_children = [c for c in children if c.status in ("pending", "in_progress")]
  2064. if pending_children:
  2065. child_ids = ", ".join(
  2066. goal_tree._generate_display_id(c) for c in pending_children[:3]
  2067. )
  2068. parts.append(
  2069. f"**提醒**:当前焦点在父目标上,建议用 `goal(focus=\"...\")` "
  2070. f"切换到具体子目标(如 {child_ids})再执行。"
  2071. )
  2072. else:
  2073. # 无焦点:提醒模型 focus
  2074. parts.append(
  2075. "**提醒**:当前没有焦点目标。请用 `goal(focus=\"...\")` 选择一个目标开始执行。"
  2076. )
  2077. # Active Collaborators
  2078. collaborators = trace.context.get("collaborators", [])
  2079. if collaborators:
  2080. lines = ["## Active Collaborators"]
  2081. for c in collaborators:
  2082. status_str = c.get("status", "unknown")
  2083. ctype = c.get("type", "agent")
  2084. summary = c.get("summary", "")
  2085. name = c.get("name", "unnamed")
  2086. lines.append(f"- {name} [{ctype}, {status_str}]: {summary}")
  2087. parts.append("\n".join(lines))
  2088. # IM 消息通知(Research Agent)
  2089. im_config = trace.context.get("im_config")
  2090. if im_config:
  2091. contact_id = im_config.get("contact_id")
  2092. chat_id = im_config.get("chat_id")
  2093. if contact_id and chat_id:
  2094. # 尝试导入 IM 模块并检查通知
  2095. try:
  2096. from agent.tools.builtin.im import chat as im_chat
  2097. notification = im_chat._notifications.get((contact_id, chat_id))
  2098. if notification:
  2099. count = notification.get("count", 0)
  2100. senders = notification.get("from", [])
  2101. senders_str = ", ".join(senders)
  2102. parts.append(
  2103. f"## IM 消息通知\n\n"
  2104. f"你有 {count} 条新消息,来自: {senders_str}\n"
  2105. f"使用 `im_receive_messages(contact_id=\"{contact_id}\", chat_id=\"{chat_id}\")` 查看消息内容。"
  2106. )
  2107. else:
  2108. parts.append("## IM 消息通知\n\n暂无新消息")
  2109. except (ImportError, AttributeError):
  2110. # IM 模块未加载或不可用
  2111. pass
  2112. # Knowledge Manager 队列状态
  2113. km_queue_size = trace.context.get("km_queue_size")
  2114. if km_queue_size is not None:
  2115. current_sender = trace.context.get("current_sender", "unknown")
  2116. if km_queue_size > 0:
  2117. parts.append(
  2118. f"## 消息队列状态\n\n"
  2119. f"当前处理: {current_sender} 的消息\n"
  2120. f"队列中还有 {km_queue_size} 条待处理消息"
  2121. )
  2122. else:
  2123. parts.append(
  2124. f"## 消息队列状态\n\n"
  2125. f"当前处理: {current_sender} 的消息\n"
  2126. f"队列为空,处理完本条消息后将进入休眠"
  2127. )
  2128. return "\n\n".join(parts)
  2129. # ===== 辅助方法 =====
  2130. async def _optimize_images(self, messages: List[Dict], model: str) -> List[Dict]:
  2131. """
  2132. 分级优化已处理的图片,节省 token
  2133. 策略(基于图片距离最后一条 assistant 的"轮次"):
  2134. 1. 最近 1-2 轮:保留原图
  2135. 2. 3-5 轮:降低分辨率和压缩(节省 token 但保留视觉信息)
  2136. 3. 5 轮以上:调用小模型生成文本描述 + 保留 URL
  2137. 处理结果会缓存,避免重复的 PIL 解码/编码和 LLM 调用。
  2138. Args:
  2139. messages: 原始消息列表
  2140. model: 当前使用的模型(用于选择描述生成模型)
  2141. Returns:
  2142. 优化后的消息列表(深拷贝)
  2143. """
  2144. if not messages:
  2145. return messages
  2146. # 找到最后一条 assistant message 的位置
  2147. last_assistant_idx = -1
  2148. for i in range(len(messages) - 1, -1, -1):
  2149. if messages[i].get("role") == "assistant":
  2150. last_assistant_idx = i
  2151. break
  2152. # 如果没有 assistant message,说明还没开始对话,不优化
  2153. if last_assistant_idx == -1:
  2154. return messages
  2155. # 统计从每个位置到最后一条 assistant 之间的 assistant 数量(作为"轮次")
  2156. assistant_count_after = [0] * len(messages)
  2157. count = 0
  2158. for i in range(len(messages) - 1, -1, -1):
  2159. assistant_count_after[i] = count
  2160. if messages[i].get("role") == "assistant":
  2161. count += 1
  2162. # 深拷贝避免修改原始数据
  2163. import copy
  2164. import hashlib
  2165. import asyncio
  2166. import base64 as b64mod
  2167. import httpx
  2168. import mimetypes
  2169. messages = copy.deepcopy(messages)
  2170. # 预处理:将所有 HTTP(S) URL 图片下载并转为 base64 data URL
  2171. # Qwen API 无法访问外部签名 URL(如 BFL、火山引擎 TOS),必须在本地转换
  2172. url_download_jobs = [] # [(msg_idx, block_idx, url)]
  2173. for i, msg in enumerate(messages):
  2174. if msg.get("role") != "tool":
  2175. continue
  2176. content = msg.get("content")
  2177. if not isinstance(content, list):
  2178. continue
  2179. for block_idx, block in enumerate(content):
  2180. if isinstance(block, dict) and block.get("type") == "image_url":
  2181. url = block.get("image_url", {}).get("url", "")
  2182. if url.startswith(("http://", "https://")):
  2183. url_download_jobs.append((i, block_idx, url))
  2184. if url_download_jobs:
  2185. async def _download_image_to_data_url(url: str) -> str | None:
  2186. try:
  2187. async with httpx.AsyncClient(timeout=60, trust_env=False) as client:
  2188. resp = await client.get(url)
  2189. resp.raise_for_status()
  2190. ct = resp.headers.get("content-type", "").split(";")[0].strip()
  2191. if not ct.startswith("image/"):
  2192. ct = mimetypes.guess_type(url.split("?")[0])[0] or "image/png"
  2193. b64 = b64mod.b64encode(resp.content).decode()
  2194. return f"data:{ct};base64,{b64}"
  2195. except Exception:
  2196. return None
  2197. results = await asyncio.gather(
  2198. *[_download_image_to_data_url(url) for _, _, url in url_download_jobs],
  2199. return_exceptions=True
  2200. )
  2201. converted = 0
  2202. for (msg_idx, block_idx, original_url), result in zip(url_download_jobs, results):
  2203. if isinstance(result, str) and result.startswith("data:"):
  2204. messages[msg_idx]["content"][block_idx]["image_url"]["url"] = result
  2205. converted += 1
  2206. if converted:
  2207. self.log.info(f"[Image Optimization] URL→base64 预转换: {converted}/{len(url_download_jobs)} 张")
  2208. # 统计优化情况
  2209. stats = {"kept": 0, "downscaled": 0, "described": 0, "cache_hit": 0}
  2210. # 收集需要降分辨率或尺寸补齐的图片(用于并发处理)
  2211. process_jobs = [] # [(msg_idx, block_idx, image_url, cache_key, max_size, cache_field)]
  2212. # 第一遍:扫描并收集需要处理的图片
  2213. for i in range(last_assistant_idx):
  2214. msg = messages[i]
  2215. if msg.get("role") != "tool":
  2216. continue
  2217. content = msg.get("content")
  2218. if not isinstance(content, list):
  2219. continue
  2220. rounds_ago = assistant_count_after[i]
  2221. for block_idx, block in enumerate(content):
  2222. if isinstance(block, dict) and block.get("type") == "image_url":
  2223. image_url_obj = block.get("image_url", {})
  2224. image_url = image_url_obj.get("url", "")
  2225. if image_url.startswith("data:"):
  2226. cache_key = hashlib.md5(image_url[:200].encode()).hexdigest()
  2227. else:
  2228. cache_key = hashlib.md5(image_url.encode()).hexdigest()
  2229. # 1-5 轮都需要检查尺寸
  2230. if rounds_ago <= 5:
  2231. cached = self._image_opt_cache.get(cache_key, {})
  2232. cache_field = "pad_only" if rounds_ago <= 2 else "downscaled"
  2233. if cache_field not in cached and image_url.startswith("data:"):
  2234. max_size = None if rounds_ago <= 2 else 512
  2235. process_jobs.append((i, block_idx, image_url, cache_key, max_size, cache_field))
  2236. # 并发处理所有尺寸任务
  2237. if process_jobs:
  2238. process_results = await asyncio.gather(
  2239. *[self._process_image_size(url, max_size=ms) for _, _, url, _, ms, _ in process_jobs],
  2240. return_exceptions=True
  2241. )
  2242. for (_, _, _, cache_key, _, cache_field), result in zip(process_jobs, process_results):
  2243. if not isinstance(result, Exception) and result is not None:
  2244. self._image_opt_cache.setdefault(cache_key, {})[cache_field] = result
  2245. # 第二遍:应用处理结果
  2246. for i in range(last_assistant_idx):
  2247. msg = messages[i]
  2248. if msg.get("role") != "tool":
  2249. continue
  2250. content = msg.get("content")
  2251. if not isinstance(content, list):
  2252. continue
  2253. # 计算这条消息距离最后一条 assistant 的"轮次"
  2254. rounds_ago = assistant_count_after[i]
  2255. # 处理每个 content block
  2256. new_content = []
  2257. for block in content:
  2258. if isinstance(block, dict) and block.get("type") == "image_url":
  2259. image_url_obj = block.get("image_url", {})
  2260. image_url = image_url_obj.get("url", "")
  2261. # 生成缓存 key(URL 图片用 URL 本身,base64 用前 64 字符 hash)
  2262. if image_url.startswith("data:"):
  2263. cache_key = hashlib.md5(image_url[:200].encode()).hexdigest()
  2264. else:
  2265. cache_key = hashlib.md5(image_url.encode()).hexdigest()
  2266. # 根据距离决定处理策略
  2267. if rounds_ago <= 2:
  2268. # 最近 1-2 轮:只补齐过小图片,保留原分辨率
  2269. cached = self._image_opt_cache.get(cache_key, {})
  2270. if "pad_only" in cached:
  2271. new_content.append({
  2272. "type": "image_url",
  2273. "image_url": {"url": cached["pad_only"]}
  2274. })
  2275. stats["kept"] += 1
  2276. stats["cache_hit"] += 1
  2277. elif image_url.startswith("data:"):
  2278. processed = await self._process_image_size(image_url, max_size=None)
  2279. if processed:
  2280. self._image_opt_cache.setdefault(cache_key, {})["pad_only"] = processed
  2281. new_content.append({
  2282. "type": "image_url",
  2283. "image_url": {"url": processed}
  2284. })
  2285. else:
  2286. new_content.append(block)
  2287. stats["kept"] += 1
  2288. else:
  2289. new_content.append(block)
  2290. stats["kept"] += 1
  2291. elif rounds_ago <= 5:
  2292. # 3-5 轮:降低分辨率(优先从缓存取)
  2293. cached = self._image_opt_cache.get(cache_key, {})
  2294. if "downscaled" in cached:
  2295. new_content.append({
  2296. "type": "image_url",
  2297. "image_url": {"url": cached["downscaled"]}
  2298. })
  2299. stats["downscaled"] += 1
  2300. stats["cache_hit"] += 1
  2301. elif image_url.startswith("data:"):
  2302. processed = await self._process_image_size(image_url, max_size=512)
  2303. if processed:
  2304. # 缓存结果
  2305. self._image_opt_cache.setdefault(cache_key, {})["downscaled"] = processed
  2306. new_content.append({
  2307. "type": "image_url",
  2308. "image_url": {"url": processed}
  2309. })
  2310. stats["downscaled"] += 1
  2311. else:
  2312. new_content.append(block)
  2313. stats["kept"] += 1
  2314. else:
  2315. # URL 图片:无法直接处理,保留原图
  2316. new_content.append(block)
  2317. stats["kept"] += 1
  2318. else:
  2319. # 5 轮以上:生成文本描述(优先从缓存取)
  2320. cached = self._image_opt_cache.get(cache_key, {})
  2321. if "description" in cached:
  2322. new_content.append(cached["description"])
  2323. stats["described"] += 1
  2324. stats["cache_hit"] += 1
  2325. else:
  2326. description = await self._generate_image_description(image_url, model)
  2327. url_info = f" (URL: {image_url[:100]}...)" if not image_url.startswith("data:") else ""
  2328. desc_block = {
  2329. "type": "text",
  2330. "text": f"[Image description: {description}]{url_info}"
  2331. }
  2332. # 缓存结果
  2333. self._image_opt_cache.setdefault(cache_key, {})["description"] = desc_block
  2334. new_content.append(desc_block)
  2335. stats["described"] += 1
  2336. else:
  2337. new_content.append(block)
  2338. msg["content"] = new_content
  2339. # print(f"[Image Opt Check] 扫描到 {stats['kept'] + stats['downscaled'] + stats['described']} 张图片上下文")
  2340. if stats["downscaled"] > 0 or stats["described"] > 0:
  2341. self.log.info(
  2342. f"[Image Optimization] 保留 {stats['kept']} 张,"
  2343. f"降分辨率 {stats['downscaled']} 张,"
  2344. f"文本描述 {stats['described']} 张,"
  2345. f"缓存命中 {stats['cache_hit']} 次"
  2346. )
  2347. return messages
  2348. async def _process_image_size(self, base64_url: str, max_size: Optional[int] = 512, min_size: int = 11) -> Optional[str]:
  2349. """
  2350. 处理 base64 图片的尺寸:
  2351. - 若 max_size 不为 None 且大于该值,则等比例缩放
  2352. - 若任意一边小于 min_size,则补充白边 (Padding)
  2353. """
  2354. try:
  2355. from PIL import Image
  2356. import io
  2357. import base64
  2358. # 解析 base64 数据
  2359. if not base64_url.startswith("data:"):
  2360. return None
  2361. header, data = base64_url.split(",", 1)
  2362. media_type = header.split(";")[0].split(":")[1] # image/png
  2363. # 解码图片
  2364. img_data = base64.b64decode(data)
  2365. img = Image.open(io.BytesIO(img_data))
  2366. width, height = img.size
  2367. needs_downscale = max_size is not None and (width > max_size or height > max_size)
  2368. needs_pad = width < min_size or height < min_size
  2369. # 尺寸正常,无需处理
  2370. if not needs_downscale and not needs_pad:
  2371. return base64_url
  2372. new_width, new_height = width, height
  2373. # 1. 降分辨率
  2374. if needs_downscale:
  2375. if width > height:
  2376. new_width = max_size
  2377. new_height = int(height * max_size / width)
  2378. else:
  2379. new_height = max_size
  2380. new_width = int(width * max_size / height)
  2381. if (new_width, new_height) != (width, height):
  2382. img_resized = img.resize((new_width, new_height), Image.Resampling.BILINEAR)
  2383. else:
  2384. img_resized = img
  2385. # 2. 补齐白边 (Padding)
  2386. pad_width = max(new_width, min_size)
  2387. pad_height = max(new_height, min_size)
  2388. if pad_width > new_width or pad_height > new_height:
  2389. # 创建白色背景
  2390. padded_img = Image.new("RGBA" if img_resized.mode in ("RGBA", "P") else "RGB", (pad_width, pad_height), (255, 255, 255, 255))
  2391. offset_x = (pad_width - new_width) // 2
  2392. offset_y = (pad_height - new_height) // 2
  2393. padded_img.paste(img_resized, (offset_x, offset_y))
  2394. img_resized = padded_img
  2395. # 转换为 RGB(JPEG不支持 RGBA, P 等具有透明度或索引的模式)
  2396. if img_resized.mode != "RGB":
  2397. if img_resized.mode == "RGBA" or img_resized.mode == "P":
  2398. # Create a white background for transparent images
  2399. background = Image.new("RGB", img_resized.size, (255, 255, 255))
  2400. if img_resized.mode == "P" and "transparency" in img_resized.info:
  2401. img_resized = img_resized.convert("RGBA")
  2402. if img_resized.mode == "RGBA":
  2403. background.paste(img_resized, mask=img_resized.split()[3])
  2404. img_resized = background
  2405. img_resized = img_resized.convert("RGB")
  2406. # 重新编码为 JPEG(如果只是补齐没有缩放,可以稍微保留高点质量)
  2407. buffer = io.BytesIO()
  2408. quality = 60 if needs_downscale else 85
  2409. img_resized.save(buffer, format="JPEG", quality=quality, optimize=False)
  2410. new_data = base64.b64encode(buffer.getvalue()).decode("utf-8")
  2411. return f"data:image/jpeg;base64,{new_data}"
  2412. except Exception as e:
  2413. self.log.warning(f"[Image Process] 处理图片尺寸失败: {e}")
  2414. return None
  2415. async def _generate_image_description(self, image_url: str, current_model: str) -> str:
  2416. """
  2417. 使用小模型生成图片的文本描述
  2418. Args:
  2419. image_url: 图片 URL(base64 或 http(s))
  2420. current_model: 当前使用的模型
  2421. Returns:
  2422. 图片描述文本
  2423. """
  2424. try:
  2425. # 使用 qwen-vl-max(通义千问视觉模型)生成描述
  2426. # 注意:qwen-vl 系列专门支持视觉输入
  2427. description_model = "qwen-vl-max"
  2428. # 构建描述请求
  2429. messages = [
  2430. {
  2431. "role": "user",
  2432. "content": [
  2433. {
  2434. "type": "image_url",
  2435. "image_url": {"url": image_url}
  2436. },
  2437. {
  2438. "type": "text",
  2439. "text": "请用 1-2 句话简洁描述这张图片的主要内容。"
  2440. }
  2441. ]
  2442. }
  2443. ]
  2444. # 调用 LLM
  2445. result = await self.llm_call(
  2446. messages=messages,
  2447. model=description_model,
  2448. tools=None,
  2449. temperature=0.3,
  2450. )
  2451. description = result.get("content", "").strip()
  2452. return description if description else "图片内容"
  2453. except Exception as e:
  2454. self.log.warning(f"[Image Description] 生成描述失败: {e}")
  2455. return "图片内容"
  2456. def _add_cache_control(
  2457. self,
  2458. messages: List[Dict],
  2459. model: str,
  2460. enable: bool
  2461. ) -> List[Dict]:
  2462. """
  2463. 为支持的模型添加 Prompt Caching 标记
  2464. 策略:固定位置 + 延迟缓存
  2465. 1. 如果有未处理的图片(最后一条 assistant 之后的 tool messages 中有图片),跳过缓存
  2466. 2. system message 添加缓存(如果足够长)
  2467. 3. 固定位置缓存点(20, 40, 60, 80),确保每个缓存点间隔 >= 1024 tokens
  2468. 4. 最多使用 4 个缓存点(含 system)
  2469. Args:
  2470. messages: 原始消息列表
  2471. model: 模型名称
  2472. enable: 是否启用缓存
  2473. Returns:
  2474. 添加了 cache_control 的消息列表(深拷贝)
  2475. """
  2476. if not enable:
  2477. return messages
  2478. # 只对 Claude 模型启用
  2479. if "claude" not in model.lower():
  2480. return messages
  2481. # 延迟缓存:检查是否有未处理的图片
  2482. last_assistant_idx = -1
  2483. for i in range(len(messages) - 1, -1, -1):
  2484. if messages[i].get("role") == "assistant":
  2485. last_assistant_idx = i
  2486. break
  2487. # 检查最后一条 assistant 之后是否有包含图片的 tool messages
  2488. has_unprocessed_images = False
  2489. if last_assistant_idx >= 0:
  2490. for i in range(last_assistant_idx + 1, len(messages)):
  2491. msg = messages[i]
  2492. if msg.get("role") == "tool":
  2493. content = msg.get("content")
  2494. if isinstance(content, list):
  2495. has_unprocessed_images = any(
  2496. isinstance(block, dict) and block.get("type") == "image_url"
  2497. for block in content
  2498. )
  2499. if has_unprocessed_images:
  2500. break
  2501. if has_unprocessed_images:
  2502. self.log.debug("[Cache] 检测到未处理的图片,延迟缓存建立")
  2503. return messages
  2504. # 深拷贝避免修改原始数据
  2505. import copy
  2506. messages = copy.deepcopy(messages)
  2507. # 策略 1: 为 system message 添加缓存
  2508. system_cached = False
  2509. for msg in messages:
  2510. if msg.get("role") == "system":
  2511. content = msg.get("content", "")
  2512. if isinstance(content, str) and len(content) > 1000:
  2513. msg["content"] = [{
  2514. "type": "text",
  2515. "text": content,
  2516. "cache_control": {"type": "ephemeral"}
  2517. }]
  2518. system_cached = True
  2519. self.log.debug(f"[Cache] 为 system message 添加缓存标记 (len={len(content)})")
  2520. break
  2521. # 策略 2: 固定位置缓存点
  2522. CACHE_INTERVAL = 20
  2523. MAX_POINTS = 3 if system_cached else 4
  2524. MIN_TOKENS = 1024
  2525. AVG_TOKENS_PER_MSG = 70
  2526. total_msgs = len(messages)
  2527. if total_msgs == 0:
  2528. return messages
  2529. cache_positions = []
  2530. last_cache_pos = 0
  2531. for i in range(1, MAX_POINTS + 1):
  2532. target_pos = i * CACHE_INTERVAL - 1 # 19, 39, 59, 79
  2533. if target_pos >= total_msgs:
  2534. break
  2535. # 从目标位置开始查找合适的 user/assistant 消息
  2536. for j in range(target_pos, total_msgs):
  2537. msg = messages[j]
  2538. if msg.get("role") not in ("user", "assistant"):
  2539. continue
  2540. content = msg.get("content", "")
  2541. if not content:
  2542. continue
  2543. # 检查 content 是否非空
  2544. is_valid = False
  2545. if isinstance(content, str):
  2546. is_valid = len(content) > 0
  2547. elif isinstance(content, list):
  2548. is_valid = any(
  2549. isinstance(block, dict) and
  2550. block.get("type") == "text" and
  2551. len(block.get("text", "")) > 0
  2552. for block in content
  2553. )
  2554. if not is_valid:
  2555. continue
  2556. # 检查 token 距离
  2557. msg_count = j - last_cache_pos
  2558. estimated_tokens = msg_count * AVG_TOKENS_PER_MSG
  2559. if estimated_tokens >= MIN_TOKENS:
  2560. cache_positions.append(j)
  2561. last_cache_pos = j
  2562. self.log.debug(f"[Cache] 在位置 {j} 添加缓存点 (估算 {estimated_tokens} tokens)")
  2563. break
  2564. # 应用缓存标记
  2565. for idx in cache_positions:
  2566. msg = messages[idx]
  2567. content = msg.get("content", "")
  2568. if isinstance(content, str):
  2569. msg["content"] = [{
  2570. "type": "text",
  2571. "text": content,
  2572. "cache_control": {"type": "ephemeral"}
  2573. }]
  2574. self.log.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 添加缓存标记")
  2575. elif isinstance(content, list):
  2576. # 在最后一个 text block 添加 cache_control
  2577. for block in reversed(content):
  2578. if isinstance(block, dict) and block.get("type") == "text":
  2579. block["cache_control"] = {"type": "ephemeral"}
  2580. self.log.debug(f"[Cache] 为 message[{idx}] ({msg.get('role')}) 添加缓存标记")
  2581. break
  2582. self.log.debug(
  2583. f"[Cache] 总消息: {total_msgs}, "
  2584. f"缓存点: {len(cache_positions)} at {cache_positions}"
  2585. )
  2586. return messages
  2587. def _get_tool_schemas(
  2588. self,
  2589. tools: Optional[List[str]] = None,
  2590. tool_groups: Optional[List[str]] = None,
  2591. exclude_tools: Optional[List[str]] = None,
  2592. ) -> List[Dict]:
  2593. """
  2594. 获取工具 Schema
  2595. 合并策略(取并集):
  2596. - tool_groups 非空: 按分组白名单过滤得到基础工具集
  2597. - tools 非空: 追加指定的工具名(与 tool_groups 结果取并集)
  2598. - 两者都为 None: 返回所有已注册工具
  2599. 最后再用 exclude_tools 减去禁用的工具(如远程 agent 禁止 agent/evaluate)。
  2600. """
  2601. if tool_groups is not None:
  2602. tool_names = set(self.tools.get_tool_names(groups=tool_groups))
  2603. else:
  2604. tool_names = set(self.tools.get_tool_names())
  2605. if tools is not None:
  2606. tool_names |= set(tools)
  2607. if exclude_tools:
  2608. tool_names -= set(exclude_tools)
  2609. return self.tools.get_schemas(list(tool_names))
  2610. # 默认 system prompt 前缀(当 config.system_prompt 和前端都未提供 system message 时使用)
  2611. # 注意:此常量已迁移到 agent.core.prompts,这里保留引用以保持向后兼容
  2612. async def _build_system_prompt(self, config: RunConfig, base_prompt: Optional[str] = None) -> Optional[str]:
  2613. """构建 system prompt(注入 skills)
  2614. 优先级:
  2615. 1. base_prompt(来自消息)
  2616. 2. config.system_prompt(显式指定)
  2617. 3. preset.system_prompt(预设的完整 system prompt)
  2618. 4. 默认模板 + skills
  2619. Skills 注入优先级:
  2620. 1. config.skills 显式指定 → 按名称过滤
  2621. 2. config.skills 为 None → 查 preset 的默认 skills 列表
  2622. 3. preset 也无 skills(None)→ 加载全部(向后兼容)
  2623. Args:
  2624. base_prompt: 已有 system 内容(来自消息),
  2625. None 时使用 config.system_prompt 或 preset.system_prompt
  2626. """
  2627. from agent.core.presets import AGENT_PRESETS
  2628. # 确定 system_prompt 来源
  2629. if base_prompt is not None:
  2630. system_prompt = base_prompt
  2631. elif config.system_prompt is not None:
  2632. system_prompt = config.system_prompt
  2633. else:
  2634. # 尝试从 preset 获取 system_prompt
  2635. preset = AGENT_PRESETS.get(config.agent_type)
  2636. system_prompt = preset.system_prompt if preset and preset.system_prompt else None
  2637. # 确定要加载哪些 skills
  2638. skills_filter: Optional[List[str]] = config.skills
  2639. if skills_filter is None:
  2640. preset = AGENT_PRESETS.get(config.agent_type)
  2641. if preset is not None:
  2642. skills_filter = preset.skills # 可能仍为 None(加载全部)
  2643. # 加载并过滤
  2644. all_skills = load_skills_from_dir(self.skills_dir)
  2645. if skills_filter is not None:
  2646. skills = [s for s in all_skills if s.name in skills_filter]
  2647. else:
  2648. skills = all_skills
  2649. skills_text = self._format_skills(skills) if skills else ""
  2650. if system_prompt:
  2651. if skills_text:
  2652. system_prompt += f"\n\n## Skills\n{skills_text}"
  2653. else:
  2654. system_prompt = DEFAULT_SYSTEM_PREFIX
  2655. if skills_text:
  2656. system_prompt += f"\n\n## Skills\n{skills_text}"
  2657. if config.max_iterations and config.max_iterations > 0:
  2658. system_prompt += f"\n\n## Execution Constraint\n这是一项有严格步数限制的任务。你最多可以用 {config.max_iterations} 轮交互来解决问题。\n请务必【边查边写、随时存档】!每当你收集或得出一个有价值的独立结果(如收集到一个独立 Case),请立刻调用工具写入或追加到结果文件中,绝对不要等到所有任务都做完再最后一次性输出。这样即使触达步数上限被强制打断,你已经收集的成果也能安全保留!"
  2659. # Memory 注入(memory-bearing Agent)——在 system prompt 末尾追加
  2660. # 初版选择 system prompt 追加(见 agent/docs/memory.md 待定问题 1)。
  2661. # 好处:run 启动一次性注入、所有后续轮次都能看到、与 skills 注入方式一致。
  2662. # 代价:若记忆文件很大会持续占 prompt tokens —— 待观察后决定是否切换方案。
  2663. if config.memory:
  2664. try:
  2665. from agent.core.memory import load_memory_files, format_memory_injection
  2666. files = load_memory_files(config.memory)
  2667. memory_text = format_memory_injection(files)
  2668. if memory_text:
  2669. system_prompt += f"\n\n{memory_text}"
  2670. except Exception as e:
  2671. self.log.warning(f"[Memory] 加载记忆失败,跳过注入: {e}")
  2672. return system_prompt
  2673. async def _generate_task_name(self, messages: List[Dict]) -> str:
  2674. """生成任务名称:优先使用 utility_llm,fallback 到文本截取"""
  2675. # 提取 messages 中的文本内容
  2676. text_parts = []
  2677. for msg in messages:
  2678. content = msg.get("content", "")
  2679. if isinstance(content, str):
  2680. text_parts.append(content)
  2681. elif isinstance(content, list):
  2682. for part in content:
  2683. if isinstance(part, dict) and part.get("type") == "text":
  2684. text_parts.append(part.get("text", ""))
  2685. raw_text = " ".join(text_parts).strip()
  2686. if not raw_text:
  2687. return TASK_NAME_FALLBACK
  2688. # 尝试使用 utility_llm 生成标题
  2689. if self.utility_llm_call:
  2690. try:
  2691. result = await self.utility_llm_call(
  2692. messages=[
  2693. {"role": "system", "content": TASK_NAME_GENERATION_SYSTEM_PROMPT},
  2694. {"role": "user", "content": raw_text[:2000]},
  2695. ],
  2696. model="gpt-4o-mini", # 使用便宜模型
  2697. )
  2698. title = result.get("content", "").strip()
  2699. if title and len(title) < 100:
  2700. return title
  2701. except Exception:
  2702. pass
  2703. # Fallback: 截取前 50 字符
  2704. return raw_text[:50] + ("..." if len(raw_text) > 50 else "")
  2705. def _format_skills(self, skills: List[Skill]) -> str:
  2706. if not skills:
  2707. return ""
  2708. return "\n\n".join(s.to_prompt_text() for s in skills)