| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- #!/usr/bin/env python3
- """
- tao_dev 版本:把 /Users/sunlit/Downloads/output-new 下 99 folders 原始数据全量入库。
- 原则:不做去重。CAP-001 在多个 folder 出现 → 各自生成独立的 cap 行。
- 关键细节(修订版):
- - LLM 输出里 is_new=True 的 cap 往往无 id / id 为空 → 用
- `{req_id}::NEW-{idx}` 合成 ID 入库,不能丢数据。
- - strategy.workflow_outline.capabilities[] 引用 cap 时有三种情况:
- a) 有 id → 按 id 查 folder_cap_ids
- b) 无 id 但有 name → 按 name 查 folder_cap_names
- c) 两者都无 → 跳过并计数
- - case_references 用中/西文冒号分隔均兼容。
- ID 方案:
- requirement: {orig_req_id}__td
- capability : {orig_req_id}::{raw_cap_id or 'NEW-<idx>'}
- strategy : strategy-taodev-{orig_req_id}-{idx}
- resource : resource/taodev/{orig_req_id}/{platform}/{case_id}
- 幂等:所有 INSERT 带 ON CONFLICT DO NOTHING;重跑不重复。
- """
- import json
- import re
- import sys
- import time
- from pathlib import Path
- import psycopg2.extras
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- OUTPUT = Path('/Users/sunlit/Downloads/output-new')
- VERSION = 'tao_dev'
- CASE_REF_RE = re.compile(r'^([a-z]+)/(case_\w+)[::\s]')
- def parse_case_ref(ref: str):
- if not isinstance(ref, str):
- return None, None
- m = CASE_REF_RE.match(ref.strip())
- if m:
- return m.group(1), m.group(2)
- return None, None
- def norm_name(n):
- return (n or '').strip()
- def main():
- wipe_first = '--wipe' in sys.argv
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- cur.execute("SET statement_timeout = '300s'")
- cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction'
- AND pid!=pg_backend_pid() AND datname=current_database()""")
- for r in cur.fetchall():
- cur.execute('SELECT pg_terminate_backend(%s)', (r['pid'],))
- if wipe_first:
- print('⚠ --wipe: 清空现有 tao_dev 数据', flush=True)
- # 清 junction(按 version 过滤父表)
- for j, parent, fk in [
- ('strategy_capability', 'strategy', 'strategy_id'),
- ('strategy_resource', 'strategy', 'strategy_id'),
- ('requirement_strategy', 'strategy', 'strategy_id'),
- ('requirement_capability', 'capability', 'capability_id'),
- ('requirement_resource', 'resource', 'resource_id'),
- ]:
- cur.execute(f"""DELETE FROM {j} WHERE {fk} IN
- (SELECT id FROM {parent} WHERE version=%s)""", (VERSION,))
- print(f' cleared {j}: {cur.rowcount}', flush=True)
- for t in ['strategy', 'capability', 'resource', 'requirement']:
- cur.execute(f'DELETE FROM {t} WHERE version=%s', (VERSION,))
- print(f' cleared {t}: {cur.rowcount}', flush=True)
- cur.execute('SELECT id, description, source_nodes, status, match_result FROM requirement WHERE version=%s', ('v0',))
- req_map = {r['description']: r for r in cur.fetchall()}
- print(f'v0 req 映射: {len(req_map)}', flush=True)
- folders = sorted([f for f in OUTPUT.iterdir() if f.is_dir()])
- print(f'folders: {len(folders)}', flush=True)
- stats = {'req': 0, 'cap': 0, 'cap_synth_id': 0, 'strat': 0, 'res': 0,
- 'req_cap': 0, 'req_strat': 0, 'req_res': 0,
- 'strat_cap_by_id': 0, 'strat_cap_by_name': 0, 'strat_cap_skip': 0,
- 'strat_res': 0, 'strat_res_skip': 0}
- for folder in folders:
- t0 = time.time()
- sd = json.loads((folder / 'strategy.json').read_text(encoding='utf-8'))
- cd = json.loads((folder / 'capabilities_extracted.json').read_text(encoding='utf-8'))
- req_text = sd.get('requirement') or cd.get('requirement')
- r0 = req_map.get(req_text)
- if not r0:
- print(f' [{folder.name}] 无法匹配 req,跳过', flush=True); continue
- orig_req = r0['id']
- new_req_id = f'{orig_req}__td'
- # ── 1. requirement ───────────────────────────────
- cur.execute("""INSERT INTO requirement (id, description, source_nodes, status,
- match_result, version)
- VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO NOTHING""",
- (new_req_id, r0['description'],
- psycopg2.extras.Json(r0['source_nodes']) if r0['source_nodes'] is not None else None,
- r0['status'], r0['match_result'], VERSION))
- stats['req'] += cur.rowcount or 0
- # ── 2. capabilities(含无 id 的新能力)─────────────
- folder_cap_by_id = {}
- folder_cap_by_name = {}
- for idx, c in enumerate(cd.get('extracted_capabilities', [])):
- if not isinstance(c, dict): continue
- raw_id = c.get('id') or ''
- if raw_id.strip():
- cap_id = f'{orig_req}::{raw_id}'
- else:
- cap_id = f'{orig_req}::NEW-{idx}'
- stats['cap_synth_id'] += 1
- nm = norm_name(c.get('name'))
- if nm: folder_cap_by_name[nm] = cap_id
- if raw_id.strip(): folder_cap_by_id[raw_id] = cap_id
- effects = c.get('effects')
- effects_json = psycopg2.extras.Json(
- {'items': effects} if isinstance(effects, list) else effects
- ) if effects is not None else None
- cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version)
- VALUES (%s,%s,%s,%s,%s,%s)
- ON CONFLICT (id) DO NOTHING""",
- (cap_id, nm, c.get('criterion', ''), c.get('description', ''),
- effects_json, VERSION))
- stats['cap'] += cur.rowcount or 0
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s,%s) ON CONFLICT DO NOTHING""",
- (new_req_id, cap_id))
- stats['req_cap'] += cur.rowcount or 0
- # ── 3. resources(raw_cases)──────────────────────
- folder_res_ids = {}
- rc_dir = folder / 'raw_cases'
- if rc_dir.exists():
- for cf in sorted(rc_dir.iterdir()):
- if cf.suffix != '.json': continue
- try:
- cj = json.loads(cf.read_text(encoding='utf-8'))
- except Exception:
- continue
- for case in cj.get('cases', []):
- platform = case.get('platform', '')
- cid = case.get('id', '')
- if not platform or not cid: continue
- res_id = f'resource/taodev/{orig_req}/{platform}/{cid}'
- folder_res_ids[(platform, cid)] = res_id
- metadata = {
- 'source_url': case.get('source_url'),
- 'metrics': case.get('metrics'),
- 'user_feedback': case.get('user_feedback'),
- 'input_details': case.get('input_details'),
- 'output_details': case.get('output_details'),
- 'platform': platform,
- 'original_case_id': cid,
- }
- now_ts = int(time.time())
- cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type,
- metadata, sort_order, submitted_by, created_at,
- updated_at, images, version)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
- ON CONFLICT (id) DO NOTHING""",
- (res_id, case.get('title', ''),
- case.get('workflow_process', ''),
- None, 'research_case',
- psycopg2.extras.Json(metadata), 0, None,
- now_ts, now_ts,
- psycopg2.extras.Json(case.get('images') or []),
- VERSION))
- stats['res'] += cur.rowcount or 0
- cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
- VALUES (%s,%s) ON CONFLICT DO NOTHING""",
- (new_req_id, res_id))
- stats['req_res'] += cur.rowcount or 0
- # ── 4. strategies ─────────────────────────────────
- for idx, st in enumerate(sd.get('strategies', [])):
- if not isinstance(st, dict): continue
- strat_id = f'strategy-taodev-{orig_req}-{idx}'
- is_sel = bool(st.get('is_selected'))
- body = {
- 'source': st.get('source'),
- 'workflow_outline': st.get('workflow_outline', []),
- 'original_strategy_name': st.get('name', ''),
- }
- now_ts = int(time.time())
- cur.execute("""INSERT INTO strategy (id, name, description, body, status,
- created_at, updated_at, version)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
- ON CONFLICT (id) DO NOTHING""",
- (strat_id, (st.get('name', '') or '')[:250],
- st.get('source', ''),
- psycopg2.extras.Json(body),
- 'approved', now_ts, now_ts, VERSION))
- stats['strat'] += cur.rowcount or 0
- cur.execute("""INSERT INTO requirement_strategy
- (requirement_id, strategy_id, is_selected)
- VALUES (%s,%s,%s) ON CONFLICT DO NOTHING""",
- (new_req_id, strat_id, is_sel))
- stats['req_strat'] += cur.rowcount or 0
- for ph in st.get('workflow_outline', []) or []:
- if not isinstance(ph, dict): continue
- for c in ph.get('capabilities', []) or []:
- if not isinstance(c, dict): continue
- raw_id = (c.get('id') or '').strip()
- nm = norm_name(c.get('name'))
- # 先 id 查,再 name 查
- mapped = folder_cap_by_id.get(raw_id) if raw_id else None
- source_kind = 'by_id'
- if not mapped and nm:
- mapped = folder_cap_by_name.get(nm)
- source_kind = 'by_name'
- if not mapped:
- stats['strat_cap_skip'] += 1; continue
- cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id)
- VALUES (%s,%s) ON CONFLICT DO NOTHING""",
- (strat_id, mapped))
- stats[f'strat_cap_{source_kind}'] += cur.rowcount or 0
- for ref in c.get('case_references', []) or []:
- platform, case_id = parse_case_ref(ref)
- if not platform or not case_id:
- stats['strat_res_skip'] += 1; continue
- res_id = folder_res_ids.get((platform, case_id))
- if not res_id:
- stats['strat_res_skip'] += 1; continue
- cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
- VALUES (%s,%s) ON CONFLICT DO NOTHING""",
- (strat_id, res_id))
- stats['strat_res'] += cur.rowcount or 0
- print(f' [{folder.name}] {orig_req}: {time.time()-t0:.1f}s', flush=True)
- print('\n=== 入库统计 ===', flush=True)
- for k, v in stats.items():
- print(f' {k}: {v}', flush=True)
- print('\n=== tao_dev 版本分布 ===', flush=True)
- for tbl in ['requirement', 'capability', 'strategy', 'resource']:
- cur.execute(f'SELECT COUNT(*) c FROM {tbl} WHERE version=%s', (VERSION,))
- print(f' {tbl}: {cur.fetchone()["c"]}', flush=True)
- finally:
- cur.close(); s.close()
- if __name__ == '__main__':
- main()
|