""" 提取审核交互式 CLI 用途 ---- 反思侧分支产出的知识条目默认写为 cognition_log: type="extraction_pending", 不会直接上传到 KnowHub。本 CLI 提供人工审核 + 批量提交入口。 两种入口(共享同一核心逻辑,见 agent/trace/extraction_review.py): - 独立脚本:python -m agent.cli.extraction_review --trace [--list|--review|--commit] - interactive.py 菜单项 8/9(见 agent/cli/interactive.py) 用法示例 -------- # 查看当前 trace 的所有未审核条目 python -m agent.cli.extraction_review --trace abc-123 --list # 交互式逐条审核 python -m agent.cli.extraction_review --trace abc-123 --review # 把已 approved 的条目批量提交到 KnowHub python -m agent.cli.extraction_review --trace abc-123 --commit # 一条龙:review 完直接 commit python -m agent.cli.extraction_review --trace abc-123 """ from __future__ import annotations import argparse import asyncio import json import sys from pathlib import Path from typing import List, Optional from agent.trace.store import FileSystemTraceStore from agent.trace.extraction_review import ( PendingExtraction, CommitReport, list_pending, review_one, commit_approved, ) # ===== 打印工具 ===== _SEP = "─" * 60 def _format_payload(payload: dict, max_content: int = 400) -> str: task = payload.get("task", "") content = payload.get("content", "") types = payload.get("types", []) tags = payload.get("tags", {}) score = payload.get("score", 0) resource_ids = payload.get("resource_ids", []) if len(content) > max_content: content = content[:max_content] + "…(truncated)" lines = [ f"task: {task}", f"types: {types} score: {score}", ] if tags: lines.append(f"tags: {tags}") if resource_ids: lines.append(f"resources: {resource_ids}") lines.append("") lines.append(content) return "\n".join(lines) def _print_pending(p: PendingExtraction, index: int, total: int) -> None: state = "" if p.committed: state = " [已提交]" elif p.reviewed: state = f" [已审核: {p.decision}]" print() print(f"[{index}/{total}] {p.extraction_id}{state}") print(_SEP) print(_format_payload(p.payload)) print(_SEP) def _print_report(report: CommitReport) -> None: print() print("=" * 60) print("提交结果") print("=" * 60) print(f"✅ 成功: {len(report.committed)}") for eid, kid in zip(report.committed, report.knowledge_ids): print(f" - {eid} → knowledge_id={kid}") if report.failed: print(f"❌ 失败: {len(report.failed)}") for item in report.failed: print(f" - {item['extraction_id']}: {item['error']}") if report.skipped: print(f"⏭ 跳过: {len(report.skipped)}(未 approved 或已提交)") print("=" * 60) # ===== 交互式编辑 ===== def _prompt_edit(payload: dict) -> Optional[dict]: """进入交互式文本编辑模式,返回修改后的 payload(None 表示取消)。 初版只支持改 task/content/score/tags(最常用字段)。 """ print("\n编辑模式(空行回车保留原值)") task = input(f"task [{payload.get('task', '')[:50]}]: ").strip() content_default = payload.get("content", "") print(f"content 当前:\n{content_default}\n") print("输入新 content(单行回车保留原值;多行请在末尾输入 `.` 单独成行结束):") content = _read_multiline_or_keep(content_default) score_raw = input(f"score [{payload.get('score', 3)}]: ").strip() tags_raw = input(f"tags JSON [{json.dumps(payload.get('tags', {}), ensure_ascii=False)}]: ").strip() new_payload = dict(payload) if task: new_payload["task"] = task if content is not None: new_payload["content"] = content if score_raw: try: new_payload["score"] = int(score_raw) except ValueError: print(f"⚠ score 不是整数,保留原值 {payload.get('score', 3)}") if tags_raw: try: new_payload["tags"] = json.loads(tags_raw) except json.JSONDecodeError as e: print(f"⚠ tags 不是合法 JSON({e}),保留原值") confirm = input("\n保存修改?[y/N]: ").strip().lower() if confirm != "y": return None return new_payload def _read_multiline_or_keep(default: str) -> Optional[str]: """单行输入则直接返回(空行表示保留默认); 如果输入 `<<` 则进入多行模式,直到 `.` 单独成行结束。""" first = input("> ") if not first.strip(): return None if first.strip() != "<<": return first lines = [] while True: line = input() if line.strip() == ".": break lines.append(line) return "\n".join(lines) # ===== 三种命令 ===== async def cmd_list(store: FileSystemTraceStore, trace_id: str, show_all: bool) -> int: pendings = await list_pending(store, trace_id, include_reviewed=show_all) if not pendings: msg = "没有" + ("任何提取记录" if show_all else "待审核的提取条目") print(f"trace {trace_id}: {msg}") return 0 print(f"trace {trace_id}: 共 {len(pendings)} 条{'' if show_all else '待审核'}") for i, p in enumerate(pendings, 1): _print_pending(p, i, len(pendings)) return 0 async def cmd_review(store: FileSystemTraceStore, trace_id: str) -> int: pendings = await list_pending(store, trace_id, include_reviewed=False) if not pendings: print(f"trace {trace_id}: 没有待审核的提取条目") return 0 print(f"trace {trace_id}: 开始审核 {len(pendings)} 条") for i, p in enumerate(pendings, 1): _print_pending(p, i, len(pendings)) while True: choice = input("[a]pprove / [e]dit / [d]iscard / [s]kip / [q]uit: ").strip().lower() if choice in ("a", "approve"): await review_one(store, trace_id, p.extraction_id, "approve") print(f"✓ {p.extraction_id} approved") break elif choice in ("d", "discard"): await review_one(store, trace_id, p.extraction_id, "discard") print(f"✗ {p.extraction_id} discarded") break elif choice in ("s", "skip"): print(f"⏭ {p.extraction_id} skipped(保留为 pending)") break elif choice in ("q", "quit"): print("退出审核") return 0 elif choice in ("e", "edit"): edited = _prompt_edit(p.payload) if edited is None: print("取消编辑,请重选") continue await review_one(store, trace_id, p.extraction_id, "edit", edited_payload=edited) print(f"✎ {p.extraction_id} edited & approved") break else: print("无效选项,请输入 a/e/d/s/q") return 0 async def cmd_commit(store: FileSystemTraceStore, trace_id: str) -> int: report = await commit_approved(store, trace_id) _print_report(report) return 0 if not report.failed else 1 # ===== argparse 入口 ===== def build_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser( prog="python -m agent.cli.extraction_review", description="审核并提交反思侧分支暂存的待审核知识条目。", ) p.add_argument("--trace", required=True, help="Trace ID") p.add_argument("--base-path", default=".trace", help="TraceStore 根目录(默认 .trace)") group = p.add_mutually_exclusive_group() group.add_argument("--list", action="store_true", help="仅列出未审核条目") group.add_argument("--list-all", action="store_true", help="列出全部条目(含已审核/已提交)") group.add_argument("--review", action="store_true", help="进入交互式审核(不自动 commit)") group.add_argument("--commit", action="store_true", help="仅批量提交已 approved 的条目") return p async def _main_async(args: argparse.Namespace) -> int: if not Path(args.base_path).exists(): print(f"❌ TraceStore 根目录不存在: {args.base_path}", file=sys.stderr) return 2 store = FileSystemTraceStore(base_path=args.base_path) if args.list or args.list_all: return await cmd_list(store, args.trace, show_all=args.list_all) if args.review: return await cmd_review(store, args.trace) if args.commit: return await cmd_commit(store, args.trace) # 默认:review 完紧接着 commit rc = await cmd_review(store, args.trace) if rc != 0: return rc print() confirm = input("现在把已 approved 的条目提交到 KnowHub?[Y/n]: ").strip().lower() if confirm in ("", "y", "yes"): return await cmd_commit(store, args.trace) print("未提交。需要时运行 `--commit` 子命令。") return 0 def main() -> int: args = build_parser().parse_args() return asyncio.run(_main_async(args)) if __name__ == "__main__": sys.exit(main())