#!/usr/bin/env python3 """ 入库研究调研结果(case.json + strategy.json)到 knowhub。 输入目录结构: / ├── 00/ │ ├── case.json ({requirement, cases:[{...}]}) │ └── strategy.json ({selected_strategy:{...}, vs_alternatives, uncovered_requirements}) ├── 01/ │ ... 入库映射: case.json.requirement → 按完整描述文本精确匹配现有 requirement(跨版本) → 找不到则报错(不创建新 requirement——避免语义重复) case.json.cases[] → resource 实体(多条,URL hash 去重) strategy.json → strategy 实体(1 条 per folder) strategy.workflow_outline[].capabilities[].is_new=true → capability 实体(新建,version 隔离) → capability_id 已存在则直接引用 关联: requirement_resource (requirement → all cases in its folder) requirement_strategy (requirement → strategy) strategy_resource (strategy → all cases in its folder — provenance) strategy_capability (strategy → all mentioned capabilities, compose) 去重策略: resource id = "resource/research/{platform}/{hash12(source_url)}" ON CONFLICT (id) DO UPDATE → 多次调研遇到同一 URL 自动覆盖 使用: python knowhub/scripts/ingest_research_output.py e.g. python knowhub/scripts/ingest_research_output.py /Users/sunlit/Downloads/output tao_dev_1 幂等:反复执行不破坏数据。可选 --purge-first 会先清掉该版本的已有数据。 """ import argparse import hashlib import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_store import PostgreSQLStore from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore from knowhub.knowhub_db.cascade import purge_version def _hash12(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12] def _resource_id(platform: str, source_url: str) -> str: p = (platform or "unknown").lower().strip() return f"resource/research/{p}/{_hash12(source_url)}" def _format_case_body(case: dict) -> str: parts = [] if case.get("user_feedback"): parts.append(f"【用户反馈】{case['user_feedback']}") if case.get("input_details"): parts.append(f"【输入详情】{case['input_details']}") if case.get("output_details"): parts.append(f"【输出详情】{case['output_details']}") workflow = case.get("workflow_process", []) if workflow: parts.append("【工作流】\n" + "\n".join(f"- {s}" for s in workflow)) return "\n\n".join(parts) def _format_strategy_body(selected: dict) -> str: """strategy.body 存完整 workflow_outline JSON(前端可解析展示)""" return json.dumps(selected, ensure_ascii=False, indent=2) def _find_existing_requirement(cursor, description: str): """按完整描述文本精确匹配现有 requirement(任何版本)。返回 id 或 None。""" cursor.execute("SELECT id FROM requirement WHERE description = %s LIMIT 1", (description,)) row = cursor.fetchone() if row: # Dict-like (RealDictRow) or tuple return row['id'] if isinstance(row, dict) or hasattr(row, 'keys') else row[0] return None def ingest_folder(folder: Path, version: str, stores: dict, stats: dict): """入库单个 output/{NN}/ 目录。所有 ID 都在 version namespace 内(除 requirement 复用已有的)。""" folder_key = folder.name # "00", "01", ... case_path = folder / "case.json" strategy_path = folder / "strategy.json" case_doc = json.loads(case_path.read_text(encoding="utf-8")) strategy_doc = json.loads(strategy_path.read_text(encoding="utf-8")) requirement_text = case_doc.get("requirement", "") searched_at = case_doc.get("searched_at", "") cases = case_doc.get("cases", []) # ─── 1. requirement:按完整描述精确匹配现有 REQ(不创建新 REQ)───── cursor = stores["req"]._get_cursor() try: req_id = _find_existing_requirement(cursor, requirement_text) finally: cursor.close() if not req_id: raise RuntimeError( f"[{folder_key}] 找不到匹配的 requirement。case.json.requirement 应与现有 " f"requirement.description 精确相等。首 80 字:{requirement_text[:80]!r}" ) stats["requirement_matched"] += 1 print(f" ✓ requirement: {req_id} (复用已有)", flush=True) # ─── 2. resource 实体(每个 case 一条,URL hash 去重)───────────── resource_ids = [] for case in cases: src_url = case.get("source_url", "") if not src_url: continue platform = case.get("platform", "unknown") rid = _resource_id(platform, src_url) metrics = case.get("metrics") or {} likes = metrics.get("likes") or 0 stores["res"].insert_or_update({ "id": rid, "title": case.get("title", ""), "body": _format_case_body(case), "content_type": "research_case", "images": case.get("images", []), "metadata": { "platform": platform, "source_url": src_url, "metrics": metrics, "user_feedback": case.get("user_feedback") or "", "input_details": case.get("input_details") or "", "output_details": case.get("output_details") or "", "workflow_process": case.get("workflow_process") or [], "last_seen": searched_at, "local_case_id": case.get("id") or "", # 保留原 case_001 便于 strategy 里的文字交叉引用追溯 }, "sort_order": -likes, "version": version, }) resource_ids.append(rid) stats["resource"] += 1 print(f" ✓ resource: {len(resource_ids)} 条", flush=True) # ─── 3. strategy 实体 ──────────────────────────────────────────── selected = strategy_doc.get("selected_strategy", {}) strategy_id = f"strategy-{version}-{folder_key}" stores["strat"].insert_or_update({ "id": strategy_id, "name": selected.get("name", ""), "description": selected.get("reasoning", "")[:2000], # reasoning 太长时截断 "body": _format_strategy_body(strategy_doc), # 完整 JSON 原样入库 "status": "draft", "created_at": int(time.time()), "updated_at": int(time.time()), "version": version, }) stats["strategy"] += 1 print(f" ✓ strategy: {strategy_id}", flush=True) # ─── 4. 处理 strategy 提到的 capability ────────────────────────── # 收集所有 (name, is_new, existing_id) 元组 workflow_outline = selected.get("workflow_outline", []) capability_ids_linked = [] new_cap_counter = 0 for phase_idx, phase in enumerate(workflow_outline): for cap_ref in phase.get("capabilities", []): if cap_ref.get("is_new"): new_cap_counter += 1 new_cap_id = f"CAP-{version}-{folder_key}-{new_cap_counter:02d}" stores["cap"].insert_or_update({ "id": new_cap_id, "name": cap_ref.get("capability_name", ""), "description": ( f"[测试生成] 来自调研策略 {strategy_id} 阶段 {phase_idx+1}。" f" 建议工具:{', '.join(cap_ref.get('suggested_tools', [])[:3])}。" ), "criterion": "", "version": version, }) capability_ids_linked.append(new_cap_id) stats["capability_new"] += 1 elif cap_ref.get("capability_id"): # 引用已有 capability(在 v0 或其他版本) capability_ids_linked.append(cap_ref["capability_id"]) print(f" ✓ capability: new={new_cap_counter}, 总引用={len(capability_ids_linked)}", flush=True) # ─── 5. 关联关系 ────────────────────────────────────────────────── # requirement → resources (provenance) for rid in resource_ids: stores["req"].add_resource(req_id, rid) # requirement → strategy (satisfies) stores["req"].add_strategy(req_id, strategy_id) # strategy → resources (build-from provenance) for rid in resource_ids: stores["strat"].add_resource(strategy_id, rid) # strategy → capabilities (compose) for cap_id in capability_ids_linked: stores["strat"].add_capability(strategy_id, cap_id, relation_type="compose") print(f" ✓ junctions: req_res={len(resource_ids)}, req_strat=1, " f"strat_res={len(resource_ids)}, strat_cap={len(capability_ids_linked)}", flush=True) def main(): p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) p.add_argument("root", help="包含 00/ 01/ 等子目录的 output 根") p.add_argument("version", help="多租户版本标签,如 tao_dev_1") p.add_argument("--purge-first", action="store_true", help="先清除该 version 的所有已有数据再入库(迭代测试用)") args = p.parse_args() root = Path(args.root).expanduser().resolve() if not root.is_dir(): print(f"❌ root 不存在或非目录: {root}", file=sys.stderr) sys.exit(1) folders = sorted([d for d in root.iterdir() if d.is_dir() and (d / "case.json").exists()]) if not folders: print(f"❌ 在 {root} 下未找到任何含 case.json 的子目录", file=sys.stderr) sys.exit(1) print(f"📂 检测到 {len(folders)} 个调研目录:{[f.name for f in folders]}") print(f"🏷 version: {args.version}") # 初始化 store(autocommit 模式,不持锁) stores = { "k": PostgreSQLStore(), "res": PostgreSQLResourceStore(), "req": PostgreSQLRequirementStore(), "cap": PostgreSQLCapabilityStore(), "strat": PostgreSQLStrategyStore(), } try: # Optional: 先 purge 该版本的已有数据 if args.purge_first: print(f"\n🧹 purge version={args.version!r}...") conn = stores["res"].conn cur = conn.cursor() try: purge_stats = purge_version(cur, args.version) print(f" deleted: {purge_stats}") finally: cur.close() stats = {"requirement_matched": 0, "resource": 0, "strategy": 0, "capability_new": 0} for folder in folders: print(f"\n📁 {folder.name}/ ...") ingest_folder(folder, args.version, stores, stats) print("\n" + "=" * 60) print("入库完成") print("=" * 60) print(f" requirement (matched): {stats['requirement_matched']} (不创建新的,复用已有 REQ)") print(f" resource: {stats['resource']}") print(f" strategy: {stats['strategy']}") print(f" new capability: {stats['capability_new']}") finally: for s in stores.values(): s.close() if __name__ == "__main__": main()