| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- """
- Trace 和 Message 数据模型
- Trace: 一次完整的 LLM 交互(单次调用或 Agent 任务)
- Message: Trace 中的 LLM 消息,对应 LLM API 格式
- """
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import Dict, Any, List, Optional, Literal, Union
- import uuid
- # ===== 消息线格式类型别名 =====
- # 轻量 wire-format 类型,用于工具参数和 runner/LLM API 接口。
- # 内部存储使用下方的 Message dataclass。
- ChatMessage = Dict[str, Any] # 单条 OpenAI 格式消息
- Messages = List[ChatMessage] # 消息列表
- MessageContent = Union[str, List[Dict[str, str]]] # content 字段(文本或多模态)
- # 导入 TokenUsage(延迟导入避免循环依赖)
- def _get_token_usage_class():
- from ..llm.usage import TokenUsage
- return TokenUsage
- @dataclass
- class Trace:
- """
- 执行轨迹 - 一次完整的 LLM 交互
- 单次调用: mode="call"
- Agent 模式: mode="agent"
- 主 Trace 和 Sub-Trace 使用相同的数据结构。
- Sub-Trace 通过 parent_trace_id 和 parent_goal_id 关联父 Trace。
- """
- trace_id: str
- mode: Literal["call", "agent"]
- # Prompt 标识(可选)
- prompt_name: Optional[str] = None
- # Agent 模式特有
- task: Optional[str] = None
- agent_type: Optional[str] = None
- # 父子关系(Sub-Trace 特有)
- parent_trace_id: Optional[str] = None # 父 Trace ID
- parent_goal_id: Optional[str] = None # 哪个 Goal 启动的
- # 状态
- status: Literal["running", "completed", "failed", "stopped"] = "running"
- # 统计
- total_messages: int = 0 # 消息总数(改名自 total_steps)
- total_tokens: int = 0 # 总 tokens(向后兼容,= prompt + completion)
- total_prompt_tokens: int = 0 # 总输入 tokens
- total_completion_tokens: int = 0 # 总输出 tokens
- total_reasoning_tokens: int = 0 # 总推理 tokens(o1/o3, DeepSeek R1, Gemini thinking)
- total_cache_creation_tokens: int = 0 # 总缓存创建 tokens(Claude)
- total_cache_read_tokens: int = 0 # 总缓存读取 tokens(Claude)
- total_cost: float = 0.0
- total_duration_ms: int = 0 # 总耗时(毫秒)
- # 进度追踪(head)
- last_sequence: int = 0 # 最新 message 的 sequence
- head_sequence: int = 0 # 当前主路径的头节点 sequence(用于 build_llm_messages)
- last_event_id: int = 0 # 最新事件 ID(用于 WS 续传)
- # 配置
- uid: Optional[str] = None
- model: Optional[str] = None # 默认模型
- tools: Optional[List[Dict]] = None # 工具定义(整个 trace 共享)
- llm_params: Dict[str, Any] = field(default_factory=dict) # LLM 参数(temperature 等)
- context: Dict[str, Any] = field(default_factory=dict) # 其他元数据
- # 当前焦点 goal
- current_goal_id: Optional[str] = None
- # 结果
- result_summary: Optional[str] = None # 执行结果摘要
- error_message: Optional[str] = None # 错误信息
- # 时间
- created_at: datetime = field(default_factory=datetime.now)
- completed_at: Optional[datetime] = None
- last_activity_at: datetime = field(default_factory=datetime.now) # 最后活动时间(用于判断是否真正运行中)
- @classmethod
- def create(
- cls,
- mode: Literal["call", "agent"],
- **kwargs
- ) -> "Trace":
- """创建新的 Trace"""
- return cls(
- trace_id=str(uuid.uuid4()),
- mode=mode,
- **kwargs
- )
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> "Trace":
- """从字典创建 Trace(处理日期字段反序列化)"""
- from dateutil import parser
- # 处理日期字段
- if "created_at" in data and isinstance(data["created_at"], str):
- data["created_at"] = parser.isoparse(data["created_at"])
- if "completed_at" in data and isinstance(data["completed_at"], str):
- data["completed_at"] = parser.isoparse(data["completed_at"])
- if "last_activity_at" in data and isinstance(data["last_activity_at"], str):
- data["last_activity_at"] = parser.isoparse(data["last_activity_at"])
- return cls(**data)
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- return {
- "trace_id": self.trace_id,
- "mode": self.mode,
- "prompt_name": self.prompt_name,
- "task": self.task,
- "agent_type": self.agent_type,
- "parent_trace_id": self.parent_trace_id,
- "parent_goal_id": self.parent_goal_id,
- "status": self.status,
- "total_messages": self.total_messages,
- "total_tokens": self.total_tokens,
- "total_prompt_tokens": self.total_prompt_tokens,
- "total_completion_tokens": self.total_completion_tokens,
- "total_reasoning_tokens": self.total_reasoning_tokens,
- "total_cache_creation_tokens": self.total_cache_creation_tokens,
- "total_cache_read_tokens": self.total_cache_read_tokens,
- "total_cost": self.total_cost,
- "total_duration_ms": self.total_duration_ms,
- "last_sequence": self.last_sequence,
- "head_sequence": self.head_sequence,
- "last_event_id": self.last_event_id,
- "uid": self.uid,
- "model": self.model,
- "tools": self.tools,
- "llm_params": self.llm_params,
- "context": self.context,
- "current_goal_id": self.current_goal_id,
- "result_summary": self.result_summary,
- "error_message": self.error_message,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- "completed_at": self.completed_at.isoformat() if self.completed_at else None,
- "last_activity_at": self.last_activity_at.isoformat() if self.last_activity_at else None,
- }
- @dataclass
- class Message:
- """
- 执行消息 - Trace 中的 LLM 消息
- 对应 LLM API 消息格式(system/user/assistant/tool),通过 goal_id 关联 Goal。
- description 字段自动生成规则:
- - system: 取 content 前 200 字符
- - user: 取 content 前 200 字符
- - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX"
- - tool: 使用 tool name
- """
- message_id: str
- trace_id: str
- role: Literal["system", "user", "assistant", "tool"] # 和 LLM API 一致
- sequence: int # 全局顺序
- parent_sequence: Optional[int] = None # 父消息的 sequence(构成消息树)
- status: Literal["active", "abandoned"] = "active" # [已弃用] 由 parent_sequence 树结构替代
- goal_id: Optional[str] = None # 关联的 Goal 内部 ID(None = 还没有创建 Goal)
- description: str = "" # 消息描述(系统自动生成)
- tool_call_id: Optional[str] = None # tool 消息关联对应的 tool_call
- content: Any = None # 消息内容(和 LLM API 格式一致)
- # 侧分支标记
- branch_type: Optional[Literal["compression", "reflection"]] = None # 侧分支类型(None = 主路径)
- branch_id: Optional[str] = None # 侧分支 ID(同一侧分支的消息共享)
- # 元数据
- prompt_tokens: Optional[int] = None # 输入 tokens
- completion_tokens: Optional[int] = None # 输出 tokens
- reasoning_tokens: Optional[int] = None # 推理 tokens(o1/o3, DeepSeek R1, Gemini thinking)
- cache_creation_tokens: Optional[int] = None # 缓存创建 tokens(Claude)
- cache_read_tokens: Optional[int] = None # 缓存读取 tokens(Claude)
- cost: Optional[float] = None
- duration_ms: Optional[int] = None
- created_at: datetime = field(default_factory=datetime.now)
- abandoned_at: Optional[datetime] = None # [已弃用] 由 parent_sequence 树结构替代
- # LLM 响应信息(仅 role="assistant" 时使用)
- finish_reason: Optional[str] = None # stop, length, tool_calls, content_filter 等
- @property
- def tokens(self) -> int:
- """动态计算总 tokens(向后兼容,input + output)"""
- return (self.prompt_tokens or 0) + (self.completion_tokens or 0)
- @property
- def all_tokens(self) -> int:
- """所有 tokens(包括 reasoning)"""
- return self.tokens + (self.reasoning_tokens or 0)
- def get_usage(self):
- """获取 TokenUsage 对象"""
- TokenUsage = _get_token_usage_class()
- return TokenUsage(
- input_tokens=self.prompt_tokens or 0,
- output_tokens=self.completion_tokens or 0,
- reasoning_tokens=self.reasoning_tokens or 0,
- cache_creation_tokens=self.cache_creation_tokens or 0,
- cache_read_tokens=self.cache_read_tokens or 0,
- )
- def to_llm_dict(self) -> Dict[str, Any]:
- """转换为 OpenAI SDK 格式的消息字典(用于 LLM 调用)"""
- msg: Dict[str, Any] = {"role": self.role}
- if self.role == "tool":
- # tool message: tool_call_id + name + content
- if self.tool_call_id:
- msg["tool_call_id"] = self.tool_call_id
- msg["name"] = self.description or "unknown"
- if isinstance(self.content, dict):
- result = self.content.get("result", self.content)
- # result 可能是 list(含图片的多模态内容)或字符串
- msg["content"] = result if isinstance(result, list) else str(result)
- else:
- msg["content"] = str(self.content) if self.content is not None else ""
- elif self.role == "assistant":
- # assistant message: content(text) + tool_calls
- if isinstance(self.content, dict):
- msg["content"] = self.content.get("text", "") or ""
- if self.content.get("tool_calls"):
- msg["tool_calls"] = self.content["tool_calls"]
- elif isinstance(self.content, str):
- msg["content"] = self.content
- else:
- msg["content"] = ""
- else:
- # system / user message: content 直接传
- msg["content"] = self.content
- return msg
- @classmethod
- def from_llm_dict(
- cls,
- d: Dict[str, Any],
- trace_id: str,
- sequence: int,
- goal_id: Optional[str] = None,
- parent_sequence: Optional[int] = None,
- ) -> "Message":
- """从 OpenAI SDK 格式创建 Message"""
- role = d["role"]
- if role == "assistant":
- content = {"text": d.get("content", ""), "tool_calls": d.get("tool_calls")}
- elif role == "tool":
- content = {"tool_name": d.get("name", "unknown"), "result": d.get("content", "")}
- else:
- content = d.get("content", "")
- return cls.create(
- trace_id=trace_id,
- role=role,
- sequence=sequence,
- goal_id=goal_id,
- parent_sequence=parent_sequence,
- content=content,
- tool_call_id=d.get("tool_call_id"),
- )
- @classmethod
- def from_dict(cls, data: Dict[str, Any]) -> "Message":
- """从字典创建 Message(处理向后兼容)"""
- # 过滤掉已删除的字段
- filtered_data = {k: v for k, v in data.items() if k not in ["tokens", "available_tools"]}
- # 解析 datetime
- if filtered_data.get("created_at") and isinstance(filtered_data["created_at"], str):
- filtered_data["created_at"] = datetime.fromisoformat(filtered_data["created_at"])
- if filtered_data.get("abandoned_at") and isinstance(filtered_data["abandoned_at"], str):
- filtered_data["abandoned_at"] = datetime.fromisoformat(filtered_data["abandoned_at"])
- # 向后兼容:旧消息没有 status 字段,默认 active
- if "status" not in filtered_data:
- filtered_data["status"] = "active"
- # 向后兼容:旧消息没有 parent_sequence 字段
- if "parent_sequence" not in filtered_data:
- filtered_data["parent_sequence"] = None
- # 向后兼容:旧消息没有侧分支字段
- if "branch_type" not in filtered_data:
- filtered_data["branch_type"] = None
- if "branch_id" not in filtered_data:
- filtered_data["branch_id"] = None
- return cls(**filtered_data)
- @classmethod
- def create(
- cls,
- trace_id: str,
- role: Literal["system", "user", "assistant", "tool"],
- sequence: int,
- goal_id: Optional[str] = None,
- content: Any = None,
- tool_call_id: Optional[str] = None,
- parent_sequence: Optional[int] = None,
- branch_type: Optional[Literal["compression", "reflection"]] = None,
- branch_id: Optional[str] = None,
- prompt_tokens: Optional[int] = None,
- completion_tokens: Optional[int] = None,
- reasoning_tokens: Optional[int] = None,
- cache_creation_tokens: Optional[int] = None,
- cache_read_tokens: Optional[int] = None,
- cost: Optional[float] = None,
- duration_ms: Optional[int] = None,
- finish_reason: Optional[str] = None,
- ) -> "Message":
- """创建新的 Message,自动生成 description"""
- description = cls._generate_description(role, content)
- return cls(
- message_id=f"{trace_id}-{sequence:04d}",
- trace_id=trace_id,
- role=role,
- sequence=sequence,
- parent_sequence=parent_sequence,
- goal_id=goal_id,
- content=content,
- description=description,
- tool_call_id=tool_call_id,
- branch_type=branch_type,
- branch_id=branch_id,
- prompt_tokens=prompt_tokens,
- completion_tokens=completion_tokens,
- reasoning_tokens=reasoning_tokens,
- cache_creation_tokens=cache_creation_tokens,
- cache_read_tokens=cache_read_tokens,
- cost=cost,
- duration_ms=duration_ms,
- finish_reason=finish_reason,
- )
- @staticmethod
- def _generate_description(role: str, content: Any) -> str:
- """
- 自动生成 description
- - system: 取 content 前 200 字符
- - user: 取 content 前 200 字符
- - assistant: 优先取 content,若无 content 则生成 "tool call: XX, XX"
- - tool: 使用 tool name
- """
- if role == "system":
- # system 消息:直接返回文本
- if isinstance(content, str):
- return content
- return "system prompt"
- elif role == "user":
- # user 消息:直接返回文本
- if isinstance(content, str):
- return content
- return "user message"
- elif role == "assistant":
- # assistant 消息:content 是字典,可能包含 text 和 tool_calls
- if isinstance(content, dict):
- # 优先返回文本内容
- if content.get("text"):
- text = content["text"]
- # 返回完整文本
- return text
- # 如果没有文本,检查 tool_calls
- if content.get("tool_calls"):
- tool_calls = content["tool_calls"]
- if isinstance(tool_calls, list):
- tool_descriptions = []
- for tc in tool_calls:
- if isinstance(tc, dict) and tc.get("function", {}).get("name"):
- tool_name = tc["function"]["name"]
- # 提取参数并截断到 100 字符
- tool_args = tc["function"].get("arguments", "{}")
- if isinstance(tool_args, str):
- args_str = tool_args
- else:
- import json
- args_str = json.dumps(tool_args, ensure_ascii=False)
- args_display = args_str[:100] + "..." if len(args_str) > 100 else args_str
- tool_descriptions.append(f"{tool_name}({args_display})")
- if tool_descriptions:
- return "tool call: " + ", ".join(tool_descriptions)
- # 如果 content 是字符串
- if isinstance(content, str):
- return content
- return "assistant message"
- elif role == "tool":
- # tool 消息:从 content 中提取 tool name
- if isinstance(content, dict):
- if content.get("tool_name"):
- return content["tool_name"]
- # 如果是字符串,尝试解析
- if isinstance(content, str):
- return content[:100] + "..." if len(content) > 100 else content
- return "tool result"
- return ""
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典"""
- result = {
- "message_id": self.message_id,
- "trace_id": self.trace_id,
- "role": self.role,
- "sequence": self.sequence,
- "parent_sequence": self.parent_sequence,
- "status": self.status,
- "goal_id": self.goal_id,
- "tool_call_id": self.tool_call_id,
- "content": self.content,
- "description": self.description,
- "tokens": self.tokens, # 使用 @property 动态计算
- "prompt_tokens": self.prompt_tokens,
- "completion_tokens": self.completion_tokens,
- "cost": self.cost,
- "duration_ms": self.duration_ms,
- "finish_reason": self.finish_reason,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- }
- # 只添加非空的可选字段
- if self.abandoned_at:
- result["abandoned_at"] = self.abandoned_at.isoformat()
- if self.reasoning_tokens is not None:
- result["reasoning_tokens"] = self.reasoning_tokens
- if self.cache_creation_tokens is not None:
- result["cache_creation_tokens"] = self.cache_creation_tokens
- if self.cache_read_tokens is not None:
- result["cache_read_tokens"] = self.cache_read_tokens
- return result
- # ===== 已弃用:Step 模型(保留用于向后兼容)=====
- # Step 类型
- StepType = Literal[
- "goal", "thought", "evaluation", "response",
- "action", "result", "memory_read", "memory_write",
- ]
- # Step 状态
- StepStatus = Literal[
- "planned", "in_progress", "awaiting_approval",
- "completed", "failed", "skipped",
- ]
- @dataclass
- class Step:
- """
- [已弃用] 执行步骤 - 使用 Message 模型替代
- 保留用于向后兼容
- """
- step_id: str
- trace_id: str
- step_type: StepType
- status: StepStatus
- sequence: int
- parent_id: Optional[str] = None
- description: str = ""
- data: Dict[str, Any] = field(default_factory=dict)
- summary: Optional[str] = None
- has_children: bool = False
- children_count: int = 0
- duration_ms: Optional[int] = None
- tokens: Optional[int] = None
- cost: Optional[float] = None
- created_at: datetime = field(default_factory=datetime.now)
- @classmethod
- def create(
- cls,
- trace_id: str,
- step_type: StepType,
- sequence: int,
- status: StepStatus = "completed",
- description: str = "",
- data: Dict[str, Any] = None,
- parent_id: Optional[str] = None,
- summary: Optional[str] = None,
- duration_ms: Optional[int] = None,
- tokens: Optional[int] = None,
- cost: Optional[float] = None,
- ) -> "Step":
- """创建新的 Step"""
- return cls(
- step_id=str(uuid.uuid4()),
- trace_id=trace_id,
- step_type=step_type,
- status=status,
- sequence=sequence,
- parent_id=parent_id,
- description=description,
- data=data or {},
- summary=summary,
- duration_ms=duration_ms,
- tokens=tokens,
- cost=cost,
- )
- def to_dict(self, view: str = "full") -> Dict[str, Any]:
- """
- 转换为字典
- Args:
- view: "compact" - 不返回大字段
- "full" - 返回完整数据
- """
- result = {
- "step_id": self.step_id,
- "trace_id": self.trace_id,
- "step_type": self.step_type,
- "status": self.status,
- "sequence": self.sequence,
- "parent_id": self.parent_id,
- "description": self.description,
- "summary": self.summary,
- "has_children": self.has_children,
- "children_count": self.children_count,
- "duration_ms": self.duration_ms,
- "tokens": self.tokens,
- "cost": self.cost,
- "created_at": self.created_at.isoformat() if self.created_at else None,
- }
- # 处理 data 字段
- if view == "compact":
- data_copy = self.data.copy()
- for key in ["output", "content", "full_output", "full_content"]:
- data_copy.pop(key, None)
- result["data"] = data_copy
- else:
- result["data"] = self.data
- return result
|