""" Storage Protocols - 存储接口定义 使用 Protocol 定义接口,允许不同的存储实现(内存、PostgreSQL、Neo4j 等) """ from typing import Protocol, List, Optional, Dict, Any, runtime_checkable from agent.models.trace import Trace, Step from agent.models.memory import Experience, Skill @runtime_checkable class TraceStore(Protocol): """Trace + Step 存储接口""" # ===== Trace 操作 ===== async def create_trace(self, trace: Trace) -> str: """ 创建新的 Trace Args: trace: Trace 对象 Returns: trace_id """ ... async def get_trace(self, trace_id: str) -> Optional[Trace]: """获取 Trace""" ... async def update_trace(self, trace_id: str, **updates) -> None: """ 更新 Trace Args: trace_id: Trace ID **updates: 要更新的字段 """ ... async def list_traces( self, mode: Optional[str] = None, agent_type: Optional[str] = None, uid: Optional[str] = None, status: Optional[str] = None, limit: int = 50 ) -> List[Trace]: """列出 Traces""" ... # ===== Step 操作 ===== async def add_step(self, step: Step) -> str: """ 添加 Step Args: step: Step 对象 Returns: step_id """ ... async def get_step(self, step_id: str) -> Optional[Step]: """获取 Step""" ... async def get_trace_steps(self, trace_id: str) -> List[Step]: """获取 Trace 的所有 Steps(按 sequence 排序)""" ... async def get_step_children(self, step_id: str) -> List[Step]: """获取 Step 的子节点""" ... @runtime_checkable class MemoryStore(Protocol): """Experience + Skill 存储接口""" # ===== Experience 操作 ===== async def add_experience(self, exp: Experience) -> str: """添加 Experience""" ... async def get_experience(self, exp_id: str) -> Optional[Experience]: """获取 Experience""" ... async def search_experiences( self, scope: str, context: str, limit: int = 10 ) -> List[Experience]: """ 搜索相关 Experience Args: scope: 范围(如 "agent:researcher") context: 当前上下文,用于语义匹配 limit: 最大返回数量 """ ... async def update_experience_stats( self, exp_id: str, success: bool ) -> None: """更新 Experience 使用统计""" ... # ===== Skill 操作 ===== async def add_skill(self, skill: Skill) -> str: """添加 Skill""" ... async def get_skill(self, skill_id: str) -> Optional[Skill]: """获取 Skill""" ... async def get_skill_tree(self, scope: str) -> List[Skill]: """获取技能树""" ... async def search_skills( self, scope: str, context: str, limit: int = 5 ) -> List[Skill]: """搜索相关 Skills""" ... @runtime_checkable class StateStore(Protocol): """短期状态存储接口(用于 Task State,通常用 Redis)""" async def get(self, key: str) -> Optional[Dict[str, Any]]: """获取状态""" ... async def set( self, key: str, value: Dict[str, Any], ttl: Optional[int] = None ) -> None: """ 设置状态 Args: key: 键 value: 值 ttl: 过期时间(秒) """ ... async def update(self, key: str, **updates) -> None: """部分更新""" ... async def delete(self, key: str) -> None: """删除""" ...