Talegorithm 19 часов назад
Родитель
Сommit
13024d5c93

+ 267 - 0
agent/cli/extraction_review.py

@@ -0,0 +1,267 @@
+"""
+提取审核交互式 CLI
+
+用途
+----
+反思侧分支产出的知识条目默认写为 cognition_log: type="extraction_pending",
+不会直接上传到 KnowHub。本 CLI 提供人工审核 + 批量提交入口。
+
+两种入口(共享同一核心逻辑,见 agent/trace/extraction_review.py):
+- 独立脚本:python -m agent.cli.extraction_review --trace <TRACE_ID> [--list|--review|--commit]
+- interactive.py 菜单项 8/9(见 agent/cli/interactive.py)
+
+用法示例
+--------
+# 查看当前 trace 的所有未审核条目
+python -m agent.cli.extraction_review --trace abc-123 --list
+
+# 交互式逐条审核
+python -m agent.cli.extraction_review --trace abc-123 --review
+
+# 把已 approved 的条目批量提交到 KnowHub
+python -m agent.cli.extraction_review --trace abc-123 --commit
+
+# 一条龙:review 完直接 commit
+python -m agent.cli.extraction_review --trace abc-123
+"""
+
+from __future__ import annotations
+
+import argparse
+import asyncio
+import json
+import sys
+from pathlib import Path
+from typing import List, Optional
+
+from agent.trace.store import FileSystemTraceStore
+from agent.trace.extraction_review import (
+    PendingExtraction,
+    CommitReport,
+    list_pending,
+    review_one,
+    commit_approved,
+)
+
+
+# ===== 打印工具 =====
+
+_SEP = "─" * 60
+
+
+def _format_payload(payload: dict, max_content: int = 400) -> str:
+    task = payload.get("task", "")
+    content = payload.get("content", "")
+    types = payload.get("types", [])
+    tags = payload.get("tags", {})
+    score = payload.get("score", 0)
+    resource_ids = payload.get("resource_ids", [])
+
+    if len(content) > max_content:
+        content = content[:max_content] + "…(truncated)"
+
+    lines = [
+        f"task:  {task}",
+        f"types: {types}   score: {score}",
+    ]
+    if tags:
+        lines.append(f"tags:  {tags}")
+    if resource_ids:
+        lines.append(f"resources: {resource_ids}")
+    lines.append("")
+    lines.append(content)
+    return "\n".join(lines)
+
+
+def _print_pending(p: PendingExtraction, index: int, total: int) -> None:
+    state = ""
+    if p.committed:
+        state = " [已提交]"
+    elif p.reviewed:
+        state = f" [已审核: {p.decision}]"
+    print()
+    print(f"[{index}/{total}] {p.extraction_id}{state}")
+    print(_SEP)
+    print(_format_payload(p.payload))
+    print(_SEP)
+
+
+def _print_report(report: CommitReport) -> None:
+    print()
+    print("=" * 60)
+    print("提交结果")
+    print("=" * 60)
+    print(f"✅ 成功: {len(report.committed)}")
+    for eid, kid in zip(report.committed, report.knowledge_ids):
+        print(f"   - {eid} → knowledge_id={kid}")
+    if report.failed:
+        print(f"❌ 失败: {len(report.failed)}")
+        for item in report.failed:
+            print(f"   - {item['extraction_id']}: {item['error']}")
+    if report.skipped:
+        print(f"⏭  跳过: {len(report.skipped)}(未 approved 或已提交)")
+    print("=" * 60)
+
+
+# ===== 交互式编辑 =====
+
+def _prompt_edit(payload: dict) -> Optional[dict]:
+    """进入交互式文本编辑模式,返回修改后的 payload(None 表示取消)。
+
+    初版只支持改 task/content/score/tags(最常用字段)。
+    """
+    print("\n编辑模式(空行回车保留原值)")
+    task = input(f"task   [{payload.get('task', '')[:50]}]: ").strip()
+    content_default = payload.get("content", "")
+    print(f"content 当前:\n{content_default}\n")
+    print("输入新 content(单行回车保留原值;多行请在末尾输入 `.` 单独成行结束):")
+    content = _read_multiline_or_keep(content_default)
+    score_raw = input(f"score  [{payload.get('score', 3)}]: ").strip()
+    tags_raw = input(f"tags JSON  [{json.dumps(payload.get('tags', {}), ensure_ascii=False)}]: ").strip()
+
+    new_payload = dict(payload)
+    if task:
+        new_payload["task"] = task
+    if content is not None:
+        new_payload["content"] = content
+    if score_raw:
+        try:
+            new_payload["score"] = int(score_raw)
+        except ValueError:
+            print(f"⚠ score 不是整数,保留原值 {payload.get('score', 3)}")
+    if tags_raw:
+        try:
+            new_payload["tags"] = json.loads(tags_raw)
+        except json.JSONDecodeError as e:
+            print(f"⚠ tags 不是合法 JSON({e}),保留原值")
+
+    confirm = input("\n保存修改?[y/N]: ").strip().lower()
+    if confirm != "y":
+        return None
+    return new_payload
+
+
+def _read_multiline_or_keep(default: str) -> Optional[str]:
+    """单行输入则直接返回(空行表示保留默认);
+    如果输入 `<<` 则进入多行模式,直到 `.` 单独成行结束。"""
+    first = input("> ")
+    if not first.strip():
+        return None
+    if first.strip() != "<<":
+        return first
+    lines = []
+    while True:
+        line = input()
+        if line.strip() == ".":
+            break
+        lines.append(line)
+    return "\n".join(lines)
+
+
+# ===== 三种命令 =====
+
+async def cmd_list(store: FileSystemTraceStore, trace_id: str, show_all: bool) -> int:
+    pendings = await list_pending(store, trace_id, include_reviewed=show_all)
+    if not pendings:
+        msg = "没有" + ("任何提取记录" if show_all else "待审核的提取条目")
+        print(f"trace {trace_id}: {msg}")
+        return 0
+    print(f"trace {trace_id}: 共 {len(pendings)} 条{'' if show_all else '待审核'}")
+    for i, p in enumerate(pendings, 1):
+        _print_pending(p, i, len(pendings))
+    return 0
+
+
+async def cmd_review(store: FileSystemTraceStore, trace_id: str) -> int:
+    pendings = await list_pending(store, trace_id, include_reviewed=False)
+    if not pendings:
+        print(f"trace {trace_id}: 没有待审核的提取条目")
+        return 0
+
+    print(f"trace {trace_id}: 开始审核 {len(pendings)} 条")
+    for i, p in enumerate(pendings, 1):
+        _print_pending(p, i, len(pendings))
+        while True:
+            choice = input("[a]pprove / [e]dit / [d]iscard / [s]kip / [q]uit: ").strip().lower()
+            if choice in ("a", "approve"):
+                await review_one(store, trace_id, p.extraction_id, "approve")
+                print(f"✓ {p.extraction_id} approved")
+                break
+            elif choice in ("d", "discard"):
+                await review_one(store, trace_id, p.extraction_id, "discard")
+                print(f"✗ {p.extraction_id} discarded")
+                break
+            elif choice in ("s", "skip"):
+                print(f"⏭ {p.extraction_id} skipped(保留为 pending)")
+                break
+            elif choice in ("q", "quit"):
+                print("退出审核")
+                return 0
+            elif choice in ("e", "edit"):
+                edited = _prompt_edit(p.payload)
+                if edited is None:
+                    print("取消编辑,请重选")
+                    continue
+                await review_one(store, trace_id, p.extraction_id, "edit", edited_payload=edited)
+                print(f"✎ {p.extraction_id} edited & approved")
+                break
+            else:
+                print("无效选项,请输入 a/e/d/s/q")
+    return 0
+
+
+async def cmd_commit(store: FileSystemTraceStore, trace_id: str) -> int:
+    report = await commit_approved(store, trace_id)
+    _print_report(report)
+    return 0 if not report.failed else 1
+
+
+# ===== argparse 入口 =====
+
+def build_parser() -> argparse.ArgumentParser:
+    p = argparse.ArgumentParser(
+        prog="python -m agent.cli.extraction_review",
+        description="审核并提交反思侧分支暂存的待审核知识条目。",
+    )
+    p.add_argument("--trace", required=True, help="Trace ID")
+    p.add_argument("--base-path", default=".trace", help="TraceStore 根目录(默认 .trace)")
+    group = p.add_mutually_exclusive_group()
+    group.add_argument("--list", action="store_true", help="仅列出未审核条目")
+    group.add_argument("--list-all", action="store_true", help="列出全部条目(含已审核/已提交)")
+    group.add_argument("--review", action="store_true", help="进入交互式审核(不自动 commit)")
+    group.add_argument("--commit", action="store_true", help="仅批量提交已 approved 的条目")
+    return p
+
+
+async def _main_async(args: argparse.Namespace) -> int:
+    if not Path(args.base_path).exists():
+        print(f"❌ TraceStore 根目录不存在: {args.base_path}", file=sys.stderr)
+        return 2
+    store = FileSystemTraceStore(base_path=args.base_path)
+
+    if args.list or args.list_all:
+        return await cmd_list(store, args.trace, show_all=args.list_all)
+    if args.review:
+        return await cmd_review(store, args.trace)
+    if args.commit:
+        return await cmd_commit(store, args.trace)
+
+    # 默认:review 完紧接着 commit
+    rc = await cmd_review(store, args.trace)
+    if rc != 0:
+        return rc
+    print()
+    confirm = input("现在把已 approved 的条目提交到 KnowHub?[Y/n]: ").strip().lower()
+    if confirm in ("", "y", "yes"):
+        return await cmd_commit(store, args.trace)
+    print("未提交。需要时运行 `--commit` 子命令。")
+    return 0
+
+
+def main() -> int:
+    args = build_parser().parse_args()
+    return asyncio.run(_main_async(args))
+
+
+if __name__ == "__main__":
+    sys.exit(main())

+ 15 - 1
agent/cli/interactive.py

@@ -177,10 +177,12 @@ class InteractiveController:
         print("  5. 从指定消息续跑")
         print("  6. 继续执行")
         print("  7. 停止执行")
+        print("  8. 审核待提交知识(review pending extractions)")
+        print("  9. 提交已审核知识到 KnowHub(commit approved)")
         print("=" * 60)
 
         while True:
-            choice = input("请输入选项 (1-7): ").strip()
+            choice = input("请输入选项 (1-9): ").strip()
 
             if choice == "1":
                 # 插入干预消息
@@ -237,6 +239,18 @@ class InteractiveController:
                 print("\n停止执行...")
                 return {"action": "stop"}
 
+            elif choice == "8":
+                # 审核待提交知识(复用 agent/cli/extraction_review.py 的交互式 review)
+                from agent.cli.extraction_review import cmd_review
+                await cmd_review(self.store, trace_id)
+                continue
+
+            elif choice == "9":
+                # 提交已审核知识到 KnowHub
+                from agent.cli.extraction_review import cmd_commit
+                await cmd_commit(self.store, trace_id)
+                continue
+
             else:
                 print("无效选项,请重新输入")
 

+ 393 - 0
agent/core/dream.py

@@ -0,0 +1,393 @@
+"""
+Dream:记忆反思操作(Phase 3)
+
+两阶段执行:
+    per_trace_reflect    → 为每个有新消息的 trace 生成反思摘要,写 cognition_log
+    cross_trace_integrate → 汇总各 trace 的反思摘要 + 当前记忆文件,
+                             用 dream_prompt 指导 LLM 更新记忆文件
+
+对外入口:
+    run_dream(store, llm_call, memory_config, trace_filter=None, model=...)
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import re
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
+
+from agent.core.memory import MemoryConfig, load_memory_files, format_memory_injection
+from agent.trace.models import Trace
+from agent.trace.store import FileSystemTraceStore
+
+logger = logging.getLogger(__name__)
+
+
+# ===== 默认 prompts =====
+
+DEFAULT_REFLECT_PROMPT = """你正在回顾一次 Agent 执行中发生的事情,为你自己(作为长期身份)的记忆做反思。
+
+请综合下面的执行过程和知识使用情况,回答:
+1. 这次执行中有什么值得记住的经验?(品味、判断、策略)
+2. 哪些知识的评估反映了我的判断需要调整?
+3. 用户的反馈(如果有)说明了什么?
+
+用简洁的第一人称段落写,不要逐条列点,不要重复执行细节 —— 你在沉淀"这对未来的我意味着什么"。
+只输出反思内容本身,不要任何其它前缀或 markdown 标题。"""
+
+
+DEFAULT_DREAM_PROMPT = """你正在整理自己的长期记忆。下面是你最近的反思摘要、以及当前各记忆文件的内容。
+
+请决定哪些文件应该更新、内容怎么改。原则:
+- 只更新真正有新见解的文件,没有变化的就不要动
+- 在原有内容基础上演进,不是重写;保留仍然有效的旧内容
+- 简洁、人类可读的 markdown 格式
+- 新增文件必须是 MemoryConfig.files 已声明的路径(否则不会被下次加载)
+
+**严格按以下 JSON 格式输出,不要任何其它文字**:
+
+```json
+{
+  "updates": [
+    {"path": "taste.md", "new_content": "完整的新文件内容"},
+    {"path": "strategy.md", "new_content": "..."}
+  ],
+  "reasoning": "你为什么做这些更新(简短)"
+}
+```
+
+如果没有任何文件需要更新,输出 `{"updates": [], "reasoning": "..."}`。"""
+
+
+# ===== 数据结构 =====
+
+@dataclass
+class DreamReport:
+    per_trace_summaries: Dict[str, str] = field(default_factory=dict)  # {trace_id: summary}
+    updated_files: List[str] = field(default_factory=list)             # 实际写入的文件路径
+    consumed_reflection_count: int = 0                                  # 本次消化了多少条 reflection
+    reasoning: str = ""
+    skipped_traces: List[str] = field(default_factory=list)
+
+
+LLMCall = Callable[..., Awaitable[Dict[str, Any]]]
+
+
+# ===== Per-trace 反思 =====
+
+async def per_trace_reflect(
+    store: FileSystemTraceStore,
+    llm_call: LLMCall,
+    trace_id: str,
+    memory_config: MemoryConfig,
+    model: str = "gpt-4o-mini",
+) -> Optional[str]:
+    """为单个 trace 生成反思摘要,写入 cognition_log,更新 reflected_at_sequence。
+
+    Returns:
+        反思摘要字符串;若 trace 没有新消息或 LLM 返回空,返回 None。
+    """
+    trace = await store.get_trace(trace_id)
+    if not trace:
+        logger.debug(f"[Dream] trace 不存在: {trace_id}")
+        return None
+
+    start_seq = (trace.reflected_at_sequence or 0) + 1
+    end_seq = trace.last_sequence
+    if start_seq > end_seq:
+        logger.debug(f"[Dream] trace {trace_id} 没有新消息({start_seq} > {end_seq})")
+        return None
+
+    all_msgs = await store.get_trace_messages(trace_id)
+    new_msgs = [m for m in all_msgs if start_seq <= m.sequence <= end_seq]
+    if not new_msgs:
+        logger.debug(f"[Dream] trace {trace_id} 范围内无消息")
+        return None
+
+    log = await store.get_cognition_log(trace_id)
+    events = log.get("events", log.get("entries", []))
+    relevant_events = [
+        e for e in events
+        if e.get("sequence") is not None
+        and start_seq <= e["sequence"] <= end_seq
+        and e.get("type") in ("query", "evaluation", "extraction_pending", "extraction_committed")
+    ]
+
+    user_content = _build_reflect_input(new_msgs, relevant_events)
+    prompt = memory_config.reflect_prompt or DEFAULT_REFLECT_PROMPT
+
+    try:
+        result = await llm_call(
+            messages=[
+                {"role": "system", "content": prompt},
+                {"role": "user", "content": user_content},
+            ],
+            model=model,
+            tools=None,
+            temperature=0.5,
+        )
+    except Exception as e:
+        logger.error(f"[Dream] per_trace_reflect LLM 调用失败 {trace_id}: {e}")
+        return None
+
+    summary = (result.get("content") or "").strip()
+    if not summary:
+        logger.info(f"[Dream] trace {trace_id} 反思 LLM 返回空,视为无值得记录的内容")
+        # 仍然更新 reflected_at_sequence,避免下次重复扫描
+        await store.update_trace(trace_id, reflected_at_sequence=end_seq)
+        return None
+
+    await store.append_cognition_event(
+        trace_id=trace_id,
+        event={
+            "type": "reflection",
+            "sequence_range": [start_seq, end_seq],
+            "summary": summary,
+        },
+    )
+    await store.update_trace(trace_id, reflected_at_sequence=end_seq)
+    logger.info(f"[Dream] trace {trace_id} 反思完成,覆盖 sequence {start_seq}-{end_seq}")
+    return summary
+
+
+def _build_reflect_input(messages: List[Any], events: List[Dict[str, Any]]) -> str:
+    """把消息和事件组织为 LLM 可读的反思输入。"""
+    parts: List[str] = ["## 执行过程"]
+    for m in messages:
+        role = getattr(m, "role", "?")
+        desc = getattr(m, "description", "") or ""
+        seq = getattr(m, "sequence", "?")
+        # 截断,防止单条过长
+        parts.append(f"[{seq}] {role}: {desc[:500]}")
+
+    if events:
+        parts.append("\n## 知识使用与提取情况(来自 cognition_log)")
+        for e in events:
+            etype = e.get("type")
+            if etype == "query":
+                parts.append(
+                    f"- [{e.get('sequence')}] query: {e.get('query', '')[:100]} → "
+                    f"source_ids={e.get('source_ids', [])}"
+                )
+            elif etype == "evaluation":
+                parts.append(
+                    f"- evaluation: knowledge_id={e.get('knowledge_id')} "
+                    f"result={e.get('eval_result')}"
+                )
+            elif etype == "extraction_pending":
+                payload = e.get("payload", {})
+                parts.append(
+                    f"- extraction_pending ({e.get('extraction_id')}): "
+                    f"{payload.get('task', '')[:80]}"
+                )
+            elif etype == "extraction_committed":
+                parts.append(
+                    f"- extraction_committed: extraction={e.get('extraction_id')} "
+                    f"→ knowledge_id={e.get('knowledge_id')}"
+                )
+    return "\n".join(parts)
+
+
+# ===== 跨 trace 整合 =====
+
+async def cross_trace_integrate(
+    store: FileSystemTraceStore,
+    llm_call: LLMCall,
+    memory_config: MemoryConfig,
+    trace_filter: Optional[Callable[[Trace], bool]] = None,
+    model: str = "gpt-4o",
+) -> Tuple[int, List[str], str]:
+    """汇总各 trace 未消化的 reflection 事件,用 LLM 更新记忆文件。
+
+    Args:
+        trace_filter: 可选的 trace 过滤函数(例如按 agent_type / owner);
+                      None 表示扫描 TraceStore 下所有 trace。
+
+    Returns:
+        (consumed_reflection_count, updated_file_paths, reasoning)
+    """
+    all_traces = await store.list_traces(limit=1000)
+    if trace_filter:
+        all_traces = [t for t in all_traces if trace_filter(t)]
+
+    # 收集所有未消化的 reflection 事件
+    reflections: List[Tuple[str, Dict[str, Any]]] = []  # [(trace_id, event)]
+    for t in all_traces:
+        log = await store.get_cognition_log(t.trace_id)
+        events = log.get("events", log.get("entries", []))
+        for e in events:
+            if e.get("type") == "reflection" and not e.get("consumed_at"):
+                reflections.append((t.trace_id, e))
+
+    if not reflections:
+        logger.info("[Dream] 没有未消化的 reflection 事件")
+        return 0, [], ""
+
+    # 读当前记忆文件
+    existing_files = load_memory_files(memory_config)
+    existing_by_path = {rel: (purpose, content) for rel, purpose, content in existing_files}
+
+    user_content = _build_dream_input(reflections, existing_files, memory_config)
+    prompt = memory_config.dream_prompt or DEFAULT_DREAM_PROMPT
+
+    try:
+        result = await llm_call(
+            messages=[
+                {"role": "system", "content": prompt},
+                {"role": "user", "content": user_content},
+            ],
+            model=model,
+            tools=None,
+            temperature=0.3,
+        )
+    except Exception as e:
+        logger.error(f"[Dream] cross_trace_integrate LLM 调用失败: {e}")
+        return 0, [], ""
+
+    raw = (result.get("content") or "").strip()
+    plan = _parse_dream_output(raw)
+    if plan is None:
+        logger.error(f"[Dream] LLM 输出无法解析为 JSON 计划,原文: {raw[:500]}")
+        return 0, [], ""
+
+    updated_paths: List[str] = []
+    base = Path(memory_config.base_path)
+
+    for update in plan.get("updates", []):
+        rel_path = update.get("path", "")
+        new_content = update.get("new_content", "")
+        if not rel_path:
+            continue
+        # 安全检查:禁止路径穿越
+        target = (base / rel_path).resolve()
+        if not str(target).startswith(str(base.resolve())):
+            logger.warning(f"[Dream] 拒绝写入 base_path 之外的路径: {rel_path}")
+            continue
+        target.parent.mkdir(parents=True, exist_ok=True)
+        target.write_text(new_content, encoding="utf-8")
+        updated_paths.append(rel_path)
+        logger.info(f"[Dream] 已更新记忆文件: {rel_path} ({len(new_content)} chars)")
+
+    # 标记所有参与的 reflection 为已消化
+    consumed_at = datetime.now().isoformat()
+    for trace_id, event in reflections:
+        log = await store.get_cognition_log(trace_id)
+        events = log.get("events", log.get("entries", []))
+        target_ts = event.get("timestamp")
+        for e in events:
+            if (
+                e.get("type") == "reflection"
+                and not e.get("consumed_at")
+                and e.get("timestamp") == target_ts
+            ):
+                e["consumed_at"] = consumed_at
+        log_file = store._get_cognition_log_file(trace_id)
+        log_file.write_text(json.dumps(log, indent=2, ensure_ascii=False), encoding="utf-8")
+
+    reasoning = plan.get("reasoning", "")
+    return len(reflections), updated_paths, reasoning
+
+
+def _build_dream_input(
+    reflections: List[Tuple[str, Dict[str, Any]]],
+    existing_files: List[Tuple[str, str, str]],
+    memory_config: MemoryConfig,
+) -> str:
+    """为 dream prompt 准备输入:反思摘要汇总 + 当前记忆文件 + 允许的文件路径。"""
+    parts: List[str] = ["## 最近的反思摘要\n"]
+    for trace_id, e in reflections:
+        seq_range = e.get("sequence_range", [None, None])
+        parts.append(
+            f"### trace {trace_id} (messages {seq_range[0]}-{seq_range[1]})\n"
+            f"{e.get('summary', '')}\n"
+        )
+
+    parts.append("\n## 当前记忆文件\n")
+    if existing_files:
+        parts.append(format_memory_injection(existing_files))
+    else:
+        parts.append("(暂无记忆文件)")
+
+    if memory_config.files:
+        parts.append("\n## 允许更新/新增的文件路径\n")
+        for key, purpose in memory_config.files.items():
+            parts.append(f"- `{key}`" + (f" — {purpose}" if purpose else ""))
+
+    return "\n".join(parts)
+
+
+def _parse_dream_output(raw: str) -> Optional[Dict[str, Any]]:
+    """解析 LLM 的 JSON 计划输出。容忍 ```json ... ``` 包裹。"""
+    stripped = raw.strip()
+    # 去除 markdown 代码块包裹
+    m = re.match(r"^```(?:json)?\s*(.*?)\s*```$", stripped, re.DOTALL)
+    if m:
+        stripped = m.group(1).strip()
+    try:
+        data = json.loads(stripped)
+    except json.JSONDecodeError:
+        return None
+    if not isinstance(data, dict) or "updates" not in data:
+        return None
+    return data
+
+
+# ===== 顶层入口 =====
+
+async def run_dream(
+    store: FileSystemTraceStore,
+    llm_call: LLMCall,
+    memory_config: MemoryConfig,
+    trace_filter: Optional[Callable[[Trace], bool]] = None,
+    reflect_model: str = "gpt-4o-mini",
+    dream_model: str = "gpt-4o",
+) -> DreamReport:
+    """执行完整的 dream 流程:per_trace_reflect → cross_trace_integrate。
+
+    Args:
+        trace_filter: 筛选需要反思的 trace(例如按 agent_type 或 owner);
+                      None 表示扫描所有 trace
+        reflect_model: per-trace 反思用的模型(轻量模型即可)
+        dream_model:   跨 trace 整合用的模型(需要更强推理能力)
+    """
+    report = DreamReport()
+
+    if not memory_config.base_path:
+        logger.warning("[Dream] memory_config.base_path 未配置,跳过")
+        return report
+
+    # Phase 1: per-trace reflect
+    all_traces = await store.list_traces(limit=1000)
+    if trace_filter:
+        all_traces = [t for t in all_traces if trace_filter(t)]
+
+    for t in all_traces:
+        if (t.reflected_at_sequence or 0) >= t.last_sequence:
+            continue
+        try:
+            summary = await per_trace_reflect(
+                store, llm_call, t.trace_id, memory_config, model=reflect_model,
+            )
+            if summary:
+                report.per_trace_summaries[t.trace_id] = summary
+        except Exception as e:
+            logger.error(f"[Dream] per_trace_reflect 异常 {t.trace_id}: {e}")
+            report.skipped_traces.append(t.trace_id)
+
+    # Phase 2: cross-trace integrate
+    try:
+        consumed, updated, reasoning = await cross_trace_integrate(
+            store, llm_call, memory_config,
+            trace_filter=trace_filter, model=dream_model,
+        )
+        report.consumed_reflection_count = consumed
+        report.updated_files = updated
+        report.reasoning = reasoning
+    except Exception as e:
+        logger.error(f"[Dream] cross_trace_integrate 异常: {e}")
+
+    return report

+ 100 - 0
agent/core/memory.py

@@ -0,0 +1,100 @@
+"""
+Memory 系统(Phase 2+)
+
+详见 agent/docs/memory-plan.md。核心概念:
+- Memory:Agent 身份私有的主观记忆,Markdown 文件,人类可读写
+- Dream:记忆反思操作(回顾多个 trace 的执行历史,更新记忆文件)
+
+本模块只提供 MemoryConfig 数据类和记忆文件加载逻辑。
+Dream 操作在 agent/core/dream.py(Phase 3)。
+"""
+
+from __future__ import annotations
+
+import glob as _glob
+import logging
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class MemoryConfig:
+    """持久化记忆配置(见 agent/docs/memory-plan.md 第五节)"""
+
+    base_path: str = ""
+    # 记忆文件根目录。所有文件路径相对此目录解析。
+
+    files: Optional[Dict[str, str]] = None
+    # {路径模式: 用途说明}
+    # key 支持两种形式:
+    #   - 直接路径:"core/identity.md"
+    #   - glob 模式:"relationships/*.md"、"journals/2026/**.md"
+    # value 是人类可读的用途说明(注入时作为文件分隔标题的一部分)。
+    # 框架只负责按 key 解析文件内容;组织结构由配置者决定。
+
+    dream_prompt: str = ""
+    # Dream 跨 trace 整合 prompt;空则使用默认(Phase 3 定义)
+
+    reflect_prompt: str = ""
+    # Per-trace 记忆反思 prompt;空则使用默认(Phase 3 定义)
+
+
+def load_memory_files(config: MemoryConfig) -> List[Tuple[str, str, str]]:
+    """按 MemoryConfig.files 的 key 解析磁盘上的记忆文件。
+
+    Returns:
+        List[(relative_path, purpose, content)],按 files 声明顺序扁平化,
+        文件不存在则跳过(记 debug 日志),内容为空也保留(方便人类看到占位)。
+    """
+    if not config.base_path or not config.files:
+        return []
+
+    base = Path(config.base_path)
+    if not base.exists():
+        logger.debug(f"[Memory] base_path 不存在: {base}")
+        return []
+
+    results: List[Tuple[str, str, str]] = []
+    seen: set[str] = set()  # 去重(多个 glob 可能命中同一个文件)
+
+    for key, purpose in config.files.items():
+        # 展开 glob;直接路径也走 glob(无通配符时返回单条或空)
+        pattern = str(base / key)
+        matched_paths = sorted(_glob.glob(pattern, recursive=True))
+
+        if not matched_paths:
+            # 直接路径没命中时给个 debug(可能还没写第一版)
+            logger.debug(f"[Memory] {key} 没有匹配文件(尚未创建)")
+            continue
+
+        for fs_path in matched_paths:
+            rel = str(Path(fs_path).relative_to(base))
+            if rel in seen:
+                continue
+            seen.add(rel)
+            try:
+                content = Path(fs_path).read_text(encoding="utf-8")
+            except Exception as e:
+                logger.warning(f"[Memory] 读取失败 {fs_path}: {e}")
+                continue
+            results.append((rel, purpose, content))
+
+    return results
+
+
+def format_memory_injection(files: List[Tuple[str, str, str]]) -> str:
+    """把加载结果格式化为可注入到上下文的 markdown 段。"""
+    if not files:
+        return ""
+    parts = ["## 你的长期记忆\n\n以下是你作为此 Agent 身份积累的记忆(人类可直接编辑):\n"]
+    for rel, purpose, content in files:
+        header = f"### `{rel}`"
+        if purpose:
+            header += f" — {purpose}"
+        parts.append(header)
+        parts.append(content.rstrip() or "_(空文件,尚未积累内容)_")
+        parts.append("")  # 空行分隔
+    return "\n".join(parts).rstrip() + "\n"

+ 59 - 129
agent/core/prompts/knowledge.py

@@ -5,180 +5,110 @@
 - REFLECT_PROMPT:            压缩时阶段性反思(消息量超阈值,对当前批历史提炼)
 - COMPLETION_REFLECT_PROMPT: 任务完成后全局复盘(对整个任务的全局视角)
 
-两个 prompt 都要求 LLM 直接调用 `upload_knowledge` 工具保存经验,
-而不是输出结构化文本再由 runner 解析。
+两个 prompt 都要求 LLM 直接调用 `knowledge_save_pending` 工具暂存为待审核条目,
+每条知识一次调用,不需要输出结构化文本。
+
+"pending" 语义:条目落到 cognition_log 的 extraction_pending 事件,
+等待人工(或 reflect_auto_commit=True 时由框架自动)review + commit 才进入 KnowHub。
+详见 agent/docs/memory-plan.md 第三节"提取-审核-提交两阶段"。
 """
 
 # ===== 压缩时阶段性反思 =====
 
-REFLECT_PROMPT = """请回顾以上执行过程,将值得沉淀的内容直接用 `upload_knowledge` 工具保存到知识库。
+REFLECT_PROMPT = """请回顾以上执行过程,将值得沉淀的内容通过 `knowledge_save_pending` 工具逐条暂存(每条知识一次调用)。
+
+暂存的条目会进入审核队列(不立即入库),等待人工 review 后才会上传到 KnowHub。
 
 ## 两种保存模式
 
-### 模式 1:经验反思(experience)
+### 模式 1:经验反思(types=["experience"]
 总结执行过程中的经验教训,关注:
 1. 人工干预:用户中途的指令说明了哪里出了问题
 2. 弯路:哪些尝试是不必要的,有没有更直接的方法
 3. 好的决策:哪些判断和选择是正确的,值得记住
 4. 工具使用:哪些工具用法是高效的,哪些可以改进
 
-**格式要求**:
-- `主题`: 「在[什么情境]下,[要完成什么]」
-- `内容`: 「当[条件]时,应该[动作](原因:[一句话])。案例:[具体案例]」
-- `类型`: `["experience"]`
-- `标签`: `{"intent": "任务意图", "state": "环境状态/工具名"}`
-- `评分`: 1-5(只保存最有价值的,宁少勿滥)
-
-### 模式 2:原始知识上传(tool/strategy/case)
-如果执行过程中**调研或发现了新知识**(如工具用法、工作流程、案例),直接上传原始知识:
+**参数格式**:
+- `task`: 「在[什么情境]下,[要完成什么]」
+- `content`: 「当[条件]时,应该[动作](原因:[一句话])。案例:[具体案例]」
+- `types`: `["experience"]`
+- `tags`: `{"intent": "任务意图", "state": "环境状态/工具名"}`
+- `score`: 1-5(只保存最有价值的,宁少勿滥)
 
-**要求**:
-- **完整性**:保留原始信息,不要过度总结
-- **来源清晰**:在 `resource_ids` 中关联来源资源,或在 `标签` 中标注来源
-- **原汁原味**:保持原文档/网页的结构和细节
+### 模式 2:原始知识(types=["tool"] / ["strategy"] / ["case"])
+如果执行过程中**调研或发现了新知识**(如工具用法、工作流程、案例),原汁原味暂存:
 
-**知识类型选择**:
 - `["tool"]`:工具知识(单个工具的功能、参数、用法、限制)
 - `["strategy"]`:工序知识(多步骤流程、方案、最佳实践)
 - `["case"]`:用例知识(真实案例、应用场景、效果数据)
 
-**格式要求**:
-- `主题`: 知识的标题(如「Midjourney 的 --ar 参数用法」)
-- `内容`: 原始知识内容(完整、详细、保留结构)
-- `类型`: `["tool"]` / `["strategy"]` / `["case"]`
-- `标签`: `{"source": "来源网站/文档", "domain": "领域", ...}`
+**参数格式**:
+- `task`: 知识的标题(如「Midjourney 的 --ar 参数用法」)
+- `content`: 原始知识内容(完整、详细、保留结构,不要过度总结
+- `types`: 二选一
+- `tags`: `{"source": "来源网站/文档", "domain": "领域", ...}`
 - `resource_ids`: 关联的资源 ID(如果保存了原始文档)
-- `评分`: 1-5(根据知识的价值和可靠性)
-
-## 参数说明
-
-**每条内容调用一次 `upload_knowledge`**:
-- `data`: 包含 knowledge/resources/tools 的字典
-  - `knowledge`: 知识列表,每个知识包含:
-    - `主题`: 标题或场景描述
-    - `内容`: 知识正文(经验用总结格式,原始知识保持完整)
-    - `类型`: `["experience"]` / `["tool"]` / `["strategy"]` / `["case"]`
-    - `标签`: 键值对标签,便于检索
-    - `评分`: 1-5
-    - `resource_ids`: 关联的资源 ID 列表(可选)
-  - `resources`: 资源列表(可选),每个资源包含:
-    - `id`: 资源 ID(如 `code/{category}/{name}`)
-    - `标题`: 资源标题
-    - `内容`: 资源内容
-    - `类型`: code/credential/cookie 等
-    - `元数据`: 额外信息
-  - `tools`: 工具列表(可选)
-- `finalize`: False(增量上传,不立即入库)
-
-**注意**:
-- 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
+- `score`: 1-5(根据知识的价值和可靠性)
+
+## 其他注意事项
+
+- **一条知识一次 `knowledge_save_pending` 调用**,不要把多条合并
+- 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获
 - 不需要输出任何文字,直接调用工具即可
 - 如果没有值得保存的经验,不调用任何工具
-- **完成经验存后立即停止,不要继续执行原有任务**
+- **完成经验暂存后立即停止,不要继续执行原有任务**
 """
 
 
 # ===== 任务完成后全局复盘 =====
 
-COMPLETION_REFLECT_PROMPT = """请对整个任务进行复盘,将值得沉淀的内容直接用 `upload_knowledge` 工具保存到知识库。
+COMPLETION_REFLECT_PROMPT = """请对整个任务进行复盘,将值得沉淀的内容通过 `knowledge_save_pending` 工具逐条暂存(每条知识一次调用)。
+
+暂存的条目会进入审核队列(不立即入库),等待人工 review 后才会上传到 KnowHub。
 
 ## 两种保存模式
 
-### 模式 1:经验反思(experience)
+### 模式 1:经验反思(types=["experience"])
 任务结束后的全局视角,关注:
 1. 任务整体路径:实际走的路径与最初计划的偏差
 2. 关键决策点:哪些决策显著影响了最终结果
 3. 可复用的模式:哪些做法在类似任务中可以直接复用
 4. 踩过的坑:哪些问题本可提前规避
 
-**格式要求**:
-- `主题`: 「在[什么情境]下,[要完成什么]」
-- `内容`: 「当[条件]时,应该[动作](原因:[一句话])。案例:[具体案例]」
-- `类型`: `["experience"]`
-- `标签`: `{"intent": "任务意图", "state": "环境状态/工具名"}`
-- `评分`: 1-5(只保存最有价值的,宁少勿滥)
+**参数格式**:
+- `task`: 「在[什么情境]下,[要完成什么]」
+- `content`: 「当[条件]时,应该[动作](原因:[一句话])。案例:[具体案例]」
+- `types`: `["experience"]`
+- `tags`: `{"intent": "任务意图", "state": "环境状态/工具名"}`
+- `score`: 1-5(只保存最有价值的,宁少勿滥)
 
-### 模式 2:原始知识上传(tool/strategy/case)
-如果任务过程中**调研或发现了新知识**,直接上传原始知识
+### 模式 2:原始知识(types=["tool"] / ["strategy"] / ["case"]
+如果任务过程中**调研或发现了新知识**,完整保留结构和细节
 
-**要求**:
-- **完整性**:保留原始信息的完整结构和细节,不要过度压缩
-- **来源清晰**:标注信息来源(URL、文档名、API 响应等)
-- **原汁原味**:保持原始数据格式(如 API 参数列表、配置示例、步骤说明等)
-
-**知识类型选择**:
 - `["tool"]`:工具知识(工具的功能、参数、用法、限制、版本信息)
 - `["strategy"]`:工序知识(完整的多步骤流程、方案、最佳实践)
 - `["case"]`:用例知识(真实案例、应用场景、效果数据、对比结果)
 
-**格式要求**:
-- `主题`: 知识的标题
-- `内容`: 原始知识内容(完整详细
-- `类型`: `["tool"]` / `["strategy"]` / `["case"]`
-- `标签`: `{"source": "来源", "domain": "领域", ...}`
+**参数格式**:
+- `task`: 知识的标题
+- `content`: 原始知识内容(完整详细,不要过度压缩
+- `types`: 三选一
+- `tags`: `{"source": "来源", "domain": "领域", ...}`
 - `resource_ids`: 关联的资源 ID
-- `评分`: 1-5
-
-## 参数说明
-
-**每条内容调用一次 `upload_knowledge`**:
-- `data`: 包含 tools/resources/knowledge 的字典
-  - `knowledge`: 知识列表,每个知识包含:
-    - `主题`: 这条经验适用的场景,格式:「在[什么情境]下,[要完成什么]」
-    - `内容`: 具体经验内容,格式:「当[条件]时,应该[动作](原因:[一句话])。案例:[具体案例]」
-    - `类型`: 知识类型,选择以下之一:
-      - `["experience"]`: 执行经验(Agent 反思总结,应该/避免做什么)
-      - `["strategy"]`: 工序知识(多步骤流程、方案)
-      - `["tool"]`: 工具知识(单个工具的功能、用法)
-      - `["case"]`: 用例知识(真实案例、应用场景)
-    - `标签`: 用 `intent`(任务意图)和 `state`(环境状态/相关工具名)标注,便于检索
-    - `评分`: 1-5,根据这条经验的价值评估
-    - `resource_ids`: 关联的资源 ID 列表(可选,如果这条知识引用了某个资源)
-  - `resources`: 资源列表(可选)
-  - `tools`: 工具列表(可选)
-- `finalize`: False(增量上传,不立即入库)
-
-**资源提取指南**:
-如果任务中涉及以下内容,应在 `data` 中包含 `resources` 字段:
-
-1. **复杂代码工具**(逻辑复杂、超过 20 行、可复用):
-   ```python
-   {
-     "id": "code/{category}/{name}",
-     "标题": "...",
-     "内容": "代码内容",
-     "类型": "code",
-     "元数据": {"language": "python"}
-   }
-   ```
-
-2. **账号密码凭证**:
-   ```python
-   {
-     "id": "credentials/{website}",
-     "标题": "...",
-     "内容": "使用说明和凭证",
-     "类型": "credential",
-     "元数据": {"acquired_at": "2026-03-06T10:00:00Z"}
-   }
-   ```
-
-3. **Cookie 和登录态**:
-   ```python
-   {
-     "id": "cookies/{website}",
-     "标题": "...",
-     "内容": "获取方法和cookie内容",
-     "类型": "cookie",
-     "元数据": {"acquired_at": "...", "expires_at": "..."}
-   }
-   ```
-
-**注意**:
-- 只保存最有价值的经验,宁少勿滥;一次就成功或比较简单的经验就不要记录了,记录反复尝试或被用户指导后才成功的经验、或者是调研之后的收获。
+- `score`: 1-5
+
+## 关于资源(resource)
+
+如果过程中产出了可复用的代码/凭证/Cookie 等资源,先用 `resource_save` 工具保存,
+再在 `knowledge_save_pending` 的 `resource_ids` 字段中关联资源 ID。
+
+## 其他注意事项
+
+- **一条知识一次 `knowledge_save_pending` 调用**,不要把多条合并
+- 只保存最有价值的经验,宁少勿滥
 - 不需要输出任何文字,直接调用工具即可
 - 如果没有值得保存的经验,不调用任何工具
-- **完成经验存后立即停止,不要继续执行原有任务**
+- **完成经验暂存后立即停止,不要继续执行原有任务**
 """
 
 

+ 80 - 1
agent/core/runner.py

@@ -37,6 +37,7 @@ from agent.skill.models import Skill
 from agent.skill.skill_loader import load_skills_from_dir
 from agent.tools import ToolRegistry, get_tool_registry
 from agent.tools.builtin.knowledge import KnowledgeConfig
+from agent.core.memory import MemoryConfig
 from agent.core.prompts import (
     DEFAULT_SYSTEM_PREFIX,
     TRUNCATION_HINT,
@@ -141,6 +142,9 @@ class RunConfig:
     enable_research_flow: bool = True  # 是否启用自动研究流程(知识检索→经验检索→调研→计划)
     # --- 知识管理配置 ---
     knowledge: KnowledgeConfig = field(default_factory=KnowledgeConfig)
+    # --- Memory 配置(见 agent/docs/memory-plan.md) ---
+    # None = 默认 Agent(无长期记忆);赋值 MemoryConfig 使该 Agent 成为 memory-bearing Agent
+    memory: Optional["MemoryConfig"] = None
 
 
     # BUILTIN_TOOLS 硬编码列表已移除(2026-04)。
@@ -219,12 +223,45 @@ class AgentRunner:
         # key: 图片内容的 hash, value: {"downscaled": ..., "description": ...}
         self._image_opt_cache: Dict[str, Dict[str, Any]] = {}
 
+        # 当前 run 的 MemoryConfig(由 run() 根据 RunConfig.memory 设置)
+        # dream 工具从 context.runner 读取此字段,判断是否 memory-bearing
+        self._current_memory_config: Optional[MemoryConfig] = None
+
     # ===== 核心公开方法 =====
 
     def get_context_usage(self, trace_id: str) -> Optional[ContextUsage]:
         """获取指定 trace 的 context 使用情况"""
         return self._context_usage.get(trace_id)
 
+    async def dream(
+        self,
+        memory_config: MemoryConfig,
+        trace_filter: Optional[Callable[["Trace"], bool]] = None,
+        reflect_model: str = "gpt-4o-mini",
+        dream_model: str = "gpt-4o",
+    ) -> "DreamReport":
+        """执行 dream(整理长期记忆)——外部调度入口。
+
+        Agent 主动调用走 dream 工具;外部调度(定时器、CLI)走这个方法。
+
+        Args:
+            memory_config: 记忆配置
+            trace_filter: 可选 trace 过滤(按 agent_type/owner 等)
+            reflect_model: per-trace 反思模型
+            dream_model: 跨 trace 整合模型
+        """
+        from agent.core.dream import run_dream
+        if not self.trace_store or not self.llm_call:
+            raise RuntimeError("dream 需要 trace_store 和 llm_call 均已配置")
+        return await run_dream(
+            store=self.trace_store,
+            llm_call=self.llm_call,
+            memory_config=memory_config,
+            trace_filter=trace_filter,
+            reflect_model=reflect_model,
+            dream_model=dream_model,
+        )
+
     async def run(
         self,
         messages: List[Dict],
@@ -253,6 +290,9 @@ class AgentRunner:
         config = config or RunConfig()
         trace = None
 
+        # Memory 模式开关(dream 工具会读取此字段)
+        self._current_memory_config = config.memory
+
         try:
             # Phase 1: PREPARE TRACE
             trace, goal_tree, sequence = await self._prepare_trace(messages, config)
@@ -1172,7 +1212,11 @@ class AgentRunner:
 
                 # 追加侧分支 prompt
                 if branch_type == "reflection":
-                    prompt = config.knowledge.get_reflect_prompt()
+                    # 完成场景用全局复盘 prompt,压缩场景用阶段性反思 prompt
+                    if break_after_side_branch:
+                        prompt = config.knowledge.get_completion_reflect_prompt()
+                    else:
+                        prompt = config.knowledge.get_reflect_prompt()
                 elif branch_type == "knowledge_eval":
                     prompt = await self._build_knowledge_eval_prompt(trace_id, goal_tree)
                 else:  # compression
@@ -1489,6 +1533,27 @@ class AgentRunner:
                     # === 反思侧分支退出(超时 + 正常完成统一处理)===
                     self.log.info("反思侧分支退出")
 
+                    # auto-commit hook:默认 pending 要等人工 review,
+                    # 但 reflect_auto_commit=True 时视作全部 approved,直接批量 upload。
+                    if (
+                        self.trace_store
+                        and getattr(config.knowledge, "reflect_auto_commit", False)
+                    ):
+                        try:
+                            from agent.trace.extraction_review import auto_commit_branch
+                            report = await auto_commit_branch(
+                                self.trace_store,
+                                trace_id,
+                                side_branch_ctx.branch_id,
+                            )
+                            if report.committed or report.failed:
+                                self.log.info(
+                                    f"[auto-commit] committed={len(report.committed)} "
+                                    f"failed={len(report.failed)} skipped={len(report.skipped)}"
+                                )
+                        except Exception as e:
+                            self.log.error(f"[auto-commit] 反思分支自动提交失败: {e}")
+
                     # 恢复主路径
                     if self.trace_store:
                         main_path_messages = await self.trace_store.get_main_path_messages(
@@ -2843,6 +2908,20 @@ class AgentRunner:
             if skills_text:
                 system_prompt += f"\n\n## Skills\n{skills_text}"
 
+        # Memory 注入(memory-bearing Agent)——在 system prompt 末尾追加
+        # 初版选择 system prompt 追加(见 agent/docs/memory-plan.md 待定问题 1)。
+        # 好处:run 启动一次性注入、所有后续轮次都能看到、与 skills 注入方式一致。
+        # 代价:若记忆文件很大会持续占 prompt tokens —— 待观察后决定是否切换方案。
+        if config.memory:
+            try:
+                from agent.core.memory import load_memory_files, format_memory_injection
+                files = load_memory_files(config.memory)
+                memory_text = format_memory_injection(files)
+                if memory_text:
+                    system_prompt += f"\n\n{memory_text}"
+            except Exception as e:
+                self.log.warning(f"[Memory] 加载记忆失败,跳过注入: {e}")
+
         return system_prompt
 
     async def _generate_task_name(self, messages: List[Dict]) -> str:

+ 248 - 44
agent/docs/memory.md

@@ -1,6 +1,8 @@
-# Memory 系统与元思考机制设计
+# Memory 系统与元思考机制
 
-> 状态:设计讨论中,未实现
+> 状态:已实现(2026-04)。本文档同时承担**设计理由**和**使用规范**。
+> 入口、工具、API 清单见文末"十、实现与入口"。
+> 一~九节解释"为什么这么做",改动前请先读懂论证。
 
 ---
 
@@ -119,9 +121,38 @@ trace 结束只意味着 Agent 行动完一个轮次。后续可能发生:
 
 如果 trace 一结束就做记忆反思,这些后续信息会被忽略。记忆反思的价值在于**综合一段时间的经历**,不是记录每次行动的即时感受。
 
-### 但知识提取仍然在压缩/完成时做
+### 但知识提取仍然在压缩/完成时做(采用"提取-审核-提交"两阶段)
 
-这不矛盾。知识提取保存的是**客观知识**(工具用法、调研结果),这些不会因为后续反馈而失效。而且压缩会删除历史,如果不在压缩前提取,知识就永久丢失了。
+知识提取必须在压缩/完成时做,因为压缩会删除历史,不在压缩前提取,知识就永久丢失。
+
+但"立即 upload 到 KnowHub"这一步并不需要立即做。所谓"客观知识"也可能被后续推翻:
+
+- 工具用法可能被后续 trace 发现是错的(例如某个参数其实有副作用)
+- 调研结论可能被用户反馈推翻
+- 一次 trace 的"成功经验"在更长窗口看可能是反模式
+
+如果 reflection 直接 upload 到 KnowHub,错误知识会立刻污染全局检索,影响所有 Agent。
+
+**两阶段方案**:
+
+```
+Step 1: extract(自动,压缩前/任务结束)
+  Reflection 侧分支提取知识 → 写 cognition_log: type="extraction_pending"
+  不调用 upload_knowledge(信息保全已完成)
+
+Step 2: review(人工,CLI 里逐条决策)
+  approve / edit / discard → 写 cognition_log: type="extraction_reviewed"
+
+Step 3: commit(人工触发,批量上传)
+  把 reviewed=approved 的批量 upload_knowledge
+  写 cognition_log: type="extraction_committed"
+```
+
+review 和 commit 分开的理由:review 是逐条语义判断(要不要、内容对不对),commit 是机械批量动作。两者分离允许用户分批 review、最后一次 commit;也允许撤回 review 决策。
+
+**默认行为**:所有 Agent(包括默认 Agent 和 memory-bearing Agent)`reflect_auto_commit` 默认关闭,pending 提取必须人工 review + commit 才会进 KnowHub。如需自动直通(保留旧行为),手动在 `KnowledgeConfig` 里打开 `reflect_auto_commit=True`。
+
+这与"信息保全 vs 全局发布解耦"的原则一致 —— 压缩前必须做的是**保全**(写本地 cognition_log),**发布**到 KnowHub 可以延迟到有人确认时。
 
 ---
 
@@ -267,6 +298,25 @@ reflected_at_sequence: Optional[int] = None    # 上次记忆反思的 sequence
 memory: Optional[MemoryConfig] = None
 ```
 
+**4. KnowledgeConfig 扩展**
+
+`agent/core/runner.py:KnowledgeConfig`(或对应类)新增字段:
+
+```python
+reflect_auto_commit: bool = False
+# False(默认,所有 Agent): reflection 只写 cognition_log: type="extraction_pending"
+#                          人工通过 CLI review + commit 才进 KnowHub
+# True(手动开启)         : reflection 直接 upload_knowledge,保留旧的"提取即上传"行为
+```
+
+**5. Reflection 侧分支行为变更**
+
+当前 reflection 的 prompt 直接指导 LLM 调用 `upload_knowledge`。需要改为:
+- `reflect_auto_commit=False` 时:prompt 指导 LLM 调用新的 `record_pending_extraction` 工具(仅写 cognition_log)
+- `reflect_auto_commit=True` 时:保持当前行为
+
+或者更简洁的实现:reflection 始终调用 `record_pending_extraction`,由侧分支结束后的 hook 根据 `reflect_auto_commit` 决定是否立即调用 `commit_approved`(视为全部 approved)。这避免了 prompt 分叉。
+
 ### 新增的部分
 
 **1. MemoryConfig**
@@ -277,14 +327,17 @@ class MemoryConfig:
     """持久化记忆配置"""
 
     base_path: str = ""                          # 记忆文件目录
-    files: Optional[Dict[str, str]] = None       # {文件名: 用途说明}
+    files: Optional[Dict[str, str]] = None       # {路径: 用途说明}
+    # key 是相对 base_path 的路径,支持嵌套(如 "core/identity.md")或 glob
+    # (如 "relationships/*.md")。框架只负责按 key 读文件内容注入上下文,
+    # 组织结构由配置者决定。
     dream_prompt: str = ""                       # Dream 整合 prompt(空用默认)
     reflect_prompt: str = ""                     # Per-trace 反思 prompt(空用默认)
 ```
 
 **2. Run 启动时记忆加载**
 
-Memory-bearing Agent 的 run 启动时,框架读取 `base_path` 下所有 `files` 中声明的文件,注入上下文
+Memory-bearing Agent 的 run 启动时,框架按 `files` 的 key 依次解析(直接路径或 glob 匹配),读取命中的文件内容以字符串形式注入上下文。Agent 可用 write_file 新增文件;只要新文件的路径匹配某条 key(直接路径或 glob),下次 run 启动时自动加载
 
 **3. Dream 操作**
 
@@ -301,6 +354,61 @@ async def dream() -> ToolResult:
 
 也可以作为 `AgentRunner` 的方法暴露,供外部调度直接调用。
 
+**4. 提取审核 CLI 流程**
+
+为支持"提取-审核-提交"两阶段(见第三节),新增 `agent/cli/extraction_review.py` 模块。**不是 Agent 工具**(Agent 不应自我审核),是 CLI 内部模块 + 独立可执行脚本:
+
+```python
+# agent/cli/extraction_review.py
+
+async def list_pending(trace_id: str) -> list[PendingExtraction]:
+    """读 cognition_log,返回 type=extraction_pending 且未 reviewed 的条目"""
+
+async def review_one(
+    trace_id: str,
+    extraction_id: str,
+    decision: Literal["approve", "edit", "discard"],
+    edited_content: Optional[str] = None,
+) -> None:
+    """写 reviewed 事件到 cognition_log"""
+
+async def commit_approved(trace_id: str) -> CommitReport:
+    """批量上传 approved 条目到 KnowHub,写 committed 事件"""
+```
+
+可独立调用:
+
+```bash
+python -m agent.cli.extraction_review --trace XXX --list
+python -m agent.cli.extraction_review --trace XXX --commit
+```
+
+**集成到现有交互式 CLI**(`agent/cli/interactive.py:174` 的菜单)扩展两项:
+
+```
+  1. 插入干预消息并继续
+  2. 触发经验总结(reflect)         ← 现有
+  ...
+  8. 审核待提交知识(review)        ← 新增
+  9. 提交已审核知识到 KnowHub        ← 新增
+```
+
+`8` 进入交互式 review 循环:
+
+```
+[1/3] tool 经验
+─────────────────────
+nanobanana 工具的 strength 参数 < 0.3 时会丢失原图轮廓...
+─────────────────────
+[a]pprove / [e]dit / [d]iscard / [s]kip / [q]uit:
+```
+
+`9` 显示 approved 列表 + 用户最终确认 → 调 `commit_approved`,输出 commit 报告(成功/失败条数、KnowHub 返回的 ID)。
+
+**实现注意**:
+- 现有 `perform_reflection`(`interactive.py:269`)走 HTTP API(`/api/traces/{trace_id}/reflect`)。新流程同样应该走 API 端点(如 `POST /api/traces/{trace_id}/extractions/{id}/review`、`POST /api/traces/{trace_id}/extractions/commit`),让未来 Web UI 能复用同一套审核流,而不是 CLI 直接读写 cognition_log 文件。
+- "edit" 分支允许用户直接修改 LLM 生成的 markdown 内容;初版只支持改正文文本,后续可扩展到改类型/metadata。
+
 ---
 
 ## 六、完整的元思考数据流
@@ -314,13 +422,19 @@ Agent 执行任务(Trace)
   ├─ 压缩触发 →
   │   队列: [reflection, knowledge_eval, compression]
-  │   reflection: 提取客观知识 → upload → KnowHub + cognition_log: type="extraction"
+  │   reflection: 提取客观知识 → cognition_log: type="extraction_pending"
+  │                            (默认不直接 upload,等人工 review)
   │   knowledge_eval: 评估各 source → cognition_log: type="evaluation"
   │   compression: 压缩上下文
   ├─ 任务完成 →
   │   knowledge_eval(如有 pending)→ cognition_log: type="evaluation"
-  │   reflection → upload → KnowHub + cognition_log: type="extraction"
+  │   reflection → cognition_log: type="extraction_pending"
+  │
+  ├─ 人工审核(CLI 触发,可发生在任意时刻)→
+  │   逐条 approve/edit/discard → cognition_log: type="extraction_reviewed"
+  │   批量 commit → upload_knowledge → KnowHub
+  │                + cognition_log: type="extraction_committed"
   └─ Trace 状态更新(新消息使 reflected_at_sequence 落后)
 
@@ -359,36 +473,40 @@ Trace 结束后:
 
 ## 七、记忆模型全景
 
+Memory 和 Knowledge 是**两条平行的线**,而不是抽象层级。区分维度是"主观 vs 客观"和"私有 vs 共享"。Memory 不会"升级"成 Knowledge,反过来也不会。
+
 ```
-┌─────────────────────────────────────────────────────────────┐
-│ Layer 3: Skills(技能库)                                     │
-│ - Markdown 文件,领域知识和能力描述                            │
-└─────────────────────────────────────────────────────────────┘
-                              ▲
-                              │ 归纳
-┌─────────────────────────────────────────────────────────────┐
-│ Layer 2: Knowledge(知识库)— 全局共享                         │
-│ - KnowHub 数据库,客观知识 + 向量索引                         │
-│ - 来源:reflection 侧分支提取                                │
-│ - 质量信号:knowledge_eval 评估结果                           │
-└─────────────────────────────────────────────────────────────┘
-                              ▲
-                              │ 提取(reflection)/ 评估(knowledge_eval)
-┌─────────────────────────────────────────────────────────────┐
-│ Layer 1.5: Memory(个人记忆)— Agent 身份私有                  │
-│ - Markdown 文件,主观记忆(偏好/策略/反思)                    │
-│ - 来源:dream 操作(per-trace 反思 + 跨 trace 整合)          │
-│ - 人类可直接编辑                                              │
-└─────────────────────────────────────────────────────────────┘
-                              ▲
-                              │ dream 反思
-┌─────────────────────────────────────────────────────────────┐
-│ Layer 1: Trace(任务状态)                                    │
-│ - 当前任务的工作记忆                                          │
-│ - Messages + Goals + cognition_log                           │
-└─────────────────────────────────────────────────────────────┘
+                    ┌─────────────────────────────┐
+                    │ Trace(任务状态 / 工作记忆)  │
+                    │ Messages + Goals             │
+                    │ + cognition_log              │
+                    └──────────┬──────────────────┘
+                               │
+              dream 反思       │      reflection 提取(→ pending)
+              (延迟、可选)    │      knowledge_eval 评估
+                               │      (即时、必做)
+                  ┌────────────┴───────────┐
+                  ▼                        ▼
+        ┌──────────────────┐     ┌──────────────────────┐
+        │ Memory           │     │ Knowledge            │
+        │ Agent 身份私有    │     │ KnowHub 全局共享      │
+        ├──────────────────┤     ├──────────────────────┤
+        │ 主观 / 偏好 / 策略 │     │ 客观 / 工具 / 调研    │
+        │ Markdown 文件     │     │ DB + 向量索引         │
+        │ 人类可直接编辑     │     │ 经 review 才入库      │
+        │ 来源: dream       │     │ 来源: reflection      │
+        └────────┬─────────┘     └──────────┬───────────┘
+                 │                          │
+                 └────注入下次 run──────────┘
 ```
 
+**两条线的交互**(不是层级关系,是同源 + 互相参考):
+
+- 都源自同一个 Trace(cognition_log 是共同的事件流)
+- dream 在生成记忆摘要时可以参考 cognition_log 中的 evaluation 趋势(来自 Knowledge 这条线)
+- reflection 也可以参考 Memory 来判断"这条经验我已经记过了"
+- 但二者的**读者不同**:Memory 只服务于同一身份的未来 run;Knowledge 服务于所有 Agent
+
 ---
 
 ## 八、两类 Agent
@@ -401,16 +519,102 @@ Trace 结束后:
 | Dream | ❌ | ✅ 可调用 dream 工具 |
 | Run 启动加载记忆 | ❌ | ✅ 自动注入 |
 
-默认行为不变。Memory 是 opt-in 的增量能力。
+Memory 是 opt-in 的增量能力。但**知识提取的提交行为变了**:默认 Agent 也不再自动 upload 到 KnowHub,必须通过 CLI 人工 review + commit;如需保留旧的"提取即上传"行为,手动设置 `KnowledgeConfig.reflect_auto_commit=True`。
 
 ---
 
-## 九、待定问题
+## 九、开放与已决问题
+
+`[DECIDED]` 已有落地结论;`[OPEN]` 尚未决定,等真实运行数据再定。
+
+1. `[DECIDED]` **记忆注入方式** → system prompt 末尾追加。见 `runner.py:_build_system_prompt` 里调 `format_memory_injection`。代价:若记忆文件很大,每轮 LLM 调用都带 —— 暂时接受,等实际观察到膨胀再换方案。
+2. `[OPEN]` **并发写冲突**:多个 Agent run 同时写同一个记忆文件怎么办?文件锁?还是 dream 统一写、其他 run 只读?当前没做并发保护,假设 dream 是单一写入方。
+3. `[OPEN]` **记忆膨胀**:记忆文件越来越长怎么办?`DEFAULT_DREAM_PROMPT` 已写"在原有基础上演进不要重写",但是否真能控制住要看实际使用。
+4. `[OPEN]` **Per-trace 反思的成本控制**:很短的 trace 不值得反思。当前 `per_trace_reflect` 无下限阈值,所有 `reflected_at_sequence < last_sequence` 的 trace 都会反思。
+5. `[OPEN]` **Knowledge eval 结果回传 KnowHub**:仍然只存本地 cognition_log。
+6. `[DECIDED]` **Dream 中评估趋势的呈现方式** → LLM 直接读 cognition_log 原始事件。见 `dream.py:_build_reflect_input`,把 query / evaluation / extraction_pending / extraction_committed 事件摘要化后一并塞给 LLM,不做预计算统计。
+7. `[DECIDED]` **Dream 操作的实现形式** → 两者都提供。Agent 主动调用走 `dream` 工具(`agent/tools/builtin/memory.py`,`memory` 组),外部调度走 `AgentRunner.dream()` 方法。
+8. `[OPEN]` **未 review 的 pending 提取何时清理**:目前没有 TTL,pending 无限期累积。等观察积压速度再定(例如 30 天未 review 自动 discard / 归档)。
+9. `[OPEN]` **review 的"edit"分支允许多深**:初版只支持改 markdown 字段(task/content/score/tags)。改 type 或 metadata 目前需 discard 重写。
+10. `[OPEN]` **批量 review 的辅助能力**:当前逐条看。未做批量 approve / 相似条目去重 / LLM 预筛。
+11. `[OPEN]` **Dream 的 JSON 解析脆弱性**:`cross_trace_integrate` 依赖 LLM 严格输出 `{updates:[...]}` JSON(见 `dream.py:_parse_dream_output`)。真实 LLM 可能偶尔加前言、用不同键名。首次线上运行需监控 parse 失败率,必要时加重试 + 更严格 prompt。
+
+---
+
+## 十、实现与入口
+
+2026-04 落地清单,供看代码时快速定位。
+
+### 10.1 数据层
+
+| 改动 | 位置 |
+|---|---|
+| Trace 新字段 `reflected_at_sequence` | `agent/trace/models.py:Trace` |
+| cognition_log 事件 schema(含新增的 extraction_pending/reviewed/committed + reflection) | `agent/trace/store.py:append_cognition_event` docstring |
+
+### 10.2 提取-审核-提交两阶段
+
+| 职责 | 位置 |
+|---|---|
+| LLM 暂存用工具(core 组默认可见) | `agent/tools/builtin/knowledge.py:knowledge_save_pending` |
+| 反思 prompts(已改为调 `knowledge_save_pending`) | `agent/core/prompts/knowledge.py` |
+| Auto-commit 开关(默认 False) | `KnowledgeConfig.reflect_auto_commit` |
+| 反思侧分支退出时的 auto-commit hook | `agent/core/runner.py` 反射分支退出分支内 |
+| 核心逻辑(list_pending / review_one / commit_approved / auto_commit_branch) | `agent/trace/extraction_review.py` |
+| **独立 CLI 入口** | `python -m agent.cli.extraction_review --trace <ID> [--list/--list-all/--review/--commit]` |
+| **交互式菜单入口** | `agent/cli/interactive.py` 菜单项 8(review)/ 9(commit) |
+| **HTTP API 入口** | `GET /api/traces/{tid}/extractions`、`POST .../extractions/{eid}/review`、`POST .../extractions/commit`(见 `agent/trace/run_api.py`) |
+
+三种入口共享同一个核心模块 `agent/trace/extraction_review.py`。
+
+### 10.3 Memory + Dream
+
+| 职责 | 位置 |
+|---|---|
+| MemoryConfig 定义 | `agent/core/memory.py:MemoryConfig` |
+| 记忆文件加载(支持 glob + 去重) | `agent/core/memory.py:load_memory_files` |
+| 记忆注入格式 | `agent/core/memory.py:format_memory_injection` |
+| 注入到 system prompt | `agent/core/runner.py:_build_system_prompt`(memory 段落在 skills 段之后) |
+| Dream per-trace 反思 | `agent/core/dream.py:per_trace_reflect` |
+| Dream 跨 trace 整合 | `agent/core/dream.py:cross_trace_integrate` |
+| Dream 顶层入口 | `agent/core/dream.py:run_dream` |
+| **Agent 工具入口(memory 组)** | `agent/tools/builtin/memory.py:dream` |
+| **外部调度入口** | `AgentRunner.dream(memory_config, trace_filter=..., reflect_model=..., dream_model=...)` |
+| 默认 prompts | `dream.py:DEFAULT_REFLECT_PROMPT` / `DEFAULT_DREAM_PROMPT`(可通过 `MemoryConfig.reflect_prompt`/`dream_prompt` 覆盖) |
+
+### 10.4 启用方式
+
+默认 Agent 不启用 memory,但**提取审核仍然生效**(pending 不自动上传 KnowHub)。
+
+要让某个 example 直接上传(恢复旧行为):
+```python
+RunConfig(knowledge=KnowledgeConfig(reflect_auto_commit=True))
+```
+
+要让一个 Agent 变成 memory-bearing:
+```python
+from agent.core.memory import MemoryConfig
+
+RunConfig(
+    memory=MemoryConfig(
+        base_path="/path/to/agent_memory",
+        files={
+            "taste.md": "品味偏好",
+            "strategy.md": "当前策略",
+            "journals/*.md": "执行日记",
+        },
+    ),
+    tool_groups=["core", "memory"],   # memory 组暴露 dream 工具
+)
+```
+
+然后周期性(或 Agent 主动调用 `dream` 工具)触发:
+```python
+await runner.dream(memory_config=rc.memory)
+```
+
+### 10.5 已知 rough edges
 
-1. **记忆注入方式**:system prompt 追加 vs 首条消息前插入 vs 作为工具结果注入?需要实验对比效果。
-2. **并发写冲突**:多个 Agent run 同时写同一个记忆文件怎么办?文件锁?还是 dream 统一写、其他 run 只读?
-3. **记忆膨胀**:记忆文件越来越长怎么办?dream prompt 应该包含精简逻辑,但需要观察实际效果。
-4. **Per-trace 反思的成本控制**:很短的 trace 不值得反思。阈值由框架设定(消息数/token数)还是让 dream 过程自己判断?
-5. **Knowledge eval 结果回传 KnowHub**:是否应该自动同步?自动回传可能影响其他 Agent 的检索。
-6. **Dream 中 knowledge_log 趋势的呈现方式**:在 dream prompt 中注入预计算的统计 vs 让 LLM 自己读原始 log?
-7. **Dream 操作的实现形式**:作为 Agent 工具(`dream()`)vs AgentRunner 方法 vs 两者都提供?
+- 实施过程发现旧的 `upload_knowledge` 引用是悬空的(仓内无实现),未清理 `examples/*/prompt` 里的残留引用
+- Dream 两次 LLM 调用(reflect + integrate)默认模型写死 `gpt-4o-mini` / `gpt-4o`,未接入 RunConfig 的 utility_llm_call
+- `trace_filter` 没提供按 `agent_type` / `owner` 过滤的便捷函数,调用方传 lambda

+ 6 - 1
agent/tools/builtin/__init__.py

@@ -17,7 +17,9 @@ from agent.tools.builtin.bash import bash_command
 from agent.tools.builtin.skill import skill, list_skills
 from agent.tools.builtin.subagent import agent, evaluate
 # sandbox 工具已废弃(2026-04);search.py / crawler.py 已重构为 content/ 工具族(2026-04)
-from agent.tools.builtin.knowledge import(knowledge_search,knowledge_save,knowledge_list,knowledge_update,knowledge_batch_update,knowledge_slim)
+from agent.tools.builtin.knowledge import(knowledge_search,knowledge_save,knowledge_save_pending,knowledge_list,knowledge_update,knowledge_batch_update,knowledge_slim)
+# Memory / Dream(见 agent/docs/memory-plan.md)
+from agent.tools.builtin.memory import dream
 # 知识上传/查询已统一到 agent 工具:
 #   agent(agent_type="remote_librarian", task=...)         # 查询
 #   agent(agent_type="remote_librarian_ingest", task=...)  # 上传(异步)
@@ -78,4 +80,7 @@ __all__ = [
     "import_content",
     # Goal 管理
     "goal",
+    # Memory & Knowledge 提取审核
+    "knowledge_save_pending",  # 反思侧分支暂存(core 组默认可见)
+    "dream",                    # memory-bearing Agent 整理长期记忆(memory 组)
 ]

+ 115 - 0
agent/tools/builtin/knowledge.py

@@ -8,6 +8,7 @@ import os
 import json
 import logging
 import subprocess
+import uuid
 import httpx
 from dataclasses import dataclass
 from typing import List, Dict, Optional, Any
@@ -34,6 +35,12 @@ class KnowledgeConfig:
     enable_completion_extraction: bool = True      # 是否在运行完成后提取知识
     completion_reflect_prompt: str = ""            # 自定义复盘 prompt;空则使用默认,见 agent/core/prompts/knowledge.py:COMPLETION_REFLECT_PROMPT
 
+    # 提取-审核-提交两阶段开关(见 agent/docs/memory-plan.md 第三节)
+    reflect_auto_commit: bool = False
+    # False(默认): reflection 仅写 cognition_log: type="extraction_pending",
+    #               人工通过 CLI(agent/cli/extraction_review.py)review + commit 才进 KnowHub
+    # True         : reflection 直接 upload_knowledge(旧行为),适合无人值守的 example
+
     # 知识注入(agent切换当前工作的goal时,自动注入相关知识)
     enable_injection: bool = True          # 是否在 focus goal 时自动注入相关知识
 
@@ -274,6 +281,114 @@ async def knowledge_save(
         )
 
 
+@tool(groups=["core"], hidden_params=["context"])
+async def knowledge_save_pending(
+    task: str,
+    content: str,
+    types: List[str],
+    tags: Optional[Dict[str, str]] = None,
+    scopes: Optional[List[str]] = None,
+    owner: Optional[str] = None,
+    resource_ids: Optional[List[str]] = None,
+    source_name: str = "",
+    source_category: str = "exp",
+    urls: Optional[List[str]] = None,
+    agent_id: str = "research_agent",
+    submitted_by: str = "",
+    score: int = 3,
+    capability_ids: Optional[List[str]] = None,
+    tool_ids: Optional[List[str]] = None,
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """
+    暂存一条待审核的知识提取(不直接写入 KnowHub)。
+
+    写入 cognition_log: type="extraction_pending",等待人工通过 CLI
+    (agent/cli/extraction_review.py)review + commit 才会进入 KnowHub。
+    参数与 knowledge_save 对齐,review 通过后字段透传给 knowledge_save。
+
+    Args:
+        task: 任务描述(在什么情景下 + 要完成什么目标)
+        content: 核心内容
+        types: 知识类型 ["experience"] / ["tool"] / ["strategy"] / ["case"]
+        tags: 业务标签
+        scopes: 可见范围(默认 ["org:cybertogether"],commit 时应用)
+        owner: 所有者(commit 时应用)
+        resource_ids: 关联的资源 ID
+        source_name: 来源名称
+        source_category: 来源类别(paper/exp/skill/book)
+        urls: 参考来源链接
+        agent_id: 执行此调研的 agent ID
+        submitted_by: 提交者
+        score: 初始评分 1-5
+        capability_ids: 关联的能力 ID
+        tool_ids: 关联的工具 ID
+
+    Returns:
+        暂存结果(含 extraction_id,用于后续 review/commit)
+    """
+    try:
+        store = context.get("store") if context else None
+        trace_id = context.get("trace_id") if context else None
+        sequence = context.get("sequence") if context else None
+        goal_id = context.get("goal_id") if context else None
+        side_branch = context.get("side_branch") if context else None
+
+        if not store or not trace_id:
+            return ToolResult(
+                title="❌ 暂存失败",
+                output="缺少 store 或 trace_id,无法写入 cognition_log",
+                error="missing trace context"
+            )
+
+        extraction_id = f"pending-{uuid.uuid4().hex[:12]}"
+
+        payload = {
+            "task": task,
+            "content": content,
+            "types": types,
+            "tags": tags or {},
+            "scopes": scopes,
+            "owner": owner,
+            "resource_ids": resource_ids or [],
+            "source_name": source_name,
+            "source_category": source_category,
+            "urls": urls or [],
+            "agent_id": agent_id,
+            "submitted_by": submitted_by,
+            "score": score,
+            "capability_ids": capability_ids or [],
+            "tool_ids": tool_ids or [],
+        }
+
+        await store.append_cognition_event(
+            trace_id=trace_id,
+            event={
+                "type": "extraction_pending",
+                "extraction_id": extraction_id,
+                "sequence": sequence,
+                "goal_id": goal_id,
+                "branch_id": side_branch.get("branch_id") if side_branch else None,
+                "payload": payload,
+            }
+        )
+
+        return ToolResult(
+            title="✅ 已暂存待审核",
+            output=f"Extraction ID: {extraction_id}\n主题: {task[:80]}\n类型: {types}\n评分: {score}\n\n等待人工 review + commit 才会进入 KnowHub。",
+            long_term_memory=f"暂存知识提取: {extraction_id} - {task[:50]}",
+            metadata={"extraction_id": extraction_id}
+        )
+
+    except Exception as e:
+        logger.error(f"暂存待审核知识失败: {e}")
+        return ToolResult(
+            title="❌ 暂存失败",
+            output=f"错误: {str(e)}",
+            error=str(e)
+        )
+
+
 @tool(groups=["knowledge_internal"], hidden_params=["context"])
 async def knowledge_update(
     knowledge_id: str,

+ 96 - 0
agent/tools/builtin/memory.py

@@ -0,0 +1,96 @@
+"""
+Memory 相关工具 —— 目前只包含 dream 操作(见 agent/docs/memory-plan.md 第四节)。
+
+dream 整理 Agent 身份的长期记忆:回顾最近 trace 的执行历史,
+逐个 trace 做反思,再跨 trace 整合写回记忆文件。
+
+设计要点:
+- 需要 config.memory(MemoryConfig)才可用;否则报错。
+- 不是 knowledge_save_pending 那样每 trace 都要用的日常工具 ——
+  所以放在独立 group "memory",通过 tool_groups 显式开启。
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Optional
+
+from agent.tools import tool, ToolResult, ToolContext
+
+logger = logging.getLogger(__name__)
+
+
+@tool(groups=["memory"], hidden_params=["context"])
+async def dream(
+    reflect_model: str = "",
+    dream_model: str = "",
+    context: Optional[ToolContext] = None,
+) -> ToolResult:
+    """整理长期记忆。回顾最近的执行历史,更新记忆文件。
+
+    本工具做两件事:
+        1. per-trace 反思:扫描未反思的 trace,为每个生成反思摘要
+        2. 跨 trace 整合:汇总未消化的反思 + 当前记忆,让 LLM 更新记忆文件
+
+    需要 RunConfig.memory(MemoryConfig)才可调用。
+
+    Args:
+        reflect_model: per-trace 反思用的模型(空则默认 gpt-4o-mini)
+        dream_model:   跨 trace 整合用的模型(空则默认 gpt-4o)
+    """
+    runner = context.get("runner") if context else None
+    if runner is None:
+        return ToolResult(
+            title="❌ dream 不可用",
+            output="缺少 runner(需要从 AgentRunner 上下文调用)",
+            error="runner not in context",
+        )
+
+    memory_config = getattr(runner, "_current_memory_config", None)
+    if memory_config is None:
+        return ToolResult(
+            title="❌ dream 不可用",
+            output="当前 Agent 未配置 MemoryConfig,不是 memory-bearing Agent",
+            error="memory not configured",
+        )
+
+    if not runner.trace_store or not runner.llm_call:
+        return ToolResult(
+            title="❌ dream 不可用",
+            output="runner 缺少 trace_store 或 llm_call",
+            error="runner dependencies missing",
+        )
+
+    from agent.core.dream import run_dream
+    report = await run_dream(
+        store=runner.trace_store,
+        llm_call=runner.llm_call,
+        memory_config=memory_config,
+        reflect_model=reflect_model or "gpt-4o-mini",
+        dream_model=dream_model or "gpt-4o",
+    )
+
+    lines = []
+    lines.append(f"per-trace 反思: {len(report.per_trace_summaries)} 条")
+    if report.skipped_traces:
+        lines.append(f"跳过: {len(report.skipped_traces)} 条 trace(日志详见 logger)")
+    lines.append(f"消化 reflection: {report.consumed_reflection_count} 条")
+    lines.append(f"更新记忆文件: {len(report.updated_files)} 个")
+    for p in report.updated_files:
+        lines.append(f"  - {p}")
+    if report.reasoning:
+        lines.append(f"\n整合理由: {report.reasoning}")
+
+    output = "\n".join(lines)
+    return ToolResult(
+        title="🧠 dream 完成",
+        output=output,
+        long_term_memory=f"dream: reflected={len(report.per_trace_summaries)}, "
+                         f"consumed={report.consumed_reflection_count}, "
+                         f"files_updated={len(report.updated_files)}",
+        metadata={
+            "per_trace_count": len(report.per_trace_summaries),
+            "consumed": report.consumed_reflection_count,
+            "updated_files": report.updated_files,
+        },
+    )

+ 234 - 0
agent/trace/extraction_review.py

@@ -0,0 +1,234 @@
+"""
+提取审核工具库(Phase 1.2+)
+
+共享核心逻辑给三个入口复用:
+- agent/cli/extraction_review.py  —— 独立 CLI 入口
+- agent/cli/interactive.py        —— 交互式会话菜单
+- agent/trace/run_api.py          —— HTTP API 端点
+
+职责划分:
+- 本模块:从 cognition_log 读 pending、生成 review 事件、批量调 knowledge_save 并写 committed 事件
+- 上游(runner):反思侧分支退出时,若 reflect_auto_commit=True 则调 auto_commit_branch
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Literal, Optional
+
+from agent.trace.store import FileSystemTraceStore
+
+ReviewDecision = Literal["approve", "edit", "discard"]
+
+
+@dataclass
+class PendingExtraction:
+    """一条待审核的提取条目(从 cognition_log 还原)"""
+    extraction_id: str
+    sequence: Optional[int]
+    goal_id: Optional[str]
+    branch_id: Optional[str]
+    payload: Dict[str, Any]
+    reviewed: bool = False
+    decision: Optional[ReviewDecision] = None
+    committed: bool = False
+
+
+@dataclass
+class CommitReport:
+    """批量 commit 的结果"""
+    committed: List[str] = field(default_factory=list)     # 成功的 extraction_id 列表
+    knowledge_ids: List[str] = field(default_factory=list) # KnowHub 返回的新增 ID
+    failed: List[Dict[str, str]] = field(default_factory=list)  # [{extraction_id, error}]
+    skipped: List[str] = field(default_factory=list)       # 不是 approved 或已 committed
+
+
+async def list_pending(
+    store: FileSystemTraceStore,
+    trace_id: str,
+    branch_id: Optional[str] = None,
+    include_reviewed: bool = False,
+) -> List[PendingExtraction]:
+    """列出 trace 下的 pending 提取条目。
+
+    Args:
+        branch_id: 若指定,仅返回此反思分支产出的 pending(用于 auto_commit_branch)
+        include_reviewed: 是否包含已 reviewed 的条目(默认只返回未 reviewed)
+
+    Returns:
+        按 pending 出现顺序排列的条目列表
+    """
+    log = await store.get_cognition_log(trace_id)
+    events = log.get("events", log.get("entries", []))
+
+    reviewed_index: Dict[str, ReviewDecision] = {}
+    committed_ids: set[str] = set()
+    for e in events:
+        if e.get("type") == "extraction_reviewed":
+            eid = e.get("extraction_id")
+            if eid:
+                reviewed_index[eid] = e.get("decision")
+        elif e.get("type") == "extraction_committed":
+            eid = e.get("extraction_id")
+            if eid:
+                committed_ids.add(eid)
+
+    pendings: List[PendingExtraction] = []
+    for e in events:
+        if e.get("type") != "extraction_pending":
+            continue
+        eid = e.get("extraction_id")
+        if not eid:
+            continue
+        if branch_id is not None and e.get("branch_id") != branch_id:
+            continue
+        reviewed = eid in reviewed_index
+        if reviewed and not include_reviewed:
+            continue
+        pendings.append(
+            PendingExtraction(
+                extraction_id=eid,
+                sequence=e.get("sequence"),
+                goal_id=e.get("goal_id"),
+                branch_id=e.get("branch_id"),
+                payload=e.get("payload", {}),
+                reviewed=reviewed,
+                decision=reviewed_index.get(eid),
+                committed=eid in committed_ids,
+            )
+        )
+    return pendings
+
+
+async def review_one(
+    store: FileSystemTraceStore,
+    trace_id: str,
+    extraction_id: str,
+    decision: ReviewDecision,
+    edited_payload: Optional[Dict[str, Any]] = None,
+) -> None:
+    """对某条 pending 生成 review 事件。
+
+    - approve: 保留原 payload,标记为可 commit
+    - edit:    用 edited_payload 覆盖原 payload(仅本事件内),标记为可 commit
+    - discard: 丢弃,不会被 commit
+    """
+    event: Dict[str, Any] = {
+        "type": "extraction_reviewed",
+        "extraction_id": extraction_id,
+        "decision": decision,
+    }
+    if decision == "edit" and edited_payload is not None:
+        event["edited_payload"] = edited_payload
+    await store.append_cognition_event(trace_id=trace_id, event=event)
+
+
+def _resolve_effective_payload(
+    pending: PendingExtraction,
+    review_events: List[Dict[str, Any]],
+) -> Dict[str, Any]:
+    """合并原 payload 与最后一次 edit 的 payload。"""
+    for e in reversed(review_events):
+        if (
+            e.get("extraction_id") == pending.extraction_id
+            and e.get("decision") == "edit"
+            and isinstance(e.get("edited_payload"), dict)
+        ):
+            return e["edited_payload"]
+    return pending.payload
+
+
+async def commit_approved(
+    store: FileSystemTraceStore,
+    trace_id: str,
+    branch_id: Optional[str] = None,
+) -> CommitReport:
+    """把已 approved/edited 但未 committed 的条目批量调 knowledge_save 上传。
+
+    Args:
+        branch_id: 若指定,只处理此分支的条目(auto_commit_branch 用)
+    """
+    from agent.tools.builtin.knowledge import knowledge_save
+
+    log = await store.get_cognition_log(trace_id)
+    events = log.get("events", log.get("entries", []))
+    review_events = [e for e in events if e.get("type") == "extraction_reviewed"]
+
+    all_pendings = await list_pending(
+        store, trace_id, branch_id=branch_id, include_reviewed=True
+    )
+
+    report = CommitReport()
+
+    for p in all_pendings:
+        if p.committed:
+            report.skipped.append(p.extraction_id)
+            continue
+        if p.decision not in ("approve", "edit"):
+            report.skipped.append(p.extraction_id)
+            continue
+
+        payload = _resolve_effective_payload(p, review_events)
+
+        try:
+            result = await knowledge_save(
+                task=payload.get("task", ""),
+                content=payload.get("content", ""),
+                types=payload.get("types", []),
+                tags=payload.get("tags"),
+                scopes=payload.get("scopes"),
+                owner=payload.get("owner"),
+                resource_ids=payload.get("resource_ids"),
+                source_name=payload.get("source_name", ""),
+                source_category=payload.get("source_category", "exp"),
+                urls=payload.get("urls"),
+                agent_id=payload.get("agent_id", "research_agent"),
+                submitted_by=payload.get("submitted_by", ""),
+                score=payload.get("score", 3),
+                capability_ids=payload.get("capability_ids"),
+                tool_ids=payload.get("tool_ids"),
+            )
+            knowledge_id = (result.metadata or {}).get("knowledge_id", "unknown")
+            if result.error:
+                raise RuntimeError(result.error)
+
+            await store.append_cognition_event(
+                trace_id=trace_id,
+                event={
+                    "type": "extraction_committed",
+                    "extraction_id": p.extraction_id,
+                    "knowledge_id": knowledge_id,
+                },
+            )
+            report.committed.append(p.extraction_id)
+            report.knowledge_ids.append(knowledge_id)
+
+        except Exception as e:
+            report.failed.append({
+                "extraction_id": p.extraction_id,
+                "error": str(e),
+            })
+
+    return report
+
+
+async def auto_commit_branch(
+    store: FileSystemTraceStore,
+    trace_id: str,
+    branch_id: str,
+) -> CommitReport:
+    """反思侧分支退出时的自动提交(reflect_auto_commit=True 路径)。
+
+    视同全部 approved:对此分支所有未 reviewed 的 pending 先 auto-approve,
+    然后调用 commit_approved。
+    """
+    pendings = await list_pending(store, trace_id, branch_id=branch_id)
+    for p in pendings:
+        if not p.reviewed:
+            await review_one(
+                store=store,
+                trace_id=trace_id,
+                extraction_id=p.extraction_id,
+                decision="approve",
+            )
+    return await commit_approved(store, trace_id, branch_id=branch_id)

+ 6 - 0
agent/trace/models.py

@@ -79,6 +79,11 @@ class Trace:
     # 当前焦点 goal
     current_goal_id: Optional[str] = None
 
+    # Memory 系统 - 记忆反思的进度追踪(见 agent/docs/memory-plan.md 第四节)
+    # dream 操作扫描 reflected_at_sequence < latest_sequence 的 trace 做反思;
+    # None 表示该 trace 从未被记忆反思处理过。
+    reflected_at_sequence: Optional[int] = None
+
     # 结果
     result_summary: Optional[str] = None     # 执行结果摘要
     error_message: Optional[str] = None      # 错误信息
@@ -145,6 +150,7 @@ class Trace:
             "llm_params": self.llm_params,
             "context": self.context,
             "current_goal_id": self.current_goal_id,
+            "reflected_at_sequence": self.reflected_at_sequence,
             "result_summary": self.result_summary,
             "error_message": self.error_message,
             "created_at": self.created_at.isoformat() if self.created_at else None,

+ 127 - 0
agent/trace/run_api.py

@@ -114,6 +114,49 @@ class CompactResponse(BaseModel):
     message: str = ""
 
 
+# ===== 提取审核(见 agent/docs/memory-plan.md 第三节) =====
+
+class PendingExtractionModel(BaseModel):
+    extraction_id: str
+    sequence: Optional[int] = None
+    goal_id: Optional[str] = None
+    branch_id: Optional[str] = None
+    payload: Dict[str, Any]
+    reviewed: bool = False
+    decision: Optional[str] = None
+    committed: bool = False
+
+
+class ListExtractionsResponse(BaseModel):
+    trace_id: str
+    count: int
+    items: List[PendingExtractionModel]
+
+
+class ReviewRequest(BaseModel):
+    decision: str = Field(..., description="approve / edit / discard")
+    edited_payload: Optional[Dict[str, Any]] = Field(
+        None, description="decision=edit 时必填;只对本次 review 生效"
+    )
+
+
+class ReviewResponse(BaseModel):
+    trace_id: str
+    extraction_id: str
+    decision: str
+
+
+class CommitResponse(BaseModel):
+    trace_id: str
+    committed_count: int
+    failed_count: int
+    skipped_count: int
+    committed: List[str]
+    knowledge_ids: List[str]
+    failed: List[Dict[str, str]]
+    skipped: List[str]
+
+
 # ===== 后台执行 =====
 
 _running_tasks: Dict[str, asyncio.Task] = {}
@@ -507,6 +550,90 @@ async def reflect_trace(trace_id: str, req: ReflectRequest):
     )
 
 
+@router.get("/{trace_id}/extractions", response_model=ListExtractionsResponse)
+async def list_extractions(trace_id: str, include_reviewed: bool = False):
+    """列出 trace 的待审核提取条目。"""
+    runner = _get_runner()
+    if not runner.trace_store:
+        raise HTTPException(status_code=503, detail="TraceStore not configured")
+
+    from agent.trace.extraction_review import list_pending
+    pendings = await list_pending(
+        runner.trace_store, trace_id, include_reviewed=include_reviewed
+    )
+    return ListExtractionsResponse(
+        trace_id=trace_id,
+        count=len(pendings),
+        items=[
+            PendingExtractionModel(
+                extraction_id=p.extraction_id,
+                sequence=p.sequence,
+                goal_id=p.goal_id,
+                branch_id=p.branch_id,
+                payload=p.payload,
+                reviewed=p.reviewed,
+                decision=p.decision,
+                committed=p.committed,
+            )
+            for p in pendings
+        ],
+    )
+
+
+@router.post(
+    "/{trace_id}/extractions/{extraction_id}/review",
+    response_model=ReviewResponse,
+)
+async def review_extraction(trace_id: str, extraction_id: str, req: ReviewRequest):
+    """对单条 pending 提交 review 决策(approve/edit/discard)。"""
+    runner = _get_runner()
+    if not runner.trace_store:
+        raise HTTPException(status_code=503, detail="TraceStore not configured")
+
+    if req.decision not in ("approve", "edit", "discard"):
+        raise HTTPException(
+            status_code=400,
+            detail=f"decision must be approve/edit/discard, got {req.decision}",
+        )
+    if req.decision == "edit" and not req.edited_payload:
+        raise HTTPException(
+            status_code=400, detail="decision=edit 必须提供 edited_payload"
+        )
+
+    from agent.trace.extraction_review import review_one
+    await review_one(
+        runner.trace_store,
+        trace_id,
+        extraction_id,
+        req.decision,  # type: ignore[arg-type]
+        edited_payload=req.edited_payload,
+    )
+    return ReviewResponse(
+        trace_id=trace_id, extraction_id=extraction_id, decision=req.decision
+    )
+
+
+@router.post("/{trace_id}/extractions/commit", response_model=CommitResponse)
+async def commit_extractions(trace_id: str):
+    """批量把已 approved/edited 的条目上传到 KnowHub。"""
+    runner = _get_runner()
+    if not runner.trace_store:
+        raise HTTPException(status_code=503, detail="TraceStore not configured")
+
+    from agent.trace.extraction_review import commit_approved
+    report = await commit_approved(runner.trace_store, trace_id)
+    return CommitResponse(
+        trace_id=trace_id,
+        committed_count=len(report.committed),
+        failed_count=len(report.failed),
+        skipped_count=len(report.skipped),
+        committed=report.committed,
+        knowledge_ids=report.knowledge_ids,
+        failed=report.failed,
+        skipped=report.skipped,
+    )
+
+
 @router.post("/{trace_id}/compact", response_model=CompactResponse)
 async def compact_trace(trace_id: str):
     """

+ 31 - 1
agent/trace/store.py

@@ -805,7 +805,37 @@ class FileSystemTraceStore:
         trace_id: str,
         event: Dict[str, Any],
     ) -> None:
-        """追加认知事件(query/evaluation/extraction/reflection)"""
+        """追加认知事件到 cognition_log.json。
+
+        所有事件共有字段:
+            type: str         事件类型(见下表)
+            timestamp: str    ISO 格式时间戳(框架自动写入)
+
+        已定义的事件类型及典型字段:
+
+            type="query" — 知识注入查询(goal focus 时触发)
+                sequence, goal_id, query, response, source_ids, sources
+
+            type="evaluation" — 知识评估(Goal 完成/压缩前/任务结束触发)
+                knowledge_id, eval_result{relevance, utility, notes}, trigger_event
+
+            type="extraction_pending" — 反思侧分支暂存的待审核提取(Phase 1.2+)
+                extraction_id, sequence, goal_id, branch_id, payload
+                (payload 字段与 knowledge_save 参数一一对应)
+
+            type="extraction_reviewed" — 人工审核决策(CLI / HTTP API 写入)
+                extraction_id, decision("approve"/"edit"/"discard"), edited_payload?
+
+            type="extraction_committed" — 已上传到 KnowHub
+                extraction_id, knowledge_id
+
+            type="reflection" — Dream 的 per-trace 反思摘要(Phase 2.4 / 3.1)
+                sequence_range: [start, end]    本次反思覆盖的消息区间
+                summary: str                    LLM 生成的反思摘要
+                consumed_at: 可选, ISO 时间戳   当跨 trace 整合已消化此反思时写入
+
+        其他字段可按需附加,不做强校验(演进友好)。
+        """
         log = await self.get_cognition_log(trace_id)
         if "events" not in log:
             log["events"] = log.pop("entries", [])