| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- #!/usr/bin/env python3
- """
- 入库研究调研结果(case.json + strategy.json)到 knowhub。
- 输入目录结构:
- <root>/
- ├── 00/
- │ ├── case.json ({requirement, cases:[{...}]})
- │ └── strategy.json ({selected_strategy:{...}, vs_alternatives, uncovered_requirements})
- ├── 01/
- │ ...
- 入库映射:
- case.json.requirement → 按完整描述文本精确匹配现有 requirement(跨版本)
- → 找不到则报错(不创建新 requirement——避免语义重复)
- case.json.cases[] → resource 实体(多条,URL hash 去重)
- strategy.json → strategy 实体(1 条 per folder)
- strategy.workflow_outline[].capabilities[].is_new=true
- → capability 实体(新建,version 隔离)
- → capability_id 已存在则直接引用
- 关联:
- requirement_resource (requirement → all cases in its folder)
- requirement_strategy (requirement → strategy)
- strategy_resource (strategy → all cases in its folder — provenance)
- strategy_capability (strategy → all mentioned capabilities, compose)
- 去重策略:
- resource id = "resource/research/{platform}/{hash12(source_url)}"
- ON CONFLICT (id) DO UPDATE → 多次调研遇到同一 URL 自动覆盖
- 使用:
- python knowhub/scripts/ingest_research_output.py <output_root> <version>
- e.g. python knowhub/scripts/ingest_research_output.py /Users/sunlit/Downloads/output tao_dev_1
- 幂等:反复执行不破坏数据。可选 --purge-first 会先清掉该版本的已有数据。
- """
- import argparse
- import hashlib
- import json
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_store import PostgreSQLStore
- from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
- from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
- from knowhub.knowhub_db.cascade import purge_version
- def _hash12(s: str) -> str:
- return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12]
- def _resource_id(platform: str, source_url: str) -> str:
- p = (platform or "unknown").lower().strip()
- return f"resource/research/{p}/{_hash12(source_url)}"
- def _format_case_body(case: dict) -> str:
- parts = []
- if case.get("user_feedback"):
- parts.append(f"【用户反馈】{case['user_feedback']}")
- if case.get("input_details"):
- parts.append(f"【输入详情】{case['input_details']}")
- if case.get("output_details"):
- parts.append(f"【输出详情】{case['output_details']}")
- workflow = case.get("workflow_process", [])
- if workflow:
- parts.append("【工作流】\n" + "\n".join(f"- {s}" for s in workflow))
- return "\n\n".join(parts)
- def _format_strategy_body(selected: dict) -> str:
- """strategy.body 存完整 workflow_outline JSON(前端可解析展示)"""
- return json.dumps(selected, ensure_ascii=False, indent=2)
- def _find_existing_requirement(cursor, description: str):
- """按完整描述文本精确匹配现有 requirement(任何版本)。返回 id 或 None。"""
- cursor.execute("SELECT id FROM requirement WHERE description = %s LIMIT 1", (description,))
- row = cursor.fetchone()
- if row:
- # Dict-like (RealDictRow) or tuple
- return row['id'] if isinstance(row, dict) or hasattr(row, 'keys') else row[0]
- return None
- def ingest_folder(folder: Path, version: str, stores: dict, stats: dict):
- """入库单个 output/{NN}/ 目录。所有 ID 都在 version namespace 内(除 requirement 复用已有的)。"""
- folder_key = folder.name # "00", "01", ...
- case_path = folder / "case.json"
- strategy_path = folder / "strategy.json"
- case_doc = json.loads(case_path.read_text(encoding="utf-8"))
- strategy_doc = json.loads(strategy_path.read_text(encoding="utf-8"))
- requirement_text = case_doc.get("requirement", "")
- searched_at = case_doc.get("searched_at", "")
- cases = case_doc.get("cases", [])
- # ─── 1. requirement:按完整描述精确匹配现有 REQ(不创建新 REQ)─────
- cursor = stores["req"]._get_cursor()
- try:
- req_id = _find_existing_requirement(cursor, requirement_text)
- finally:
- cursor.close()
- if not req_id:
- raise RuntimeError(
- f"[{folder_key}] 找不到匹配的 requirement。case.json.requirement 应与现有 "
- f"requirement.description 精确相等。首 80 字:{requirement_text[:80]!r}"
- )
- stats["requirement_matched"] += 1
- print(f" ✓ requirement: {req_id} (复用已有)", flush=True)
- # ─── 2. resource 实体(每个 case 一条,URL hash 去重)─────────────
- resource_ids = []
- for case in cases:
- src_url = case.get("source_url", "")
- if not src_url:
- continue
- platform = case.get("platform", "unknown")
- rid = _resource_id(platform, src_url)
- metrics = case.get("metrics") or {}
- likes = metrics.get("likes") or 0
- stores["res"].insert_or_update({
- "id": rid,
- "title": case.get("title", ""),
- "body": _format_case_body(case),
- "content_type": "research_case",
- "images": case.get("images", []),
- "metadata": {
- "platform": platform,
- "source_url": src_url,
- "metrics": metrics,
- "user_feedback": case.get("user_feedback") or "",
- "input_details": case.get("input_details") or "",
- "output_details": case.get("output_details") or "",
- "workflow_process": case.get("workflow_process") or [],
- "last_seen": searched_at,
- "local_case_id": case.get("id") or "", # 保留原 case_001 便于 strategy 里的文字交叉引用追溯
- },
- "sort_order": -likes,
- "version": version,
- })
- resource_ids.append(rid)
- stats["resource"] += 1
- print(f" ✓ resource: {len(resource_ids)} 条", flush=True)
- # ─── 3. strategy 实体 ────────────────────────────────────────────
- selected = strategy_doc.get("selected_strategy", {})
- strategy_id = f"strategy-{version}-{folder_key}"
- stores["strat"].insert_or_update({
- "id": strategy_id,
- "name": selected.get("name", ""),
- "description": selected.get("reasoning", "")[:2000], # reasoning 太长时截断
- "body": _format_strategy_body(strategy_doc), # 完整 JSON 原样入库
- "status": "draft",
- "created_at": int(time.time()),
- "updated_at": int(time.time()),
- "version": version,
- })
- stats["strategy"] += 1
- print(f" ✓ strategy: {strategy_id}", flush=True)
- # ─── 4. 处理 strategy 提到的 capability ──────────────────────────
- # 收集所有 (name, is_new, existing_id) 元组
- workflow_outline = selected.get("workflow_outline", [])
- capability_ids_linked = []
- new_cap_counter = 0
- for phase_idx, phase in enumerate(workflow_outline):
- for cap_ref in phase.get("capabilities", []):
- if cap_ref.get("is_new"):
- new_cap_counter += 1
- new_cap_id = f"CAP-{version}-{folder_key}-{new_cap_counter:02d}"
- stores["cap"].insert_or_update({
- "id": new_cap_id,
- "name": cap_ref.get("capability_name", ""),
- "description": (
- f"[测试生成] 来自调研策略 {strategy_id} 阶段 {phase_idx+1}。"
- f" 建议工具:{', '.join(cap_ref.get('suggested_tools', [])[:3])}。"
- ),
- "criterion": "",
- "version": version,
- })
- capability_ids_linked.append(new_cap_id)
- stats["capability_new"] += 1
- elif cap_ref.get("capability_id"):
- # 引用已有 capability(在 v0 或其他版本)
- capability_ids_linked.append(cap_ref["capability_id"])
- print(f" ✓ capability: new={new_cap_counter}, 总引用={len(capability_ids_linked)}", flush=True)
- # ─── 5. 关联关系 ──────────────────────────────────────────────────
- # requirement → resources (provenance)
- for rid in resource_ids:
- stores["req"].add_resource(req_id, rid)
- # requirement → strategy (satisfies)
- stores["req"].add_strategy(req_id, strategy_id)
- # strategy → resources (build-from provenance)
- for rid in resource_ids:
- stores["strat"].add_resource(strategy_id, rid)
- # strategy → capabilities (compose)
- for cap_id in capability_ids_linked:
- stores["strat"].add_capability(strategy_id, cap_id, relation_type="compose")
- print(f" ✓ junctions: req_res={len(resource_ids)}, req_strat=1, "
- f"strat_res={len(resource_ids)}, strat_cap={len(capability_ids_linked)}",
- flush=True)
- def main():
- p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
- p.add_argument("root", help="包含 00/ 01/ 等子目录的 output 根")
- p.add_argument("version", help="多租户版本标签,如 tao_dev_1")
- p.add_argument("--purge-first", action="store_true",
- help="先清除该 version 的所有已有数据再入库(迭代测试用)")
- args = p.parse_args()
- root = Path(args.root).expanduser().resolve()
- if not root.is_dir():
- print(f"❌ root 不存在或非目录: {root}", file=sys.stderr)
- sys.exit(1)
- folders = sorted([d for d in root.iterdir() if d.is_dir() and (d / "case.json").exists()])
- if not folders:
- print(f"❌ 在 {root} 下未找到任何含 case.json 的子目录", file=sys.stderr)
- sys.exit(1)
- print(f"📂 检测到 {len(folders)} 个调研目录:{[f.name for f in folders]}")
- print(f"🏷 version: {args.version}")
- # 初始化 store(autocommit 模式,不持锁)
- stores = {
- "k": PostgreSQLStore(),
- "res": PostgreSQLResourceStore(),
- "req": PostgreSQLRequirementStore(),
- "cap": PostgreSQLCapabilityStore(),
- "strat": PostgreSQLStrategyStore(),
- }
- try:
- # Optional: 先 purge 该版本的已有数据
- if args.purge_first:
- print(f"\n🧹 purge version={args.version!r}...")
- conn = stores["res"].conn
- cur = conn.cursor()
- try:
- purge_stats = purge_version(cur, args.version)
- print(f" deleted: {purge_stats}")
- finally:
- cur.close()
- stats = {"requirement_matched": 0, "resource": 0, "strategy": 0, "capability_new": 0}
- for folder in folders:
- print(f"\n📁 {folder.name}/ ...")
- ingest_folder(folder, args.version, stores, stats)
- print("\n" + "=" * 60)
- print("入库完成")
- print("=" * 60)
- print(f" requirement (matched): {stats['requirement_matched']} (不创建新的,复用已有 REQ)")
- print(f" resource: {stats['resource']}")
- print(f" strategy: {stats['strategy']}")
- print(f" new capability: {stats['capability_new']}")
- finally:
- for s in stores.values():
- s.close()
- if __name__ == "__main__":
- main()
|