|
|
@@ -0,0 +1,600 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+一次性修复脚本:重建 howard_dedup 版本的 capability / strategy / resource
|
|
|
+
|
|
|
+背景:同事的 agent 失控污染了 tao_dev_1 数据。
|
|
|
+这个脚本:
|
|
|
+ 1. 备份当前 DB 状态到 /tmp/knowhub_backup_<date>/
|
|
|
+ 2. 用 /tmp/capabilities_all.md(465 快照)+ MERGE_CLUSTERS 构建别名表
|
|
|
+ 3. Purge 当前 capability + strategy + resource + junctions(所有版本)
|
|
|
+ 4. 从 output 2/ 的 99 folder 重新 ingest,version='howard_dedup'
|
|
|
+ 5. 应用 RENAMES 保留改名工作
|
|
|
+
|
|
|
+不动的表:requirement、knowledge 及其 junction。
|
|
|
+
|
|
|
+用法:
|
|
|
+ python knowhub/scripts/rebuild_howard_dedup.py --backup-only
|
|
|
+ python knowhub/scripts/rebuild_howard_dedup.py --dry-run
|
|
|
+ python knowhub/scripts/rebuild_howard_dedup.py --execute
|
|
|
+"""
|
|
|
+import argparse
|
|
|
+import hashlib
|
|
|
+import json
|
|
|
+import os
|
|
|
+import re
|
|
|
+import sys
|
|
|
+import time
|
|
|
+from datetime import date
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
+from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
|
|
|
+from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore
|
|
|
+from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore
|
|
|
+from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore
|
|
|
+from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
|
|
|
+from knowhub.scripts.rename_merged_capabilities import RENAMES
|
|
|
+
|
|
|
+OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2')
|
|
|
+SNAPSHOT_PATH = Path('/tmp/capabilities_all.md')
|
|
|
+BACKUP_DIR = Path(f'/tmp/knowhub_backup_{date.today().isoformat()}')
|
|
|
+DEDUP_VERSION = 'howard_dedup'
|
|
|
+
|
|
|
+# CAP-006 在同事操作后丢失,从 conversation 缓存中重建
|
|
|
+CAP_006 = {
|
|
|
+ 'id': 'CAP-006',
|
|
|
+ 'name': '图像细节增强与高清放大',
|
|
|
+ 'description': '对已生成的图像进行分辨率提升和细节增强,在放大的同时补充高频细节(后处理路径,区别于生成阶段直接高清输出的 CAP-016)',
|
|
|
+ 'criterion': '',
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def norm(s):
|
|
|
+ return s.strip().lower() if s else ''
|
|
|
+
|
|
|
+
|
|
|
+def hash8(text):
|
|
|
+ return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
|
|
|
+
|
|
|
+
|
|
|
+def hash12(text):
|
|
|
+ return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12]
|
|
|
+
|
|
|
+
|
|
|
+def gen_cap_id(name):
|
|
|
+ return f'CAP-{hash8(norm(name))}'
|
|
|
+
|
|
|
+
|
|
|
+def gen_resource_id(platform, url):
|
|
|
+ p = (platform or 'unknown').lower().strip()
|
|
|
+ return f'resource/research/{p}/{hash12(url)}'
|
|
|
+
|
|
|
+
|
|
|
+def gen_strategy_id(req_text, strategy_name):
|
|
|
+ return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def parse_snapshot(path):
|
|
|
+ """Parse /tmp/capabilities_all.md -> {id: name}."""
|
|
|
+ if not path.exists():
|
|
|
+ print(f'⚠️ snapshot file missing: {path}', flush=True)
|
|
|
+ return {}
|
|
|
+ content = path.read_text(encoding='utf-8')
|
|
|
+ pat = re.compile(r'^## (.+?)\n\*\*(CAP-[\w\-]+)\*\*', re.MULTILINE)
|
|
|
+ return {cid: name.strip() for name, cid in pat.findall(content)}
|
|
|
+
|
|
|
+
|
|
|
+def build_alias_map(snapshot, current_caps):
|
|
|
+ """Build normalized_name -> final canonical_id (transitively resolved)."""
|
|
|
+ # Step A: build member→canonical map; resolve transitively (A→B→C → A,B→C)
|
|
|
+ member_to_canonical = {}
|
|
|
+ for canonical, members in MERGE_CLUSTERS.items():
|
|
|
+ for m in members:
|
|
|
+ member_to_canonical[m] = canonical
|
|
|
+
|
|
|
+ def final(cid, limit=10):
|
|
|
+ seen = set()
|
|
|
+ while cid in member_to_canonical and cid not in seen and limit > 0:
|
|
|
+ seen.add(cid)
|
|
|
+ cid = member_to_canonical[cid]
|
|
|
+ limit -= 1
|
|
|
+ return cid
|
|
|
+
|
|
|
+ # Resolve: every member maps to final canonical
|
|
|
+ for m in list(member_to_canonical.keys()):
|
|
|
+ member_to_canonical[m] = final(m)
|
|
|
+
|
|
|
+ alias = {}
|
|
|
+
|
|
|
+ # 1. All snapshot names → final canonical (member) or self (non-member)
|
|
|
+ for cid, name in snapshot.items():
|
|
|
+ alias[norm(name)] = member_to_canonical.get(cid, cid)
|
|
|
+
|
|
|
+ # 2. Canonical names from current DB (post-rename) → final canonical
|
|
|
+ for canonical in MERGE_CLUSTERS.keys():
|
|
|
+ final_id = final(canonical)
|
|
|
+ if canonical in current_caps:
|
|
|
+ alias[norm(current_caps[canonical]['name'])] = final_id
|
|
|
+
|
|
|
+ # 3. RENAMES: new name → same canonical (final-resolved)
|
|
|
+ for cid, (new_name, _) in RENAMES.items():
|
|
|
+ alias[norm(new_name)] = final(cid)
|
|
|
+
|
|
|
+ # 4. v0 foundation CAP-001..021 + CAP-006 reconstruction → self
|
|
|
+ for cid, cap in current_caps.items():
|
|
|
+ if cid.startswith('CAP-') and len(cid) == 7 and cid[4:].isdigit():
|
|
|
+ alias[norm(cap['name'])] = cid
|
|
|
+ alias[norm(CAP_006['name'])] = CAP_006['id']
|
|
|
+
|
|
|
+ # 5. Any remaining current DB capability (non-VCAP) → self
|
|
|
+ for cid, cap in current_caps.items():
|
|
|
+ if cid.startswith('CAP-tao_dev_'):
|
|
|
+ continue
|
|
|
+ alias.setdefault(norm(cap['name']), cid)
|
|
|
+
|
|
|
+ return alias
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+# PHASE 0: BACKUP
|
|
|
+def backup(stores):
|
|
|
+ BACKUP_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
+ print(f'📦 Backing up to {BACKUP_DIR}/', flush=True)
|
|
|
+ tables = [
|
|
|
+ 'capability', 'strategy', 'resource', 'requirement', 'knowledge',
|
|
|
+ 'requirement_capability', 'capability_tool', 'capability_knowledge',
|
|
|
+ 'capability_resource', 'strategy_capability', 'strategy_resource',
|
|
|
+ 'strategy_knowledge', 'requirement_strategy', 'requirement_resource',
|
|
|
+ ]
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ try:
|
|
|
+ for t in tables:
|
|
|
+ try:
|
|
|
+ cur.execute(f'SELECT * FROM {t}')
|
|
|
+ rows = [dict(r) for r in cur.fetchall()]
|
|
|
+ # strip embedding (too big, not essential for restore)
|
|
|
+ for r in rows:
|
|
|
+ r.pop('embedding', None)
|
|
|
+ (BACKUP_DIR / f'{t}.json').write_text(
|
|
|
+ json.dumps(rows, default=str, ensure_ascii=False))
|
|
|
+ print(f' ✓ {t}: {len(rows)} rows', flush=True)
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ❌ {t}: {e}', flush=True)
|
|
|
+ finally:
|
|
|
+ cur.close()
|
|
|
+
|
|
|
+
|
|
|
+# PHASE 1: PURGE
|
|
|
+def purge(stores):
|
|
|
+ print('\n🧹 Purging capability / strategy / resource (all versions) + junctions...', flush=True)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ try:
|
|
|
+ # junctions first (no FK but keep clean)
|
|
|
+ for t in ['requirement_capability', 'capability_tool', 'capability_knowledge',
|
|
|
+ 'capability_resource', 'strategy_capability', 'strategy_resource',
|
|
|
+ 'strategy_knowledge', 'requirement_strategy', 'requirement_resource']:
|
|
|
+ cur.execute(f'DELETE FROM {t}')
|
|
|
+ print(f' ✓ {t} cleared', flush=True)
|
|
|
+ for t in ['capability', 'strategy', 'resource']:
|
|
|
+ cur.execute(f'DELETE FROM {t}')
|
|
|
+ print(f' ✓ {t} cleared', flush=True)
|
|
|
+ finally:
|
|
|
+ cur.close()
|
|
|
+
|
|
|
+
|
|
|
+# PHASE 2: SEED
|
|
|
+def seed(stores, current_caps, snapshot):
|
|
|
+ """Insert:
|
|
|
+ - 21 v0 foundation caps (CAP-001..021) in howard_dedup
|
|
|
+ - CAP-006 reconstructed if missing
|
|
|
+ - All surviving non-VCAP tao_dev_1 canonicals (preserves R1/R2/C renames)
|
|
|
+ - Missing canonicals (in MERGE_CLUSTERS.keys but gone) recovered from snapshot
|
|
|
+ """
|
|
|
+ print('\n🌱 Seeding howard_dedup...', flush=True)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ try:
|
|
|
+ inserted = 0
|
|
|
+ # 1. Insert from current_caps: everything non-VCAP gets version=howard_dedup
|
|
|
+ for cap_id, cap in current_caps.items():
|
|
|
+ if cap_id.startswith('CAP-tao_dev_'):
|
|
|
+ continue # skip VCAP
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO capability (id, name, criterion, description, effects, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
+ (cap_id, cap.get('name', ''), cap.get('criterion', '') or '',
|
|
|
+ cap.get('description', '') or '',
|
|
|
+ json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
|
|
|
+ DEDUP_VERSION))
|
|
|
+ inserted += 1
|
|
|
+
|
|
|
+ # 2. CAP-006 reconstruct if missing
|
|
|
+ if 'CAP-006' not in current_caps:
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO capability (id, name, criterion, description, effects, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
+ (CAP_006['id'], CAP_006['name'], CAP_006['criterion'],
|
|
|
+ CAP_006['description'], '[]', DEDUP_VERSION))
|
|
|
+ inserted += 1
|
|
|
+ print(f' ✓ CAP-006 reconstructed', flush=True)
|
|
|
+
|
|
|
+ # 3. Missing canonicals (in MERGE_CLUSTERS but not in DB) recovered from snapshot
|
|
|
+ merged_members = set()
|
|
|
+ for members in MERGE_CLUSTERS.values():
|
|
|
+ merged_members.update(members)
|
|
|
+ recovered = 0
|
|
|
+ for cid in MERGE_CLUSTERS.keys():
|
|
|
+ if cid in current_caps or cid in merged_members:
|
|
|
+ continue
|
|
|
+ # Not in DB, not a member — need to recover
|
|
|
+ name = snapshot.get(cid)
|
|
|
+ if not name:
|
|
|
+ continue
|
|
|
+ # Use RENAMES version if available
|
|
|
+ if cid in RENAMES:
|
|
|
+ name, desc = RENAMES[cid]
|
|
|
+ else:
|
|
|
+ desc = ''
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO capability (id, name, criterion, description, effects, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
+ (cid, name, '', desc, '[]', DEDUP_VERSION))
|
|
|
+ recovered += 1
|
|
|
+ print(f' seeded: {inserted} existing + {recovered} recovered from snapshot', flush=True)
|
|
|
+ finally:
|
|
|
+ cur.close()
|
|
|
+
|
|
|
+
|
|
|
+# PHASE 3: INGEST from output 2/
|
|
|
+def ingest_folder(folder, stores, alias, cur, stats):
|
|
|
+ folder_key = folder.name
|
|
|
+ # blueprint → requirement match
|
|
|
+ bp_path = folder / 'blueprint.json'
|
|
|
+ if not bp_path.exists():
|
|
|
+ stats['bad_folders'].append(folder_key + ':no_blueprint')
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ bp = json.loads(bp_path.read_text(encoding='utf-8'))
|
|
|
+ except Exception as e:
|
|
|
+ stats['bad_folders'].append(folder_key + f':bp_parse_err({e})')
|
|
|
+ return
|
|
|
+ req_text = bp.get('requirement', '')
|
|
|
+ if not req_text:
|
|
|
+ stats['bad_folders'].append(folder_key + ':empty_req')
|
|
|
+ return
|
|
|
+
|
|
|
+ cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
|
|
|
+ row = cur.fetchone()
|
|
|
+ if not row:
|
|
|
+ # try fuzzier: first 80 chars prefix
|
|
|
+ cur.execute('SELECT id FROM requirement WHERE description LIKE %s LIMIT 1',
|
|
|
+ (req_text[:80].replace('%', '\\%') + '%',))
|
|
|
+ row = cur.fetchone()
|
|
|
+ if not row:
|
|
|
+ print(f' ⚠️ no matching requirement: {req_text[:60]}', flush=True)
|
|
|
+ stats['missing_req'].append(folder_key)
|
|
|
+ return
|
|
|
+ req_id = row['id']
|
|
|
+
|
|
|
+ # resources from raw_cases/
|
|
|
+ raw_dir = folder / 'raw_cases'
|
|
|
+ resource_ids = []
|
|
|
+ if raw_dir.exists():
|
|
|
+ for cf in sorted(raw_dir.glob('*.json')):
|
|
|
+ try:
|
|
|
+ data = json.loads(cf.read_text(encoding='utf-8'))
|
|
|
+ except Exception:
|
|
|
+ continue
|
|
|
+ cases = data.get('cases', []) if isinstance(data, dict) else data
|
|
|
+ if not isinstance(cases, list):
|
|
|
+ continue
|
|
|
+ for case in cases:
|
|
|
+ if not isinstance(case, dict):
|
|
|
+ continue
|
|
|
+ url = case.get('source_url') or case.get('url')
|
|
|
+ if not url:
|
|
|
+ continue
|
|
|
+ platform = case.get('platform') or cf.stem.replace('case_', '')
|
|
|
+ rid = gen_resource_id(platform, url)
|
|
|
+ title = (case.get('title') or '')[:200]
|
|
|
+ metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
|
|
|
+ likes = (metrics.get('likes') or 0) if metrics else 0
|
|
|
+
|
|
|
+ cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
|
+ (rid, title,
|
|
|
+ json.dumps(case, ensure_ascii=False)[:8000],
|
|
|
+ 'research_case',
|
|
|
+ json.dumps(case.get('images', []) or [], ensure_ascii=False),
|
|
|
+ json.dumps({'platform': platform, 'source_url': url,
|
|
|
+ 'metrics': metrics, 'folder': folder_key},
|
|
|
+ ensure_ascii=False),
|
|
|
+ -int(likes), DEDUP_VERSION))
|
|
|
+ resource_ids.append(rid)
|
|
|
+ stats['resource'] += 1
|
|
|
+
|
|
|
+ # capabilities
|
|
|
+ caps_path = folder / 'capabilities_extracted.json'
|
|
|
+ cap_resolved = {} # source_key -> resolved_id
|
|
|
+ if caps_path.exists():
|
|
|
+ try:
|
|
|
+ caps_data = json.loads(caps_path.read_text(encoding='utf-8'))
|
|
|
+ except Exception as e:
|
|
|
+ stats['bad_folders'].append(folder_key + f':caps_parse_err({e})')
|
|
|
+ caps_data = {'extracted_capabilities': []}
|
|
|
+
|
|
|
+ for cap in caps_data.get('extracted_capabilities', []):
|
|
|
+ name = (cap.get('name') or '').strip()
|
|
|
+ if not name:
|
|
|
+ continue
|
|
|
+ src_id = cap.get('id')
|
|
|
+ resolved = None
|
|
|
+
|
|
|
+ # (1) source id exists in DB?
|
|
|
+ if src_id:
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,))
|
|
|
+ if cur.fetchone():
|
|
|
+ resolved = src_id
|
|
|
+ # (2) alias by name?
|
|
|
+ if not resolved:
|
|
|
+ cand = alias.get(norm(name))
|
|
|
+ if cand:
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,))
|
|
|
+ if cur.fetchone():
|
|
|
+ resolved = cand
|
|
|
+ # (3) create new with hash ID
|
|
|
+ if not resolved:
|
|
|
+ new_id = gen_cap_id(name)
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
|
|
|
+ if not cur.fetchone():
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO capability (id, name, criterion, description, effects, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
+ (new_id, name, cap.get('criterion', '') or '',
|
|
|
+ cap.get('description', '') or '',
|
|
|
+ json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
|
|
|
+ DEDUP_VERSION))
|
|
|
+ alias[norm(name)] = new_id
|
|
|
+ stats['capability_new'] += 1
|
|
|
+ resolved = new_id
|
|
|
+ else:
|
|
|
+ # backfill criterion/effects if missing
|
|
|
+ cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,))
|
|
|
+ ex = cur.fetchone()
|
|
|
+ if ex:
|
|
|
+ if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'):
|
|
|
+ cur.execute('UPDATE capability SET criterion = %s WHERE id = %s',
|
|
|
+ (cap['criterion'], resolved))
|
|
|
+ stats['criterion_backfilled'] += 1
|
|
|
+ cur_eff = ex.get('effects')
|
|
|
+ if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'):
|
|
|
+ cur.execute('UPDATE capability SET effects = %s WHERE id = %s',
|
|
|
+ (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved))
|
|
|
+ stats['effects_backfilled'] += 1
|
|
|
+ stats['capability_linked'] += 1
|
|
|
+ cap_resolved[src_id or name] = resolved
|
|
|
+
|
|
|
+ # strategy (is_selected only)
|
|
|
+ strat_path = folder / 'strategy.json'
|
|
|
+ if not strat_path.exists():
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ strat_data = json.loads(strat_path.read_text(encoding='utf-8'))
|
|
|
+ except Exception as e:
|
|
|
+ stats['bad_folders'].append(folder_key + f':strat_parse_err({e})')
|
|
|
+ return
|
|
|
+
|
|
|
+ selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None)
|
|
|
+ if not selected:
|
|
|
+ sel_list = strat_data.get('strategies', [])
|
|
|
+ selected = sel_list[0] if sel_list else None
|
|
|
+ if not selected:
|
|
|
+ return
|
|
|
+
|
|
|
+ strategy_name = selected.get('name') or f'Strategy-{folder_key}'
|
|
|
+ strat_id = gen_strategy_id(req_text, strategy_name)
|
|
|
+ now = int(time.time())
|
|
|
+ cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
|
+ (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000],
|
|
|
+ json.dumps(selected, ensure_ascii=False, indent=2),
|
|
|
+ 'draft', now, now, DEDUP_VERSION))
|
|
|
+ stats['strategy'] += 1
|
|
|
+
|
|
|
+ # wire junctions
|
|
|
+ for rid in resource_ids:
|
|
|
+ cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
|
|
|
+ cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
|
|
|
+ cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
|
|
|
+
|
|
|
+ strat_cap_ids = set()
|
|
|
+ wo = selected.get('workflow_outline') or []
|
|
|
+ if isinstance(wo, list):
|
|
|
+ for phase in wo:
|
|
|
+ if not isinstance(phase, dict):
|
|
|
+ continue
|
|
|
+ caps = phase.get('capabilities') or []
|
|
|
+ if not isinstance(caps, list):
|
|
|
+ continue
|
|
|
+ for c_ref in caps:
|
|
|
+ if not isinstance(c_ref, dict):
|
|
|
+ continue
|
|
|
+ key = c_ref.get('id') or c_ref.get('name', '')
|
|
|
+ resolved = cap_resolved.get(key) or alias.get(norm(c_ref.get('name', '')))
|
|
|
+ if resolved:
|
|
|
+ strat_cap_ids.add(resolved)
|
|
|
+ for cid in strat_cap_ids:
|
|
|
+ cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
|
|
|
+ VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
|
|
|
+ (strat_id, cid))
|
|
|
+ cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""",
|
|
|
+ (req_id, cid))
|
|
|
+ stats['folder_cap_count'] = len(strat_cap_ids)
|
|
|
+ stats['folder_res_count'] = len(resource_ids)
|
|
|
+
|
|
|
+
|
|
|
+# PHASE 4: apply renames (defensive — some may have already been applied during seeding)
|
|
|
+def apply_renames(stores):
|
|
|
+ print('\n📝 Applying RENAMES...', flush=True)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ try:
|
|
|
+ applied = 0
|
|
|
+ for cid, (name, desc) in RENAMES.items():
|
|
|
+ cur.execute('UPDATE capability SET name = %s, description = %s WHERE id = %s',
|
|
|
+ (name, desc, cid))
|
|
|
+ if (cur.rowcount or 0) > 0:
|
|
|
+ applied += 1
|
|
|
+ print(f' applied {applied}/{len(RENAMES)}', flush=True)
|
|
|
+ finally:
|
|
|
+ cur.close()
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def main():
|
|
|
+ ap = argparse.ArgumentParser()
|
|
|
+ g = ap.add_mutually_exclusive_group(required=True)
|
|
|
+ g.add_argument('--backup-only', action='store_true')
|
|
|
+ g.add_argument('--dry-run', action='store_true')
|
|
|
+ g.add_argument('--execute', action='store_true')
|
|
|
+ ap.add_argument('--skip-backup', action='store_true', help='Skip backup (if already done)')
|
|
|
+ args = ap.parse_args()
|
|
|
+
|
|
|
+ print(f'\n{"="*60}', flush=True)
|
|
|
+ print(f'{"BACKUP" if args.backup_only else "DRY RUN" if args.dry_run else "EXECUTE"} '
|
|
|
+ f'— rebuild howard_dedup', flush=True)
|
|
|
+ print(f'{"="*60}', flush=True)
|
|
|
+
|
|
|
+ stores = {
|
|
|
+ 'cap': PostgreSQLCapabilityStore(),
|
|
|
+ 'res': PostgreSQLResourceStore(),
|
|
|
+ 'req': PostgreSQLRequirementStore(),
|
|
|
+ 'strat': PostgreSQLStrategyStore(),
|
|
|
+ }
|
|
|
+
|
|
|
+ try:
|
|
|
+ if not args.skip_backup:
|
|
|
+ backup(stores)
|
|
|
+ if args.backup_only:
|
|
|
+ print('\n✅ Backup only. Exit.', flush=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ # dump current caps (used for seed)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ cur.execute('SELECT id, name, criterion, description, effects, version FROM capability')
|
|
|
+ current_caps = {r['id']: dict(r) for r in cur.fetchall()}
|
|
|
+ cur.close()
|
|
|
+
|
|
|
+ snapshot = parse_snapshot(SNAPSHOT_PATH)
|
|
|
+ alias = build_alias_map(snapshot, current_caps)
|
|
|
+ (BACKUP_DIR / 'alias_map.json').write_text(
|
|
|
+ json.dumps(alias, ensure_ascii=False, indent=2))
|
|
|
+ print(f'\n🔗 Alias map: {len(alias)} entries (snapshot={len(snapshot)}, '
|
|
|
+ f'current_caps={len(current_caps)})', flush=True)
|
|
|
+
|
|
|
+ if args.dry_run:
|
|
|
+ print('\n[DRY-RUN] Simulating folder 001 ingest...', flush=True)
|
|
|
+ f1 = OUTPUT_DIR / '001'
|
|
|
+ if f1.exists():
|
|
|
+ caps = json.loads((f1 / 'capabilities_extracted.json').read_text())
|
|
|
+ new = linked = 0
|
|
|
+ for c in caps.get('extracted_capabilities', [])[:15]:
|
|
|
+ name = (c.get('name') or '').strip()
|
|
|
+ src = c.get('id')
|
|
|
+ cand = alias.get(norm(name))
|
|
|
+ status = 'LINK' if cand else 'NEW'
|
|
|
+ if cand: linked += 1
|
|
|
+ else: new += 1
|
|
|
+ print(f' [{status}] src_id={src!r} → {cand!r} | {name[:50]}', flush=True)
|
|
|
+ print(f'\n folder 001 sample (first 15): link={linked} new={new}', flush=True)
|
|
|
+ print('\n[DRY-RUN] Skipping purge + ingest. Use --execute to run.', flush=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ # Skip purge+seed if howard_dedup already has canonical seeds
|
|
|
+ # (enables resume after connection drop)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ cur.execute("SELECT COUNT(*) AS c FROM capability WHERE version = %s",
|
|
|
+ (DEDUP_VERSION,))
|
|
|
+ hd_count = cur.fetchone()['c']
|
|
|
+ cur.close()
|
|
|
+ if hd_count > 100:
|
|
|
+ print(f'\n⚠️ howard_dedup already has {hd_count} caps — skipping purge+seed (resume mode)',
|
|
|
+ flush=True)
|
|
|
+ else:
|
|
|
+ purge(stores)
|
|
|
+ seed(stores, current_caps, snapshot)
|
|
|
+
|
|
|
+ # Ingest
|
|
|
+ print('\n📂 Ingesting output 2/ ...', flush=True)
|
|
|
+ stats = {'folder_processed': 0, 'bad_folders': [], 'missing_req': [],
|
|
|
+ 'resource': 0, 'capability_new': 0, 'capability_linked': 0,
|
|
|
+ 'strategy': 0, 'criterion_backfilled': 0, 'effects_backfilled': 0,
|
|
|
+ 'folder_cap_count': 0, 'folder_res_count': 0}
|
|
|
+ folders = sorted([d for d in OUTPUT_DIR.iterdir() if d.is_dir()])
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ try:
|
|
|
+ for idx, folder in enumerate(folders, 1):
|
|
|
+ before_cap_new = stats['capability_new']
|
|
|
+ before_cap_link = stats['capability_linked']
|
|
|
+ before_res = stats['resource']
|
|
|
+ before_strat = stats['strategy']
|
|
|
+ stats['folder_cap_count'] = 0
|
|
|
+ stats['folder_res_count'] = 0
|
|
|
+ try:
|
|
|
+ ingest_folder(folder, stores, alias, cur, stats)
|
|
|
+ stats['folder_processed'] += 1
|
|
|
+ d_new = stats['capability_new'] - before_cap_new
|
|
|
+ d_link = stats['capability_linked'] - before_cap_link
|
|
|
+ d_res = stats['resource'] - before_res
|
|
|
+ d_strat = stats['strategy'] - before_strat
|
|
|
+ print(f"[{idx:3d}/{len(folders)}] 📁 {folder.name}/ "
|
|
|
+ f"cap:link={d_link} new={d_new} res={d_res} strat={d_strat} "
|
|
|
+ f"strat_caps={stats['folder_cap_count']}",
|
|
|
+ flush=True)
|
|
|
+ except Exception as e:
|
|
|
+ print(f'[{idx:3d}/{len(folders)}] 📁 {folder.name}/ ❌ {type(e).__name__}: {e}',
|
|
|
+ flush=True)
|
|
|
+ stats['bad_folders'].append(folder.name + f':{type(e).__name__}')
|
|
|
+ # reconnect cursor if connection dropped
|
|
|
+ try:
|
|
|
+ cur.close()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ cur.close()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ apply_renames(stores)
|
|
|
+
|
|
|
+ # Verify
|
|
|
+ print(f'\n{"="*60}\n验证:', flush=True)
|
|
|
+ cur = stores['cap']._get_cursor()
|
|
|
+ for tbl in ['capability', 'strategy', 'resource', 'requirement']:
|
|
|
+ cur.execute(f"SELECT version, COUNT(*) AS c FROM {tbl} GROUP BY version")
|
|
|
+ for r in cur.fetchall():
|
|
|
+ print(f" {tbl} / {r['version']}: {r['c']}", flush=True)
|
|
|
+ cur.close()
|
|
|
+ print(f'\n📊 Stats:', flush=True)
|
|
|
+ for k, v in stats.items():
|
|
|
+ if isinstance(v, list):
|
|
|
+ print(f' {k}: {len(v)} {v[:5]}{"..." if len(v)>5 else ""}', flush=True)
|
|
|
+ else:
|
|
|
+ print(f' {k}: {v}', flush=True)
|
|
|
+ print(f'{"="*60}', flush=True)
|
|
|
+ finally:
|
|
|
+ for s in stores.values():
|
|
|
+ s.close()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|