""" Dream:记忆反思操作(Phase 3) 两阶段执行: per_trace_reflect → 为每个有新消息的 trace 生成反思摘要,写 cognition_log cross_trace_integrate → 汇总各 trace 的反思摘要 + 当前记忆文件, 用 dream_prompt 指导 LLM 更新记忆文件 对外入口: run_dream(store, llm_call, memory_config, trace_filter=None, model=...) """ from __future__ import annotations import json import logging import re from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple from agent.core.memory import MemoryConfig, load_memory_files, format_memory_injection from agent.trace.models import Trace from agent.trace.store import FileSystemTraceStore logger = logging.getLogger(__name__) # ===== 默认 prompts ===== DEFAULT_REFLECT_PROMPT = """你正在回顾一次 Agent 执行中发生的事情,为你自己(作为长期身份)的记忆做反思。 请综合下面的执行过程和知识使用情况,回答: 1. 这次执行中有什么值得记住的经验?(品味、判断、策略) 2. 哪些知识的评估反映了我的判断需要调整? 3. 用户的反馈(如果有)说明了什么? 用简洁的第一人称段落写,不要逐条列点,不要重复执行细节 —— 你在沉淀"这对未来的我意味着什么"。 只输出反思内容本身,不要任何其它前缀或 markdown 标题。""" DEFAULT_DREAM_PROMPT = """你正在整理自己的长期记忆。下面是你最近的反思摘要、以及当前各记忆文件的内容。 请决定哪些文件应该更新、内容怎么改。原则: - 只更新真正有新见解的文件,没有变化的就不要动 - 在原有内容基础上演进,不是重写;保留仍然有效的旧内容 - 简洁、人类可读的 markdown 格式 - 新增文件必须是 MemoryConfig.files 已声明的路径(否则不会被下次加载) **严格按以下 JSON 格式输出,不要任何其它文字**: ```json { "updates": [ {"path": "taste.md", "new_content": "完整的新文件内容"}, {"path": "strategy.md", "new_content": "..."} ], "reasoning": "你为什么做这些更新(简短)" } ``` 如果没有任何文件需要更新,输出 `{"updates": [], "reasoning": "..."}`。""" # ===== 数据结构 ===== @dataclass class DreamReport: per_trace_summaries: Dict[str, str] = field(default_factory=dict) # {trace_id: summary} updated_files: List[str] = field(default_factory=list) # 实际写入的文件路径 consumed_reflection_count: int = 0 # 本次消化了多少条 reflection reasoning: str = "" skipped_traces: List[str] = field(default_factory=list) LLMCall = Callable[..., Awaitable[Dict[str, Any]]] # ===== Per-trace 反思 ===== async def per_trace_reflect( store: FileSystemTraceStore, llm_call: LLMCall, trace_id: str, memory_config: MemoryConfig, model: str = "gpt-4o-mini", ) -> Optional[str]: """为单个 trace 生成反思摘要,写入 cognition_log,更新 reflected_at_sequence。 Returns: 反思摘要字符串;若 trace 没有新消息或 LLM 返回空,返回 None。 """ trace = await store.get_trace(trace_id) if not trace: logger.debug(f"[Dream] trace 不存在: {trace_id}") return None start_seq = (trace.reflected_at_sequence or 0) + 1 end_seq = trace.last_sequence if start_seq > end_seq: logger.debug(f"[Dream] trace {trace_id} 没有新消息({start_seq} > {end_seq})") return None all_msgs = await store.get_trace_messages(trace_id) new_msgs = [m for m in all_msgs if start_seq <= m.sequence <= end_seq] if not new_msgs: logger.debug(f"[Dream] trace {trace_id} 范围内无消息") return None log = await store.get_cognition_log(trace_id) events = log.get("events", log.get("entries", [])) relevant_events = [ e for e in events if e.get("sequence") is not None and start_seq <= e["sequence"] <= end_seq and e.get("type") in ("query", "evaluation", "extraction_pending", "extraction_committed") ] user_content = _build_reflect_input(new_msgs, relevant_events) prompt = memory_config.reflect_prompt or DEFAULT_REFLECT_PROMPT try: result = await llm_call( messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": user_content}, ], model=model, tools=None, temperature=0.5, ) except Exception as e: logger.error(f"[Dream] per_trace_reflect LLM 调用失败 {trace_id}: {e}") return None summary = (result.get("content") or "").strip() if not summary: logger.info(f"[Dream] trace {trace_id} 反思 LLM 返回空,视为无值得记录的内容") # 仍然更新 reflected_at_sequence,避免下次重复扫描 await store.update_trace(trace_id, reflected_at_sequence=end_seq) return None await store.append_cognition_event( trace_id=trace_id, event={ "type": "reflection", "sequence_range": [start_seq, end_seq], "summary": summary, }, ) await store.update_trace(trace_id, reflected_at_sequence=end_seq) logger.info(f"[Dream] trace {trace_id} 反思完成,覆盖 sequence {start_seq}-{end_seq}") return summary def _build_reflect_input(messages: List[Any], events: List[Dict[str, Any]]) -> str: """把消息和事件组织为 LLM 可读的反思输入。""" parts: List[str] = ["## 执行过程"] for m in messages: role = getattr(m, "role", "?") desc = getattr(m, "description", "") or "" seq = getattr(m, "sequence", "?") # 截断,防止单条过长 parts.append(f"[{seq}] {role}: {desc[:500]}") if events: parts.append("\n## 知识使用与提取情况(来自 cognition_log)") for e in events: etype = e.get("type") if etype == "query": parts.append( f"- [{e.get('sequence')}] query: {e.get('query', '')[:100]} → " f"source_ids={e.get('source_ids', [])}" ) elif etype == "evaluation": parts.append( f"- evaluation: knowledge_id={e.get('knowledge_id')} " f"result={e.get('eval_result')}" ) elif etype == "extraction_pending": payload = e.get("payload", {}) parts.append( f"- extraction_pending ({e.get('extraction_id')}): " f"{payload.get('task', '')[:80]}" ) elif etype == "extraction_committed": parts.append( f"- extraction_committed: extraction={e.get('extraction_id')} " f"→ knowledge_id={e.get('knowledge_id')}" ) return "\n".join(parts) # ===== 跨 trace 整合 ===== async def cross_trace_integrate( store: FileSystemTraceStore, llm_call: LLMCall, memory_config: MemoryConfig, trace_filter: Optional[Callable[[Trace], bool]] = None, model: str = "gpt-4o", ) -> Tuple[int, List[str], str]: """汇总各 trace 未消化的 reflection 事件,用 LLM 更新记忆文件。 Args: trace_filter: 可选的 trace 过滤函数(例如按 agent_type / owner); None 表示扫描 TraceStore 下所有 trace。 Returns: (consumed_reflection_count, updated_file_paths, reasoning) """ all_traces = await store.list_traces(limit=1000) if trace_filter: all_traces = [t for t in all_traces if trace_filter(t)] # 收集所有未消化的 reflection 事件 reflections: List[Tuple[str, Dict[str, Any]]] = [] # [(trace_id, event)] for t in all_traces: log = await store.get_cognition_log(t.trace_id) events = log.get("events", log.get("entries", [])) for e in events: if e.get("type") == "reflection" and not e.get("consumed_at"): reflections.append((t.trace_id, e)) if not reflections: logger.info("[Dream] 没有未消化的 reflection 事件") return 0, [], "" # 读当前记忆文件 existing_files = load_memory_files(memory_config) existing_by_path = {rel: (purpose, content) for rel, purpose, content in existing_files} user_content = _build_dream_input(reflections, existing_files, memory_config) prompt = memory_config.dream_prompt or DEFAULT_DREAM_PROMPT try: result = await llm_call( messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": user_content}, ], model=model, tools=None, temperature=0.3, ) except Exception as e: logger.error(f"[Dream] cross_trace_integrate LLM 调用失败: {e}") return 0, [], "" raw = (result.get("content") or "").strip() plan = _parse_dream_output(raw) if plan is None: logger.error(f"[Dream] LLM 输出无法解析为 JSON 计划,原文: {raw[:500]}") return 0, [], "" updated_paths: List[str] = [] base = Path(memory_config.base_path) for update in plan.get("updates", []): rel_path = update.get("path", "") new_content = update.get("new_content", "") if not rel_path: continue # 安全检查:禁止路径穿越 target = (base / rel_path).resolve() if not str(target).startswith(str(base.resolve())): logger.warning(f"[Dream] 拒绝写入 base_path 之外的路径: {rel_path}") continue target.parent.mkdir(parents=True, exist_ok=True) target.write_text(new_content, encoding="utf-8") updated_paths.append(rel_path) logger.info(f"[Dream] 已更新记忆文件: {rel_path} ({len(new_content)} chars)") # 标记所有参与的 reflection 为已消化 consumed_at = datetime.now().isoformat() for trace_id, event in reflections: log = await store.get_cognition_log(trace_id) events = log.get("events", log.get("entries", [])) target_ts = event.get("timestamp") for e in events: if ( e.get("type") == "reflection" and not e.get("consumed_at") and e.get("timestamp") == target_ts ): e["consumed_at"] = consumed_at log_file = store._get_cognition_log_file(trace_id) log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8") reasoning = plan.get("reasoning", "") return len(reflections), updated_paths, reasoning def _build_dream_input( reflections: List[Tuple[str, Dict[str, Any]]], existing_files: List[Tuple[str, str, str]], memory_config: MemoryConfig, ) -> str: """为 dream prompt 准备输入:反思摘要汇总 + 当前记忆文件 + 允许的文件路径。""" parts: List[str] = ["## 最近的反思摘要\n"] for trace_id, e in reflections: seq_range = e.get("sequence_range", [None, None]) parts.append( f"### trace {trace_id} (messages {seq_range[0]}-{seq_range[1]})\n" f"{e.get('summary', '')}\n" ) parts.append("\n## 当前记忆文件\n") if existing_files: parts.append(format_memory_injection(existing_files)) else: parts.append("(暂无记忆文件)") if memory_config.files: parts.append("\n## 允许更新/新增的文件路径\n") for key, purpose in memory_config.files.items(): parts.append(f"- `{key}`" + (f" — {purpose}" if purpose else "")) return "\n".join(parts) def _parse_dream_output(raw: str) -> Optional[Dict[str, Any]]: """解析 LLM 的 JSON 计划输出。容忍 ```json ... ``` 包裹。""" stripped = raw.strip() # 去除 markdown 代码块包裹 m = re.match(r"^```(?:json)?\s*(.*?)\s*```$", stripped, re.DOTALL) if m: stripped = m.group(1).strip() try: data = json.loads(stripped) except json.JSONDecodeError: return None if not isinstance(data, dict) or "updates" not in data: return None return data # ===== 顶层入口 ===== async def run_dream( store: FileSystemTraceStore, llm_call: LLMCall, memory_config: MemoryConfig, trace_filter: Optional[Callable[[Trace], bool]] = None, reflect_model: str = "gpt-4o-mini", dream_model: str = "gpt-4o", ) -> DreamReport: """执行完整的 dream 流程:per_trace_reflect → cross_trace_integrate。 Args: trace_filter: 筛选需要反思的 trace(例如按 agent_type 或 owner); None 表示扫描所有 trace reflect_model: per-trace 反思用的模型(轻量模型即可) dream_model: 跨 trace 整合用的模型(需要更强推理能力) """ report = DreamReport() if not memory_config.base_path: logger.warning("[Dream] memory_config.base_path 未配置,跳过") return report # Phase 1: per-trace reflect all_traces = await store.list_traces(limit=1000) if trace_filter: all_traces = [t for t in all_traces if trace_filter(t)] for t in all_traces: if (t.reflected_at_sequence or 0) >= t.last_sequence: continue try: summary = await per_trace_reflect( store, llm_call, t.trace_id, memory_config, model=reflect_model, ) if summary: report.per_trace_summaries[t.trace_id] = summary except Exception as e: logger.error(f"[Dream] per_trace_reflect 异常 {t.trace_id}: {e}") report.skipped_traces.append(t.trace_id) # Phase 2: cross-trace integrate try: consumed, updated, reasoning = await cross_trace_integrate( store, llm_call, memory_config, trace_filter=trace_filter, model=dream_model, ) report.consumed_reflection_count = consumed report.updated_files = updated report.reasoning = reasoning except Exception as e: logger.error(f"[Dream] cross_trace_integrate 异常: {e}") return report