| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- #!/usr/bin/env python3
- """
- Salvage folders with non-standard JSON schemas that standard ingest missed.
- Target folders (from data problem audit):
- 004, 031, 053, 066, 070 — non-standard strategy.json structure (LLM 自由发挥)
- 044 — capabilities_extracted uses old schema (capability_id/capability_name)
- Strategy: for each folder, manually locate the relevant fields and wire the junctions.
- All insertions use version='howard_dedup'.
- 用法:
- python knowhub/scripts/salvage_malformed_folders.py --execute
- """
- import argparse
- import hashlib
- import json
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.scripts.rebuild_howard_dedup import (
- parse_snapshot, build_alias_map, norm, gen_cap_id, gen_strategy_id,
- gen_resource_id, SNAPSHOT_PATH, OUTPUT_DIR, DEDUP_VERSION,
- )
- DEDUP = DEDUP_VERSION
- def resolve_cap(cur, cap_name_or_id, alias, current_caps, create_if_missing=False,
- cap_desc='', cap_criterion=''):
- """Resolve a capability name/id → DB id. Optionally create if missing."""
- if not cap_name_or_id:
- return None
- # If it looks like a cap ID and exists in DB, use it
- if cap_name_or_id.startswith('CAP-') and cap_name_or_id in current_caps:
- return cap_name_or_id
- # Try alias
- cand = alias.get(norm(cap_name_or_id))
- if cand and cand in current_caps:
- return cand
- # Try checking if name matches DB
- cur.execute('SELECT id FROM capability WHERE name = %s LIMIT 1', (cap_name_or_id,))
- row = cur.fetchone()
- if row:
- return row['id']
- if create_if_missing:
- new_id = gen_cap_id(cap_name_or_id)
- cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
- if not cur.fetchone():
- cur.execute(
- """INSERT INTO capability (id, name, criterion, description, effects, version)
- VALUES (%s, %s, %s, %s, %s, %s)""",
- (new_id, cap_name_or_id, cap_criterion, cap_desc, '[]', DEDUP))
- current_caps[new_id] = {'name': cap_name_or_id}
- alias[norm(cap_name_or_id)] = new_id
- return new_id
- return None
- def insert_strategy(cur, folder_key, req_id, req_text, strategy_name, strategy_body,
- reasoning, resource_ids, cap_ids):
- strat_id = gen_strategy_id(req_text, strategy_name)
- now = int(time.time())
- cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
- cur.execute(
- """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (strat_id, strategy_name, (reasoning or '')[:2000],
- json.dumps(strategy_body, ensure_ascii=False, indent=2),
- 'draft', now, now, DEDUP))
- # requirement → strategy
- cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
- # strategy → resources
- for rid in resource_ids:
- cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
- # strategy → capabilities (+ req → cap)
- for cid in cap_ids:
- cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
- VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
- return strat_id
- def ensure_resources(cur, folder, req_id):
- """Read raw_cases/, insert resources, return list of resource IDs."""
- raw_dir = folder / 'raw_cases'
- resource_ids = []
- if not raw_dir.exists():
- return resource_ids
- for cf in sorted(raw_dir.glob('*.json')):
- try:
- data = json.loads(cf.read_text(encoding='utf-8'))
- except Exception:
- continue
- cases = data.get('cases', []) if isinstance(data, dict) else data
- if not isinstance(cases, list):
- continue
- for case in cases:
- if not isinstance(case, dict):
- continue
- url = case.get('source_url') or case.get('url')
- if not url:
- continue
- platform = case.get('platform') or cf.stem.replace('case_', '')
- rid = gen_resource_id(platform, url)
- title = (case.get('title') or '')[:200]
- metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
- likes = (metrics.get('likes') or 0) if metrics else 0
- cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
- cur.execute(
- """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (rid, title, json.dumps(case, ensure_ascii=False)[:8000],
- 'research_case',
- json.dumps(case.get('images', []) or [], ensure_ascii=False),
- json.dumps({'platform': platform, 'source_url': url,
- 'metrics': metrics, 'folder': folder.name},
- ensure_ascii=False),
- -int(likes), DEDUP))
- resource_ids.append(rid)
- cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
- return resource_ids
- # ═══════════════════════════════════════════════════════════
- # Per-folder handlers
- def salvage_004(cur, alias, current_caps):
- """004: strategy.strategy.phases + capability_mapping (capability=name)."""
- folder = OUTPUT_DIR / '004'
- req_id = 'REQ_004'
- d = json.loads((folder / 'strategy.json').read_text())
- req_text = d.get('requirement', '')
- strat_dict = d.get('strategy', {})
- overview = strat_dict.get('overview', '')
- name = f'Strategy-004'
- # capability_mapping: [{capability: name, role, used_in_phases, key_tools}]
- cap_ids = set()
- for entry in d.get('capability_mapping', []) or []:
- if not isinstance(entry, dict):
- continue
- cap_name = entry.get('capability') or entry.get('capability_name')
- cid = resolve_cap(cur, cap_name, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- resource_ids = ensure_resources(cur, folder, req_id)
- body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])}
- insert_strategy(cur, '004', req_id, req_text, name, body, overview,
- resource_ids, cap_ids)
- return len(cap_ids), len(resource_ids)
- def salvage_031(cur, alias, current_caps):
- """031: strategy.strategy.phases + capability_mapping (capability_id/name)."""
- folder = OUTPUT_DIR / '031'
- req_id = 'REQ_031'
- d = json.loads((folder / 'strategy.json').read_text())
- req_text = d.get('requirement', '')
- strat_dict = d.get('strategy', {})
- overview = strat_dict.get('overview', '')
- name = f'Strategy-031'
- cap_ids = set()
- for entry in d.get('capability_mapping', []) or []:
- if not isinstance(entry, dict):
- continue
- cid_src = entry.get('capability_id')
- cname = entry.get('capability_name')
- cid = None
- if cid_src:
- cid = resolve_cap(cur, cid_src, alias, current_caps)
- if not cid and cname:
- cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- resource_ids = ensure_resources(cur, folder, req_id)
- body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])}
- insert_strategy(cur, '031', req_id, req_text, name, body, overview,
- resource_ids, cap_ids)
- return len(cap_ids), len(resource_ids)
- def salvage_053(cur, alias, current_caps):
- """053: phases[] (phase/description/capabilities) + core_workflow (text)."""
- folder = OUTPUT_DIR / '053'
- req_id = 'REQ_053'
- d = json.loads((folder / 'strategy.json').read_text())
- req_text = d.get('requirement', '')
- name = f'Strategy-053'
- cap_ids = set()
- for phase in d.get('phases', []) or []:
- if not isinstance(phase, dict):
- continue
- for cap_ref in phase.get('capabilities', []) or []:
- if isinstance(cap_ref, str):
- # string name or id
- cid = resolve_cap(cur, cap_ref, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- elif isinstance(cap_ref, dict):
- src = cap_ref.get('id') or cap_ref.get('capability_id')
- nm = cap_ref.get('name') or cap_ref.get('capability_name')
- cid = None
- if src: cid = resolve_cap(cur, src, alias, current_caps)
- if not cid and nm:
- cid = resolve_cap(cur, nm, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- resource_ids = ensure_resources(cur, folder, req_id)
- body = {'phases': d.get('phases', []), 'core_workflow': d.get('core_workflow', '')}
- insert_strategy(cur, '053', req_id, req_text, name, body,
- d.get('core_workflow', '')[:500], resource_ids, cap_ids)
- return len(cap_ids), len(resource_ids)
- def salvage_066(cur, alias, current_caps):
- """066: execution_phases + key_capabilities (id/name/role)."""
- folder = OUTPUT_DIR / '066'
- req_id = 'REQ_066'
- d = json.loads((folder / 'strategy.json').read_text())
- req_text = d.get('requirement', '')
- name = f'Strategy-066'
- cap_ids = set()
- # key_capabilities is primary
- for cap in d.get('key_capabilities', []) or []:
- if not isinstance(cap, dict): continue
- cid = None
- if cap.get('id'):
- cid = resolve_cap(cur, cap['id'], alias, current_caps)
- if not cid and cap.get('name'):
- cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- # execution_phases may reference more
- for phase in d.get('execution_phases', []) or []:
- if not isinstance(phase, dict): continue
- for c in phase.get('capabilities_used', []) or []:
- if isinstance(c, str):
- cid = resolve_cap(cur, c, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- elif isinstance(c, dict):
- cid = None
- if c.get('id'): cid = resolve_cap(cur, c['id'], alias, current_caps)
- if not cid and c.get('name'):
- cid = resolve_cap(cur, c['name'], alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- resource_ids = ensure_resources(cur, folder, req_id)
- body = {'execution_phases': d.get('execution_phases', []),
- 'key_capabilities': d.get('key_capabilities', []),
- 'recommended_tools': d.get('recommended_tools', [])}
- insert_strategy(cur, '066', req_id, req_text, name, body, '', resource_ids, cap_ids)
- return len(cap_ids), len(resource_ids)
- def salvage_070(cur, alias, current_caps):
- """070: capabilities_mapping (id/name/role_in_strategy/is_new) + tool_chain."""
- folder = OUTPUT_DIR / '070'
- req_id = 'REQ_070'
- d = json.loads((folder / 'strategy.json').read_text())
- req_text = d.get('requirement', '')
- name = f'Strategy-070'
- cap_ids = set()
- for cap in d.get('capabilities_mapping', []) or []:
- if not isinstance(cap, dict): continue
- cid = None
- if cap.get('id'): cid = resolve_cap(cur, cap['id'], alias, current_caps)
- if not cid and cap.get('name'):
- cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- resource_ids = ensure_resources(cur, folder, req_id)
- body = {'capabilities_mapping': d.get('capabilities_mapping', []),
- 'tool_chain': d.get('tool_chain', []),
- 'selected_blueprint': d.get('selected_blueprint', '')}
- insert_strategy(cur, '070', req_id, req_text, name, body, '', resource_ids, cap_ids)
- return len(cap_ids), len(resource_ids)
- def salvage_044(cur, alias, current_caps):
- """044: capabilities_extracted uses capability_id/capability_name schema.
- Just link these caps to existing REQ_044's strategy (already created)."""
- folder = OUTPUT_DIR / '044'
- req_id = 'REQ_044'
- # find existing strategy
- cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id=%s LIMIT 1', (req_id,))
- row = cur.fetchone()
- if not row:
- print(' ⚠️ 044 has no strategy yet', flush=True)
- return 0, 0
- strat_id = row['strategy_id']
- d = json.loads((folder / 'capabilities_extracted.json').read_text())
- cap_ids = set()
- for cap in d.get('extracted_capabilities', []) or []:
- if not isinstance(cap, dict): continue
- cid_src = cap.get('capability_id')
- cname = cap.get('capability_name')
- cid = None
- if cid_src:
- cid = resolve_cap(cur, cid_src, alias, current_caps)
- if not cid and cname:
- cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True)
- if cid: cap_ids.add(cid)
- for cid in cap_ids:
- cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
- VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
- return len(cap_ids), 0
- # ═══════════════════════════════════════════════════════════
- def main():
- ap = argparse.ArgumentParser()
- ap.add_argument('--execute', action='store_true', required=True)
- args = ap.parse_args()
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- cur.execute("SELECT id, name FROM capability WHERE version = %s", (DEDUP,))
- current_caps = {r['id']: {'name': r['name']} for r in cur.fetchall()}
- snapshot = parse_snapshot(SNAPSHOT_PATH)
- alias = build_alias_map(snapshot, current_caps)
- print(f'alias: {len(alias)}, caps: {len(current_caps)}', flush=True)
- handlers = [
- ('004', salvage_004), ('031', salvage_031),
- ('044', salvage_044), ('053', salvage_053),
- ('066', salvage_066), ('070', salvage_070),
- ]
- for key, fn in handlers:
- print(f'\n📁 salvaging {key}/ ...', flush=True)
- try:
- n_cap, n_res = fn(cur, alias, current_caps)
- print(f' ✓ {key}: {n_cap} caps, {n_res} resources', flush=True)
- except Exception as e:
- print(f' ❌ {key}: {type(e).__name__}: {e}', flush=True)
- import traceback; traceback.print_exc()
- # Final check
- print(f'\n{"="*50}\nFinal requirement coverage:', flush=True)
- for req in ['REQ_004','REQ_031','REQ_044','REQ_053','REQ_066','REQ_070']:
- cur.execute('SELECT COUNT(*) AS c FROM requirement_strategy WHERE requirement_id=%s', (req,))
- strat = cur.fetchone()['c']
- cur.execute('SELECT COUNT(*) AS c FROM requirement_capability WHERE requirement_id=%s', (req,))
- cap = cur.fetchone()['c']
- cur.execute('SELECT COUNT(*) AS c FROM requirement_resource WHERE requirement_id=%s', (req,))
- res = cur.fetchone()['c']
- print(f' {req}: strat={strat} cap={cap} res={res}', flush=True)
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|