""" 信号总线 - Agent 间异步通讯 提供简单的信号发送和缓冲池检查机制 """ from dataclasses import dataclass from typing import Any, List, Dict from collections import defaultdict @dataclass class Signal: """信号基类""" type: str # 信号类型,如 "subagent.start", "subagent.complete" trace_id: str # 发送信号的 trace ID data: Dict[str, Any] # 信号数据 class SignalBus: """ 信号总线 - 简化版 只提供两个核心接口: 1. emit() - 发送信号到缓冲池 2. check_buffer() - 检查并清空缓冲池 """ def __init__(self): # 缓冲池:parent_trace_id -> List[Signal] self._buffer: Dict[str, List[Signal]] = defaultdict(list) def emit(self, signal: Signal) -> None: """ 发送信号到缓冲池 信号会根据 parent_trace_id 存入对应的缓冲池 Args: signal: 要发送的信号 """ parent_trace_id = signal.data.get("parent_trace_id") if parent_trace_id: self._buffer[parent_trace_id].append(signal) def check_buffer(self, trace_id: str) -> List[Signal]: """ 检查并清空指定 trace 的缓冲池 Args: trace_id: 要检查的 trace ID Returns: 该 trace 的所有待处理信号(检查后会清空) """ signals = self._buffer.get(trace_id, []) if signals: self._buffer[trace_id] = [] return signals