""" 提取审核工具库(Phase 1.2+) 共享核心逻辑给三个入口复用: - agent/cli/extraction_review.py —— 独立 CLI 入口 - agent/cli/interactive.py —— 交互式会话菜单 - agent/trace/run_api.py —— HTTP API 端点 职责划分: - 本模块:从 cognition_log 读 pending、生成 review 事件、批量调 knowledge_save 并写 committed 事件 - 上游(runner):反思侧分支退出时,若 reflect_auto_commit=True 则调 auto_commit_branch """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Literal, Optional from agent.trace.store import FileSystemTraceStore ReviewDecision = Literal["approve", "edit", "discard"] @dataclass class PendingExtraction: """一条待审核的提取条目(从 cognition_log 还原)""" extraction_id: str sequence: Optional[int] goal_id: Optional[str] branch_id: Optional[str] payload: Dict[str, Any] reviewed: bool = False decision: Optional[ReviewDecision] = None committed: bool = False @dataclass class CommitReport: """批量 commit 的结果""" committed: List[str] = field(default_factory=list) # 成功的 extraction_id 列表 knowledge_ids: List[str] = field(default_factory=list) # KnowHub 返回的新增 ID failed: List[Dict[str, str]] = field(default_factory=list) # [{extraction_id, error}] skipped: List[str] = field(default_factory=list) # 不是 approved 或已 committed async def list_pending( store: FileSystemTraceStore, trace_id: str, branch_id: Optional[str] = None, include_reviewed: bool = False, ) -> List[PendingExtraction]: """列出 trace 下的 pending 提取条目。 Args: branch_id: 若指定,仅返回此反思分支产出的 pending(用于 auto_commit_branch) include_reviewed: 是否包含已 reviewed 的条目(默认只返回未 reviewed) Returns: 按 pending 出现顺序排列的条目列表 """ log = await store.get_cognition_log(trace_id) events = log.get("events", log.get("entries", [])) reviewed_index: Dict[str, ReviewDecision] = {} committed_ids: set[str] = set() for e in events: if e.get("type") == "extraction_reviewed": eid = e.get("extraction_id") if eid: reviewed_index[eid] = e.get("decision") elif e.get("type") == "extraction_committed": eid = e.get("extraction_id") if eid: committed_ids.add(eid) pendings: List[PendingExtraction] = [] for e in events: if e.get("type") != "extraction_pending": continue eid = e.get("extraction_id") if not eid: continue if branch_id is not None and e.get("branch_id") != branch_id: continue reviewed = eid in reviewed_index if reviewed and not include_reviewed: continue pendings.append( PendingExtraction( extraction_id=eid, sequence=e.get("sequence"), goal_id=e.get("goal_id"), branch_id=e.get("branch_id"), payload=e.get("payload", {}), reviewed=reviewed, decision=reviewed_index.get(eid), committed=eid in committed_ids, ) ) return pendings async def review_one( store: FileSystemTraceStore, trace_id: str, extraction_id: str, decision: ReviewDecision, edited_payload: Optional[Dict[str, Any]] = None, ) -> None: """对某条 pending 生成 review 事件。 - approve: 保留原 payload,标记为可 commit - edit: 用 edited_payload 覆盖原 payload(仅本事件内),标记为可 commit - discard: 丢弃,不会被 commit """ event: Dict[str, Any] = { "type": "extraction_reviewed", "extraction_id": extraction_id, "decision": decision, } if decision == "edit" and edited_payload is not None: event["edited_payload"] = edited_payload await store.append_cognition_event(trace_id=trace_id, event=event) def _resolve_effective_payload( pending: PendingExtraction, review_events: List[Dict[str, Any]], ) -> Dict[str, Any]: """合并原 payload 与最后一次 edit 的 payload。""" for e in reversed(review_events): if ( e.get("extraction_id") == pending.extraction_id and e.get("decision") == "edit" and isinstance(e.get("edited_payload"), dict) ): return e["edited_payload"] return pending.payload async def commit_approved( store: FileSystemTraceStore, trace_id: str, branch_id: Optional[str] = None, ) -> CommitReport: """把已 approved/edited 但未 committed 的条目批量调 knowledge_save 上传。 Args: branch_id: 若指定,只处理此分支的条目(auto_commit_branch 用) """ from agent.tools.builtin.knowledge import knowledge_save log = await store.get_cognition_log(trace_id) events = log.get("events", log.get("entries", [])) review_events = [e for e in events if e.get("type") == "extraction_reviewed"] all_pendings = await list_pending( store, trace_id, branch_id=branch_id, include_reviewed=True ) report = CommitReport() for p in all_pendings: if p.committed: report.skipped.append(p.extraction_id) continue if p.decision not in ("approve", "edit"): report.skipped.append(p.extraction_id) continue payload = _resolve_effective_payload(p, review_events) try: result = await knowledge_save( task=payload.get("task", ""), content=payload.get("content", ""), types=payload.get("types", []), tags=payload.get("tags"), scopes=payload.get("scopes"), owner=payload.get("owner"), resource_ids=payload.get("resource_ids"), source_name=payload.get("source_name", ""), source_category=payload.get("source_category", "exp"), urls=payload.get("urls"), agent_id=payload.get("agent_id", "research_agent"), submitted_by=payload.get("submitted_by", ""), score=payload.get("score", 3), capability_ids=payload.get("capability_ids"), tool_ids=payload.get("tool_ids"), ) knowledge_id = (result.metadata or {}).get("knowledge_id", "unknown") if result.error: raise RuntimeError(result.error) await store.append_cognition_event( trace_id=trace_id, event={ "type": "extraction_committed", "extraction_id": p.extraction_id, "knowledge_id": knowledge_id, }, ) report.committed.append(p.extraction_id) report.knowledge_ids.append(knowledge_id) except Exception as e: report.failed.append({ "extraction_id": p.extraction_id, "error": str(e), }) return report async def auto_commit_branch( store: FileSystemTraceStore, trace_id: str, branch_id: str, ) -> CommitReport: """反思侧分支退出时的自动提交(reflect_auto_commit=True 路径)。 视同全部 approved:对此分支所有未 reviewed 的 pending 先 auto-approve, 然后调用 commit_approved。 """ pendings = await list_pending(store, trace_id, branch_id=branch_id) for p in pendings: if not p.reviewed: await review_one( store=store, trace_id=trace_id, extraction_id=p.extraction_id, decision="approve", ) return await commit_approved(store, trace_id, branch_id=branch_id)