#!/usr/bin/env python3 """ Salvage folders with non-standard JSON schemas that standard ingest missed. Target folders (from data problem audit): 004, 031, 053, 066, 070 — non-standard strategy.json structure (LLM 自由发挥) 044 — capabilities_extracted uses old schema (capability_id/capability_name) Strategy: for each folder, manually locate the relevant fields and wire the junctions. All insertions use version='howard_dedup'. 用法: python knowhub/scripts/salvage_malformed_folders.py --execute """ import argparse import hashlib import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.scripts.rebuild_howard_dedup import ( parse_snapshot, build_alias_map, norm, gen_cap_id, gen_strategy_id, gen_resource_id, SNAPSHOT_PATH, OUTPUT_DIR, DEDUP_VERSION, ) DEDUP = DEDUP_VERSION def resolve_cap(cur, cap_name_or_id, alias, current_caps, create_if_missing=False, cap_desc='', cap_criterion=''): """Resolve a capability name/id → DB id. Optionally create if missing.""" if not cap_name_or_id: return None # If it looks like a cap ID and exists in DB, use it if cap_name_or_id.startswith('CAP-') and cap_name_or_id in current_caps: return cap_name_or_id # Try alias cand = alias.get(norm(cap_name_or_id)) if cand and cand in current_caps: return cand # Try checking if name matches DB cur.execute('SELECT id FROM capability WHERE name = %s LIMIT 1', (cap_name_or_id,)) row = cur.fetchone() if row: return row['id'] if create_if_missing: new_id = gen_cap_id(cap_name_or_id) cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,)) if not cur.fetchone(): cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (new_id, cap_name_or_id, cap_criterion, cap_desc, '[]', DEDUP)) current_caps[new_id] = {'name': cap_name_or_id} alias[norm(cap_name_or_id)] = new_id return new_id return None def insert_strategy(cur, folder_key, req_id, req_text, strategy_name, strategy_body, reasoning, resource_ids, cap_ids): strat_id = gen_strategy_id(req_text, strategy_name) now = int(time.time()) cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,)) cur.execute( """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (strat_id, strategy_name, (reasoning or '')[:2000], json.dumps(strategy_body, ensure_ascii=False, indent=2), 'draft', now, now, DEDUP)) # requirement → strategy cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id)) # strategy → resources for rid in resource_ids: cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid)) # strategy → capabilities (+ req → cap) for cid in cap_ids: cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid)) cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid)) return strat_id def ensure_resources(cur, folder, req_id): """Read raw_cases/, insert resources, return list of resource IDs.""" raw_dir = folder / 'raw_cases' resource_ids = [] if not raw_dir.exists(): return resource_ids for cf in sorted(raw_dir.glob('*.json')): try: data = json.loads(cf.read_text(encoding='utf-8')) except Exception: continue cases = data.get('cases', []) if isinstance(data, dict) else data if not isinstance(cases, list): continue for case in cases: if not isinstance(case, dict): continue url = case.get('source_url') or case.get('url') if not url: continue platform = case.get('platform') or cf.stem.replace('case_', '') rid = gen_resource_id(platform, url) title = (case.get('title') or '')[:200] metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {} likes = (metrics.get('likes') or 0) if metrics else 0 cur.execute('DELETE FROM resource WHERE id = %s', (rid,)) cur.execute( """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (rid, title, json.dumps(case, ensure_ascii=False)[:8000], 'research_case', json.dumps(case.get('images', []) or [], ensure_ascii=False), json.dumps({'platform': platform, 'source_url': url, 'metrics': metrics, 'folder': folder.name}, ensure_ascii=False), -int(likes), DEDUP)) resource_ids.append(rid) cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid)) return resource_ids # ═══════════════════════════════════════════════════════════ # Per-folder handlers def salvage_004(cur, alias, current_caps): """004: strategy.strategy.phases + capability_mapping (capability=name).""" folder = OUTPUT_DIR / '004' req_id = 'REQ_004' d = json.loads((folder / 'strategy.json').read_text()) req_text = d.get('requirement', '') strat_dict = d.get('strategy', {}) overview = strat_dict.get('overview', '') name = f'Strategy-004' # capability_mapping: [{capability: name, role, used_in_phases, key_tools}] cap_ids = set() for entry in d.get('capability_mapping', []) or []: if not isinstance(entry, dict): continue cap_name = entry.get('capability') or entry.get('capability_name') cid = resolve_cap(cur, cap_name, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) resource_ids = ensure_resources(cur, folder, req_id) body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])} insert_strategy(cur, '004', req_id, req_text, name, body, overview, resource_ids, cap_ids) return len(cap_ids), len(resource_ids) def salvage_031(cur, alias, current_caps): """031: strategy.strategy.phases + capability_mapping (capability_id/name).""" folder = OUTPUT_DIR / '031' req_id = 'REQ_031' d = json.loads((folder / 'strategy.json').read_text()) req_text = d.get('requirement', '') strat_dict = d.get('strategy', {}) overview = strat_dict.get('overview', '') name = f'Strategy-031' cap_ids = set() for entry in d.get('capability_mapping', []) or []: if not isinstance(entry, dict): continue cid_src = entry.get('capability_id') cname = entry.get('capability_name') cid = None if cid_src: cid = resolve_cap(cur, cid_src, alias, current_caps) if not cid and cname: cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) resource_ids = ensure_resources(cur, folder, req_id) body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])} insert_strategy(cur, '031', req_id, req_text, name, body, overview, resource_ids, cap_ids) return len(cap_ids), len(resource_ids) def salvage_053(cur, alias, current_caps): """053: phases[] (phase/description/capabilities) + core_workflow (text).""" folder = OUTPUT_DIR / '053' req_id = 'REQ_053' d = json.loads((folder / 'strategy.json').read_text()) req_text = d.get('requirement', '') name = f'Strategy-053' cap_ids = set() for phase in d.get('phases', []) or []: if not isinstance(phase, dict): continue for cap_ref in phase.get('capabilities', []) or []: if isinstance(cap_ref, str): # string name or id cid = resolve_cap(cur, cap_ref, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) elif isinstance(cap_ref, dict): src = cap_ref.get('id') or cap_ref.get('capability_id') nm = cap_ref.get('name') or cap_ref.get('capability_name') cid = None if src: cid = resolve_cap(cur, src, alias, current_caps) if not cid and nm: cid = resolve_cap(cur, nm, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) resource_ids = ensure_resources(cur, folder, req_id) body = {'phases': d.get('phases', []), 'core_workflow': d.get('core_workflow', '')} insert_strategy(cur, '053', req_id, req_text, name, body, d.get('core_workflow', '')[:500], resource_ids, cap_ids) return len(cap_ids), len(resource_ids) def salvage_066(cur, alias, current_caps): """066: execution_phases + key_capabilities (id/name/role).""" folder = OUTPUT_DIR / '066' req_id = 'REQ_066' d = json.loads((folder / 'strategy.json').read_text()) req_text = d.get('requirement', '') name = f'Strategy-066' cap_ids = set() # key_capabilities is primary for cap in d.get('key_capabilities', []) or []: if not isinstance(cap, dict): continue cid = None if cap.get('id'): cid = resolve_cap(cur, cap['id'], alias, current_caps) if not cid and cap.get('name'): cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) # execution_phases may reference more for phase in d.get('execution_phases', []) or []: if not isinstance(phase, dict): continue for c in phase.get('capabilities_used', []) or []: if isinstance(c, str): cid = resolve_cap(cur, c, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) elif isinstance(c, dict): cid = None if c.get('id'): cid = resolve_cap(cur, c['id'], alias, current_caps) if not cid and c.get('name'): cid = resolve_cap(cur, c['name'], alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) resource_ids = ensure_resources(cur, folder, req_id) body = {'execution_phases': d.get('execution_phases', []), 'key_capabilities': d.get('key_capabilities', []), 'recommended_tools': d.get('recommended_tools', [])} insert_strategy(cur, '066', req_id, req_text, name, body, '', resource_ids, cap_ids) return len(cap_ids), len(resource_ids) def salvage_070(cur, alias, current_caps): """070: capabilities_mapping (id/name/role_in_strategy/is_new) + tool_chain.""" folder = OUTPUT_DIR / '070' req_id = 'REQ_070' d = json.loads((folder / 'strategy.json').read_text()) req_text = d.get('requirement', '') name = f'Strategy-070' cap_ids = set() for cap in d.get('capabilities_mapping', []) or []: if not isinstance(cap, dict): continue cid = None if cap.get('id'): cid = resolve_cap(cur, cap['id'], alias, current_caps) if not cid and cap.get('name'): cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) resource_ids = ensure_resources(cur, folder, req_id) body = {'capabilities_mapping': d.get('capabilities_mapping', []), 'tool_chain': d.get('tool_chain', []), 'selected_blueprint': d.get('selected_blueprint', '')} insert_strategy(cur, '070', req_id, req_text, name, body, '', resource_ids, cap_ids) return len(cap_ids), len(resource_ids) def salvage_044(cur, alias, current_caps): """044: capabilities_extracted uses capability_id/capability_name schema. Just link these caps to existing REQ_044's strategy (already created).""" folder = OUTPUT_DIR / '044' req_id = 'REQ_044' # find existing strategy cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id=%s LIMIT 1', (req_id,)) row = cur.fetchone() if not row: print(' ⚠️ 044 has no strategy yet', flush=True) return 0, 0 strat_id = row['strategy_id'] d = json.loads((folder / 'capabilities_extracted.json').read_text()) cap_ids = set() for cap in d.get('extracted_capabilities', []) or []: if not isinstance(cap, dict): continue cid_src = cap.get('capability_id') cname = cap.get('capability_name') cid = None if cid_src: cid = resolve_cap(cur, cid_src, alias, current_caps) if not cid and cname: cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True) if cid: cap_ids.add(cid) for cid in cap_ids: cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid)) cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid)) return len(cap_ids), 0 # ═══════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser() ap.add_argument('--execute', action='store_true', required=True) args = ap.parse_args() s = PostgreSQLCapabilityStore() cur = s._get_cursor() cur.execute("SELECT id, name FROM capability WHERE version = %s", (DEDUP,)) current_caps = {r['id']: {'name': r['name']} for r in cur.fetchall()} snapshot = parse_snapshot(SNAPSHOT_PATH) alias = build_alias_map(snapshot, current_caps) print(f'alias: {len(alias)}, caps: {len(current_caps)}', flush=True) handlers = [ ('004', salvage_004), ('031', salvage_031), ('044', salvage_044), ('053', salvage_053), ('066', salvage_066), ('070', salvage_070), ] for key, fn in handlers: print(f'\n📁 salvaging {key}/ ...', flush=True) try: n_cap, n_res = fn(cur, alias, current_caps) print(f' ✓ {key}: {n_cap} caps, {n_res} resources', flush=True) except Exception as e: print(f' ❌ {key}: {type(e).__name__}: {e}', flush=True) import traceback; traceback.print_exc() # Final check print(f'\n{"="*50}\nFinal requirement coverage:', flush=True) for req in ['REQ_004','REQ_031','REQ_044','REQ_053','REQ_066','REQ_070']: cur.execute('SELECT COUNT(*) AS c FROM requirement_strategy WHERE requirement_id=%s', (req,)) strat = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) AS c FROM requirement_capability WHERE requirement_id=%s', (req,)) cap = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) AS c FROM requirement_resource WHERE requirement_id=%s', (req,)) res = cur.fetchone()['c'] print(f' {req}: strat={strat} cap={cap} res={res}', flush=True) cur.close() s.close() if __name__ == '__main__': main()