signals.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. """
  2. 信号总线 - Agent 间异步通讯
  3. 提供简单的信号发送和缓冲池检查机制
  4. """
  5. from dataclasses import dataclass
  6. from typing import Any, List, Dict
  7. from collections import defaultdict
  8. @dataclass
  9. class Signal:
  10. """信号基类"""
  11. type: str # 信号类型,如 "subagent.start", "subagent.complete"
  12. trace_id: str # 发送信号的 trace ID
  13. data: Dict[str, Any] # 信号数据
  14. class SignalBus:
  15. """
  16. 信号总线 - 简化版
  17. 只提供两个核心接口:
  18. 1. emit() - 发送信号到缓冲池
  19. 2. check_buffer() - 检查并清空缓冲池
  20. """
  21. def __init__(self):
  22. # 缓冲池:parent_trace_id -> List[Signal]
  23. self._buffer: Dict[str, List[Signal]] = defaultdict(list)
  24. def emit(self, signal: Signal) -> None:
  25. """
  26. 发送信号到缓冲池
  27. 信号会根据 parent_trace_id 存入对应的缓冲池
  28. Args:
  29. signal: 要发送的信号
  30. """
  31. parent_trace_id = signal.data.get("parent_trace_id")
  32. if parent_trace_id:
  33. self._buffer[parent_trace_id].append(signal)
  34. def check_buffer(self, trace_id: str) -> List[Signal]:
  35. """
  36. 检查并清空指定 trace 的缓冲池
  37. Args:
  38. trace_id: 要检查的 trace ID
  39. Returns:
  40. 该 trace 的所有待处理信号(检查后会清空)
  41. """
  42. signals = self._buffer.get(trace_id, [])
  43. if signals:
  44. self._buffer[trace_id] = []
  45. return signals