""" Trace 和 Message 数据模型 Trace: 一次完整的 LLM 交互(单次调用或 Agent 任务) Message: Trace 中的 LLM 消息,对应 LLM API 格式 """ from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, List, Optional, Literal import uuid # 导入 TokenUsage(延迟导入避免循环依赖) def _get_token_usage_class(): from ..llm.usage import TokenUsage return TokenUsage @dataclass class Trace: """ 执行轨迹 - 一次完整的 LLM 交互 单次调用: mode="call" Agent 模式: mode="agent" 主 Trace 和 Sub-Trace 使用相同的数据结构。 Sub-Trace 通过 parent_trace_id 和 parent_goal_id 关联父 Trace。 """ trace_id: str mode: Literal["call", "agent"] # Prompt 标识(可选) prompt_name: Optional[str] = None # Agent 模式特有 task: Optional[str] = None agent_type: Optional[str] = None # 父子关系(Sub-Trace 特有) parent_trace_id: Optional[str] = None # 父 Trace ID parent_goal_id: Optional[str] = None # 哪个 Goal 启动的 # 状态 status: Literal["running", "completed", "failed"] = "running" # 统计 total_messages: int = 0 # 消息总数(改名自 total_steps) total_tokens: int = 0 # 总 tokens(向后兼容,= prompt + completion) total_prompt_tokens: int = 0 # 总输入 tokens total_completion_tokens: int = 0 # 总输出 tokens total_reasoning_tokens: int = 0 # 总推理 tokens(o1/o3, DeepSeek R1, Gemini thinking) total_cache_creation_tokens: int = 0 # 总缓存创建 tokens(Claude) total_cache_read_tokens: int = 0 # 总缓存读取 tokens(Claude) total_cost: float = 0.0 total_duration_ms: int = 0 # 总耗时(毫秒) # 进度追踪(head) last_sequence: int = 0 # 最新 message 的 sequence last_event_id: int = 0 # 最新事件 ID(用于 WS 续传) # 配置 uid: Optional[str] = None model: Optional[str] = None # 默认模型 tools: Optional[List[Dict]] = None # 工具定义(整个 trace 共享) llm_params: Dict[str, Any] = field(default_factory=dict) # LLM 参数(temperature 等) context: Dict[str, Any] = field(default_factory=dict) # 其他元数据 # 当前焦点 goal current_goal_id: Optional[str] = None # 结果 result_summary: Optional[str] = None # 执行结果摘要 error_message: Optional[str] = None # 错误信息 # 时间 created_at: datetime = field(default_factory=datetime.now) completed_at: Optional[datetime] = None @classmethod def create( cls, mode: Literal["call", "agent"], **kwargs ) -> "Trace": """创建新的 Trace""" return cls( trace_id=str(uuid.uuid4()), mode=mode, **kwargs ) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "trace_id": self.trace_id, "mode": self.mode, "prompt_name": self.prompt_name, "task": self.task, "agent_type": self.agent_type, "parent_trace_id": self.parent_trace_id, "parent_goal_id": self.parent_goal_id, "status": self.status, "total_messages": self.total_messages, "total_tokens": self.total_tokens, "total_prompt_tokens": self.total_prompt_tokens, "total_completion_tokens": self.total_completion_tokens, "total_reasoning_tokens": self.total_reasoning_tokens, "total_cache_creation_tokens": self.total_cache_creation_tokens, "total_cache_read_tokens": self.total_cache_read_tokens, "total_cost": self.total_cost, "total_duration_ms": self.total_duration_ms, "last_sequence": self.last_sequence, "last_event_id": self.last_event_id, "uid": self.uid, "model": self.model, "tools": self.tools, "llm_params": self.llm_params, "context": self.context, "current_goal_id": self.current_goal_id, "result_summary": self.result_summary, "error_message": self.error_message, "created_at": self.created_at.isoformat() if self.created_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, } @dataclass class Message: """ 执行消息 - Trace 中的 LLM 消息 对应 LLM API 消息格式(system/user/assistant/tool),通过 goal_id 关联 Goal。 description 字段自动生成规则: - system: 取 content 前 200 字符 - user: 取 content 前 200 字符 - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX" - tool: 使用 tool name """ message_id: str trace_id: str role: Literal["system", "user", "assistant", "tool"] # 和 LLM API 一致 sequence: int # 全局顺序 goal_id: Optional[str] = None # 关联的 Goal 内部 ID(None = 还没有创建 Goal) description: str = "" # 消息描述(系统自动生成) tool_call_id: Optional[str] = None # tool 消息关联对应的 tool_call content: Any = None # 消息内容(和 LLM API 格式一致) # 元数据 prompt_tokens: Optional[int] = None # 输入 tokens completion_tokens: Optional[int] = None # 输出 tokens reasoning_tokens: Optional[int] = None # 推理 tokens(o1/o3, DeepSeek R1, Gemini thinking) cache_creation_tokens: Optional[int] = None # 缓存创建 tokens(Claude) cache_read_tokens: Optional[int] = None # 缓存读取 tokens(Claude) cost: Optional[float] = None duration_ms: Optional[int] = None created_at: datetime = field(default_factory=datetime.now) # LLM 响应信息(仅 role="assistant" 时使用) finish_reason: Optional[str] = None # stop, length, tool_calls, content_filter 等 @property def tokens(self) -> int: """动态计算总 tokens(向后兼容,input + output)""" return (self.prompt_tokens or 0) + (self.completion_tokens or 0) @property def all_tokens(self) -> int: """所有 tokens(包括 reasoning)""" return self.tokens + (self.reasoning_tokens or 0) def get_usage(self): """获取 TokenUsage 对象""" TokenUsage = _get_token_usage_class() return TokenUsage( input_tokens=self.prompt_tokens or 0, output_tokens=self.completion_tokens or 0, reasoning_tokens=self.reasoning_tokens or 0, cache_creation_tokens=self.cache_creation_tokens or 0, cache_read_tokens=self.cache_read_tokens or 0, ) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "Message": """从字典创建 Message(处理向后兼容)""" # 过滤掉已删除的字段 filtered_data = {k: v for k, v in data.items() if k not in ["tokens", "available_tools"]} # 解析 datetime if filtered_data.get("created_at") and isinstance(filtered_data["created_at"], str): filtered_data["created_at"] = datetime.fromisoformat(filtered_data["created_at"]) return cls(**filtered_data) @classmethod def create( cls, trace_id: str, role: Literal["system", "user", "assistant", "tool"], sequence: int, goal_id: Optional[str] = None, content: Any = None, tool_call_id: Optional[str] = None, prompt_tokens: Optional[int] = None, completion_tokens: Optional[int] = None, reasoning_tokens: Optional[int] = None, cache_creation_tokens: Optional[int] = None, cache_read_tokens: Optional[int] = None, cost: Optional[float] = None, duration_ms: Optional[int] = None, finish_reason: Optional[str] = None, ) -> "Message": """创建新的 Message,自动生成 description""" description = cls._generate_description(role, content) return cls( message_id=f"{trace_id}-{sequence:04d}", trace_id=trace_id, role=role, sequence=sequence, goal_id=goal_id, content=content, description=description, tool_call_id=tool_call_id, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, reasoning_tokens=reasoning_tokens, cache_creation_tokens=cache_creation_tokens, cache_read_tokens=cache_read_tokens, cost=cost, duration_ms=duration_ms, finish_reason=finish_reason, ) @staticmethod def _generate_description(role: str, content: Any) -> str: """ 自动生成 description - system: 取 content 前 200 字符 - user: 取 content 前 200 字符 - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX" - tool: 使用 tool name """ if role == "system": # system 消息:直接截取文本 if isinstance(content, str): return content[:200] + "..." if len(content) > 200 else content return "system prompt" elif role == "user": # user 消息:直接截取文本 if isinstance(content, str): return content[:200] + "..." if len(content) > 200 else content return "user message" elif role == "assistant": # assistant 消息:content 是字典,可能包含 text 和 tool_calls if isinstance(content, dict): # 优先返回文本内容 if content.get("text"): text = content["text"] # 截断过长的文本 return text[:200] + "..." if len(text) > 200 else text # 如果没有文本,检查 tool_calls if content.get("tool_calls"): tool_calls = content["tool_calls"] if isinstance(tool_calls, list): tool_names = [] for tc in tool_calls: if isinstance(tc, dict) and tc.get("function", {}).get("name"): tool_names.append(tc["function"]["name"]) if tool_names: return f"tool call: {', '.join(tool_names)}" # 如果 content 是字符串 if isinstance(content, str): return content[:200] + "..." if len(content) > 200 else content return "assistant message" elif role == "tool": # tool 消息:从 content 中提取 tool name if isinstance(content, dict): if content.get("tool_name"): return content["tool_name"] # 如果是字符串,尝试解析 if isinstance(content, str): return content[:100] + "..." if len(content) > 100 else content return "tool result" return "" def to_dict(self) -> Dict[str, Any]: """转换为字典""" result = { "message_id": self.message_id, "trace_id": self.trace_id, "role": self.role, "sequence": self.sequence, "goal_id": self.goal_id, "tool_call_id": self.tool_call_id, "content": self.content, "description": self.description, "tokens": self.tokens, # 使用 @property 动态计算 "prompt_tokens": self.prompt_tokens, "completion_tokens": self.completion_tokens, "cost": self.cost, "duration_ms": self.duration_ms, "finish_reason": self.finish_reason, "created_at": self.created_at.isoformat() if self.created_at else None, } # 只添加非空的可选字段 if self.reasoning_tokens: result["reasoning_tokens"] = self.reasoning_tokens if self.cache_creation_tokens: result["cache_creation_tokens"] = self.cache_creation_tokens if self.cache_read_tokens: result["cache_read_tokens"] = self.cache_read_tokens return result # ===== 已弃用:Step 模型(保留用于向后兼容)===== # Step 类型 StepType = Literal[ "goal", "thought", "evaluation", "response", "action", "result", "memory_read", "memory_write", ] # Step 状态 StepStatus = Literal[ "planned", "in_progress", "awaiting_approval", "completed", "failed", "skipped", ] @dataclass class Step: """ [已弃用] 执行步骤 - 使用 Message 模型替代 保留用于向后兼容 """ step_id: str trace_id: str step_type: StepType status: StepStatus sequence: int parent_id: Optional[str] = None description: str = "" data: Dict[str, Any] = field(default_factory=dict) summary: Optional[str] = None has_children: bool = False children_count: int = 0 duration_ms: Optional[int] = None tokens: Optional[int] = None cost: Optional[float] = None created_at: datetime = field(default_factory=datetime.now) @classmethod def create( cls, trace_id: str, step_type: StepType, sequence: int, status: StepStatus = "completed", description: str = "", data: Dict[str, Any] = None, parent_id: Optional[str] = None, summary: Optional[str] = None, duration_ms: Optional[int] = None, tokens: Optional[int] = None, cost: Optional[float] = None, ) -> "Step": """创建新的 Step""" return cls( step_id=str(uuid.uuid4()), trace_id=trace_id, step_type=step_type, status=status, sequence=sequence, parent_id=parent_id, description=description, data=data or {}, summary=summary, duration_ms=duration_ms, tokens=tokens, cost=cost, ) def to_dict(self, view: str = "full") -> Dict[str, Any]: """ 转换为字典 Args: view: "compact" - 不返回大字段 "full" - 返回完整数据 """ result = { "step_id": self.step_id, "trace_id": self.trace_id, "step_type": self.step_type, "status": self.status, "sequence": self.sequence, "parent_id": self.parent_id, "description": self.description, "summary": self.summary, "has_children": self.has_children, "children_count": self.children_count, "duration_ms": self.duration_ms, "tokens": self.tokens, "cost": self.cost, "created_at": self.created_at.isoformat() if self.created_at else None, } # 处理 data 字段 if view == "compact": data_copy = self.data.copy() for key in ["output", "content", "full_output", "full_content"]: data_copy.pop(key, None) result["data"] = data_copy else: result["data"] = self.data return result