#!/usr/bin/env python3 """ 一次性修复脚本:重建 howard_dedup 版本的 capability / strategy / resource 背景:同事的 agent 失控污染了 tao_dev_1 数据。 这个脚本: 1. 备份当前 DB 状态到 /tmp/knowhub_backup_/ 2. 用 /tmp/capabilities_all.md(465 快照)+ MERGE_CLUSTERS 构建别名表 3. Purge 当前 capability + strategy + resource + junctions(所有版本) 4. 从 output 2/ 的 99 folder 重新 ingest,version='howard_dedup' 5. 应用 RENAMES 保留改名工作 不动的表:requirement、knowledge 及其 junction。 用法: python knowhub/scripts/rebuild_howard_dedup.py --backup-only python knowhub/scripts/rebuild_howard_dedup.py --dry-run python knowhub/scripts/rebuild_howard_dedup.py --execute """ import argparse import hashlib import json import os import re import sys import time from datetime import date from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.knowhub_db.pg_resource_store import PostgreSQLResourceStore from knowhub.knowhub_db.pg_requirement_store import PostgreSQLRequirementStore from knowhub.knowhub_db.pg_strategy_store import PostgreSQLStrategyStore from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS from knowhub.scripts.rename_merged_capabilities import RENAMES OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2') SNAPSHOT_PATH = Path('/tmp/capabilities_all.md') BACKUP_DIR = Path(f'/tmp/knowhub_backup_{date.today().isoformat()}') DEDUP_VERSION = 'howard_dedup' # CAP-006 在同事操作后丢失,从 conversation 缓存中重建 CAP_006 = { 'id': 'CAP-006', 'name': '图像细节增强与高清放大', 'description': '对已生成的图像进行分辨率提升和细节增强,在放大的同时补充高频细节(后处理路径,区别于生成阶段直接高清输出的 CAP-016)', 'criterion': '', } def norm(s): return s.strip().lower() if s else '' def hash8(text): return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8] def hash12(text): return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12] def gen_cap_id(name): return f'CAP-{hash8(norm(name))}' def gen_resource_id(platform, url): p = (platform or 'unknown').lower().strip() return f'resource/research/{p}/{hash12(url)}' def gen_strategy_id(req_text, strategy_name): return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}' # ═══════════════════════════════════════════════════════════ def parse_snapshot(path): """Parse /tmp/capabilities_all.md -> {id: name}.""" if not path.exists(): print(f'⚠️ snapshot file missing: {path}', flush=True) return {} content = path.read_text(encoding='utf-8') pat = re.compile(r'^## (.+?)\n\*\*(CAP-[\w\-]+)\*\*', re.MULTILINE) return {cid: name.strip() for name, cid in pat.findall(content)} def build_alias_map(snapshot, current_caps): """Build normalized_name -> final canonical_id (transitively resolved).""" # Step A: build member→canonical map; resolve transitively (A→B→C → A,B→C) member_to_canonical = {} for canonical, members in MERGE_CLUSTERS.items(): for m in members: member_to_canonical[m] = canonical def final(cid, limit=10): seen = set() while cid in member_to_canonical and cid not in seen and limit > 0: seen.add(cid) cid = member_to_canonical[cid] limit -= 1 return cid # Resolve: every member maps to final canonical for m in list(member_to_canonical.keys()): member_to_canonical[m] = final(m) alias = {} # 1. All snapshot names → final canonical (member) or self (non-member) for cid, name in snapshot.items(): alias[norm(name)] = member_to_canonical.get(cid, cid) # 2. Canonical names from current DB (post-rename) → final canonical for canonical in MERGE_CLUSTERS.keys(): final_id = final(canonical) if canonical in current_caps: alias[norm(current_caps[canonical]['name'])] = final_id # 3. RENAMES: new name → same canonical (final-resolved) for cid, (new_name, _) in RENAMES.items(): alias[norm(new_name)] = final(cid) # 4. v0 foundation CAP-001..021 + CAP-006 reconstruction → self for cid, cap in current_caps.items(): if cid.startswith('CAP-') and len(cid) == 7 and cid[4:].isdigit(): alias[norm(cap['name'])] = cid alias[norm(CAP_006['name'])] = CAP_006['id'] # 5. Any remaining current DB capability (non-VCAP) → self for cid, cap in current_caps.items(): if cid.startswith('CAP-tao_dev_'): continue alias.setdefault(norm(cap['name']), cid) return alias # ═══════════════════════════════════════════════════════════ # PHASE 0: BACKUP def backup(stores): BACKUP_DIR.mkdir(parents=True, exist_ok=True) print(f'📦 Backing up to {BACKUP_DIR}/', flush=True) tables = [ 'capability', 'strategy', 'resource', 'requirement', 'knowledge', 'requirement_capability', 'capability_tool', 'capability_knowledge', 'capability_resource', 'strategy_capability', 'strategy_resource', 'strategy_knowledge', 'requirement_strategy', 'requirement_resource', ] cur = stores['cap']._get_cursor() try: for t in tables: try: cur.execute(f'SELECT * FROM {t}') rows = [dict(r) for r in cur.fetchall()] # strip embedding (too big, not essential for restore) for r in rows: r.pop('embedding', None) (BACKUP_DIR / f'{t}.json').write_text( json.dumps(rows, default=str, ensure_ascii=False)) print(f' ✓ {t}: {len(rows)} rows', flush=True) except Exception as e: print(f' ❌ {t}: {e}', flush=True) finally: cur.close() # PHASE 1: PURGE def purge(stores): print('\n🧹 Purging capability / strategy / resource (all versions) + junctions...', flush=True) cur = stores['cap']._get_cursor() try: # junctions first (no FK but keep clean) for t in ['requirement_capability', 'capability_tool', 'capability_knowledge', 'capability_resource', 'strategy_capability', 'strategy_resource', 'strategy_knowledge', 'requirement_strategy', 'requirement_resource']: cur.execute(f'DELETE FROM {t}') print(f' ✓ {t} cleared', flush=True) for t in ['capability', 'strategy', 'resource']: cur.execute(f'DELETE FROM {t}') print(f' ✓ {t} cleared', flush=True) finally: cur.close() # PHASE 2: SEED def seed(stores, current_caps, snapshot): """Insert: - 21 v0 foundation caps (CAP-001..021) in howard_dedup - CAP-006 reconstructed if missing - All surviving non-VCAP tao_dev_1 canonicals (preserves R1/R2/C renames) - Missing canonicals (in MERGE_CLUSTERS.keys but gone) recovered from snapshot """ print('\n🌱 Seeding howard_dedup...', flush=True) cur = stores['cap']._get_cursor() try: inserted = 0 # 1. Insert from current_caps: everything non-VCAP gets version=howard_dedup for cap_id, cap in current_caps.items(): if cap_id.startswith('CAP-tao_dev_'): continue # skip VCAP cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (cap_id, cap.get('name', ''), cap.get('criterion', '') or '', cap.get('description', '') or '', json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str), DEDUP_VERSION)) inserted += 1 # 2. CAP-006 reconstruct if missing if 'CAP-006' not in current_caps: cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (CAP_006['id'], CAP_006['name'], CAP_006['criterion'], CAP_006['description'], '[]', DEDUP_VERSION)) inserted += 1 print(f' ✓ CAP-006 reconstructed', flush=True) # 3. Missing canonicals (in MERGE_CLUSTERS but not in DB) recovered from snapshot merged_members = set() for members in MERGE_CLUSTERS.values(): merged_members.update(members) recovered = 0 for cid in MERGE_CLUSTERS.keys(): if cid in current_caps or cid in merged_members: continue # Not in DB, not a member — need to recover name = snapshot.get(cid) if not name: continue # Use RENAMES version if available if cid in RENAMES: name, desc = RENAMES[cid] else: desc = '' cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (cid, name, '', desc, '[]', DEDUP_VERSION)) recovered += 1 print(f' seeded: {inserted} existing + {recovered} recovered from snapshot', flush=True) finally: cur.close() # PHASE 3: INGEST from output 2/ def ingest_folder(folder, stores, alias, cur, stats): folder_key = folder.name # blueprint → requirement match bp_path = folder / 'blueprint.json' if not bp_path.exists(): stats['bad_folders'].append(folder_key + ':no_blueprint') return try: bp = json.loads(bp_path.read_text(encoding='utf-8')) except Exception as e: stats['bad_folders'].append(folder_key + f':bp_parse_err({e})') return req_text = bp.get('requirement', '') if not req_text: stats['bad_folders'].append(folder_key + ':empty_req') return cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,)) row = cur.fetchone() if not row: # try fuzzier: first 80 chars prefix cur.execute('SELECT id FROM requirement WHERE description LIKE %s LIMIT 1', (req_text[:80].replace('%', '\\%') + '%',)) row = cur.fetchone() if not row: print(f' ⚠️ no matching requirement: {req_text[:60]}', flush=True) stats['missing_req'].append(folder_key) return req_id = row['id'] # resources from raw_cases/ raw_dir = folder / 'raw_cases' resource_ids = [] if raw_dir.exists(): for cf in sorted(raw_dir.glob('*.json')): try: data = json.loads(cf.read_text(encoding='utf-8')) except Exception: continue cases = data.get('cases', []) if isinstance(data, dict) else data if not isinstance(cases, list): continue for case in cases: if not isinstance(case, dict): continue url = case.get('source_url') or case.get('url') if not url: continue platform = case.get('platform') or cf.stem.replace('case_', '') rid = gen_resource_id(platform, url) title = (case.get('title') or '')[:200] metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {} likes = (metrics.get('likes') or 0) if metrics else 0 cur.execute('DELETE FROM resource WHERE id = %s', (rid,)) cur.execute( """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (rid, title, json.dumps(case, ensure_ascii=False)[:8000], 'research_case', json.dumps(case.get('images', []) or [], ensure_ascii=False), json.dumps({'platform': platform, 'source_url': url, 'metrics': metrics, 'folder': folder_key}, ensure_ascii=False), -int(likes), DEDUP_VERSION)) resource_ids.append(rid) stats['resource'] += 1 # capabilities caps_path = folder / 'capabilities_extracted.json' cap_resolved = {} # source_key -> resolved_id if caps_path.exists(): try: caps_data = json.loads(caps_path.read_text(encoding='utf-8')) except Exception as e: stats['bad_folders'].append(folder_key + f':caps_parse_err({e})') caps_data = {'extracted_capabilities': []} for cap in caps_data.get('extracted_capabilities', []): name = (cap.get('name') or '').strip() if not name: continue src_id = cap.get('id') resolved = None # (1) source id exists in DB? if src_id: cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,)) if cur.fetchone(): resolved = src_id # (2) alias by name? if not resolved: cand = alias.get(norm(name)) if cand: cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,)) if cur.fetchone(): resolved = cand # (3) create new with hash ID if not resolved: new_id = gen_cap_id(name) cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,)) if not cur.fetchone(): cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (new_id, name, cap.get('criterion', '') or '', cap.get('description', '') or '', json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str), DEDUP_VERSION)) alias[norm(name)] = new_id stats['capability_new'] += 1 resolved = new_id else: # backfill criterion/effects if missing cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,)) ex = cur.fetchone() if ex: if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'): cur.execute('UPDATE capability SET criterion = %s WHERE id = %s', (cap['criterion'], resolved)) stats['criterion_backfilled'] += 1 cur_eff = ex.get('effects') if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'): cur.execute('UPDATE capability SET effects = %s WHERE id = %s', (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved)) stats['effects_backfilled'] += 1 stats['capability_linked'] += 1 cap_resolved[src_id or name] = resolved # strategy (is_selected only) strat_path = folder / 'strategy.json' if not strat_path.exists(): return try: strat_data = json.loads(strat_path.read_text(encoding='utf-8')) except Exception as e: stats['bad_folders'].append(folder_key + f':strat_parse_err({e})') return selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None) if not selected: sel_list = strat_data.get('strategies', []) selected = sel_list[0] if sel_list else None if not selected: return strategy_name = selected.get('name') or f'Strategy-{folder_key}' strat_id = gen_strategy_id(req_text, strategy_name) now = int(time.time()) cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,)) cur.execute( """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000], json.dumps(selected, ensure_ascii=False, indent=2), 'draft', now, now, DEDUP_VERSION)) stats['strategy'] += 1 # wire junctions for rid in resource_ids: cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid)) cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid)) cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id)) strat_cap_ids = set() wo = selected.get('workflow_outline') or [] if isinstance(wo, list): for phase in wo: if not isinstance(phase, dict): continue caps = phase.get('capabilities') or [] if not isinstance(caps, list): continue for c_ref in caps: if not isinstance(c_ref, dict): continue key = c_ref.get('id') or c_ref.get('name', '') resolved = cap_resolved.get(key) or alias.get(norm(c_ref.get('name', ''))) if resolved: strat_cap_ids.add(resolved) for cid in strat_cap_ids: cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid)) cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid)) stats['folder_cap_count'] = len(strat_cap_ids) stats['folder_res_count'] = len(resource_ids) # PHASE 4: apply renames (defensive — some may have already been applied during seeding) def apply_renames(stores): print('\n📝 Applying RENAMES...', flush=True) cur = stores['cap']._get_cursor() try: applied = 0 for cid, (name, desc) in RENAMES.items(): cur.execute('UPDATE capability SET name = %s, description = %s WHERE id = %s', (name, desc, cid)) if (cur.rowcount or 0) > 0: applied += 1 print(f' applied {applied}/{len(RENAMES)}', flush=True) finally: cur.close() # ═══════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser() g = ap.add_mutually_exclusive_group(required=True) g.add_argument('--backup-only', action='store_true') g.add_argument('--dry-run', action='store_true') g.add_argument('--execute', action='store_true') ap.add_argument('--skip-backup', action='store_true', help='Skip backup (if already done)') args = ap.parse_args() print(f'\n{"="*60}', flush=True) print(f'{"BACKUP" if args.backup_only else "DRY RUN" if args.dry_run else "EXECUTE"} ' f'— rebuild howard_dedup', flush=True) print(f'{"="*60}', flush=True) stores = { 'cap': PostgreSQLCapabilityStore(), 'res': PostgreSQLResourceStore(), 'req': PostgreSQLRequirementStore(), 'strat': PostgreSQLStrategyStore(), } try: if not args.skip_backup: backup(stores) if args.backup_only: print('\n✅ Backup only. Exit.', flush=True) return # dump current caps (used for seed) cur = stores['cap']._get_cursor() cur.execute('SELECT id, name, criterion, description, effects, version FROM capability') current_caps = {r['id']: dict(r) for r in cur.fetchall()} cur.close() snapshot = parse_snapshot(SNAPSHOT_PATH) alias = build_alias_map(snapshot, current_caps) (BACKUP_DIR / 'alias_map.json').write_text( json.dumps(alias, ensure_ascii=False, indent=2)) print(f'\n🔗 Alias map: {len(alias)} entries (snapshot={len(snapshot)}, ' f'current_caps={len(current_caps)})', flush=True) if args.dry_run: print('\n[DRY-RUN] Simulating folder 001 ingest...', flush=True) f1 = OUTPUT_DIR / '001' if f1.exists(): caps = json.loads((f1 / 'capabilities_extracted.json').read_text()) new = linked = 0 for c in caps.get('extracted_capabilities', [])[:15]: name = (c.get('name') or '').strip() src = c.get('id') cand = alias.get(norm(name)) status = 'LINK' if cand else 'NEW' if cand: linked += 1 else: new += 1 print(f' [{status}] src_id={src!r} → {cand!r} | {name[:50]}', flush=True) print(f'\n folder 001 sample (first 15): link={linked} new={new}', flush=True) print('\n[DRY-RUN] Skipping purge + ingest. Use --execute to run.', flush=True) return # Skip purge+seed if howard_dedup already has canonical seeds # (enables resume after connection drop) cur = stores['cap']._get_cursor() cur.execute("SELECT COUNT(*) AS c FROM capability WHERE version = %s", (DEDUP_VERSION,)) hd_count = cur.fetchone()['c'] cur.close() if hd_count > 100: print(f'\n⚠️ howard_dedup already has {hd_count} caps — skipping purge+seed (resume mode)', flush=True) else: purge(stores) seed(stores, current_caps, snapshot) # Ingest print('\n📂 Ingesting output 2/ ...', flush=True) stats = {'folder_processed': 0, 'bad_folders': [], 'missing_req': [], 'resource': 0, 'capability_new': 0, 'capability_linked': 0, 'strategy': 0, 'criterion_backfilled': 0, 'effects_backfilled': 0, 'folder_cap_count': 0, 'folder_res_count': 0} folders = sorted([d for d in OUTPUT_DIR.iterdir() if d.is_dir()]) cur = stores['cap']._get_cursor() try: for idx, folder in enumerate(folders, 1): before_cap_new = stats['capability_new'] before_cap_link = stats['capability_linked'] before_res = stats['resource'] before_strat = stats['strategy'] stats['folder_cap_count'] = 0 stats['folder_res_count'] = 0 try: ingest_folder(folder, stores, alias, cur, stats) stats['folder_processed'] += 1 d_new = stats['capability_new'] - before_cap_new d_link = stats['capability_linked'] - before_cap_link d_res = stats['resource'] - before_res d_strat = stats['strategy'] - before_strat print(f"[{idx:3d}/{len(folders)}] 📁 {folder.name}/ " f"cap:link={d_link} new={d_new} res={d_res} strat={d_strat} " f"strat_caps={stats['folder_cap_count']}", flush=True) except Exception as e: print(f'[{idx:3d}/{len(folders)}] 📁 {folder.name}/ ❌ {type(e).__name__}: {e}', flush=True) stats['bad_folders'].append(folder.name + f':{type(e).__name__}') # reconnect cursor if connection dropped try: cur.close() except Exception: pass cur = stores['cap']._get_cursor() finally: try: cur.close() except Exception: pass apply_renames(stores) # Verify print(f'\n{"="*60}\n验证:', flush=True) cur = stores['cap']._get_cursor() for tbl in ['capability', 'strategy', 'resource', 'requirement']: cur.execute(f"SELECT version, COUNT(*) AS c FROM {tbl} GROUP BY version") for r in cur.fetchall(): print(f" {tbl} / {r['version']}: {r['c']}", flush=True) cur.close() print(f'\n📊 Stats:', flush=True) for k, v in stats.items(): if isinstance(v, list): print(f' {k}: {len(v)} {v[:5]}{"..." if len(v)>5 else ""}', flush=True) else: print(f' {k}: {v}', flush=True) print(f'{"="*60}', flush=True) finally: for s in stores.values(): s.close() if __name__ == '__main__': main()