| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- """
- Dream:记忆反思操作(Phase 3)
- 两阶段执行:
- per_trace_reflect → 为每个有新消息的 trace 生成反思摘要,写 cognition_log
- cross_trace_integrate → 汇总各 trace 的反思摘要 + 当前记忆文件,
- 用 dream_prompt 指导 LLM 更新记忆文件
- 对外入口:
- run_dream(store, llm_call, memory_config, trace_filter=None, model=...)
- """
- from __future__ import annotations
- import json
- import logging
- import re
- from dataclasses import dataclass, field
- from datetime import datetime
- from pathlib import Path
- from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
- from agent.core.memory import MemoryConfig, load_memory_files, format_memory_injection
- from agent.trace.models import Trace
- from agent.trace.store import FileSystemTraceStore
- logger = logging.getLogger(__name__)
- # ===== 默认 prompts =====
- DEFAULT_REFLECT_PROMPT = """你正在回顾一次 Agent 执行中发生的事情,为你自己(作为长期身份)的记忆做反思。
- 请综合下面的执行过程和知识使用情况,回答:
- 1. 这次执行中有什么值得记住的经验?(品味、判断、策略)
- 2. 哪些知识的评估反映了我的判断需要调整?
- 3. 用户的反馈(如果有)说明了什么?
- 用简洁的第一人称段落写,不要逐条列点,不要重复执行细节 —— 你在沉淀"这对未来的我意味着什么"。
- 只输出反思内容本身,不要任何其它前缀或 markdown 标题。"""
- DEFAULT_DREAM_PROMPT = """你正在整理自己的长期记忆。下面是你最近的反思摘要、以及当前各记忆文件的内容。
- 请决定哪些文件应该更新、内容怎么改。原则:
- - 只更新真正有新见解的文件,没有变化的就不要动
- - 在原有内容基础上演进,不是重写;保留仍然有效的旧内容
- - 简洁、人类可读的 markdown 格式
- - 新增文件必须是 MemoryConfig.files 已声明的路径(否则不会被下次加载)
- **严格按以下 JSON 格式输出,不要任何其它文字**:
- ```json
- {
- "updates": [
- {"path": "taste.md", "new_content": "完整的新文件内容"},
- {"path": "strategy.md", "new_content": "..."}
- ],
- "reasoning": "你为什么做这些更新(简短)"
- }
- ```
- 如果没有任何文件需要更新,输出 `{"updates": [], "reasoning": "..."}`。"""
- # ===== 数据结构 =====
- @dataclass
- class DreamReport:
- per_trace_summaries: Dict[str, str] = field(default_factory=dict) # {trace_id: summary}
- updated_files: List[str] = field(default_factory=list) # 实际写入的文件路径
- consumed_reflection_count: int = 0 # 本次消化了多少条 reflection
- reasoning: str = ""
- skipped_traces: List[str] = field(default_factory=list)
- LLMCall = Callable[..., Awaitable[Dict[str, Any]]]
- # ===== Per-trace 反思 =====
- async def per_trace_reflect(
- store: FileSystemTraceStore,
- llm_call: LLMCall,
- trace_id: str,
- memory_config: MemoryConfig,
- model: str = "gpt-4o-mini",
- ) -> Optional[str]:
- """为单个 trace 生成反思摘要,写入 cognition_log,更新 reflected_at_sequence。
- Returns:
- 反思摘要字符串;若 trace 没有新消息或 LLM 返回空,返回 None。
- """
- trace = await store.get_trace(trace_id)
- if not trace:
- logger.debug(f"[Dream] trace 不存在: {trace_id}")
- return None
- start_seq = (trace.reflected_at_sequence or 0) + 1
- end_seq = trace.last_sequence
- if start_seq > end_seq:
- logger.debug(f"[Dream] trace {trace_id} 没有新消息({start_seq} > {end_seq})")
- return None
- all_msgs = await store.get_trace_messages(trace_id)
- new_msgs = [m for m in all_msgs if start_seq <= m.sequence <= end_seq]
- if not new_msgs:
- logger.debug(f"[Dream] trace {trace_id} 范围内无消息")
- return None
- log = await store.get_cognition_log(trace_id)
- events = log.get("events", log.get("entries", []))
- relevant_events = [
- e for e in events
- if e.get("sequence") is not None
- and start_seq <= e["sequence"] <= end_seq
- and e.get("type") in ("query", "evaluation", "extraction_pending", "extraction_committed")
- ]
- user_content = _build_reflect_input(new_msgs, relevant_events)
- prompt = memory_config.reflect_prompt or DEFAULT_REFLECT_PROMPT
- try:
- result = await llm_call(
- messages=[
- {"role": "system", "content": prompt},
- {"role": "user", "content": user_content},
- ],
- model=model,
- tools=None,
- temperature=0.5,
- )
- except Exception as e:
- logger.error(f"[Dream] per_trace_reflect LLM 调用失败 {trace_id}: {e}")
- return None
- summary = (result.get("content") or "").strip()
- if not summary:
- logger.info(f"[Dream] trace {trace_id} 反思 LLM 返回空,视为无值得记录的内容")
- # 仍然更新 reflected_at_sequence,避免下次重复扫描
- await store.update_trace(trace_id, reflected_at_sequence=end_seq)
- return None
- await store.append_cognition_event(
- trace_id=trace_id,
- event={
- "type": "reflection",
- "sequence_range": [start_seq, end_seq],
- "summary": summary,
- },
- )
- await store.update_trace(trace_id, reflected_at_sequence=end_seq)
- logger.info(f"[Dream] trace {trace_id} 反思完成,覆盖 sequence {start_seq}-{end_seq}")
- return summary
- def _build_reflect_input(messages: List[Any], events: List[Dict[str, Any]]) -> str:
- """把消息和事件组织为 LLM 可读的反思输入。"""
- parts: List[str] = ["## 执行过程"]
- for m in messages:
- role = getattr(m, "role", "?")
- desc = getattr(m, "description", "") or ""
- seq = getattr(m, "sequence", "?")
- # 截断,防止单条过长
- parts.append(f"[{seq}] {role}: {desc[:500]}")
- if events:
- parts.append("\n## 知识使用与提取情况(来自 cognition_log)")
- for e in events:
- etype = e.get("type")
- if etype == "query":
- parts.append(
- f"- [{e.get('sequence')}] query: {e.get('query', '')[:100]} → "
- f"source_ids={e.get('source_ids', [])}"
- )
- elif etype == "evaluation":
- parts.append(
- f"- evaluation: knowledge_id={e.get('knowledge_id')} "
- f"result={e.get('eval_result')}"
- )
- elif etype == "extraction_pending":
- payload = e.get("payload", {})
- parts.append(
- f"- extraction_pending ({e.get('extraction_id')}): "
- f"{payload.get('task', '')[:80]}"
- )
- elif etype == "extraction_committed":
- parts.append(
- f"- extraction_committed: extraction={e.get('extraction_id')} "
- f"→ knowledge_id={e.get('knowledge_id')}"
- )
- return "\n".join(parts)
- # ===== 跨 trace 整合 =====
- async def cross_trace_integrate(
- store: FileSystemTraceStore,
- llm_call: LLMCall,
- memory_config: MemoryConfig,
- trace_filter: Optional[Callable[[Trace], bool]] = None,
- model: str = "gpt-4o",
- ) -> Tuple[int, List[str], str]:
- """汇总各 trace 未消化的 reflection 事件,用 LLM 更新记忆文件。
- Args:
- trace_filter: 可选的 trace 过滤函数(例如按 agent_type / owner);
- None 表示扫描 TraceStore 下所有 trace。
- Returns:
- (consumed_reflection_count, updated_file_paths, reasoning)
- """
- all_traces = await store.list_traces(limit=1000)
- if trace_filter:
- all_traces = [t for t in all_traces if trace_filter(t)]
- # 收集所有未消化的 reflection 事件
- reflections: List[Tuple[str, Dict[str, Any]]] = [] # [(trace_id, event)]
- for t in all_traces:
- log = await store.get_cognition_log(t.trace_id)
- events = log.get("events", log.get("entries", []))
- for e in events:
- if e.get("type") == "reflection" and not e.get("consumed_at"):
- reflections.append((t.trace_id, e))
- if not reflections:
- logger.info("[Dream] 没有未消化的 reflection 事件")
- return 0, [], ""
- # 读当前记忆文件
- existing_files = load_memory_files(memory_config)
- existing_by_path = {rel: (purpose, content) for rel, purpose, content in existing_files}
- user_content = _build_dream_input(reflections, existing_files, memory_config)
- prompt = memory_config.dream_prompt or DEFAULT_DREAM_PROMPT
- try:
- result = await llm_call(
- messages=[
- {"role": "system", "content": prompt},
- {"role": "user", "content": user_content},
- ],
- model=model,
- tools=None,
- temperature=0.3,
- )
- except Exception as e:
- logger.error(f"[Dream] cross_trace_integrate LLM 调用失败: {e}")
- return 0, [], ""
- raw = (result.get("content") or "").strip()
- plan = _parse_dream_output(raw)
- if plan is None:
- logger.error(f"[Dream] LLM 输出无法解析为 JSON 计划,原文: {raw[:500]}")
- return 0, [], ""
- updated_paths: List[str] = []
- base = Path(memory_config.base_path)
- for update in plan.get("updates", []):
- rel_path = update.get("path", "")
- new_content = update.get("new_content", "")
- if not rel_path:
- continue
- # 安全检查:禁止路径穿越
- target = (base / rel_path).resolve()
- if not str(target).startswith(str(base.resolve())):
- logger.warning(f"[Dream] 拒绝写入 base_path 之外的路径: {rel_path}")
- continue
- target.parent.mkdir(parents=True, exist_ok=True)
- target.write_text(new_content, encoding="utf-8")
- updated_paths.append(rel_path)
- logger.info(f"[Dream] 已更新记忆文件: {rel_path} ({len(new_content)} chars)")
- # 标记所有参与的 reflection 为已消化
- consumed_at = datetime.now().isoformat()
- for trace_id, event in reflections:
- log = await store.get_cognition_log(trace_id)
- events = log.get("events", log.get("entries", []))
- target_ts = event.get("timestamp")
- for e in events:
- if (
- e.get("type") == "reflection"
- and not e.get("consumed_at")
- and e.get("timestamp") == target_ts
- ):
- e["consumed_at"] = consumed_at
- log_file = store._get_cognition_log_file(trace_id)
- log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8")
- reasoning = plan.get("reasoning", "")
- return len(reflections), updated_paths, reasoning
- def _build_dream_input(
- reflections: List[Tuple[str, Dict[str, Any]]],
- existing_files: List[Tuple[str, str, str]],
- memory_config: MemoryConfig,
- ) -> str:
- """为 dream prompt 准备输入:反思摘要汇总 + 当前记忆文件 + 允许的文件路径。"""
- parts: List[str] = ["## 最近的反思摘要\n"]
- for trace_id, e in reflections:
- seq_range = e.get("sequence_range", [None, None])
- parts.append(
- f"### trace {trace_id} (messages {seq_range[0]}-{seq_range[1]})\n"
- f"{e.get('summary', '')}\n"
- )
- parts.append("\n## 当前记忆文件\n")
- if existing_files:
- parts.append(format_memory_injection(existing_files))
- else:
- parts.append("(暂无记忆文件)")
- if memory_config.files:
- parts.append("\n## 允许更新/新增的文件路径\n")
- for key, purpose in memory_config.files.items():
- parts.append(f"- `{key}`" + (f" — {purpose}" if purpose else ""))
- return "\n".join(parts)
- def _parse_dream_output(raw: str) -> Optional[Dict[str, Any]]:
- """解析 LLM 的 JSON 计划输出。容忍 ```json ... ``` 包裹。"""
- stripped = raw.strip()
- # 去除 markdown 代码块包裹
- m = re.match(r"^```(?:json)?\s*(.*?)\s*```$", stripped, re.DOTALL)
- if m:
- stripped = m.group(1).strip()
- try:
- data = json.loads(stripped)
- except json.JSONDecodeError:
- return None
- if not isinstance(data, dict) or "updates" not in data:
- return None
- return data
- # ===== 顶层入口 =====
- async def run_dream(
- store: FileSystemTraceStore,
- llm_call: LLMCall,
- memory_config: MemoryConfig,
- trace_filter: Optional[Callable[[Trace], bool]] = None,
- reflect_model: str = "gpt-4o-mini",
- dream_model: str = "gpt-4o",
- ) -> DreamReport:
- """执行完整的 dream 流程:per_trace_reflect → cross_trace_integrate。
- Args:
- trace_filter: 筛选需要反思的 trace(例如按 agent_type 或 owner);
- None 表示扫描所有 trace
- reflect_model: per-trace 反思用的模型(轻量模型即可)
- dream_model: 跨 trace 整合用的模型(需要更强推理能力)
- """
- report = DreamReport()
- if not memory_config.base_path:
- logger.warning("[Dream] memory_config.base_path 未配置,跳过")
- return report
- # Phase 1: per-trace reflect
- all_traces = await store.list_traces(limit=1000)
- if trace_filter:
- all_traces = [t for t in all_traces if trace_filter(t)]
- for t in all_traces:
- if (t.reflected_at_sequence or 0) >= t.last_sequence:
- continue
- try:
- summary = await per_trace_reflect(
- store, llm_call, t.trace_id, memory_config, model=reflect_model,
- )
- if summary:
- report.per_trace_summaries[t.trace_id] = summary
- except Exception as e:
- logger.error(f"[Dream] per_trace_reflect 异常 {t.trace_id}: {e}")
- report.skipped_traces.append(t.trace_id)
- # Phase 2: cross-trace integrate
- try:
- consumed, updated, reasoning = await cross_trace_integrate(
- store, llm_call, memory_config,
- trace_filter=trace_filter, model=dream_model,
- )
- report.consumed_reflection_count = consumed
- report.updated_files = updated
- report.reasoning = reasoning
- except Exception as e:
- logger.error(f"[Dream] cross_trace_integrate 异常: {e}")
- return report
|