#!/usr/bin/env python3 """ tao_dev 版本:把 /Users/sunlit/Downloads/output-new 下 99 folders 原始数据全量入库。 原则:不做去重。CAP-001 在多个 folder 出现 → 各自生成独立的 cap 行。 关键细节(修订版): - LLM 输出里 is_new=True 的 cap 往往无 id / id 为空 → 用 `{req_id}::NEW-{idx}` 合成 ID 入库,不能丢数据。 - strategy.workflow_outline.capabilities[] 引用 cap 时有三种情况: a) 有 id → 按 id 查 folder_cap_ids b) 无 id 但有 name → 按 name 查 folder_cap_names c) 两者都无 → 跳过并计数 - case_references 用中/西文冒号分隔均兼容。 ID 方案: requirement: {orig_req_id}__td capability : {orig_req_id}::{raw_cap_id or 'NEW-'} strategy : strategy-taodev-{orig_req_id}-{idx} resource : resource/taodev/{orig_req_id}/{platform}/{case_id} 幂等:所有 INSERT 带 ON CONFLICT DO NOTHING;重跑不重复。 """ import json import re import sys import time from pathlib import Path import psycopg2.extras sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore OUTPUT = Path('/Users/sunlit/Downloads/output-new') VERSION = 'tao_dev' CASE_REF_RE = re.compile(r'^([a-z]+)/(case_\w+)[::\s]') def parse_case_ref(ref: str): if not isinstance(ref, str): return None, None m = CASE_REF_RE.match(ref.strip()) if m: return m.group(1), m.group(2) return None, None def norm_name(n): return (n or '').strip() def main(): wipe_first = '--wipe' in sys.argv s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: cur.execute("SET statement_timeout = '300s'") cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid!=pg_backend_pid() AND datname=current_database()""") for r in cur.fetchall(): cur.execute('SELECT pg_terminate_backend(%s)', (r['pid'],)) if wipe_first: print('⚠ --wipe: 清空现有 tao_dev 数据', flush=True) # 清 junction(按 version 过滤父表) for j, parent, fk in [ ('strategy_capability', 'strategy', 'strategy_id'), ('strategy_resource', 'strategy', 'strategy_id'), ('requirement_strategy', 'strategy', 'strategy_id'), ('requirement_capability', 'capability', 'capability_id'), ('requirement_resource', 'resource', 'resource_id'), ]: cur.execute(f"""DELETE FROM {j} WHERE {fk} IN (SELECT id FROM {parent} WHERE version=%s)""", (VERSION,)) print(f' cleared {j}: {cur.rowcount}', flush=True) for t in ['strategy', 'capability', 'resource', 'requirement']: cur.execute(f'DELETE FROM {t} WHERE version=%s', (VERSION,)) print(f' cleared {t}: {cur.rowcount}', flush=True) cur.execute('SELECT id, description, source_nodes, status, match_result FROM requirement WHERE version=%s', ('v0',)) req_map = {r['description']: r for r in cur.fetchall()} print(f'v0 req 映射: {len(req_map)}', flush=True) folders = sorted([f for f in OUTPUT.iterdir() if f.is_dir()]) print(f'folders: {len(folders)}', flush=True) stats = {'req': 0, 'cap': 0, 'cap_synth_id': 0, 'strat': 0, 'res': 0, 'req_cap': 0, 'req_strat': 0, 'req_res': 0, 'strat_cap_by_id': 0, 'strat_cap_by_name': 0, 'strat_cap_skip': 0, 'strat_res': 0, 'strat_res_skip': 0} for folder in folders: t0 = time.time() sd = json.loads((folder / 'strategy.json').read_text(encoding='utf-8')) cd = json.loads((folder / 'capabilities_extracted.json').read_text(encoding='utf-8')) req_text = sd.get('requirement') or cd.get('requirement') r0 = req_map.get(req_text) if not r0: print(f' [{folder.name}] 无法匹配 req,跳过', flush=True); continue orig_req = r0['id'] new_req_id = f'{orig_req}__td' # ── 1. requirement ─────────────────────────────── cur.execute("""INSERT INTO requirement (id, description, source_nodes, status, match_result, version) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""", (new_req_id, r0['description'], psycopg2.extras.Json(r0['source_nodes']) if r0['source_nodes'] is not None else None, r0['status'], r0['match_result'], VERSION)) stats['req'] += cur.rowcount or 0 # ── 2. capabilities(含无 id 的新能力)───────────── folder_cap_by_id = {} folder_cap_by_name = {} for idx, c in enumerate(cd.get('extracted_capabilities', [])): if not isinstance(c, dict): continue raw_id = c.get('id') or '' if raw_id.strip(): cap_id = f'{orig_req}::{raw_id}' else: cap_id = f'{orig_req}::NEW-{idx}' stats['cap_synth_id'] += 1 nm = norm_name(c.get('name')) if nm: folder_cap_by_name[nm] = cap_id if raw_id.strip(): folder_cap_by_id[raw_id] = cap_id effects = c.get('effects') effects_json = psycopg2.extras.Json( {'items': effects} if isinstance(effects, list) else effects ) if effects is not None else None cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""", (cap_id, nm, c.get('criterion', ''), c.get('description', ''), effects_json, VERSION)) stats['cap'] += cur.rowcount or 0 cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s,%s) ON CONFLICT DO NOTHING""", (new_req_id, cap_id)) stats['req_cap'] += cur.rowcount or 0 # ── 3. resources(raw_cases)────────────────────── folder_res_ids = {} rc_dir = folder / 'raw_cases' if rc_dir.exists(): for cf in sorted(rc_dir.iterdir()): if cf.suffix != '.json': continue try: cj = json.loads(cf.read_text(encoding='utf-8')) except Exception: continue for case in cj.get('cases', []): platform = case.get('platform', '') cid = case.get('id', '') if not platform or not cid: continue res_id = f'resource/taodev/{orig_req}/{platform}/{cid}' folder_res_ids[(platform, cid)] = res_id metadata = { 'source_url': case.get('source_url'), 'metrics': case.get('metrics'), 'user_feedback': case.get('user_feedback'), 'input_details': case.get('input_details'), 'output_details': case.get('output_details'), 'platform': platform, 'original_case_id': cid, } now_ts = int(time.time()) cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at, images, version) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""", (res_id, case.get('title', ''), case.get('workflow_process', ''), None, 'research_case', psycopg2.extras.Json(metadata), 0, None, now_ts, now_ts, psycopg2.extras.Json(case.get('images') or []), VERSION)) stats['res'] += cur.rowcount or 0 cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s,%s) ON CONFLICT DO NOTHING""", (new_req_id, res_id)) stats['req_res'] += cur.rowcount or 0 # ── 4. strategies ───────────────────────────────── for idx, st in enumerate(sd.get('strategies', [])): if not isinstance(st, dict): continue strat_id = f'strategy-taodev-{orig_req}-{idx}' is_sel = bool(st.get('is_selected')) body = { 'source': st.get('source'), 'workflow_outline': st.get('workflow_outline', []), 'original_strategy_name': st.get('name', ''), } now_ts = int(time.time()) cur.execute("""INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""", (strat_id, (st.get('name', '') or '')[:250], st.get('source', ''), psycopg2.extras.Json(body), 'approved', now_ts, now_ts, VERSION)) stats['strat'] += cur.rowcount or 0 cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id, is_selected) VALUES (%s,%s,%s) ON CONFLICT DO NOTHING""", (new_req_id, strat_id, is_sel)) stats['req_strat'] += cur.rowcount or 0 for ph in st.get('workflow_outline', []) or []: if not isinstance(ph, dict): continue for c in ph.get('capabilities', []) or []: if not isinstance(c, dict): continue raw_id = (c.get('id') or '').strip() nm = norm_name(c.get('name')) # 先 id 查,再 name 查 mapped = folder_cap_by_id.get(raw_id) if raw_id else None source_kind = 'by_id' if not mapped and nm: mapped = folder_cap_by_name.get(nm) source_kind = 'by_name' if not mapped: stats['strat_cap_skip'] += 1; continue cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id) VALUES (%s,%s) ON CONFLICT DO NOTHING""", (strat_id, mapped)) stats[f'strat_cap_{source_kind}'] += cur.rowcount or 0 for ref in c.get('case_references', []) or []: platform, case_id = parse_case_ref(ref) if not platform or not case_id: stats['strat_res_skip'] += 1; continue res_id = folder_res_ids.get((platform, case_id)) if not res_id: stats['strat_res_skip'] += 1; continue cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s,%s) ON CONFLICT DO NOTHING""", (strat_id, res_id)) stats['strat_res'] += cur.rowcount or 0 print(f' [{folder.name}] {orig_req}: {time.time()-t0:.1f}s', flush=True) print('\n=== 入库统计 ===', flush=True) for k, v in stats.items(): print(f' {k}: {v}', flush=True) print('\n=== tao_dev 版本分布 ===', flush=True) for tbl in ['requirement', 'capability', 'strategy', 'resource']: cur.execute(f'SELECT COUNT(*) c FROM {tbl} WHERE version=%s', (VERSION,)) print(f' {tbl}: {cur.fetchone()["c"]}', flush=True) finally: cur.close(); s.close() if __name__ == '__main__': main()