manager.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544
  1. """
  2. Sub-Agent 管理器 - 统一管理 Sub-Agent 创建和执行
  3. 统一 evaluate、delegate、explore 三种模式的 Sub-Agent 管理
  4. """
  5. import asyncio
  6. from typing import Optional, Dict, Any, List
  7. from datetime import datetime
  8. from agent.execution.models import Trace, Message
  9. from agent.execution.trace_id import generate_sub_trace_id
  10. from agent.models.goal import Goal, GoalTree
  11. from agent.services.subagent.signals import Signal
  12. class SubAgentManager:
  13. """
  14. 统一的 Sub-Agent 管理器
  15. 负责创建、配置和执行不同模式的 Sub-Agent
  16. """
  17. def __init__(self, store, signal_bus=None):
  18. """
  19. 初始化管理器
  20. Args:
  21. store: TraceStore 实例
  22. signal_bus: SignalBus 实例(可选,用于异步通讯)
  23. """
  24. self.store = store
  25. self.signal_bus = signal_bus
  26. async def execute(
  27. self,
  28. mode: str,
  29. current_trace_id: str,
  30. current_goal_id: str,
  31. options: Dict[str, Any],
  32. continue_from: Optional[str] = None,
  33. wait: bool = True,
  34. run_agent=None
  35. ) -> Dict[str, Any]:
  36. """
  37. 统一的执行逻辑(信号驱动)
  38. Args:
  39. mode: 模式 - "evaluate" | "delegate" | "explore"
  40. current_trace_id: 当前主 Trace ID
  41. current_goal_id: 当前 Goal ID
  42. options: 模式特定的选项
  43. continue_from: 继承的 trace ID(可选)
  44. wait: True=等待完成信号, False=立即返回
  45. run_agent: 运行 Agent 的函数
  46. Returns:
  47. 根据 mode 返回不同格式的结果
  48. """
  49. if not run_agent:
  50. raise ValueError("run_agent parameter is required")
  51. # 1. 创建 Sub-Trace
  52. sub_trace_id = await self._create_sub_trace(
  53. mode, current_trace_id, current_goal_id,
  54. options, continue_from
  55. )
  56. # 2. 在后台启动 Sub-Agent
  57. task = asyncio.create_task(
  58. self._run_subagent_background(
  59. mode, sub_trace_id, current_trace_id,
  60. current_goal_id, options, run_agent
  61. )
  62. )
  63. # 3. 发送启动信号
  64. if self.signal_bus:
  65. self.signal_bus.emit(Signal(
  66. type="subagent.start",
  67. trace_id=sub_trace_id,
  68. data={
  69. "parent_trace_id": current_trace_id,
  70. "mode": mode,
  71. "task": self._get_task_summary(mode, options)
  72. }
  73. ))
  74. if wait:
  75. # 4a. 等待完成信号
  76. return await self._wait_for_completion(
  77. sub_trace_id, current_trace_id, mode
  78. )
  79. else:
  80. # 4b. 立即返回
  81. return {
  82. "subagent_id": sub_trace_id,
  83. "status": "running",
  84. "mode": mode
  85. }
  86. async def _create_sub_trace(
  87. self,
  88. mode: str,
  89. current_trace_id: str,
  90. current_goal_id: str,
  91. options: Dict[str, Any],
  92. continue_from: Optional[str] = None
  93. ) -> str:
  94. """创建 Sub-Trace(不再执行,只创建)"""
  95. # 1. 配置权限和参数
  96. allowed_tools = self._get_allowed_tools(mode)
  97. agent_type = mode if mode != "evaluation" else "evaluator"
  98. # 2. 更新当前 Goal 为 agent_call 类型
  99. update_data = {
  100. "type": "agent_call",
  101. "agent_call_mode": mode,
  102. "status": "in_progress"
  103. }
  104. # evaluation 模式特殊处理
  105. if mode == "evaluate":
  106. update_data["target_goal_id"] = options.get("target_goal_id")
  107. update_data["evaluation_input"] = options.get("evaluation_input")
  108. await self.store.update_goal(current_trace_id, current_goal_id, **update_data)
  109. # 3. 生成或复用 Sub-Trace ID
  110. if continue_from:
  111. sub_trace_id = continue_from
  112. # 验证 trace 存在
  113. existing_trace = await self.store.get_trace(sub_trace_id)
  114. if not existing_trace:
  115. raise ValueError(f"Continue-from trace not found: {continue_from}")
  116. else:
  117. sub_trace_id = generate_sub_trace_id(current_trace_id, mode)
  118. # 4. 构建任务 prompt
  119. task_prompt = await self._build_task_prompt(mode, options, current_trace_id, continue_from)
  120. # 5. 创建或复用 Sub-Trace
  121. if not continue_from:
  122. # 新建 Sub-Trace
  123. sub_trace = Trace(
  124. trace_id=sub_trace_id,
  125. mode="agent",
  126. task=task_prompt,
  127. parent_trace_id=current_trace_id,
  128. parent_goal_id=current_goal_id,
  129. agent_type=agent_type,
  130. context={
  131. "allowed_tools": allowed_tools,
  132. "max_turns": self._get_max_turns(mode)
  133. },
  134. status="running",
  135. created_at=datetime.now()
  136. )
  137. await self.store.create_trace(sub_trace)
  138. await self.store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=[sub_trace_id])
  139. # 推送 sub_trace_started 事件
  140. await self.store.append_event(current_trace_id, "sub_trace_started", {
  141. "trace_id": sub_trace_id,
  142. "parent_trace_id": current_trace_id,
  143. "parent_goal_id": current_goal_id,
  144. "agent_type": agent_type,
  145. "task": self._get_task_summary(mode, options)
  146. })
  147. else:
  148. # 连续记忆:在现有 trace 上继续
  149. await self.store.append_message(sub_trace_id, Message(
  150. role="user",
  151. content=task_prompt,
  152. created_at=datetime.now()
  153. ))
  154. return sub_trace_id
  155. async def _run_subagent_background(
  156. self,
  157. mode: str,
  158. sub_trace_id: str,
  159. current_trace_id: str,
  160. current_goal_id: str,
  161. options: Dict[str, Any],
  162. run_agent
  163. ):
  164. """在后台运行 Sub-Agent,完成后发送信号"""
  165. try:
  166. # 获取 trace 对象
  167. sub_trace = await self.store.get_trace(sub_trace_id)
  168. # 运行 agent
  169. result = await run_agent(sub_trace)
  170. # 获取最终状态
  171. updated_trace = await self.store.get_trace(sub_trace_id)
  172. # 格式化结果
  173. formatted_result = await self._format_result(
  174. mode, result, updated_trace, options, current_trace_id
  175. )
  176. # 发送完成信号
  177. if self.signal_bus:
  178. self.signal_bus.emit(Signal(
  179. type="subagent.complete",
  180. trace_id=sub_trace_id,
  181. data={
  182. "parent_trace_id": current_trace_id,
  183. "result": formatted_result,
  184. "status": "completed"
  185. }
  186. ))
  187. # 推送事件
  188. await self.store.append_event(current_trace_id, "sub_trace_completed", {
  189. "trace_id": sub_trace_id,
  190. "status": "completed",
  191. "result": formatted_result,
  192. "stats": {
  193. "total_messages": updated_trace.total_messages if updated_trace else 0,
  194. "total_tokens": updated_trace.total_tokens if updated_trace else 0,
  195. "total_cost": updated_trace.total_cost if updated_trace else 0
  196. }
  197. })
  198. # 更新主 Goal
  199. await self._update_goal_after_completion(
  200. mode, current_trace_id, current_goal_id,
  201. formatted_result, options
  202. )
  203. except Exception as e:
  204. # 发送错误信号
  205. if self.signal_bus:
  206. self.signal_bus.emit(Signal(
  207. type="subagent.error",
  208. trace_id=sub_trace_id,
  209. data={
  210. "parent_trace_id": current_trace_id,
  211. "error": str(e),
  212. "mode": mode
  213. }
  214. ))
  215. # 推送失败事件
  216. await self.store.append_event(current_trace_id, "sub_trace_completed", {
  217. "trace_id": sub_trace_id,
  218. "status": "failed",
  219. "error": str(e)
  220. })
  221. # 更新主 Goal 为失败
  222. await self.store.update_goal(
  223. current_trace_id, current_goal_id,
  224. status="failed",
  225. summary=f"{mode} 失败: {str(e)}"
  226. )
  227. async def _wait_for_completion(
  228. self,
  229. sub_trace_id: str,
  230. current_trace_id: str,
  231. mode: str,
  232. timeout: float = 300.0 # 5 分钟超时
  233. ) -> Dict[str, Any]:
  234. """等待 Sub-Agent 完成信号"""
  235. start_time = asyncio.get_event_loop().time()
  236. while True:
  237. # 检查超时
  238. if asyncio.get_event_loop().time() - start_time > timeout:
  239. raise TimeoutError(f"{mode} Sub-Agent 超时({timeout}秒)")
  240. # 检查信号
  241. if self.signal_bus:
  242. signals = self.signal_bus.check_buffer(current_trace_id)
  243. for signal in signals:
  244. if signal.trace_id == sub_trace_id:
  245. if signal.type == "subagent.complete":
  246. return signal.data["result"]
  247. elif signal.type == "subagent.error":
  248. error = signal.data.get("error", "Unknown error")
  249. raise Exception(f"{mode} 失败: {error}")
  250. # 短暂休眠,避免忙等待
  251. await asyncio.sleep(0.1)
  252. def _get_allowed_tools(self, mode: str) -> Optional[List[str]]:
  253. """根据 mode 返回允许的工具列表"""
  254. if mode == "evaluate":
  255. return ["read_file", "grep_content", "glob_files"]
  256. elif mode == "explore":
  257. return ["read_file", "grep_content", "glob_files"]
  258. elif mode == "delegate":
  259. return None # 完整权限
  260. return None
  261. def _get_max_turns(self, mode: str) -> int:
  262. """根据 mode 返回最大轮次"""
  263. if mode == "evaluate":
  264. return 10
  265. elif mode == "explore":
  266. return 20
  267. elif mode == "delegate":
  268. return 50
  269. return 30
  270. def _get_task_summary(self, mode: str, options: Dict[str, Any]) -> str:
  271. """获取任务摘要(用于事件)"""
  272. if mode == "evaluate":
  273. target_goal_id = options.get("target_goal_id", "unknown")
  274. return f"评估 Goal {target_goal_id}"
  275. elif mode == "delegate":
  276. return options.get("task", "委托任务")
  277. elif mode == "explore":
  278. branches = options.get("branches", [])
  279. return f"探索 {len(branches)} 个方案"
  280. return "Sub-Agent 任务"
  281. async def _build_task_prompt(
  282. self,
  283. mode: str,
  284. options: Dict[str, Any],
  285. current_trace_id: str,
  286. continue_from: Optional[str]
  287. ) -> str:
  288. """构建任务 prompt"""
  289. if mode == "evaluate":
  290. return await self._build_evaluation_prompt(options, current_trace_id, continue_from)
  291. elif mode == "delegate":
  292. return options.get("task", "")
  293. elif mode == "explore":
  294. return self._build_exploration_prompt(options)
  295. return ""
  296. async def _build_evaluation_prompt(
  297. self,
  298. options: Dict[str, Any],
  299. current_trace_id: str,
  300. continue_from: Optional[str]
  301. ) -> str:
  302. """构建评估 prompt(参考 evaluate.py)"""
  303. target_goal_id = options.get("target_goal_id")
  304. evaluation_input = options.get("evaluation_input", {})
  305. requirements = options.get("requirements")
  306. # 获取被评估的 Goal
  307. goal_tree = await self.store.get_goal_tree(current_trace_id)
  308. if not goal_tree:
  309. raise ValueError(f"Goal tree not found for trace: {current_trace_id}")
  310. target_goal = goal_tree.find(target_goal_id)
  311. if not target_goal:
  312. raise ValueError(f"Target goal not found: {target_goal_id}")
  313. # 获取历史评估结果(如果是连续记忆)
  314. previous_results = []
  315. if continue_from and target_goal.evaluation_result:
  316. previous_results.append(target_goal.evaluation_result)
  317. # 构建 prompt
  318. lines = []
  319. lines.append("# 评估任务")
  320. lines.append("")
  321. lines.append("请评估以下任务的执行结果是否满足要求。")
  322. lines.append("")
  323. lines.append("## 目标描述")
  324. lines.append("")
  325. goal_description = evaluation_input.get("goal_description", target_goal.description)
  326. lines.append(goal_description)
  327. lines.append("")
  328. lines.append("## 执行结果")
  329. lines.append("")
  330. actual_result = evaluation_input.get("actual_result")
  331. if actual_result is not None:
  332. if isinstance(actual_result, str):
  333. lines.append(actual_result)
  334. else:
  335. import json
  336. lines.append("```json")
  337. lines.append(json.dumps(actual_result, ensure_ascii=False, indent=2))
  338. lines.append("```")
  339. else:
  340. lines.append("(无执行结果)")
  341. lines.append("")
  342. if requirements:
  343. lines.append("## 评估要求")
  344. lines.append("")
  345. lines.append(requirements)
  346. lines.append("")
  347. if previous_results:
  348. lines.append("## 历史评估记录")
  349. lines.append("")
  350. for i, prev in enumerate(previous_results, 1):
  351. lines.append(f"### 评估 #{i}")
  352. lines.append(f"- **结论**: {'通过' if prev.get('passed') else '不通过'}")
  353. lines.append(f"- **理由**: {prev.get('reason', '无')}")
  354. if prev.get('suggestions'):
  355. lines.append(f"- **建议**: {', '.join(prev.get('suggestions', []))}")
  356. lines.append("")
  357. lines.append("## 输出格式")
  358. lines.append("")
  359. lines.append("请按照以下格式输出评估结果:")
  360. lines.append("")
  361. lines.append("## 评估结论")
  362. lines.append("[通过/不通过]")
  363. lines.append("")
  364. lines.append("## 评估理由")
  365. lines.append("[详细说明为什么通过或不通过]")
  366. lines.append("")
  367. lines.append("## 修改建议(如果不通过)")
  368. lines.append("1. [具体的、可操作的建议1]")
  369. lines.append("2. [具体的、可操作的建议2]")
  370. return "\n".join(lines)
  371. def _build_exploration_prompt(self, options: Dict[str, Any]) -> str:
  372. """构建探索 prompt"""
  373. branches = options.get("branches", [])
  374. background = options.get("background", "")
  375. lines = []
  376. lines.append("# 探索任务")
  377. lines.append("")
  378. if background:
  379. lines.append(background)
  380. lines.append("")
  381. lines.append("请探索以下方案:")
  382. for i, branch in enumerate(branches, 1):
  383. lines.append(f"{i}. {branch}")
  384. return "\n".join(lines)
  385. async def _format_result(
  386. self,
  387. mode: str,
  388. result: Any,
  389. trace: Trace,
  390. options: Dict[str, Any],
  391. current_trace_id: str
  392. ) -> Dict[str, Any]:
  393. """根据 mode 格式化结果"""
  394. if mode == "evaluate":
  395. return self._parse_evaluation_result(result)
  396. elif mode == "delegate":
  397. summary = result.get("summary", "任务完成") if isinstance(result, dict) else "任务完成"
  398. return {
  399. "summary": summary,
  400. "stats": {
  401. "total_messages": trace.total_messages if trace else 0,
  402. "total_tokens": trace.total_tokens if trace else 0,
  403. "total_cost": trace.total_cost if trace else 0
  404. }
  405. }
  406. elif mode == "explore":
  407. return {"summary": result if isinstance(result, str) else "探索完成"}
  408. return {}
  409. def _parse_evaluation_result(self, agent_result: Any) -> Dict[str, Any]:
  410. """解析评估结果(参考 evaluate.py)"""
  411. last_message = agent_result if agent_result else None
  412. if not last_message:
  413. return {
  414. "passed": False,
  415. "reason": "评估 Agent 未返回结果",
  416. "suggestions": [],
  417. "details": {}
  418. }
  419. # 解析评估结论
  420. passed = False
  421. if "通过" in last_message and "不通过" not in last_message:
  422. passed = True
  423. elif "不通过" in last_message:
  424. passed = False
  425. # 提取评估理由
  426. reason = ""
  427. if "## 评估理由" in last_message:
  428. parts = last_message.split("## 评估理由")
  429. if len(parts) > 1:
  430. reason_section = parts[1].split("##")[0].strip()
  431. reason = reason_section
  432. # 提取修改建议
  433. suggestions = []
  434. if "## 修改建议" in last_message:
  435. parts = last_message.split("## 修改建议")
  436. if len(parts) > 1:
  437. suggestions_section = parts[1].split("##")[0].strip()
  438. for line in suggestions_section.split("\n"):
  439. line = line.strip()
  440. if line and (line.startswith("-") or line.startswith("*") or line[0].isdigit()):
  441. suggestion = line.lstrip("-*0123456789. ").strip()
  442. if suggestion:
  443. suggestions.append(suggestion)
  444. return {
  445. "passed": passed,
  446. "reason": reason if reason else last_message[:200],
  447. "suggestions": suggestions,
  448. "details": {"full_response": last_message}
  449. }
  450. async def _update_goal_after_completion(
  451. self,
  452. mode: str,
  453. current_trace_id: str,
  454. current_goal_id: str,
  455. result: Dict[str, Any],
  456. options: Dict[str, Any]
  457. ):
  458. """完成后更新 Goal"""
  459. if mode == "evaluate":
  460. await self.store.update_goal(
  461. current_trace_id, current_goal_id,
  462. evaluation_result=result,
  463. status="completed",
  464. summary=f"评估{'通过' if result.get('passed') else '不通过'}"
  465. )
  466. elif mode == "delegate":
  467. task = options.get("task", "任务")
  468. await self.store.update_goal(
  469. current_trace_id, current_goal_id,
  470. status="completed",
  471. summary=f"已委托完成: {task}"
  472. )
  473. elif mode == "explore":
  474. await self.store.update_goal(
  475. current_trace_id, current_goal_id,
  476. status="completed",
  477. summary="探索完成"
  478. )