| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- """
- 信号总线 - Agent 间异步通讯
- 提供简单的信号发送和缓冲池检查机制
- """
- from dataclasses import dataclass
- from typing import Any, List, Dict
- from collections import defaultdict
- @dataclass
- class Signal:
- """信号基类"""
- type: str # 信号类型,如 "subagent.start", "subagent.complete"
- trace_id: str # 发送信号的 trace ID
- data: Dict[str, Any] # 信号数据
- class SignalBus:
- """
- 信号总线 - 简化版
- 只提供两个核心接口:
- 1. emit() - 发送信号到缓冲池
- 2. check_buffer() - 检查并清空缓冲池
- """
- def __init__(self):
- # 缓冲池:parent_trace_id -> List[Signal]
- self._buffer: Dict[str, List[Signal]] = defaultdict(list)
- def emit(self, signal: Signal) -> None:
- """
- 发送信号到缓冲池
- 信号会根据 parent_trace_id 存入对应的缓冲池
- Args:
- signal: 要发送的信号
- """
- parent_trace_id = signal.data.get("parent_trace_id")
- if parent_trace_id:
- self._buffer[parent_trace_id].append(signal)
- def check_buffer(self, trace_id: str) -> List[Signal]:
- """
- 检查并清空指定 trace 的缓冲池
- Args:
- trace_id: 要检查的 trace ID
- Returns:
- 该 trace 的所有待处理信号(检查后会清空)
- """
- signals = self._buffer.get(trace_id, [])
- if signals:
- self._buffer[trace_id] = []
- return signals
|