ingest_reruns.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. #!/usr/bin/env python3
  2. """
  3. 处理 5 个重跑数据 folder:032/046/069/085/097。
  4. 策略(对每个 folder):
  5. 1. 定位 req_id(blueprint.json 解析失败时 fallback strategy.json)
  6. 2. 清理旧数据:
  7. - 删除 folder 标签为 F 的 resource 及其所有 junction
  8. - 删除 req 关联的 strategy + 其 junction
  9. - 删除 requirement_capability (req, *) 条目(后续重建全集)
  10. 3. 重新 ingest:
  11. - resources from raw_cases/(case_bili.json parse fail → 正则 fallback)
  12. - capabilities via alias(不存在则新建 howard_dedup)
  13. - strategy(is_selected 或第一条)
  14. - junctions:
  15. · req_res / strat_res: 按 resource 逐条写
  16. · strat_cap: workflow_outline 的 caps(relation_type='compose')
  17. · req_strat: 1 条
  18. · req_cap: capabilities_extracted.json 的所有 caps(A 方案:研究全集)
  19. 所有操作 autocommit=True;脚本幂等、允许断点重跑。
  20. """
  21. import hashlib
  22. import json
  23. import re
  24. import sys
  25. import time
  26. from pathlib import Path
  27. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  28. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  29. from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
  30. from knowhub.scripts.rename_merged_capabilities import RENAMES
  31. RERUN_DIR = Path('/Users/sunlit/Downloads/5')
  32. FOLDERS = ['032', '046', '069', '085', '097']
  33. DEDUP_VERSION = 'howard_dedup'
  34. def norm(s):
  35. return (s or '').strip().lower()
  36. def hash8(text):
  37. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
  38. def hash12(text):
  39. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12]
  40. def gen_cap_id(name):
  41. return f'CAP-{hash8(norm(name))}'
  42. def gen_resource_id(platform, url):
  43. p = (platform or 'unknown').lower().strip()
  44. return f'resource/research/{p}/{hash12(url)}'
  45. def gen_strategy_id(req_text, strategy_name):
  46. return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
  47. # ═══════════════════════════════════════════════════════════
  48. def build_alias_map(cur):
  49. """Build norm(name) -> canonical_id alias from current DB + MERGE_CLUSTERS + RENAMES."""
  50. # Step A: member→canonical with transitive closure
  51. member_to_canonical = {}
  52. for canonical, members in MERGE_CLUSTERS.items():
  53. for m in members:
  54. member_to_canonical[m] = canonical
  55. def final(cid, limit=10):
  56. seen = set()
  57. while cid in member_to_canonical and cid not in seen and limit > 0:
  58. seen.add(cid)
  59. cid = member_to_canonical[cid]
  60. limit -= 1
  61. return cid
  62. for m in list(member_to_canonical.keys()):
  63. member_to_canonical[m] = final(m)
  64. alias = {}
  65. # Current DB names
  66. cur.execute('SELECT id, name FROM capability')
  67. for r in cur.fetchall():
  68. alias[norm(r['name'])] = r['id']
  69. # RENAMES new names
  70. for cid, (new_name, _) in RENAMES.items():
  71. alias[norm(new_name)] = final(cid)
  72. return alias
  73. # ═══════════════════════════════════════════════════════════
  74. def load_raw_cases(folder_path):
  75. """Return list of case dicts; fallback to regex when json.load fails."""
  76. raw_dir = folder_path / 'raw_cases'
  77. all_cases = []
  78. if not raw_dir.exists():
  79. return all_cases
  80. for cf in sorted(raw_dir.glob('*.json')):
  81. platform = cf.stem.replace('case_', '')
  82. try:
  83. data = json.loads(cf.read_text(encoding='utf-8'))
  84. cases = data.get('cases', []) if isinstance(data, dict) else data
  85. if isinstance(cases, list):
  86. for c in cases:
  87. if isinstance(c, dict):
  88. c.setdefault('platform', platform)
  89. all_cases.append(c)
  90. continue
  91. except Exception as e:
  92. print(f' ⚠️ {cf.name} parse fail ({e}); trying regex fallback', flush=True)
  93. # regex fallback: anchor by source_url (titles too unreliable)
  94. text = cf.read_text(encoding='utf-8')
  95. urls = re.findall(r'"source_url"\s*:\s*"([^"]+)"', text)
  96. ids = re.findall(r'"id"\s*:\s*"(case_[^"]+)"', text)
  97. recovered_n = 0
  98. for i, url in enumerate(urls):
  99. case = {
  100. 'id': ids[i] if i < len(ids) else f'{platform}_fallback_{i}',
  101. 'title': '', # unreliable without proper JSON parse
  102. 'platform': platform,
  103. 'source_url': url,
  104. }
  105. all_cases.append(case)
  106. recovered_n += 1
  107. print(f' ⇒ recovered {recovered_n} {platform} cases via regex (titles skipped)',
  108. flush=True)
  109. return all_cases
  110. # ═══════════════════════════════════════════════════════════
  111. def cleanup_folder_data(cur, req_id, folder_key, stats):
  112. """Remove old resources with folder tag + related junctions + old strategy for this req + req_cap."""
  113. # resources tagged with this folder
  114. cur.execute("SELECT id FROM resource WHERE metadata::jsonb->>'folder' = %s", (folder_key,))
  115. old_res = [r['id'] for r in cur.fetchall()]
  116. for rid in old_res:
  117. cur.execute('DELETE FROM requirement_resource WHERE resource_id = %s', (rid,))
  118. cur.execute('DELETE FROM strategy_resource WHERE resource_id = %s', (rid,))
  119. cur.execute('DELETE FROM capability_resource WHERE resource_id = %s', (rid,))
  120. cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
  121. stats['deleted_resources'] = len(old_res)
  122. # any remaining req_res junctions for this req (untagged orphans)
  123. cur.execute('DELETE FROM requirement_resource WHERE requirement_id = %s', (req_id,))
  124. # strategies linked to this req
  125. cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id = %s', (req_id,))
  126. old_strats = [r['strategy_id'] for r in cur.fetchall()]
  127. for sid in old_strats:
  128. cur.execute('DELETE FROM requirement_strategy WHERE strategy_id = %s', (sid,))
  129. cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (sid,))
  130. cur.execute('DELETE FROM strategy_resource WHERE strategy_id = %s', (sid,))
  131. cur.execute('DELETE FROM strategy_knowledge WHERE strategy_id = %s', (sid,))
  132. cur.execute('DELETE FROM strategy WHERE id = %s', (sid,))
  133. stats['deleted_strategies'] = len(old_strats)
  134. # req_cap for this req (will be rebuilt)
  135. cur.execute('DELETE FROM requirement_capability WHERE requirement_id = %s', (req_id,))
  136. # ═══════════════════════════════════════════════════════════
  137. def ingest_folder(folder_path, cur, alias, stats):
  138. folder_key = folder_path.name
  139. # (1) requirement text — try blueprint, fallback to strategy
  140. req_text = ''
  141. try:
  142. bp = json.loads((folder_path / 'blueprint.json').read_text(encoding='utf-8'))
  143. req_text = bp.get('requirement', '')
  144. except Exception as e:
  145. print(f' ⚠️ blueprint parse fail ({e}); trying strategy.json', flush=True)
  146. if not req_text:
  147. try:
  148. sd = json.loads((folder_path / 'strategy.json').read_text(encoding='utf-8'))
  149. req_text = sd.get('requirement', '')
  150. except Exception as e:
  151. print(f' ❌ no requirement text available ({e})', flush=True)
  152. return
  153. cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
  154. row = cur.fetchone()
  155. if not row:
  156. print(f' ❌ no matching requirement for {folder_key}', flush=True)
  157. return
  158. req_id = row['id']
  159. print(f' → req_id={req_id}', flush=True)
  160. # (2) cleanup
  161. cleanup_folder_data(cur, req_id, folder_key, stats)
  162. del_r, del_s = stats['deleted_resources'], stats['deleted_strategies']
  163. print(f' cleaned: del_res={del_r}, del_strat={del_s}', flush=True)
  164. # (3) resources
  165. cases = load_raw_cases(folder_path)
  166. resource_ids = []
  167. for case in cases:
  168. url = case.get('source_url') or case.get('url')
  169. if not url:
  170. continue
  171. platform = case.get('platform') or 'unknown'
  172. rid = gen_resource_id(platform, url)
  173. title = (case.get('title') or '')[:200]
  174. metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
  175. likes = (metrics.get('likes') or 0) if metrics else 0
  176. cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
  177. cur.execute(
  178. """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
  179. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  180. (rid, title,
  181. json.dumps(case, ensure_ascii=False)[:8000],
  182. 'research_case',
  183. json.dumps(case.get('images', []) or [], ensure_ascii=False),
  184. json.dumps({'platform': platform, 'source_url': url,
  185. 'metrics': metrics, 'folder': folder_key},
  186. ensure_ascii=False),
  187. -int(likes), DEDUP_VERSION))
  188. resource_ids.append(rid)
  189. stats['resources'] = len(resource_ids)
  190. # (4) capabilities (from capabilities_extracted.json) — track ALL of them for req_cap superset
  191. caps_path = folder_path / 'capabilities_extracted.json'
  192. all_cap_ids_research = set() # for req_cap (A-plan research superset)
  193. cap_key_to_id = {} # source_key -> resolved_id (for strat_cap resolution)
  194. if caps_path.exists():
  195. try:
  196. caps_data = json.loads(caps_path.read_text(encoding='utf-8'))
  197. except Exception as e:
  198. print(f' ⚠️ capabilities_extracted parse fail: {e}', flush=True)
  199. caps_data = {'extracted_capabilities': []}
  200. for cap in caps_data.get('extracted_capabilities', []):
  201. name = (cap.get('name') or '').strip()
  202. if not name:
  203. continue
  204. src_id = cap.get('id')
  205. resolved = None
  206. # (a) source id exists?
  207. if src_id:
  208. cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,))
  209. if cur.fetchone():
  210. resolved = src_id
  211. # (b) alias by name
  212. if not resolved:
  213. cand = alias.get(norm(name))
  214. if cand:
  215. cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,))
  216. if cur.fetchone():
  217. resolved = cand
  218. # (c) create new
  219. if not resolved:
  220. new_id = gen_cap_id(name)
  221. cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
  222. if not cur.fetchone():
  223. cur.execute(
  224. """INSERT INTO capability (id, name, criterion, description, effects, version)
  225. VALUES (%s, %s, %s, %s, %s, %s)""",
  226. (new_id, name, cap.get('criterion', '') or '',
  227. cap.get('description', '') or '',
  228. json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
  229. DEDUP_VERSION))
  230. alias[norm(name)] = new_id
  231. stats['cap_new'] += 1
  232. resolved = new_id
  233. else:
  234. # backfill criterion/effects if missing
  235. cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,))
  236. ex = cur.fetchone()
  237. if ex:
  238. if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'):
  239. cur.execute('UPDATE capability SET criterion = %s WHERE id = %s',
  240. (cap['criterion'], resolved))
  241. cur_eff = ex.get('effects')
  242. if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'):
  243. cur.execute('UPDATE capability SET effects = %s WHERE id = %s',
  244. (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved))
  245. stats['cap_linked'] += 1
  246. all_cap_ids_research.add(resolved)
  247. cap_key_to_id[src_id or name] = resolved
  248. # (5) strategy
  249. strat_path = folder_path / 'strategy.json'
  250. strat_id = None
  251. strat_cap_ids = set()
  252. if strat_path.exists():
  253. try:
  254. strat_data = json.loads(strat_path.read_text(encoding='utf-8'))
  255. except Exception as e:
  256. print(f' ⚠️ strategy parse fail: {e}', flush=True)
  257. strat_data = {'strategies': []}
  258. selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None)
  259. if not selected and strat_data.get('strategies'):
  260. selected = strat_data['strategies'][0]
  261. if selected:
  262. strategy_name = selected.get('name') or f'Strategy-{folder_key}'
  263. strat_id = gen_strategy_id(req_text, strategy_name)
  264. now = int(time.time())
  265. cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
  266. cur.execute(
  267. """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
  268. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  269. (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000],
  270. json.dumps(selected, ensure_ascii=False, indent=2),
  271. 'draft', now, now, DEDUP_VERSION))
  272. stats['strategies'] = 1
  273. # workflow_outline cap resolution
  274. wo = selected.get('workflow_outline') or []
  275. if isinstance(wo, list):
  276. for phase in wo:
  277. if not isinstance(phase, dict):
  278. continue
  279. caps = phase.get('capabilities') or []
  280. if not isinstance(caps, list):
  281. continue
  282. for cref in caps:
  283. if not isinstance(cref, dict):
  284. continue
  285. key = cref.get('id') or cref.get('name', '')
  286. resolved = cap_key_to_id.get(key) or alias.get(norm(cref.get('name', '')))
  287. if resolved:
  288. strat_cap_ids.add(resolved)
  289. # (6) wire junctions
  290. for rid in resource_ids:
  291. cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
  292. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
  293. if strat_id:
  294. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  295. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
  296. if strat_id:
  297. cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
  298. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
  299. for cid in strat_cap_ids:
  300. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
  301. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
  302. # req_cap: research superset (A plan)
  303. for cid in all_cap_ids_research:
  304. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  305. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
  306. # also include strat-only caps (in case some are in workflow_outline but not in extracted list)
  307. for cid in strat_cap_ids:
  308. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  309. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
  310. stats['req_cap_wired'] = len(all_cap_ids_research | strat_cap_ids)
  311. stats['strat_cap_wired'] = len(strat_cap_ids)
  312. rc_n, sc_n = stats['req_cap_wired'], stats['strat_cap_wired']
  313. s_n = 1 if strat_id else 0
  314. print(f' ingested: res={len(resource_ids)}, strat={s_n}, req_cap={rc_n}, strat_cap={sc_n}', flush=True)
  315. # ═══════════════════════════════════════════════════════════
  316. def main():
  317. s = PostgreSQLCapabilityStore()
  318. cur = s._get_cursor()
  319. try:
  320. print('Building alias map...', flush=True)
  321. alias = build_alias_map(cur)
  322. print(f' alias entries: {len(alias)}', flush=True)
  323. totals = {'deleted_resources': 0, 'deleted_strategies': 0,
  324. 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
  325. 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
  326. for f in FOLDERS:
  327. print(f'\n=== {f} ===', flush=True)
  328. stats = {'deleted_resources': 0, 'deleted_strategies': 0,
  329. 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
  330. 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
  331. try:
  332. ingest_folder(RERUN_DIR / f, cur, alias, stats)
  333. for k in totals:
  334. totals[k] += stats.get(k, 0)
  335. except Exception as e:
  336. print(f' ❌ {type(e).__name__}: {e}', flush=True)
  337. try:
  338. cur.close()
  339. except Exception:
  340. pass
  341. cur = s._get_cursor()
  342. print(f'\n{"="*50}\nTOTALS: {totals}', flush=True)
  343. finally:
  344. cur.close()
  345. s.close()
  346. if __name__ == '__main__':
  347. main()