""" Trace 和 Message 数据模型 Trace: 一次完整的 LLM 交互(单次调用或 Agent 任务) Message: Trace 中的 LLM 消息,对应 LLM API 格式 """ from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, List, Optional, Literal import uuid @dataclass class Trace: """ 执行轨迹 - 一次完整的 LLM 交互 单次调用: mode="call" Agent 模式: mode="agent" """ trace_id: str mode: Literal["call", "agent"] # Prompt 标识(可选) prompt_name: Optional[str] = None # Agent 模式特有 task: Optional[str] = None agent_type: Optional[str] = None # 状态 status: Literal["running", "completed", "failed"] = "running" # 统计 total_messages: int = 0 # 消息总数(改名自 total_steps) total_tokens: int = 0 total_cost: float = 0.0 total_duration_ms: int = 0 # 总耗时(毫秒) # 进度追踪(head) last_sequence: int = 0 # 最新 message 的 sequence last_event_id: int = 0 # 最新事件 ID(用于 WS 续传) # 上下文 uid: Optional[str] = None context: Dict[str, Any] = field(default_factory=dict) # 当前焦点 goal current_goal_id: Optional[str] = None # 时间 created_at: datetime = field(default_factory=datetime.now) completed_at: Optional[datetime] = None @classmethod def create( cls, mode: Literal["call", "agent"], **kwargs ) -> "Trace": """创建新的 Trace""" return cls( trace_id=str(uuid.uuid4()), mode=mode, **kwargs ) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "trace_id": self.trace_id, "mode": self.mode, "prompt_name": self.prompt_name, "task": self.task, "agent_type": self.agent_type, "status": self.status, "total_messages": self.total_messages, "total_tokens": self.total_tokens, "total_cost": self.total_cost, "total_duration_ms": self.total_duration_ms, "last_sequence": self.last_sequence, "last_event_id": self.last_event_id, "uid": self.uid, "context": self.context, "current_goal_id": self.current_goal_id, "created_at": self.created_at.isoformat() if self.created_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, } @dataclass class Message: """ 执行消息 - Trace 中的 LLM 消息 对应 LLM API 消息格式(assistant/tool),通过 goal_id 和 branch_id 关联 Goal。 description 字段自动生成规则: - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX" - tool: 使用 tool name """ message_id: str trace_id: str role: Literal["assistant", "tool"] # 和 LLM API 一致 sequence: int # 全局顺序 goal_id: str # 关联的 Goal 内部 ID description: str = "" # 消息描述(系统自动生成) branch_id: Optional[str] = None # 所属分支(null=主线, "A"/"B"=分支) tool_call_id: Optional[str] = None # tool 消息关联对应的 tool_call content: Any = None # 消息内容(和 LLM API 格式一致) # 元数据 tokens: Optional[int] = None cost: Optional[float] = None duration_ms: Optional[int] = None created_at: datetime = field(default_factory=datetime.now) @classmethod def create( cls, trace_id: str, role: Literal["assistant", "tool"], sequence: int, goal_id: str, content: Any = None, branch_id: Optional[str] = None, tool_call_id: Optional[str] = None, tokens: Optional[int] = None, cost: Optional[float] = None, duration_ms: Optional[int] = None, ) -> "Message": """创建新的 Message,自动生成 description""" description = cls._generate_description(role, content) return cls( message_id=str(uuid.uuid4()), trace_id=trace_id, role=role, sequence=sequence, goal_id=goal_id, content=content, description=description, branch_id=branch_id, tool_call_id=tool_call_id, tokens=tokens, cost=cost, duration_ms=duration_ms, ) @staticmethod def _generate_description(role: str, content: Any) -> str: """ 自动生成 description - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX" - tool: 使用 tool name """ if role == "assistant": # assistant 消息:content 是字典,可能包含 text 和 tool_calls if isinstance(content, dict): # 优先返回文本内容 if content.get("text"): text = content["text"] # 截断过长的文本 return text[:200] + "..." if len(text) > 200 else text # 如果没有文本,检查 tool_calls if content.get("tool_calls"): tool_calls = content["tool_calls"] if isinstance(tool_calls, list): tool_names = [] for tc in tool_calls: if isinstance(tc, dict) and tc.get("function", {}).get("name"): tool_names.append(tc["function"]["name"]) if tool_names: return f"tool call: {', '.join(tool_names)}" # 如果 content 是字符串 if isinstance(content, str): return content[:200] + "..." if len(content) > 200 else content return "assistant message" elif role == "tool": # tool 消息:从 content 中提取 tool name if isinstance(content, dict): if content.get("tool_name"): return content["tool_name"] # 如果是字符串,尝试解析 if isinstance(content, str): return content[:100] + "..." if len(content) > 100 else content return "tool result" return "" def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "message_id": self.message_id, "trace_id": self.trace_id, "branch_id": self.branch_id, "role": self.role, "sequence": self.sequence, "goal_id": self.goal_id, "tool_call_id": self.tool_call_id, "content": self.content, "description": self.description, "tokens": self.tokens, "cost": self.cost, "duration_ms": self.duration_ms, "created_at": self.created_at.isoformat() if self.created_at else None, } # ===== 已弃用:Step 模型(保留用于向后兼容)===== # Step 类型 StepType = Literal[ "goal", "thought", "evaluation", "response", "action", "result", "memory_read", "memory_write", ] # Step 状态 StepStatus = Literal[ "planned", "in_progress", "awaiting_approval", "completed", "failed", "skipped", ] @dataclass class Step: """ [已弃用] 执行步骤 - 使用 Message 模型替代 保留用于向后兼容 """ step_id: str trace_id: str step_type: StepType status: StepStatus sequence: int parent_id: Optional[str] = None description: str = "" data: Dict[str, Any] = field(default_factory=dict) summary: Optional[str] = None has_children: bool = False children_count: int = 0 duration_ms: Optional[int] = None tokens: Optional[int] = None cost: Optional[float] = None created_at: datetime = field(default_factory=datetime.now) @classmethod def create( cls, trace_id: str, step_type: StepType, sequence: int, status: StepStatus = "completed", description: str = "", data: Dict[str, Any] = None, parent_id: Optional[str] = None, summary: Optional[str] = None, duration_ms: Optional[int] = None, tokens: Optional[int] = None, cost: Optional[float] = None, ) -> "Step": """创建新的 Step""" return cls( step_id=str(uuid.uuid4()), trace_id=trace_id, step_type=step_type, status=status, sequence=sequence, parent_id=parent_id, description=description, data=data or {}, summary=summary, duration_ms=duration_ms, tokens=tokens, cost=cost, ) def to_dict(self, view: str = "full") -> Dict[str, Any]: """ 转换为字典 Args: view: "compact" - 不返回大字段 "full" - 返回完整数据 """ result = { "step_id": self.step_id, "trace_id": self.trace_id, "step_type": self.step_type, "status": self.status, "sequence": self.sequence, "parent_id": self.parent_id, "description": self.description, "summary": self.summary, "has_children": self.has_children, "children_count": self.children_count, "duration_ms": self.duration_ms, "tokens": self.tokens, "cost": self.cost, "created_at": self.created_at.isoformat() if self.created_at else None, } # 处理 data 字段 if view == "compact": data_copy = self.data.copy() for key in ["output", "content", "full_output", "full_content"]: data_copy.pop(key, None) result["data"] = data_copy else: result["data"] = self.data return result