| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- """
- 提取审核工具库(Phase 1.2+)
- 共享核心逻辑给三个入口复用:
- - agent/cli/extraction_review.py —— 独立 CLI 入口
- - agent/cli/interactive.py —— 交互式会话菜单
- - agent/trace/run_api.py —— HTTP API 端点
- 职责划分:
- - 本模块:从 cognition_log 读 pending、生成 review 事件、批量调 knowledge_save 并写 committed 事件
- - 上游(runner):反思侧分支退出时,若 reflect_auto_commit=True 则调 auto_commit_branch
- """
- from __future__ import annotations
- from dataclasses import dataclass, field
- from typing import Any, Dict, List, Literal, Optional
- from agent.trace.store import FileSystemTraceStore
- ReviewDecision = Literal["approve", "edit", "discard"]
- @dataclass
- class PendingExtraction:
- """一条待审核的提取条目(从 cognition_log 还原)"""
- extraction_id: str
- sequence: Optional[int]
- goal_id: Optional[str]
- branch_id: Optional[str]
- payload: Dict[str, Any]
- reviewed: bool = False
- decision: Optional[ReviewDecision] = None
- committed: bool = False
- @dataclass
- class CommitReport:
- """批量 commit 的结果"""
- committed: List[str] = field(default_factory=list) # 成功的 extraction_id 列表
- knowledge_ids: List[str] = field(default_factory=list) # KnowHub 返回的新增 ID
- failed: List[Dict[str, str]] = field(default_factory=list) # [{extraction_id, error}]
- skipped: List[str] = field(default_factory=list) # 不是 approved 或已 committed
- async def list_pending(
- store: FileSystemTraceStore,
- trace_id: str,
- branch_id: Optional[str] = None,
- include_reviewed: bool = False,
- ) -> List[PendingExtraction]:
- """列出 trace 下的 pending 提取条目。
- Args:
- branch_id: 若指定,仅返回此反思分支产出的 pending(用于 auto_commit_branch)
- include_reviewed: 是否包含已 reviewed 的条目(默认只返回未 reviewed)
- Returns:
- 按 pending 出现顺序排列的条目列表
- """
- log = await store.get_cognition_log(trace_id)
- events = log.get("events", log.get("entries", []))
- reviewed_index: Dict[str, ReviewDecision] = {}
- committed_ids: set[str] = set()
- for e in events:
- if e.get("type") == "extraction_reviewed":
- eid = e.get("extraction_id")
- if eid:
- reviewed_index[eid] = e.get("decision")
- elif e.get("type") == "extraction_committed":
- eid = e.get("extraction_id")
- if eid:
- committed_ids.add(eid)
- pendings: List[PendingExtraction] = []
- for e in events:
- if e.get("type") != "extraction_pending":
- continue
- eid = e.get("extraction_id")
- if not eid:
- continue
- if branch_id is not None and e.get("branch_id") != branch_id:
- continue
- reviewed = eid in reviewed_index
- if reviewed and not include_reviewed:
- continue
- pendings.append(
- PendingExtraction(
- extraction_id=eid,
- sequence=e.get("sequence"),
- goal_id=e.get("goal_id"),
- branch_id=e.get("branch_id"),
- payload=e.get("payload", {}),
- reviewed=reviewed,
- decision=reviewed_index.get(eid),
- committed=eid in committed_ids,
- )
- )
- return pendings
- async def review_one(
- store: FileSystemTraceStore,
- trace_id: str,
- extraction_id: str,
- decision: ReviewDecision,
- edited_payload: Optional[Dict[str, Any]] = None,
- ) -> None:
- """对某条 pending 生成 review 事件。
- - approve: 保留原 payload,标记为可 commit
- - edit: 用 edited_payload 覆盖原 payload(仅本事件内),标记为可 commit
- - discard: 丢弃,不会被 commit
- """
- event: Dict[str, Any] = {
- "type": "extraction_reviewed",
- "extraction_id": extraction_id,
- "decision": decision,
- }
- if decision == "edit" and edited_payload is not None:
- event["edited_payload"] = edited_payload
- await store.append_cognition_event(trace_id=trace_id, event=event)
- def _resolve_effective_payload(
- pending: PendingExtraction,
- review_events: List[Dict[str, Any]],
- ) -> Dict[str, Any]:
- """合并原 payload 与最后一次 edit 的 payload。"""
- for e in reversed(review_events):
- if (
- e.get("extraction_id") == pending.extraction_id
- and e.get("decision") == "edit"
- and isinstance(e.get("edited_payload"), dict)
- ):
- return e["edited_payload"]
- return pending.payload
- async def commit_approved(
- store: FileSystemTraceStore,
- trace_id: str,
- branch_id: Optional[str] = None,
- ) -> CommitReport:
- """把已 approved/edited 但未 committed 的条目批量调 knowledge_save 上传。
- Args:
- branch_id: 若指定,只处理此分支的条目(auto_commit_branch 用)
- """
- from agent.tools.builtin.knowledge import knowledge_save
- log = await store.get_cognition_log(trace_id)
- events = log.get("events", log.get("entries", []))
- review_events = [e for e in events if e.get("type") == "extraction_reviewed"]
- all_pendings = await list_pending(
- store, trace_id, branch_id=branch_id, include_reviewed=True
- )
- report = CommitReport()
- for p in all_pendings:
- if p.committed:
- report.skipped.append(p.extraction_id)
- continue
- if p.decision not in ("approve", "edit"):
- report.skipped.append(p.extraction_id)
- continue
- payload = _resolve_effective_payload(p, review_events)
- try:
- result = await knowledge_save(
- task=payload.get("task", ""),
- content=payload.get("content", ""),
- types=payload.get("types", []),
- tags=payload.get("tags"),
- scopes=payload.get("scopes"),
- owner=payload.get("owner"),
- resource_ids=payload.get("resource_ids"),
- source_name=payload.get("source_name", ""),
- source_category=payload.get("source_category", "exp"),
- urls=payload.get("urls"),
- agent_id=payload.get("agent_id", "research_agent"),
- submitted_by=payload.get("submitted_by", ""),
- score=payload.get("score", 3),
- capability_ids=payload.get("capability_ids"),
- tool_ids=payload.get("tool_ids"),
- )
- knowledge_id = (result.metadata or {}).get("knowledge_id", "unknown")
- if result.error:
- raise RuntimeError(result.error)
- await store.append_cognition_event(
- trace_id=trace_id,
- event={
- "type": "extraction_committed",
- "extraction_id": p.extraction_id,
- "knowledge_id": knowledge_id,
- },
- )
- report.committed.append(p.extraction_id)
- report.knowledge_ids.append(knowledge_id)
- except Exception as e:
- report.failed.append({
- "extraction_id": p.extraction_id,
- "error": str(e),
- })
- return report
- async def auto_commit_branch(
- store: FileSystemTraceStore,
- trace_id: str,
- branch_id: str,
- ) -> CommitReport:
- """反思侧分支退出时的自动提交(reflect_auto_commit=True 路径)。
- 视同全部 approved:对此分支所有未 reviewed 的 pending 先 auto-approve,
- 然后调用 commit_approved。
- """
- pendings = await list_pending(store, trace_id, branch_id=branch_id)
- for p in pendings:
- if not p.reviewed:
- await review_one(
- store=store,
- trace_id=trace_id,
- extraction_id=p.extraction_id,
- decision="approve",
- )
- return await commit_approved(store, trace_id, branch_id=branch_id)
|