|
@@ -520,10 +520,10 @@ class AgentRunner:
|
|
|
trace_id, trace_obj.head_sequence
|
|
trace_id, trace_obj.head_sequence
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
- # 清理尾部不完整的 tool_call/tool_result 对
|
|
|
|
|
- # 当 agent 被 stop 时,可能恰好存了 assistant(tool_calls) 但还没存 tool results,
|
|
|
|
|
- # 直接发给 LLM 会报 400(tool_use without tool_result)
|
|
|
|
|
- main_path = self._trim_dangling_tool_calls(main_path)
|
|
|
|
|
|
|
+ # 修复 orphaned tool_calls(中断导致的 tool_call 无 tool_result)
|
|
|
|
|
+ main_path, sequence = await self._heal_orphaned_tool_calls(
|
|
|
|
|
+ main_path, trace_id, goal_tree, sequence,
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
history = [msg.to_llm_dict() for msg in main_path]
|
|
history = [msg.to_llm_dict() for msg in main_path]
|
|
|
if main_path:
|
|
if main_path:
|
|
@@ -911,36 +911,154 @@ class AgentRunner:
|
|
|
|
|
|
|
|
return cutoff
|
|
return cutoff
|
|
|
|
|
|
|
|
- @staticmethod
|
|
|
|
|
- def _trim_dangling_tool_calls(messages: List[Message]) -> List[Message]:
|
|
|
|
|
|
|
+ async def _heal_orphaned_tool_calls(
|
|
|
|
|
+ self,
|
|
|
|
|
+ messages: List[Message],
|
|
|
|
|
+ trace_id: str,
|
|
|
|
|
+ goal_tree: Optional[GoalTree],
|
|
|
|
|
+ sequence: int,
|
|
|
|
|
+ ) -> tuple:
|
|
|
"""
|
|
"""
|
|
|
- 从消息列表尾部移除不完整的 tool_call/tool_result 对。
|
|
|
|
|
|
|
+ 检测并修复消息历史中的 orphaned tool_calls。
|
|
|
|
|
+
|
|
|
|
|
+ 当 agent 被 stop/crash 中断时,可能有 assistant 的 tool_calls 没有对应的
|
|
|
|
|
+ tool results(包括多 tool_call 部分完成的情况)。直接发给 LLM 会导致 400。
|
|
|
|
|
|
|
|
- 当 agent 被 stop 中断时,可能最后一条消息是带 tool_calls 的 assistant,
|
|
|
|
|
- 但对应的 tool results 尚未存储。直接发给 LLM 会导致 400 错误。
|
|
|
|
|
- 此方法从尾部向前回退,直到最后一条消息不是悬空的 tool_call。
|
|
|
|
|
|
|
+ 修复策略:为每个缺失的 tool_result 插入合成的"中断通知"消息,而非裁剪。
|
|
|
|
|
+ - 普通工具:简短中断提示
|
|
|
|
|
+ - agent/evaluate:包含 sub_trace_id、执行统计、continue_from 指引
|
|
|
|
|
+
|
|
|
|
|
+ 合成消息持久化到 store,确保幂等(下次续跑不再触发)。
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ (healed_messages, next_sequence)
|
|
|
"""
|
|
"""
|
|
|
if not messages:
|
|
if not messages:
|
|
|
- return messages
|
|
|
|
|
|
|
+ return messages, sequence
|
|
|
|
|
|
|
|
- while messages:
|
|
|
|
|
- last = messages[-1]
|
|
|
|
|
- if last.role != "assistant":
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ # 收集所有 tool_call IDs → (assistant_msg, tool_call_dict)
|
|
|
|
|
+ tc_map: Dict[str, tuple] = {}
|
|
|
|
|
+ result_ids: set = set()
|
|
|
|
|
|
|
|
- content = last.content
|
|
|
|
|
- if not isinstance(content, dict) or not content.get("tool_calls"):
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ for msg in messages:
|
|
|
|
|
+ if msg.role == "assistant":
|
|
|
|
|
+ content = msg.content
|
|
|
|
|
+ if isinstance(content, dict) and content.get("tool_calls"):
|
|
|
|
|
+ for tc in content["tool_calls"]:
|
|
|
|
|
+ tc_id = tc.get("id")
|
|
|
|
|
+ if tc_id:
|
|
|
|
|
+ tc_map[tc_id] = (msg, tc)
|
|
|
|
|
+ elif msg.role == "tool" and msg.tool_call_id:
|
|
|
|
|
+ result_ids.add(msg.tool_call_id)
|
|
|
|
|
+
|
|
|
|
|
+ orphaned_ids = [tc_id for tc_id in tc_map if tc_id not in result_ids]
|
|
|
|
|
+ if not orphaned_ids:
|
|
|
|
|
+ return messages, sequence
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(
|
|
|
|
|
+ "检测到 %d 个 orphaned tool_calls,生成合成中断通知",
|
|
|
|
|
+ len(orphaned_ids),
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ healed = list(messages)
|
|
|
|
|
+ head_seq = messages[-1].sequence
|
|
|
|
|
+
|
|
|
|
|
+ for tc_id in orphaned_ids:
|
|
|
|
|
+ assistant_msg, tc = tc_map[tc_id]
|
|
|
|
|
+ tool_name = tc.get("function", {}).get("name", "unknown")
|
|
|
|
|
+
|
|
|
|
|
+ if tool_name in ("agent", "evaluate"):
|
|
|
|
|
+ result_text = self._build_agent_interrupted_result(
|
|
|
|
|
+ tc, goal_tree, assistant_msg,
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ result_text = (
|
|
|
|
|
+ f"⚠️ 工具 {tool_name} 执行被中断(进程异常退出),"
|
|
|
|
|
+ "未获得执行结果。请根据需要重新调用。"
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
- # 最后一条是 assistant + tool_calls,检查 tool_results 是否齐全
|
|
|
|
|
- # 既然它是最后一条,后面没有 tool results → 悬空,需要移除
|
|
|
|
|
- logger.info(
|
|
|
|
|
- "移除尾部悬空的 tool_call 消息 (sequence=%d)",
|
|
|
|
|
- last.sequence,
|
|
|
|
|
|
|
+ synthetic_msg = Message.create(
|
|
|
|
|
+ trace_id=trace_id,
|
|
|
|
|
+ role="tool",
|
|
|
|
|
+ sequence=sequence,
|
|
|
|
|
+ goal_id=assistant_msg.goal_id,
|
|
|
|
|
+ parent_sequence=head_seq,
|
|
|
|
|
+ tool_call_id=tc_id,
|
|
|
|
|
+ content={"tool_name": tool_name, "result": result_text},
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ if self.trace_store:
|
|
|
|
|
+ await self.trace_store.add_message(synthetic_msg)
|
|
|
|
|
+
|
|
|
|
|
+ healed.append(synthetic_msg)
|
|
|
|
|
+ head_seq = sequence
|
|
|
|
|
+ sequence += 1
|
|
|
|
|
+
|
|
|
|
|
+ # 更新 trace head/last sequence
|
|
|
|
|
+ if self.trace_store:
|
|
|
|
|
+ await self.trace_store.update_trace(
|
|
|
|
|
+ trace_id,
|
|
|
|
|
+ head_sequence=head_seq,
|
|
|
|
|
+ last_sequence=max(head_seq, sequence - 1),
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ return healed, sequence
|
|
|
|
|
+
|
|
|
|
|
+ def _build_agent_interrupted_result(
|
|
|
|
|
+ self,
|
|
|
|
|
+ tc: Dict,
|
|
|
|
|
+ goal_tree: Optional[GoalTree],
|
|
|
|
|
+ assistant_msg: Message,
|
|
|
|
|
+ ) -> str:
|
|
|
|
|
+ """为中断的 agent/evaluate 工具调用构建合成结果(对齐正常返回值格式)"""
|
|
|
|
|
+ args_str = tc.get("function", {}).get("arguments", "{}")
|
|
|
|
|
+ try:
|
|
|
|
|
+ args = json.loads(args_str) if isinstance(args_str, str) else args_str
|
|
|
|
|
+ except json.JSONDecodeError:
|
|
|
|
|
+ args = {}
|
|
|
|
|
+
|
|
|
|
|
+ task = args.get("task", "未知任务")
|
|
|
|
|
+ if isinstance(task, list):
|
|
|
|
|
+ task = "; ".join(task)
|
|
|
|
|
+
|
|
|
|
|
+ tool_name = tc.get("function", {}).get("name", "agent")
|
|
|
|
|
+ mode = "evaluate" if tool_name == "evaluate" else "delegate"
|
|
|
|
|
+
|
|
|
|
|
+ # 从 goal_tree 查找 sub_trace 信息
|
|
|
|
|
+ sub_trace_id = None
|
|
|
|
|
+ stats = None
|
|
|
|
|
+ if goal_tree and assistant_msg.goal_id:
|
|
|
|
|
+ goal = goal_tree.find(assistant_msg.goal_id)
|
|
|
|
|
+ if goal and goal.sub_trace_ids:
|
|
|
|
|
+ first = goal.sub_trace_ids[0]
|
|
|
|
|
+ if isinstance(first, dict):
|
|
|
|
|
+ sub_trace_id = first.get("trace_id")
|
|
|
|
|
+ elif isinstance(first, str):
|
|
|
|
|
+ sub_trace_id = first
|
|
|
|
|
+ if goal.cumulative_stats:
|
|
|
|
|
+ s = goal.cumulative_stats
|
|
|
|
|
+ if s.message_count > 0:
|
|
|
|
|
+ stats = {
|
|
|
|
|
+ "message_count": s.message_count,
|
|
|
|
|
+ "total_tokens": s.total_tokens,
|
|
|
|
|
+ "total_cost": round(s.total_cost, 4),
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ result: Dict[str, Any] = {
|
|
|
|
|
+ "mode": mode,
|
|
|
|
|
+ "status": "interrupted",
|
|
|
|
|
+ "summary": "⚠️ 子Agent执行被中断(进程异常退出)",
|
|
|
|
|
+ "task": task,
|
|
|
|
|
+ }
|
|
|
|
|
+ if sub_trace_id:
|
|
|
|
|
+ result["sub_trace_id"] = sub_trace_id
|
|
|
|
|
+ result["hint"] = (
|
|
|
|
|
+ f'使用 continue_from="{sub_trace_id}" 可继续执行,保留已有进度'
|
|
|
)
|
|
)
|
|
|
- messages = messages[:-1]
|
|
|
|
|
|
|
+ if stats:
|
|
|
|
|
+ result["stats"] = stats
|
|
|
|
|
|
|
|
- return messages
|
|
|
|
|
|
|
+ return json.dumps(result, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
# ===== 上下文注入 =====
|
|
# ===== 上下文注入 =====
|
|
|
|
|
|