delegate.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """
  2. Delegate 工具 - 委托任务给子 Agent
  3. 将大任务委托给独立的 Sub-Trace 执行,获得完整权限。
  4. """
  5. from typing import Optional, Dict, Any
  6. from datetime import datetime
  7. from .models import Trace, Message
  8. from .trace_id import generate_sub_trace_id
  9. from .goal_models import Goal
  10. async def delegate_tool(
  11. current_trace_id: str,
  12. current_goal_id: str,
  13. task: str,
  14. store=None,
  15. run_agent=None
  16. ) -> str:
  17. """
  18. 将任务委托给独立的 Sub-Agent
  19. Args:
  20. current_trace_id: 当前主 Trace ID
  21. current_goal_id: 当前 Goal ID
  22. task: 委托的任务描述
  23. store: TraceStore 实例
  24. run_agent: 运行 Agent 的函数
  25. Returns:
  26. 任务执行结果摘要
  27. Example:
  28. >>> result = await delegate_tool(
  29. ... current_trace_id="abc123",
  30. ... current_goal_id="3",
  31. ... task="实现用户登录功能",
  32. ... store=store,
  33. ... run_agent=run_agent_func
  34. ... )
  35. """
  36. if not store:
  37. raise ValueError("store parameter is required")
  38. if not run_agent:
  39. raise ValueError("run_agent parameter is required")
  40. # 1. 创建 agent_call Goal
  41. await store.update_goal(current_trace_id, current_goal_id,
  42. type="agent_call",
  43. agent_call_mode="delegate",
  44. status="in_progress")
  45. # 2. 生成 Sub-Trace ID
  46. sub_trace_id = generate_sub_trace_id(current_trace_id, "delegate")
  47. # 3. 创建 Sub-Trace
  48. sub_trace = Trace(
  49. trace_id=sub_trace_id,
  50. mode="agent",
  51. task=task,
  52. parent_trace_id=current_trace_id,
  53. parent_goal_id=current_goal_id,
  54. agent_type="delegate",
  55. context={
  56. # delegate 模式:完整权限
  57. "allowed_tools": None, # None = 所有工具
  58. "max_turns": 50
  59. },
  60. status="running",
  61. created_at=datetime.now()
  62. )
  63. # 保存 Sub-Trace
  64. await store.create_trace(sub_trace)
  65. # 更新主 Goal 的 sub_trace_ids
  66. await store.update_goal(current_trace_id, current_goal_id, sub_trace_ids=[sub_trace_id])
  67. # 推送 sub_trace_started 事件
  68. await store.append_event(current_trace_id, "sub_trace_started", {
  69. "trace_id": sub_trace_id,
  70. "parent_trace_id": current_trace_id,
  71. "parent_goal_id": current_goal_id,
  72. "agent_type": "delegate",
  73. "task": task
  74. })
  75. # 4. 执行 Sub-Trace
  76. try:
  77. result = await run_agent(sub_trace)
  78. # 获取 Sub-Trace 的最终状态
  79. updated_trace = await store.get_trace(sub_trace_id)
  80. if isinstance(result, dict):
  81. summary = result.get("summary", "任务完成")
  82. else:
  83. summary = "任务完成"
  84. # 推送 sub_trace_completed 事件
  85. await store.append_event(current_trace_id, "sub_trace_completed", {
  86. "trace_id": sub_trace_id,
  87. "status": "completed",
  88. "summary": summary,
  89. "stats": {
  90. "total_messages": updated_trace.total_messages if updated_trace else 0,
  91. "total_tokens": updated_trace.total_tokens if updated_trace else 0,
  92. "total_cost": updated_trace.total_cost if updated_trace else 0
  93. }
  94. })
  95. # 5. 完成主 Goal
  96. await store.update_goal(current_trace_id, current_goal_id,
  97. status="completed",
  98. summary=f"已委托完成: {task}")
  99. # 格式化返回结果
  100. return f"""## 委托任务完成
  101. **任务**: {task}
  102. **结果**: {summary}
  103. **统计**:
  104. - 消息数: {updated_trace.total_messages if updated_trace else 0}
  105. - Tokens: {updated_trace.total_tokens if updated_trace else 0}
  106. - 成本: ${updated_trace.total_cost if updated_trace else 0:.4f}
  107. """
  108. except Exception as e:
  109. # 推送失败事件
  110. await store.append_event(current_trace_id, "sub_trace_completed", {
  111. "trace_id": sub_trace_id,
  112. "status": "failed",
  113. "error": str(e)
  114. })
  115. # 更新主 Goal 为失败
  116. await store.update_goal(current_trace_id, current_goal_id,
  117. status="failed",
  118. summary=f"委托任务失败: {str(e)}")
  119. return f"""## 委托任务失败
  120. **任务**: {task}
  121. **错误**: {str(e)}
  122. """
  123. def create_delegate_tool_schema() -> Dict[str, Any]:
  124. """
  125. 创建 delegate 工具的 JSON Schema
  126. Returns:
  127. 工具的 JSON Schema
  128. """
  129. return {
  130. "type": "function",
  131. "function": {
  132. "name": "delegate",
  133. "description": "将大任务委托给独立的 Sub-Agent 执行。Sub-Agent 拥有完整权限,适合执行复杂的、需要多步骤的任务。",
  134. "parameters": {
  135. "type": "object",
  136. "properties": {
  137. "task": {
  138. "type": "string",
  139. "description": "要委托的任务描述,应该清晰具体"
  140. }
  141. },
  142. "required": ["task"]
  143. }
  144. }
  145. }