extraction_review.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. """
  2. 提取审核工具库(Phase 1.2+)
  3. 共享核心逻辑给三个入口复用:
  4. - agent/cli/extraction_review.py —— 独立 CLI 入口
  5. - agent/cli/interactive.py —— 交互式会话菜单
  6. - agent/trace/run_api.py —— HTTP API 端点
  7. 职责划分:
  8. - 本模块:从 cognition_log 读 pending、生成 review 事件、批量调 knowledge_save 并写 committed 事件
  9. - 上游(runner):反思侧分支退出时,若 reflect_auto_commit=True 则调 auto_commit_branch
  10. """
  11. from __future__ import annotations
  12. from dataclasses import dataclass, field
  13. from typing import Any, Dict, List, Literal, Optional
  14. from agent.trace.store import FileSystemTraceStore
  15. ReviewDecision = Literal["approve", "edit", "discard"]
  16. @dataclass
  17. class PendingExtraction:
  18. """一条待审核的提取条目(从 cognition_log 还原)"""
  19. extraction_id: str
  20. sequence: Optional[int]
  21. goal_id: Optional[str]
  22. branch_id: Optional[str]
  23. payload: Dict[str, Any]
  24. reviewed: bool = False
  25. decision: Optional[ReviewDecision] = None
  26. committed: bool = False
  27. @dataclass
  28. class CommitReport:
  29. """批量 commit 的结果"""
  30. committed: List[str] = field(default_factory=list) # 成功的 extraction_id 列表
  31. knowledge_ids: List[str] = field(default_factory=list) # KnowHub 返回的新增 ID
  32. failed: List[Dict[str, str]] = field(default_factory=list) # [{extraction_id, error}]
  33. skipped: List[str] = field(default_factory=list) # 不是 approved 或已 committed
  34. async def list_pending(
  35. store: FileSystemTraceStore,
  36. trace_id: str,
  37. branch_id: Optional[str] = None,
  38. include_reviewed: bool = False,
  39. ) -> List[PendingExtraction]:
  40. """列出 trace 下的 pending 提取条目。
  41. Args:
  42. branch_id: 若指定,仅返回此反思分支产出的 pending(用于 auto_commit_branch)
  43. include_reviewed: 是否包含已 reviewed 的条目(默认只返回未 reviewed)
  44. Returns:
  45. 按 pending 出现顺序排列的条目列表
  46. """
  47. log = await store.get_cognition_log(trace_id)
  48. events = log.get("events", log.get("entries", []))
  49. reviewed_index: Dict[str, ReviewDecision] = {}
  50. committed_ids: set[str] = set()
  51. for e in events:
  52. if e.get("type") == "extraction_reviewed":
  53. eid = e.get("extraction_id")
  54. if eid:
  55. reviewed_index[eid] = e.get("decision")
  56. elif e.get("type") == "extraction_committed":
  57. eid = e.get("extraction_id")
  58. if eid:
  59. committed_ids.add(eid)
  60. pendings: List[PendingExtraction] = []
  61. for e in events:
  62. if e.get("type") != "extraction_pending":
  63. continue
  64. eid = e.get("extraction_id")
  65. if not eid:
  66. continue
  67. if branch_id is not None and e.get("branch_id") != branch_id:
  68. continue
  69. reviewed = eid in reviewed_index
  70. if reviewed and not include_reviewed:
  71. continue
  72. pendings.append(
  73. PendingExtraction(
  74. extraction_id=eid,
  75. sequence=e.get("sequence"),
  76. goal_id=e.get("goal_id"),
  77. branch_id=e.get("branch_id"),
  78. payload=e.get("payload", {}),
  79. reviewed=reviewed,
  80. decision=reviewed_index.get(eid),
  81. committed=eid in committed_ids,
  82. )
  83. )
  84. return pendings
  85. async def review_one(
  86. store: FileSystemTraceStore,
  87. trace_id: str,
  88. extraction_id: str,
  89. decision: ReviewDecision,
  90. edited_payload: Optional[Dict[str, Any]] = None,
  91. ) -> None:
  92. """对某条 pending 生成 review 事件。
  93. - approve: 保留原 payload,标记为可 commit
  94. - edit: 用 edited_payload 覆盖原 payload(仅本事件内),标记为可 commit
  95. - discard: 丢弃,不会被 commit
  96. """
  97. event: Dict[str, Any] = {
  98. "type": "extraction_reviewed",
  99. "extraction_id": extraction_id,
  100. "decision": decision,
  101. }
  102. if decision == "edit" and edited_payload is not None:
  103. event["edited_payload"] = edited_payload
  104. await store.append_cognition_event(trace_id=trace_id, event=event)
  105. def _resolve_effective_payload(
  106. pending: PendingExtraction,
  107. review_events: List[Dict[str, Any]],
  108. ) -> Dict[str, Any]:
  109. """合并原 payload 与最后一次 edit 的 payload。"""
  110. for e in reversed(review_events):
  111. if (
  112. e.get("extraction_id") == pending.extraction_id
  113. and e.get("decision") == "edit"
  114. and isinstance(e.get("edited_payload"), dict)
  115. ):
  116. return e["edited_payload"]
  117. return pending.payload
  118. async def commit_approved(
  119. store: FileSystemTraceStore,
  120. trace_id: str,
  121. branch_id: Optional[str] = None,
  122. ) -> CommitReport:
  123. """把已 approved/edited 但未 committed 的条目批量调 knowledge_save 上传。
  124. Args:
  125. branch_id: 若指定,只处理此分支的条目(auto_commit_branch 用)
  126. """
  127. from agent.tools.builtin.knowledge import knowledge_save
  128. log = await store.get_cognition_log(trace_id)
  129. events = log.get("events", log.get("entries", []))
  130. review_events = [e for e in events if e.get("type") == "extraction_reviewed"]
  131. all_pendings = await list_pending(
  132. store, trace_id, branch_id=branch_id, include_reviewed=True
  133. )
  134. report = CommitReport()
  135. for p in all_pendings:
  136. if p.committed:
  137. report.skipped.append(p.extraction_id)
  138. continue
  139. if p.decision not in ("approve", "edit"):
  140. report.skipped.append(p.extraction_id)
  141. continue
  142. payload = _resolve_effective_payload(p, review_events)
  143. try:
  144. result = await knowledge_save(
  145. task=payload.get("task", ""),
  146. content=payload.get("content", ""),
  147. types=payload.get("types", []),
  148. tags=payload.get("tags"),
  149. scopes=payload.get("scopes"),
  150. owner=payload.get("owner"),
  151. resource_ids=payload.get("resource_ids"),
  152. source_name=payload.get("source_name", ""),
  153. source_category=payload.get("source_category", "exp"),
  154. urls=payload.get("urls"),
  155. agent_id=payload.get("agent_id", "research_agent"),
  156. submitted_by=payload.get("submitted_by", ""),
  157. score=payload.get("score", 3),
  158. capability_ids=payload.get("capability_ids"),
  159. tool_ids=payload.get("tool_ids"),
  160. )
  161. knowledge_id = (result.metadata or {}).get("knowledge_id", "unknown")
  162. if result.error:
  163. raise RuntimeError(result.error)
  164. await store.append_cognition_event(
  165. trace_id=trace_id,
  166. event={
  167. "type": "extraction_committed",
  168. "extraction_id": p.extraction_id,
  169. "knowledge_id": knowledge_id,
  170. },
  171. )
  172. report.committed.append(p.extraction_id)
  173. report.knowledge_ids.append(knowledge_id)
  174. except Exception as e:
  175. report.failed.append({
  176. "extraction_id": p.extraction_id,
  177. "error": str(e),
  178. })
  179. return report
  180. async def auto_commit_branch(
  181. store: FileSystemTraceStore,
  182. trace_id: str,
  183. branch_id: str,
  184. ) -> CommitReport:
  185. """反思侧分支退出时的自动提交(reflect_auto_commit=True 路径)。
  186. 视同全部 approved:对此分支所有未 reviewed 的 pending 先 auto-approve,
  187. 然后调用 commit_approved。
  188. """
  189. pendings = await list_pending(store, trace_id, branch_id=branch_id)
  190. for p in pendings:
  191. if not p.reviewed:
  192. await review_one(
  193. store=store,
  194. trace_id=trace_id,
  195. extraction_id=p.extraction_id,
  196. decision="approve",
  197. )
  198. return await commit_approved(store, trace_id, branch_id=branch_id)