ingest_research_output.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. #!/usr/bin/env python3
  2. """
  3. 入库研究调研结果(case.json + strategy.json)到 knowhub。
  4. 输入目录结构:
  5. <root>/
  6. ├── 00/
  7. │ ├── case.json ({requirement, cases:[{...}]})
  8. │ └── strategy.json ({selected_strategy:{...}, vs_alternatives, uncovered_requirements})
  9. ├── 01/
  10. │ ...
  11. 入库映射:
  12. case.json.requirement → 按完整描述文本精确匹配现有 requirement(跨版本)
  13. → 找不到则报错(不创建新 requirement——避免语义重复)
  14. case.json.cases[] → resource 实体(多条,URL hash 去重)
  15. strategy.json → strategy 实体(1 条 per folder)
  16. strategy.workflow_outline[].capabilities[].is_new=true
  17. → capability 实体(新建,version 隔离)
  18. → capability_id 已存在则直接引用
  19. 关联:
  20. requirement_resource (requirement → all cases in its folder)
  21. requirement_strategy (requirement → strategy)
  22. strategy_resource (strategy → all cases in its folder — provenance)
  23. strategy_capability (strategy → all mentioned capabilities, compose)
  24. 去重策略:
  25. resource id = "resource/research/{platform}/{hash12(source_url)}"
  26. ON CONFLICT (id) DO UPDATE → 多次调研遇到同一 URL 自动覆盖
  27. 使用:
  28. python knowhub/scripts/ingest_research_output.py <output_root> <version>
  29. e.g. python knowhub/scripts/ingest_research_output.py /Users/sunlit/Downloads/output tao_dev_1
  30. 幂等:反复执行不破坏数据。可选 --purge-first 会先清掉该版本的已有数据。
  31. """
  32. import argparse
  33. import hashlib
  34. import json
  35. import sys
  36. import time
  37. from pathlib import Path
  38. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  39. from knowhub.knowhub_db.pg_store import PostgreSQLStore
  40. from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
  41. from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
  42. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  43. from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
  44. from knowhub.knowhub_db.cascade import purge_version
  45. def _hash12(s: str) -> str:
  46. return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12]
  47. def _resource_id(platform: str, source_url: str) -> str:
  48. p = (platform or "unknown").lower().strip()
  49. return f"resource/research/{p}/{_hash12(source_url)}"
  50. def _format_case_body(case: dict) -> str:
  51. parts = []
  52. if case.get("user_feedback"):
  53. parts.append(f"【用户反馈】{case['user_feedback']}")
  54. if case.get("input_details"):
  55. parts.append(f"【输入详情】{case['input_details']}")
  56. if case.get("output_details"):
  57. parts.append(f"【输出详情】{case['output_details']}")
  58. workflow = case.get("workflow_process", [])
  59. if workflow:
  60. parts.append("【工作流】\n" + "\n".join(f"- {s}" for s in workflow))
  61. return "\n\n".join(parts)
  62. def _format_strategy_body(selected: dict) -> str:
  63. """strategy.body 存完整 workflow_outline JSON(前端可解析展示)"""
  64. return json.dumps(selected, ensure_ascii=False, indent=2)
  65. def _find_existing_requirement(cursor, description: str):
  66. """按完整描述文本精确匹配现有 requirement(任何版本)。返回 id 或 None。"""
  67. cursor.execute("SELECT id FROM requirement WHERE description = %s LIMIT 1", (description,))
  68. row = cursor.fetchone()
  69. if row:
  70. # Dict-like (RealDictRow) or tuple
  71. return row['id'] if isinstance(row, dict) or hasattr(row, 'keys') else row[0]
  72. return None
  73. def ingest_folder(folder: Path, version: str, stores: dict, stats: dict):
  74. """入库单个 output/{NN}/ 目录。所有 ID 都在 version namespace 内(除 requirement 复用已有的)。"""
  75. folder_key = folder.name # "00", "01", ...
  76. case_path = folder / "case.json"
  77. strategy_path = folder / "strategy.json"
  78. case_doc = json.loads(case_path.read_text(encoding="utf-8"))
  79. strategy_doc = json.loads(strategy_path.read_text(encoding="utf-8"))
  80. requirement_text = case_doc.get("requirement", "")
  81. searched_at = case_doc.get("searched_at", "")
  82. cases = case_doc.get("cases", [])
  83. # ─── 1. requirement:按完整描述精确匹配现有 REQ(不创建新 REQ)─────
  84. cursor = stores["req"]._get_cursor()
  85. try:
  86. req_id = _find_existing_requirement(cursor, requirement_text)
  87. finally:
  88. cursor.close()
  89. if not req_id:
  90. raise RuntimeError(
  91. f"[{folder_key}] 找不到匹配的 requirement。case.json.requirement 应与现有 "
  92. f"requirement.description 精确相等。首 80 字:{requirement_text[:80]!r}"
  93. )
  94. stats["requirement_matched"] += 1
  95. print(f" ✓ requirement: {req_id} (复用已有)", flush=True)
  96. # ─── 2. resource 实体(每个 case 一条,URL hash 去重)─────────────
  97. resource_ids = []
  98. for case in cases:
  99. src_url = case.get("source_url", "")
  100. if not src_url:
  101. continue
  102. platform = case.get("platform", "unknown")
  103. rid = _resource_id(platform, src_url)
  104. metrics = case.get("metrics") or {}
  105. likes = metrics.get("likes") or 0
  106. stores["res"].insert_or_update({
  107. "id": rid,
  108. "title": case.get("title", ""),
  109. "body": _format_case_body(case),
  110. "content_type": "research_case",
  111. "images": case.get("images", []),
  112. "metadata": {
  113. "platform": platform,
  114. "source_url": src_url,
  115. "metrics": metrics,
  116. "user_feedback": case.get("user_feedback") or "",
  117. "input_details": case.get("input_details") or "",
  118. "output_details": case.get("output_details") or "",
  119. "workflow_process": case.get("workflow_process") or [],
  120. "last_seen": searched_at,
  121. "local_case_id": case.get("id") or "", # 保留原 case_001 便于 strategy 里的文字交叉引用追溯
  122. },
  123. "sort_order": -likes,
  124. "version": version,
  125. })
  126. resource_ids.append(rid)
  127. stats["resource"] += 1
  128. print(f" ✓ resource: {len(resource_ids)} 条", flush=True)
  129. # ─── 3. strategy 实体 ────────────────────────────────────────────
  130. selected = strategy_doc.get("selected_strategy", {})
  131. strategy_id = f"strategy-{version}-{folder_key}"
  132. stores["strat"].insert_or_update({
  133. "id": strategy_id,
  134. "name": selected.get("name", ""),
  135. "description": selected.get("reasoning", "")[:2000], # reasoning 太长时截断
  136. "body": _format_strategy_body(strategy_doc), # 完整 JSON 原样入库
  137. "status": "draft",
  138. "created_at": int(time.time()),
  139. "updated_at": int(time.time()),
  140. "version": version,
  141. })
  142. stats["strategy"] += 1
  143. print(f" ✓ strategy: {strategy_id}", flush=True)
  144. # ─── 4. 处理 strategy 提到的 capability ──────────────────────────
  145. # 收集所有 (name, is_new, existing_id) 元组
  146. workflow_outline = selected.get("workflow_outline", [])
  147. capability_ids_linked = []
  148. new_cap_counter = 0
  149. for phase_idx, phase in enumerate(workflow_outline):
  150. for cap_ref in phase.get("capabilities", []):
  151. if cap_ref.get("is_new"):
  152. new_cap_counter += 1
  153. new_cap_id = f"CAP-{version}-{folder_key}-{new_cap_counter:02d}"
  154. stores["cap"].insert_or_update({
  155. "id": new_cap_id,
  156. "name": cap_ref.get("capability_name", ""),
  157. "description": (
  158. f"[测试生成] 来自调研策略 {strategy_id} 阶段 {phase_idx+1}。"
  159. f" 建议工具:{', '.join(cap_ref.get('suggested_tools', [])[:3])}。"
  160. ),
  161. "criterion": "",
  162. "version": version,
  163. })
  164. capability_ids_linked.append(new_cap_id)
  165. stats["capability_new"] += 1
  166. elif cap_ref.get("capability_id"):
  167. # 引用已有 capability(在 v0 或其他版本)
  168. capability_ids_linked.append(cap_ref["capability_id"])
  169. print(f" ✓ capability: new={new_cap_counter}, 总引用={len(capability_ids_linked)}", flush=True)
  170. # ─── 5. 关联关系 ──────────────────────────────────────────────────
  171. # requirement → resources (provenance)
  172. for rid in resource_ids:
  173. stores["req"].add_resource(req_id, rid)
  174. # requirement → strategy (satisfies)
  175. stores["req"].add_strategy(req_id, strategy_id)
  176. # strategy → resources (build-from provenance)
  177. for rid in resource_ids:
  178. stores["strat"].add_resource(strategy_id, rid)
  179. # strategy → capabilities (compose)
  180. for cap_id in capability_ids_linked:
  181. stores["strat"].add_capability(strategy_id, cap_id, relation_type="compose")
  182. print(f" ✓ junctions: req_res={len(resource_ids)}, req_strat=1, "
  183. f"strat_res={len(resource_ids)}, strat_cap={len(capability_ids_linked)}",
  184. flush=True)
  185. def main():
  186. p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
  187. p.add_argument("root", help="包含 00/ 01/ 等子目录的 output 根")
  188. p.add_argument("version", help="多租户版本标签,如 tao_dev_1")
  189. p.add_argument("--purge-first", action="store_true",
  190. help="先清除该 version 的所有已有数据再入库(迭代测试用)")
  191. args = p.parse_args()
  192. root = Path(args.root).expanduser().resolve()
  193. if not root.is_dir():
  194. print(f"❌ root 不存在或非目录: {root}", file=sys.stderr)
  195. sys.exit(1)
  196. folders = sorted([d for d in root.iterdir() if d.is_dir() and (d / "case.json").exists()])
  197. if not folders:
  198. print(f"❌ 在 {root} 下未找到任何含 case.json 的子目录", file=sys.stderr)
  199. sys.exit(1)
  200. print(f"📂 检测到 {len(folders)} 个调研目录:{[f.name for f in folders]}")
  201. print(f"🏷 version: {args.version}")
  202. # 初始化 store(autocommit 模式,不持锁)
  203. stores = {
  204. "k": PostgreSQLStore(),
  205. "res": PostgreSQLResourceStore(),
  206. "req": PostgreSQLRequirementStore(),
  207. "cap": PostgreSQLCapabilityStore(),
  208. "strat": PostgreSQLStrategyStore(),
  209. }
  210. try:
  211. # Optional: 先 purge 该版本的已有数据
  212. if args.purge_first:
  213. print(f"\n🧹 purge version={args.version!r}...")
  214. conn = stores["res"].conn
  215. cur = conn.cursor()
  216. try:
  217. purge_stats = purge_version(cur, args.version)
  218. print(f" deleted: {purge_stats}")
  219. finally:
  220. cur.close()
  221. stats = {"requirement_matched": 0, "resource": 0, "strategy": 0, "capability_new": 0}
  222. for folder in folders:
  223. print(f"\n📁 {folder.name}/ ...")
  224. ingest_folder(folder, args.version, stores, stats)
  225. print("\n" + "=" * 60)
  226. print("入库完成")
  227. print("=" * 60)
  228. print(f" requirement (matched): {stats['requirement_matched']} (不创建新的,复用已有 REQ)")
  229. print(f" resource: {stats['resource']}")
  230. print(f" strategy: {stats['strategy']}")
  231. print(f" new capability: {stats['capability_new']}")
  232. finally:
  233. for s in stores.values():
  234. s.close()
  235. if __name__ == "__main__":
  236. main()