salvage_malformed_folders.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python3
  2. """
  3. Salvage folders with non-standard JSON schemas that standard ingest missed.
  4. Target folders (from data problem audit):
  5. 004, 031, 053, 066, 070 — non-standard strategy.json structure (LLM 自由发挥)
  6. 044 — capabilities_extracted uses old schema (capability_id/capability_name)
  7. Strategy: for each folder, manually locate the relevant fields and wire the junctions.
  8. All insertions use version='howard_dedup'.
  9. 用法:
  10. python knowhub/scripts/salvage_malformed_folders.py --execute
  11. """
  12. import argparse
  13. import hashlib
  14. import json
  15. import sys
  16. import time
  17. from pathlib import Path
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  20. from knowhub.scripts.rebuild_howard_dedup import (
  21. parse_snapshot, build_alias_map, norm, gen_cap_id, gen_strategy_id,
  22. gen_resource_id, SNAPSHOT_PATH, OUTPUT_DIR, DEDUP_VERSION,
  23. )
  24. DEDUP = DEDUP_VERSION
  25. def resolve_cap(cur, cap_name_or_id, alias, current_caps, create_if_missing=False,
  26. cap_desc='', cap_criterion=''):
  27. """Resolve a capability name/id → DB id. Optionally create if missing."""
  28. if not cap_name_or_id:
  29. return None
  30. # If it looks like a cap ID and exists in DB, use it
  31. if cap_name_or_id.startswith('CAP-') and cap_name_or_id in current_caps:
  32. return cap_name_or_id
  33. # Try alias
  34. cand = alias.get(norm(cap_name_or_id))
  35. if cand and cand in current_caps:
  36. return cand
  37. # Try checking if name matches DB
  38. cur.execute('SELECT id FROM capability WHERE name = %s LIMIT 1', (cap_name_or_id,))
  39. row = cur.fetchone()
  40. if row:
  41. return row['id']
  42. if create_if_missing:
  43. new_id = gen_cap_id(cap_name_or_id)
  44. cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
  45. if not cur.fetchone():
  46. cur.execute(
  47. """INSERT INTO capability (id, name, criterion, description, effects, version)
  48. VALUES (%s, %s, %s, %s, %s, %s)""",
  49. (new_id, cap_name_or_id, cap_criterion, cap_desc, '[]', DEDUP))
  50. current_caps[new_id] = {'name': cap_name_or_id}
  51. alias[norm(cap_name_or_id)] = new_id
  52. return new_id
  53. return None
  54. def insert_strategy(cur, folder_key, req_id, req_text, strategy_name, strategy_body,
  55. reasoning, resource_ids, cap_ids):
  56. strat_id = gen_strategy_id(req_text, strategy_name)
  57. now = int(time.time())
  58. cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
  59. cur.execute(
  60. """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
  61. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  62. (strat_id, strategy_name, (reasoning or '')[:2000],
  63. json.dumps(strategy_body, ensure_ascii=False, indent=2),
  64. 'draft', now, now, DEDUP))
  65. # requirement → strategy
  66. cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
  67. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
  68. # strategy → resources
  69. for rid in resource_ids:
  70. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  71. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
  72. # strategy → capabilities (+ req → cap)
  73. for cid in cap_ids:
  74. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
  75. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
  76. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  77. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
  78. return strat_id
  79. def ensure_resources(cur, folder, req_id):
  80. """Read raw_cases/, insert resources, return list of resource IDs."""
  81. raw_dir = folder / 'raw_cases'
  82. resource_ids = []
  83. if not raw_dir.exists():
  84. return resource_ids
  85. for cf in sorted(raw_dir.glob('*.json')):
  86. try:
  87. data = json.loads(cf.read_text(encoding='utf-8'))
  88. except Exception:
  89. continue
  90. cases = data.get('cases', []) if isinstance(data, dict) else data
  91. if not isinstance(cases, list):
  92. continue
  93. for case in cases:
  94. if not isinstance(case, dict):
  95. continue
  96. url = case.get('source_url') or case.get('url')
  97. if not url:
  98. continue
  99. platform = case.get('platform') or cf.stem.replace('case_', '')
  100. rid = gen_resource_id(platform, url)
  101. title = (case.get('title') or '')[:200]
  102. metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
  103. likes = (metrics.get('likes') or 0) if metrics else 0
  104. cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
  105. cur.execute(
  106. """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
  107. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  108. (rid, title, json.dumps(case, ensure_ascii=False)[:8000],
  109. 'research_case',
  110. json.dumps(case.get('images', []) or [], ensure_ascii=False),
  111. json.dumps({'platform': platform, 'source_url': url,
  112. 'metrics': metrics, 'folder': folder.name},
  113. ensure_ascii=False),
  114. -int(likes), DEDUP))
  115. resource_ids.append(rid)
  116. cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
  117. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
  118. return resource_ids
  119. # ═══════════════════════════════════════════════════════════
  120. # Per-folder handlers
  121. def salvage_004(cur, alias, current_caps):
  122. """004: strategy.strategy.phases + capability_mapping (capability=name)."""
  123. folder = OUTPUT_DIR / '004'
  124. req_id = 'REQ_004'
  125. d = json.loads((folder / 'strategy.json').read_text())
  126. req_text = d.get('requirement', '')
  127. strat_dict = d.get('strategy', {})
  128. overview = strat_dict.get('overview', '')
  129. name = f'Strategy-004'
  130. # capability_mapping: [{capability: name, role, used_in_phases, key_tools}]
  131. cap_ids = set()
  132. for entry in d.get('capability_mapping', []) or []:
  133. if not isinstance(entry, dict):
  134. continue
  135. cap_name = entry.get('capability') or entry.get('capability_name')
  136. cid = resolve_cap(cur, cap_name, alias, current_caps, create_if_missing=True)
  137. if cid: cap_ids.add(cid)
  138. resource_ids = ensure_resources(cur, folder, req_id)
  139. body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])}
  140. insert_strategy(cur, '004', req_id, req_text, name, body, overview,
  141. resource_ids, cap_ids)
  142. return len(cap_ids), len(resource_ids)
  143. def salvage_031(cur, alias, current_caps):
  144. """031: strategy.strategy.phases + capability_mapping (capability_id/name)."""
  145. folder = OUTPUT_DIR / '031'
  146. req_id = 'REQ_031'
  147. d = json.loads((folder / 'strategy.json').read_text())
  148. req_text = d.get('requirement', '')
  149. strat_dict = d.get('strategy', {})
  150. overview = strat_dict.get('overview', '')
  151. name = f'Strategy-031'
  152. cap_ids = set()
  153. for entry in d.get('capability_mapping', []) or []:
  154. if not isinstance(entry, dict):
  155. continue
  156. cid_src = entry.get('capability_id')
  157. cname = entry.get('capability_name')
  158. cid = None
  159. if cid_src:
  160. cid = resolve_cap(cur, cid_src, alias, current_caps)
  161. if not cid and cname:
  162. cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True)
  163. if cid: cap_ids.add(cid)
  164. resource_ids = ensure_resources(cur, folder, req_id)
  165. body = {'strategy': strat_dict, 'capability_mapping': d.get('capability_mapping', [])}
  166. insert_strategy(cur, '031', req_id, req_text, name, body, overview,
  167. resource_ids, cap_ids)
  168. return len(cap_ids), len(resource_ids)
  169. def salvage_053(cur, alias, current_caps):
  170. """053: phases[] (phase/description/capabilities) + core_workflow (text)."""
  171. folder = OUTPUT_DIR / '053'
  172. req_id = 'REQ_053'
  173. d = json.loads((folder / 'strategy.json').read_text())
  174. req_text = d.get('requirement', '')
  175. name = f'Strategy-053'
  176. cap_ids = set()
  177. for phase in d.get('phases', []) or []:
  178. if not isinstance(phase, dict):
  179. continue
  180. for cap_ref in phase.get('capabilities', []) or []:
  181. if isinstance(cap_ref, str):
  182. # string name or id
  183. cid = resolve_cap(cur, cap_ref, alias, current_caps, create_if_missing=True)
  184. if cid: cap_ids.add(cid)
  185. elif isinstance(cap_ref, dict):
  186. src = cap_ref.get('id') or cap_ref.get('capability_id')
  187. nm = cap_ref.get('name') or cap_ref.get('capability_name')
  188. cid = None
  189. if src: cid = resolve_cap(cur, src, alias, current_caps)
  190. if not cid and nm:
  191. cid = resolve_cap(cur, nm, alias, current_caps, create_if_missing=True)
  192. if cid: cap_ids.add(cid)
  193. resource_ids = ensure_resources(cur, folder, req_id)
  194. body = {'phases': d.get('phases', []), 'core_workflow': d.get('core_workflow', '')}
  195. insert_strategy(cur, '053', req_id, req_text, name, body,
  196. d.get('core_workflow', '')[:500], resource_ids, cap_ids)
  197. return len(cap_ids), len(resource_ids)
  198. def salvage_066(cur, alias, current_caps):
  199. """066: execution_phases + key_capabilities (id/name/role)."""
  200. folder = OUTPUT_DIR / '066'
  201. req_id = 'REQ_066'
  202. d = json.loads((folder / 'strategy.json').read_text())
  203. req_text = d.get('requirement', '')
  204. name = f'Strategy-066'
  205. cap_ids = set()
  206. # key_capabilities is primary
  207. for cap in d.get('key_capabilities', []) or []:
  208. if not isinstance(cap, dict): continue
  209. cid = None
  210. if cap.get('id'):
  211. cid = resolve_cap(cur, cap['id'], alias, current_caps)
  212. if not cid and cap.get('name'):
  213. cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True)
  214. if cid: cap_ids.add(cid)
  215. # execution_phases may reference more
  216. for phase in d.get('execution_phases', []) or []:
  217. if not isinstance(phase, dict): continue
  218. for c in phase.get('capabilities_used', []) or []:
  219. if isinstance(c, str):
  220. cid = resolve_cap(cur, c, alias, current_caps, create_if_missing=True)
  221. if cid: cap_ids.add(cid)
  222. elif isinstance(c, dict):
  223. cid = None
  224. if c.get('id'): cid = resolve_cap(cur, c['id'], alias, current_caps)
  225. if not cid and c.get('name'):
  226. cid = resolve_cap(cur, c['name'], alias, current_caps, create_if_missing=True)
  227. if cid: cap_ids.add(cid)
  228. resource_ids = ensure_resources(cur, folder, req_id)
  229. body = {'execution_phases': d.get('execution_phases', []),
  230. 'key_capabilities': d.get('key_capabilities', []),
  231. 'recommended_tools': d.get('recommended_tools', [])}
  232. insert_strategy(cur, '066', req_id, req_text, name, body, '', resource_ids, cap_ids)
  233. return len(cap_ids), len(resource_ids)
  234. def salvage_070(cur, alias, current_caps):
  235. """070: capabilities_mapping (id/name/role_in_strategy/is_new) + tool_chain."""
  236. folder = OUTPUT_DIR / '070'
  237. req_id = 'REQ_070'
  238. d = json.loads((folder / 'strategy.json').read_text())
  239. req_text = d.get('requirement', '')
  240. name = f'Strategy-070'
  241. cap_ids = set()
  242. for cap in d.get('capabilities_mapping', []) or []:
  243. if not isinstance(cap, dict): continue
  244. cid = None
  245. if cap.get('id'): cid = resolve_cap(cur, cap['id'], alias, current_caps)
  246. if not cid and cap.get('name'):
  247. cid = resolve_cap(cur, cap['name'], alias, current_caps, create_if_missing=True)
  248. if cid: cap_ids.add(cid)
  249. resource_ids = ensure_resources(cur, folder, req_id)
  250. body = {'capabilities_mapping': d.get('capabilities_mapping', []),
  251. 'tool_chain': d.get('tool_chain', []),
  252. 'selected_blueprint': d.get('selected_blueprint', '')}
  253. insert_strategy(cur, '070', req_id, req_text, name, body, '', resource_ids, cap_ids)
  254. return len(cap_ids), len(resource_ids)
  255. def salvage_044(cur, alias, current_caps):
  256. """044: capabilities_extracted uses capability_id/capability_name schema.
  257. Just link these caps to existing REQ_044's strategy (already created)."""
  258. folder = OUTPUT_DIR / '044'
  259. req_id = 'REQ_044'
  260. # find existing strategy
  261. cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id=%s LIMIT 1', (req_id,))
  262. row = cur.fetchone()
  263. if not row:
  264. print(' ⚠️ 044 has no strategy yet', flush=True)
  265. return 0, 0
  266. strat_id = row['strategy_id']
  267. d = json.loads((folder / 'capabilities_extracted.json').read_text())
  268. cap_ids = set()
  269. for cap in d.get('extracted_capabilities', []) or []:
  270. if not isinstance(cap, dict): continue
  271. cid_src = cap.get('capability_id')
  272. cname = cap.get('capability_name')
  273. cid = None
  274. if cid_src:
  275. cid = resolve_cap(cur, cid_src, alias, current_caps)
  276. if not cid and cname:
  277. cid = resolve_cap(cur, cname, alias, current_caps, create_if_missing=True)
  278. if cid: cap_ids.add(cid)
  279. for cid in cap_ids:
  280. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
  281. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
  282. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  283. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
  284. return len(cap_ids), 0
  285. # ═══════════════════════════════════════════════════════════
  286. def main():
  287. ap = argparse.ArgumentParser()
  288. ap.add_argument('--execute', action='store_true', required=True)
  289. args = ap.parse_args()
  290. s = PostgreSQLCapabilityStore()
  291. cur = s._get_cursor()
  292. cur.execute("SELECT id, name FROM capability WHERE version = %s", (DEDUP,))
  293. current_caps = {r['id']: {'name': r['name']} for r in cur.fetchall()}
  294. snapshot = parse_snapshot(SNAPSHOT_PATH)
  295. alias = build_alias_map(snapshot, current_caps)
  296. print(f'alias: {len(alias)}, caps: {len(current_caps)}', flush=True)
  297. handlers = [
  298. ('004', salvage_004), ('031', salvage_031),
  299. ('044', salvage_044), ('053', salvage_053),
  300. ('066', salvage_066), ('070', salvage_070),
  301. ]
  302. for key, fn in handlers:
  303. print(f'\n📁 salvaging {key}/ ...', flush=True)
  304. try:
  305. n_cap, n_res = fn(cur, alias, current_caps)
  306. print(f' ✓ {key}: {n_cap} caps, {n_res} resources', flush=True)
  307. except Exception as e:
  308. print(f' ❌ {key}: {type(e).__name__}: {e}', flush=True)
  309. import traceback; traceback.print_exc()
  310. # Final check
  311. print(f'\n{"="*50}\nFinal requirement coverage:', flush=True)
  312. for req in ['REQ_004','REQ_031','REQ_044','REQ_053','REQ_066','REQ_070']:
  313. cur.execute('SELECT COUNT(*) AS c FROM requirement_strategy WHERE requirement_id=%s', (req,))
  314. strat = cur.fetchone()['c']
  315. cur.execute('SELECT COUNT(*) AS c FROM requirement_capability WHERE requirement_id=%s', (req,))
  316. cap = cur.fetchone()['c']
  317. cur.execute('SELECT COUNT(*) AS c FROM requirement_resource WHERE requirement_id=%s', (req,))
  318. res = cur.fetchone()['c']
  319. print(f' {req}: strat={strat} cap={cap} res={res}', flush=True)
  320. cur.close()
  321. s.close()
  322. if __name__ == '__main__':
  323. main()