rebuild_howard_dedup.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. #!/usr/bin/env python3
  2. """
  3. 一次性修复脚本:重建 howard_dedup 版本的 capability / strategy / resource
  4. 背景:同事的 agent 失控污染了 tao_dev_1 数据。
  5. 这个脚本:
  6. 1. 备份当前 DB 状态到 /tmp/knowhub_backup_<date>/
  7. 2. 用 /tmp/capabilities_all.md(465 快照)+ MERGE_CLUSTERS 构建别名表
  8. 3. Purge 当前 capability + strategy + resource + junctions(所有版本)
  9. 4. 从 output 2/ 的 99 folder 重新 ingest,version='howard_dedup'
  10. 5. 应用 RENAMES 保留改名工作
  11. 不动的表:requirement、knowledge 及其 junction。
  12. 用法:
  13. python knowhub/scripts/rebuild_howard_dedup.py --backup-only
  14. python knowhub/scripts/rebuild_howard_dedup.py --dry-run
  15. python knowhub/scripts/rebuild_howard_dedup.py --execute
  16. """
  17. import argparse
  18. import hashlib
  19. import json
  20. import os
  21. import re
  22. import sys
  23. import time
  24. from datetime import date
  25. from pathlib import Path
  26. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  27. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  28. from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
  29. from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
  30. from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
  31. from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
  32. from knowhub.scripts.rename_merged_capabilities import RENAMES
  33. OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2')
  34. SNAPSHOT_PATH = Path('/tmp/capabilities_all.md')
  35. BACKUP_DIR = Path(f'/tmp/knowhub_backup_{date.today().isoformat()}')
  36. DEDUP_VERSION = 'howard_dedup'
  37. # CAP-006 在同事操作后丢失,从 conversation 缓存中重建
  38. CAP_006 = {
  39. 'id': 'CAP-006',
  40. 'name': '图像细节增强与高清放大',
  41. 'description': '对已生成的图像进行分辨率提升和细节增强,在放大的同时补充高频细节(后处理路径,区别于生成阶段直接高清输出的 CAP-016)',
  42. 'criterion': '',
  43. }
  44. def norm(s):
  45. return s.strip().lower() if s else ''
  46. def hash8(text):
  47. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
  48. def hash12(text):
  49. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12]
  50. def gen_cap_id(name):
  51. return f'CAP-{hash8(norm(name))}'
  52. def gen_resource_id(platform, url):
  53. p = (platform or 'unknown').lower().strip()
  54. return f'resource/research/{p}/{hash12(url)}'
  55. def gen_strategy_id(req_text, strategy_name):
  56. return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
  57. # ═══════════════════════════════════════════════════════════
  58. def parse_snapshot(path):
  59. """Parse /tmp/capabilities_all.md -> {id: name}."""
  60. if not path.exists():
  61. print(f'⚠️ snapshot file missing: {path}', flush=True)
  62. return {}
  63. content = path.read_text(encoding='utf-8')
  64. pat = re.compile(r'^## (.+?)\n\*\*(CAP-[\w\-]+)\*\*', re.MULTILINE)
  65. return {cid: name.strip() for name, cid in pat.findall(content)}
  66. def build_alias_map(snapshot, current_caps):
  67. """Build normalized_name -> final canonical_id (transitively resolved)."""
  68. # Step A: build member→canonical map; resolve transitively (A→B→C → A,B→C)
  69. member_to_canonical = {}
  70. for canonical, members in MERGE_CLUSTERS.items():
  71. for m in members:
  72. member_to_canonical[m] = canonical
  73. def final(cid, limit=10):
  74. seen = set()
  75. while cid in member_to_canonical and cid not in seen and limit > 0:
  76. seen.add(cid)
  77. cid = member_to_canonical[cid]
  78. limit -= 1
  79. return cid
  80. # Resolve: every member maps to final canonical
  81. for m in list(member_to_canonical.keys()):
  82. member_to_canonical[m] = final(m)
  83. alias = {}
  84. # 1. All snapshot names → final canonical (member) or self (non-member)
  85. for cid, name in snapshot.items():
  86. alias[norm(name)] = member_to_canonical.get(cid, cid)
  87. # 2. Canonical names from current DB (post-rename) → final canonical
  88. for canonical in MERGE_CLUSTERS.keys():
  89. final_id = final(canonical)
  90. if canonical in current_caps:
  91. alias[norm(current_caps[canonical]['name'])] = final_id
  92. # 3. RENAMES: new name → same canonical (final-resolved)
  93. for cid, (new_name, _) in RENAMES.items():
  94. alias[norm(new_name)] = final(cid)
  95. # 4. v0 foundation CAP-001..021 + CAP-006 reconstruction → self
  96. for cid, cap in current_caps.items():
  97. if cid.startswith('CAP-') and len(cid) == 7 and cid[4:].isdigit():
  98. alias[norm(cap['name'])] = cid
  99. alias[norm(CAP_006['name'])] = CAP_006['id']
  100. # 5. Any remaining current DB capability (non-VCAP) → self
  101. for cid, cap in current_caps.items():
  102. if cid.startswith('CAP-tao_dev_'):
  103. continue
  104. alias.setdefault(norm(cap['name']), cid)
  105. return alias
  106. # ═══════════════════════════════════════════════════════════
  107. # PHASE 0: BACKUP
  108. def backup(stores):
  109. BACKUP_DIR.mkdir(parents=True, exist_ok=True)
  110. print(f'📦 Backing up to {BACKUP_DIR}/', flush=True)
  111. tables = [
  112. 'capability', 'strategy', 'resource', 'requirement', 'knowledge',
  113. 'requirement_capability', 'capability_tool', 'capability_knowledge',
  114. 'capability_resource', 'strategy_capability', 'strategy_resource',
  115. 'strategy_knowledge', 'requirement_strategy', 'requirement_resource',
  116. ]
  117. cur = stores['cap']._get_cursor()
  118. try:
  119. for t in tables:
  120. try:
  121. cur.execute(f'SELECT * FROM {t}')
  122. rows = [dict(r) for r in cur.fetchall()]
  123. # strip embedding (too big, not essential for restore)
  124. for r in rows:
  125. r.pop('embedding', None)
  126. (BACKUP_DIR / f'{t}.json').write_text(
  127. json.dumps(rows, default=str, ensure_ascii=False))
  128. print(f' ✓ {t}: {len(rows)} rows', flush=True)
  129. except Exception as e:
  130. print(f' ❌ {t}: {e}', flush=True)
  131. finally:
  132. cur.close()
  133. # PHASE 1: PURGE
  134. def purge(stores):
  135. print('\n🧹 Purging capability / strategy / resource (all versions) + junctions...', flush=True)
  136. cur = stores['cap']._get_cursor()
  137. try:
  138. # junctions first (no FK but keep clean)
  139. for t in ['requirement_capability', 'capability_tool', 'capability_knowledge',
  140. 'capability_resource', 'strategy_capability', 'strategy_resource',
  141. 'strategy_knowledge', 'requirement_strategy', 'requirement_resource']:
  142. cur.execute(f'DELETE FROM {t}')
  143. print(f' ✓ {t} cleared', flush=True)
  144. for t in ['capability', 'strategy', 'resource']:
  145. cur.execute(f'DELETE FROM {t}')
  146. print(f' ✓ {t} cleared', flush=True)
  147. finally:
  148. cur.close()
  149. # PHASE 2: SEED
  150. def seed(stores, current_caps, snapshot):
  151. """Insert:
  152. - 21 v0 foundation caps (CAP-001..021) in howard_dedup
  153. - CAP-006 reconstructed if missing
  154. - All surviving non-VCAP tao_dev_1 canonicals (preserves R1/R2/C renames)
  155. - Missing canonicals (in MERGE_CLUSTERS.keys but gone) recovered from snapshot
  156. """
  157. print('\n🌱 Seeding howard_dedup...', flush=True)
  158. cur = stores['cap']._get_cursor()
  159. try:
  160. inserted = 0
  161. # 1. Insert from current_caps: everything non-VCAP gets version=howard_dedup
  162. for cap_id, cap in current_caps.items():
  163. if cap_id.startswith('CAP-tao_dev_'):
  164. continue # skip VCAP
  165. cur.execute(
  166. """INSERT INTO capability (id, name, criterion, description, effects, version)
  167. VALUES (%s, %s, %s, %s, %s, %s)""",
  168. (cap_id, cap.get('name', ''), cap.get('criterion', '') or '',
  169. cap.get('description', '') or '',
  170. json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
  171. DEDUP_VERSION))
  172. inserted += 1
  173. # 2. CAP-006 reconstruct if missing
  174. if 'CAP-006' not in current_caps:
  175. cur.execute(
  176. """INSERT INTO capability (id, name, criterion, description, effects, version)
  177. VALUES (%s, %s, %s, %s, %s, %s)""",
  178. (CAP_006['id'], CAP_006['name'], CAP_006['criterion'],
  179. CAP_006['description'], '[]', DEDUP_VERSION))
  180. inserted += 1
  181. print(f' ✓ CAP-006 reconstructed', flush=True)
  182. # 3. Missing canonicals (in MERGE_CLUSTERS but not in DB) recovered from snapshot
  183. merged_members = set()
  184. for members in MERGE_CLUSTERS.values():
  185. merged_members.update(members)
  186. recovered = 0
  187. for cid in MERGE_CLUSTERS.keys():
  188. if cid in current_caps or cid in merged_members:
  189. continue
  190. # Not in DB, not a member — need to recover
  191. name = snapshot.get(cid)
  192. if not name:
  193. continue
  194. # Use RENAMES version if available
  195. if cid in RENAMES:
  196. name, desc = RENAMES[cid]
  197. else:
  198. desc = ''
  199. cur.execute(
  200. """INSERT INTO capability (id, name, criterion, description, effects, version)
  201. VALUES (%s, %s, %s, %s, %s, %s)""",
  202. (cid, name, '', desc, '[]', DEDUP_VERSION))
  203. recovered += 1
  204. print(f' seeded: {inserted} existing + {recovered} recovered from snapshot', flush=True)
  205. finally:
  206. cur.close()
  207. # PHASE 3: INGEST from output 2/
  208. def ingest_folder(folder, stores, alias, cur, stats):
  209. folder_key = folder.name
  210. # blueprint → requirement match
  211. bp_path = folder / 'blueprint.json'
  212. if not bp_path.exists():
  213. stats['bad_folders'].append(folder_key + ':no_blueprint')
  214. return
  215. try:
  216. bp = json.loads(bp_path.read_text(encoding='utf-8'))
  217. except Exception as e:
  218. stats['bad_folders'].append(folder_key + f':bp_parse_err({e})')
  219. return
  220. req_text = bp.get('requirement', '')
  221. if not req_text:
  222. stats['bad_folders'].append(folder_key + ':empty_req')
  223. return
  224. cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
  225. row = cur.fetchone()
  226. if not row:
  227. # try fuzzier: first 80 chars prefix
  228. cur.execute('SELECT id FROM requirement WHERE description LIKE %s LIMIT 1',
  229. (req_text[:80].replace('%', '\\%') + '%',))
  230. row = cur.fetchone()
  231. if not row:
  232. print(f' ⚠️ no matching requirement: {req_text[:60]}', flush=True)
  233. stats['missing_req'].append(folder_key)
  234. return
  235. req_id = row['id']
  236. # resources from raw_cases/
  237. raw_dir = folder / 'raw_cases'
  238. resource_ids = []
  239. if raw_dir.exists():
  240. for cf in sorted(raw_dir.glob('*.json')):
  241. try:
  242. data = json.loads(cf.read_text(encoding='utf-8'))
  243. except Exception:
  244. continue
  245. cases = data.get('cases', []) if isinstance(data, dict) else data
  246. if not isinstance(cases, list):
  247. continue
  248. for case in cases:
  249. if not isinstance(case, dict):
  250. continue
  251. url = case.get('source_url') or case.get('url')
  252. if not url:
  253. continue
  254. platform = case.get('platform') or cf.stem.replace('case_', '')
  255. rid = gen_resource_id(platform, url)
  256. title = (case.get('title') or '')[:200]
  257. metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
  258. likes = (metrics.get('likes') or 0) if metrics else 0
  259. cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
  260. cur.execute(
  261. """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
  262. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  263. (rid, title,
  264. json.dumps(case, ensure_ascii=False)[:8000],
  265. 'research_case',
  266. json.dumps(case.get('images', []) or [], ensure_ascii=False),
  267. json.dumps({'platform': platform, 'source_url': url,
  268. 'metrics': metrics, 'folder': folder_key},
  269. ensure_ascii=False),
  270. -int(likes), DEDUP_VERSION))
  271. resource_ids.append(rid)
  272. stats['resource'] += 1
  273. # capabilities
  274. caps_path = folder / 'capabilities_extracted.json'
  275. cap_resolved = {} # source_key -> resolved_id
  276. if caps_path.exists():
  277. try:
  278. caps_data = json.loads(caps_path.read_text(encoding='utf-8'))
  279. except Exception as e:
  280. stats['bad_folders'].append(folder_key + f':caps_parse_err({e})')
  281. caps_data = {'extracted_capabilities': []}
  282. for cap in caps_data.get('extracted_capabilities', []):
  283. name = (cap.get('name') or '').strip()
  284. if not name:
  285. continue
  286. src_id = cap.get('id')
  287. resolved = None
  288. # (1) source id exists in DB?
  289. if src_id:
  290. cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,))
  291. if cur.fetchone():
  292. resolved = src_id
  293. # (2) alias by name?
  294. if not resolved:
  295. cand = alias.get(norm(name))
  296. if cand:
  297. cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,))
  298. if cur.fetchone():
  299. resolved = cand
  300. # (3) create new with hash ID
  301. if not resolved:
  302. new_id = gen_cap_id(name)
  303. cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
  304. if not cur.fetchone():
  305. cur.execute(
  306. """INSERT INTO capability (id, name, criterion, description, effects, version)
  307. VALUES (%s, %s, %s, %s, %s, %s)""",
  308. (new_id, name, cap.get('criterion', '') or '',
  309. cap.get('description', '') or '',
  310. json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
  311. DEDUP_VERSION))
  312. alias[norm(name)] = new_id
  313. stats['capability_new'] += 1
  314. resolved = new_id
  315. else:
  316. # backfill criterion/effects if missing
  317. cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,))
  318. ex = cur.fetchone()
  319. if ex:
  320. if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'):
  321. cur.execute('UPDATE capability SET criterion = %s WHERE id = %s',
  322. (cap['criterion'], resolved))
  323. stats['criterion_backfilled'] += 1
  324. cur_eff = ex.get('effects')
  325. if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'):
  326. cur.execute('UPDATE capability SET effects = %s WHERE id = %s',
  327. (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved))
  328. stats['effects_backfilled'] += 1
  329. stats['capability_linked'] += 1
  330. cap_resolved[src_id or name] = resolved
  331. # strategy (is_selected only)
  332. strat_path = folder / 'strategy.json'
  333. if not strat_path.exists():
  334. return
  335. try:
  336. strat_data = json.loads(strat_path.read_text(encoding='utf-8'))
  337. except Exception as e:
  338. stats['bad_folders'].append(folder_key + f':strat_parse_err({e})')
  339. return
  340. selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None)
  341. if not selected:
  342. sel_list = strat_data.get('strategies', [])
  343. selected = sel_list[0] if sel_list else None
  344. if not selected:
  345. return
  346. strategy_name = selected.get('name') or f'Strategy-{folder_key}'
  347. strat_id = gen_strategy_id(req_text, strategy_name)
  348. now = int(time.time())
  349. cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
  350. cur.execute(
  351. """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
  352. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  353. (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000],
  354. json.dumps(selected, ensure_ascii=False, indent=2),
  355. 'draft', now, now, DEDUP_VERSION))
  356. stats['strategy'] += 1
  357. # wire junctions
  358. for rid in resource_ids:
  359. cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
  360. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
  361. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  362. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
  363. cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
  364. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
  365. strat_cap_ids = set()
  366. wo = selected.get('workflow_outline') or []
  367. if isinstance(wo, list):
  368. for phase in wo:
  369. if not isinstance(phase, dict):
  370. continue
  371. caps = phase.get('capabilities') or []
  372. if not isinstance(caps, list):
  373. continue
  374. for c_ref in caps:
  375. if not isinstance(c_ref, dict):
  376. continue
  377. key = c_ref.get('id') or c_ref.get('name', '')
  378. resolved = cap_resolved.get(key) or alias.get(norm(c_ref.get('name', '')))
  379. if resolved:
  380. strat_cap_ids.add(resolved)
  381. for cid in strat_cap_ids:
  382. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
  383. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
  384. (strat_id, cid))
  385. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  386. VALUES (%s, %s) ON CONFLICT DO NOTHING""",
  387. (req_id, cid))
  388. stats['folder_cap_count'] = len(strat_cap_ids)
  389. stats['folder_res_count'] = len(resource_ids)
  390. # PHASE 4: apply renames (defensive — some may have already been applied during seeding)
  391. def apply_renames(stores):
  392. print('\n📝 Applying RENAMES...', flush=True)
  393. cur = stores['cap']._get_cursor()
  394. try:
  395. applied = 0
  396. for cid, (name, desc) in RENAMES.items():
  397. cur.execute('UPDATE capability SET name = %s, description = %s WHERE id = %s',
  398. (name, desc, cid))
  399. if (cur.rowcount or 0) > 0:
  400. applied += 1
  401. print(f' applied {applied}/{len(RENAMES)}', flush=True)
  402. finally:
  403. cur.close()
  404. # ═══════════════════════════════════════════════════════════
  405. def main():
  406. ap = argparse.ArgumentParser()
  407. g = ap.add_mutually_exclusive_group(required=True)
  408. g.add_argument('--backup-only', action='store_true')
  409. g.add_argument('--dry-run', action='store_true')
  410. g.add_argument('--execute', action='store_true')
  411. ap.add_argument('--skip-backup', action='store_true', help='Skip backup (if already done)')
  412. args = ap.parse_args()
  413. print(f'\n{"="*60}', flush=True)
  414. print(f'{"BACKUP" if args.backup_only else "DRY RUN" if args.dry_run else "EXECUTE"} '
  415. f'— rebuild howard_dedup', flush=True)
  416. print(f'{"="*60}', flush=True)
  417. stores = {
  418. 'cap': PostgreSQLCapabilityStore(),
  419. 'res': PostgreSQLResourceStore(),
  420. 'req': PostgreSQLRequirementStore(),
  421. 'strat': PostgreSQLStrategyStore(),
  422. }
  423. try:
  424. if not args.skip_backup:
  425. backup(stores)
  426. if args.backup_only:
  427. print('\n✅ Backup only. Exit.', flush=True)
  428. return
  429. # dump current caps (used for seed)
  430. cur = stores['cap']._get_cursor()
  431. cur.execute('SELECT id, name, criterion, description, effects, version FROM capability')
  432. current_caps = {r['id']: dict(r) for r in cur.fetchall()}
  433. cur.close()
  434. snapshot = parse_snapshot(SNAPSHOT_PATH)
  435. alias = build_alias_map(snapshot, current_caps)
  436. (BACKUP_DIR / 'alias_map.json').write_text(
  437. json.dumps(alias, ensure_ascii=False, indent=2))
  438. print(f'\n🔗 Alias map: {len(alias)} entries (snapshot={len(snapshot)}, '
  439. f'current_caps={len(current_caps)})', flush=True)
  440. if args.dry_run:
  441. print('\n[DRY-RUN] Simulating folder 001 ingest...', flush=True)
  442. f1 = OUTPUT_DIR / '001'
  443. if f1.exists():
  444. caps = json.loads((f1 / 'capabilities_extracted.json').read_text())
  445. new = linked = 0
  446. for c in caps.get('extracted_capabilities', [])[:15]:
  447. name = (c.get('name') or '').strip()
  448. src = c.get('id')
  449. cand = alias.get(norm(name))
  450. status = 'LINK' if cand else 'NEW'
  451. if cand: linked += 1
  452. else: new += 1
  453. print(f' [{status}] src_id={src!r} → {cand!r} | {name[:50]}', flush=True)
  454. print(f'\n folder 001 sample (first 15): link={linked} new={new}', flush=True)
  455. print('\n[DRY-RUN] Skipping purge + ingest. Use --execute to run.', flush=True)
  456. return
  457. # Skip purge+seed if howard_dedup already has canonical seeds
  458. # (enables resume after connection drop)
  459. cur = stores['cap']._get_cursor()
  460. cur.execute("SELECT COUNT(*) AS c FROM capability WHERE version = %s",
  461. (DEDUP_VERSION,))
  462. hd_count = cur.fetchone()['c']
  463. cur.close()
  464. if hd_count > 100:
  465. print(f'\n⚠️ howard_dedup already has {hd_count} caps — skipping purge+seed (resume mode)',
  466. flush=True)
  467. else:
  468. purge(stores)
  469. seed(stores, current_caps, snapshot)
  470. # Ingest
  471. print('\n📂 Ingesting output 2/ ...', flush=True)
  472. stats = {'folder_processed': 0, 'bad_folders': [], 'missing_req': [],
  473. 'resource': 0, 'capability_new': 0, 'capability_linked': 0,
  474. 'strategy': 0, 'criterion_backfilled': 0, 'effects_backfilled': 0,
  475. 'folder_cap_count': 0, 'folder_res_count': 0}
  476. folders = sorted([d for d in OUTPUT_DIR.iterdir() if d.is_dir()])
  477. cur = stores['cap']._get_cursor()
  478. try:
  479. for idx, folder in enumerate(folders, 1):
  480. before_cap_new = stats['capability_new']
  481. before_cap_link = stats['capability_linked']
  482. before_res = stats['resource']
  483. before_strat = stats['strategy']
  484. stats['folder_cap_count'] = 0
  485. stats['folder_res_count'] = 0
  486. try:
  487. ingest_folder(folder, stores, alias, cur, stats)
  488. stats['folder_processed'] += 1
  489. d_new = stats['capability_new'] - before_cap_new
  490. d_link = stats['capability_linked'] - before_cap_link
  491. d_res = stats['resource'] - before_res
  492. d_strat = stats['strategy'] - before_strat
  493. print(f"[{idx:3d}/{len(folders)}] 📁 {folder.name}/ "
  494. f"cap:link={d_link} new={d_new} res={d_res} strat={d_strat} "
  495. f"strat_caps={stats['folder_cap_count']}",
  496. flush=True)
  497. except Exception as e:
  498. print(f'[{idx:3d}/{len(folders)}] 📁 {folder.name}/ ❌ {type(e).__name__}: {e}',
  499. flush=True)
  500. stats['bad_folders'].append(folder.name + f':{type(e).__name__}')
  501. # reconnect cursor if connection dropped
  502. try:
  503. cur.close()
  504. except Exception:
  505. pass
  506. cur = stores['cap']._get_cursor()
  507. finally:
  508. try:
  509. cur.close()
  510. except Exception:
  511. pass
  512. apply_renames(stores)
  513. # Verify
  514. print(f'\n{"="*60}\n验证:', flush=True)
  515. cur = stores['cap']._get_cursor()
  516. for tbl in ['capability', 'strategy', 'resource', 'requirement']:
  517. cur.execute(f"SELECT version, COUNT(*) AS c FROM {tbl} GROUP BY version")
  518. for r in cur.fetchall():
  519. print(f" {tbl} / {r['version']}: {r['c']}", flush=True)
  520. cur.close()
  521. print(f'\n📊 Stats:', flush=True)
  522. for k, v in stats.items():
  523. if isinstance(v, list):
  524. print(f' {k}: {len(v)} {v[:5]}{"..." if len(v)>5 else ""}', flush=True)
  525. else:
  526. print(f' {k}: {v}', flush=True)
  527. print(f'{"="*60}', flush=True)
  528. finally:
  529. for s in stores.values():
  530. s.close()
  531. if __name__ == '__main__':
  532. main()