dream.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. """
  2. Dream:记忆反思操作(Phase 3)
  3. 两阶段执行:
  4. per_trace_reflect → 为每个有新消息的 trace 生成反思摘要,写 cognition_log
  5. cross_trace_integrate → 汇总各 trace 的反思摘要 + 当前记忆文件,
  6. 用 dream_prompt 指导 LLM 更新记忆文件
  7. 对外入口:
  8. run_dream(store, llm_call, memory_config, trace_filter=None, model=...)
  9. """
  10. from __future__ import annotations
  11. import json
  12. import logging
  13. import re
  14. from dataclasses import dataclass, field
  15. from datetime import datetime
  16. from pathlib import Path
  17. from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
  18. from agent.core.memory import MemoryConfig, load_memory_files, format_memory_injection
  19. from agent.trace.models import Trace
  20. from agent.trace.store import FileSystemTraceStore
  21. logger = logging.getLogger(__name__)
  22. # ===== 默认 prompts =====
  23. DEFAULT_REFLECT_PROMPT = """你正在回顾一次 Agent 执行中发生的事情,为你自己(作为长期身份)的记忆做反思。
  24. 请综合下面的执行过程和知识使用情况,回答:
  25. 1. 这次执行中有什么值得记住的经验?(品味、判断、策略)
  26. 2. 哪些知识的评估反映了我的判断需要调整?
  27. 3. 用户的反馈(如果有)说明了什么?
  28. 用简洁的第一人称段落写,不要逐条列点,不要重复执行细节 —— 你在沉淀"这对未来的我意味着什么"。
  29. 只输出反思内容本身,不要任何其它前缀或 markdown 标题。"""
  30. DEFAULT_DREAM_PROMPT = """你正在整理自己的长期记忆。下面是你最近的反思摘要、以及当前各记忆文件的内容。
  31. 请决定哪些文件应该更新、内容怎么改。原则:
  32. - 只更新真正有新见解的文件,没有变化的就不要动
  33. - 在原有内容基础上演进,不是重写;保留仍然有效的旧内容
  34. - 简洁、人类可读的 markdown 格式
  35. - 新增文件必须是 MemoryConfig.files 已声明的路径(否则不会被下次加载)
  36. **严格按以下 JSON 格式输出,不要任何其它文字**:
  37. ```json
  38. {
  39. "updates": [
  40. {"path": "taste.md", "new_content": "完整的新文件内容"},
  41. {"path": "strategy.md", "new_content": "..."}
  42. ],
  43. "reasoning": "你为什么做这些更新(简短)"
  44. }
  45. ```
  46. 如果没有任何文件需要更新,输出 `{"updates": [], "reasoning": "..."}`。"""
  47. # ===== 数据结构 =====
  48. @dataclass
  49. class DreamReport:
  50. per_trace_summaries: Dict[str, str] = field(default_factory=dict) # {trace_id: summary}
  51. updated_files: List[str] = field(default_factory=list) # 实际写入的文件路径
  52. consumed_reflection_count: int = 0 # 本次消化了多少条 reflection
  53. reasoning: str = ""
  54. skipped_traces: List[str] = field(default_factory=list)
  55. LLMCall = Callable[..., Awaitable[Dict[str, Any]]]
  56. # ===== Per-trace 反思 =====
  57. async def per_trace_reflect(
  58. store: FileSystemTraceStore,
  59. llm_call: LLMCall,
  60. trace_id: str,
  61. memory_config: MemoryConfig,
  62. model: str = "gpt-4o-mini",
  63. ) -> Optional[str]:
  64. """为单个 trace 生成反思摘要,写入 cognition_log,更新 reflected_at_sequence。
  65. Returns:
  66. 反思摘要字符串;若 trace 没有新消息或 LLM 返回空,返回 None。
  67. """
  68. trace = await store.get_trace(trace_id)
  69. if not trace:
  70. logger.debug(f"[Dream] trace 不存在: {trace_id}")
  71. return None
  72. start_seq = (trace.reflected_at_sequence or 0) + 1
  73. end_seq = trace.last_sequence
  74. if start_seq > end_seq:
  75. logger.debug(f"[Dream] trace {trace_id} 没有新消息({start_seq} > {end_seq})")
  76. return None
  77. all_msgs = await store.get_trace_messages(trace_id)
  78. new_msgs = [m for m in all_msgs if start_seq <= m.sequence <= end_seq]
  79. if not new_msgs:
  80. logger.debug(f"[Dream] trace {trace_id} 范围内无消息")
  81. return None
  82. log = await store.get_cognition_log(trace_id)
  83. events = log.get("events", log.get("entries", []))
  84. relevant_events = [
  85. e for e in events
  86. if e.get("sequence") is not None
  87. and start_seq <= e["sequence"] <= end_seq
  88. and e.get("type") in ("query", "evaluation", "extraction_pending", "extraction_committed")
  89. ]
  90. user_content = _build_reflect_input(new_msgs, relevant_events)
  91. prompt = memory_config.reflect_prompt or DEFAULT_REFLECT_PROMPT
  92. try:
  93. result = await llm_call(
  94. messages=[
  95. {"role": "system", "content": prompt},
  96. {"role": "user", "content": user_content},
  97. ],
  98. model=model,
  99. tools=None,
  100. temperature=0.5,
  101. )
  102. except Exception as e:
  103. logger.error(f"[Dream] per_trace_reflect LLM 调用失败 {trace_id}: {e}")
  104. return None
  105. summary = (result.get("content") or "").strip()
  106. if not summary:
  107. logger.info(f"[Dream] trace {trace_id} 反思 LLM 返回空,视为无值得记录的内容")
  108. # 仍然更新 reflected_at_sequence,避免下次重复扫描
  109. await store.update_trace(trace_id, reflected_at_sequence=end_seq)
  110. return None
  111. await store.append_cognition_event(
  112. trace_id=trace_id,
  113. event={
  114. "type": "reflection",
  115. "sequence_range": [start_seq, end_seq],
  116. "summary": summary,
  117. },
  118. )
  119. await store.update_trace(trace_id, reflected_at_sequence=end_seq)
  120. logger.info(f"[Dream] trace {trace_id} 反思完成,覆盖 sequence {start_seq}-{end_seq}")
  121. return summary
  122. def _build_reflect_input(messages: List[Any], events: List[Dict[str, Any]]) -> str:
  123. """把消息和事件组织为 LLM 可读的反思输入。"""
  124. parts: List[str] = ["## 执行过程"]
  125. for m in messages:
  126. role = getattr(m, "role", "?")
  127. desc = getattr(m, "description", "") or ""
  128. seq = getattr(m, "sequence", "?")
  129. # 截断,防止单条过长
  130. parts.append(f"[{seq}] {role}: {desc[:500]}")
  131. if events:
  132. parts.append("\n## 知识使用与提取情况(来自 cognition_log)")
  133. for e in events:
  134. etype = e.get("type")
  135. if etype == "query":
  136. parts.append(
  137. f"- [{e.get('sequence')}] query: {e.get('query', '')[:100]} → "
  138. f"source_ids={e.get('source_ids', [])}"
  139. )
  140. elif etype == "evaluation":
  141. parts.append(
  142. f"- evaluation: knowledge_id={e.get('knowledge_id')} "
  143. f"result={e.get('eval_result')}"
  144. )
  145. elif etype == "extraction_pending":
  146. payload = e.get("payload", {})
  147. parts.append(
  148. f"- extraction_pending ({e.get('extraction_id')}): "
  149. f"{payload.get('task', '')[:80]}"
  150. )
  151. elif etype == "extraction_committed":
  152. parts.append(
  153. f"- extraction_committed: extraction={e.get('extraction_id')} "
  154. f"→ knowledge_id={e.get('knowledge_id')}"
  155. )
  156. return "\n".join(parts)
  157. # ===== 跨 trace 整合 =====
  158. async def cross_trace_integrate(
  159. store: FileSystemTraceStore,
  160. llm_call: LLMCall,
  161. memory_config: MemoryConfig,
  162. trace_filter: Optional[Callable[[Trace], bool]] = None,
  163. model: str = "gpt-4o",
  164. ) -> Tuple[int, List[str], str]:
  165. """汇总各 trace 未消化的 reflection 事件,用 LLM 更新记忆文件。
  166. Args:
  167. trace_filter: 可选的 trace 过滤函数(例如按 agent_type / owner);
  168. None 表示扫描 TraceStore 下所有 trace。
  169. Returns:
  170. (consumed_reflection_count, updated_file_paths, reasoning)
  171. """
  172. all_traces = await store.list_traces(limit=1000)
  173. if trace_filter:
  174. all_traces = [t for t in all_traces if trace_filter(t)]
  175. # 收集所有未消化的 reflection 事件
  176. reflections: List[Tuple[str, Dict[str, Any]]] = [] # [(trace_id, event)]
  177. for t in all_traces:
  178. log = await store.get_cognition_log(t.trace_id)
  179. events = log.get("events", log.get("entries", []))
  180. for e in events:
  181. if e.get("type") == "reflection" and not e.get("consumed_at"):
  182. reflections.append((t.trace_id, e))
  183. if not reflections:
  184. logger.info("[Dream] 没有未消化的 reflection 事件")
  185. return 0, [], ""
  186. # 读当前记忆文件
  187. existing_files = load_memory_files(memory_config)
  188. existing_by_path = {rel: (purpose, content) for rel, purpose, content in existing_files}
  189. user_content = _build_dream_input(reflections, existing_files, memory_config)
  190. prompt = memory_config.dream_prompt or DEFAULT_DREAM_PROMPT
  191. try:
  192. result = await llm_call(
  193. messages=[
  194. {"role": "system", "content": prompt},
  195. {"role": "user", "content": user_content},
  196. ],
  197. model=model,
  198. tools=None,
  199. temperature=0.3,
  200. )
  201. except Exception as e:
  202. logger.error(f"[Dream] cross_trace_integrate LLM 调用失败: {e}")
  203. return 0, [], ""
  204. raw = (result.get("content") or "").strip()
  205. plan = _parse_dream_output(raw)
  206. if plan is None:
  207. logger.error(f"[Dream] LLM 输出无法解析为 JSON 计划,原文: {raw[:500]}")
  208. return 0, [], ""
  209. updated_paths: List[str] = []
  210. base = Path(memory_config.base_path)
  211. for update in plan.get("updates", []):
  212. rel_path = update.get("path", "")
  213. new_content = update.get("new_content", "")
  214. if not rel_path:
  215. continue
  216. # 安全检查:禁止路径穿越
  217. target = (base / rel_path).resolve()
  218. if not str(target).startswith(str(base.resolve())):
  219. logger.warning(f"[Dream] 拒绝写入 base_path 之外的路径: {rel_path}")
  220. continue
  221. target.parent.mkdir(parents=True, exist_ok=True)
  222. target.write_text(new_content, encoding="utf-8")
  223. updated_paths.append(rel_path)
  224. logger.info(f"[Dream] 已更新记忆文件: {rel_path} ({len(new_content)} chars)")
  225. # 标记所有参与的 reflection 为已消化
  226. consumed_at = datetime.now().isoformat()
  227. for trace_id, event in reflections:
  228. log = await store.get_cognition_log(trace_id)
  229. events = log.get("events", log.get("entries", []))
  230. target_ts = event.get("timestamp")
  231. for e in events:
  232. if (
  233. e.get("type") == "reflection"
  234. and not e.get("consumed_at")
  235. and e.get("timestamp") == target_ts
  236. ):
  237. e["consumed_at"] = consumed_at
  238. log_file = store._get_cognition_log_file(trace_id)
  239. log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8")
  240. reasoning = plan.get("reasoning", "")
  241. return len(reflections), updated_paths, reasoning
  242. def _build_dream_input(
  243. reflections: List[Tuple[str, Dict[str, Any]]],
  244. existing_files: List[Tuple[str, str, str]],
  245. memory_config: MemoryConfig,
  246. ) -> str:
  247. """为 dream prompt 准备输入:反思摘要汇总 + 当前记忆文件 + 允许的文件路径。"""
  248. parts: List[str] = ["## 最近的反思摘要\n"]
  249. for trace_id, e in reflections:
  250. seq_range = e.get("sequence_range", [None, None])
  251. parts.append(
  252. f"### trace {trace_id} (messages {seq_range[0]}-{seq_range[1]})\n"
  253. f"{e.get('summary', '')}\n"
  254. )
  255. parts.append("\n## 当前记忆文件\n")
  256. if existing_files:
  257. parts.append(format_memory_injection(existing_files))
  258. else:
  259. parts.append("(暂无记忆文件)")
  260. if memory_config.files:
  261. parts.append("\n## 允许更新/新增的文件路径\n")
  262. for key, purpose in memory_config.files.items():
  263. parts.append(f"- `{key}`" + (f" — {purpose}" if purpose else ""))
  264. return "\n".join(parts)
  265. def _parse_dream_output(raw: str) -> Optional[Dict[str, Any]]:
  266. """解析 LLM 的 JSON 计划输出。容忍 ```json ... ``` 包裹。"""
  267. stripped = raw.strip()
  268. # 去除 markdown 代码块包裹
  269. m = re.match(r"^```(?:json)?\s*(.*?)\s*```$", stripped, re.DOTALL)
  270. if m:
  271. stripped = m.group(1).strip()
  272. try:
  273. data = json.loads(stripped)
  274. except json.JSONDecodeError:
  275. return None
  276. if not isinstance(data, dict) or "updates" not in data:
  277. return None
  278. return data
  279. # ===== 顶层入口 =====
  280. async def run_dream(
  281. store: FileSystemTraceStore,
  282. llm_call: LLMCall,
  283. memory_config: MemoryConfig,
  284. trace_filter: Optional[Callable[[Trace], bool]] = None,
  285. reflect_model: str = "gpt-4o-mini",
  286. dream_model: str = "gpt-4o",
  287. ) -> DreamReport:
  288. """执行完整的 dream 流程:per_trace_reflect → cross_trace_integrate。
  289. Args:
  290. trace_filter: 筛选需要反思的 trace(例如按 agent_type 或 owner);
  291. None 表示扫描所有 trace
  292. reflect_model: per-trace 反思用的模型(轻量模型即可)
  293. dream_model: 跨 trace 整合用的模型(需要更强推理能力)
  294. """
  295. report = DreamReport()
  296. if not memory_config.base_path:
  297. logger.warning("[Dream] memory_config.base_path 未配置,跳过")
  298. return report
  299. # Phase 1: per-trace reflect
  300. all_traces = await store.list_traces(limit=1000)
  301. if trace_filter:
  302. all_traces = [t for t in all_traces if trace_filter(t)]
  303. for t in all_traces:
  304. if (t.reflected_at_sequence or 0) >= t.last_sequence:
  305. continue
  306. try:
  307. summary = await per_trace_reflect(
  308. store, llm_call, t.trace_id, memory_config, model=reflect_model,
  309. )
  310. if summary:
  311. report.per_trace_summaries[t.trace_id] = summary
  312. except Exception as e:
  313. logger.error(f"[Dream] per_trace_reflect 异常 {t.trace_id}: {e}")
  314. report.skipped_traces.append(t.trace_id)
  315. # Phase 2: cross-trace integrate
  316. try:
  317. consumed, updated, reasoning = await cross_trace_integrate(
  318. store, llm_call, memory_config,
  319. trace_filter=trace_filter, model=dream_model,
  320. )
  321. report.consumed_reflection_count = consumed
  322. report.updated_files = updated
  323. report.reasoning = reasoning
  324. except Exception as e:
  325. logger.error(f"[Dream] cross_trace_integrate 异常: {e}")
  326. return report