#!/usr/bin/env python3 """ 处理 5 个重跑数据 folder:032/046/069/085/097。 策略(对每个 folder): 1. 定位 req_id(blueprint.json 解析失败时 fallback strategy.json) 2. 清理旧数据: - 删除 folder 标签为 F 的 resource 及其所有 junction - 删除 req 关联的 strategy + 其 junction - 删除 requirement_capability (req, *) 条目(后续重建全集) 3. 重新 ingest: - resources from raw_cases/(case_bili.json parse fail → 正则 fallback) - capabilities via alias(不存在则新建 howard_dedup) - strategy(is_selected 或第一条) - junctions: · req_res / strat_res: 按 resource 逐条写 · strat_cap: workflow_outline 的 caps(relation_type='compose') · req_strat: 1 条 · req_cap: capabilities_extracted.json 的所有 caps(A 方案:研究全集) 所有操作 autocommit=True;脚本幂等、允许断点重跑。 """ import hashlib import json import re import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS from knowhub.scripts.rename_merged_capabilities import RENAMES RERUN_DIR = Path('/Users/sunlit/Downloads/5') FOLDERS = ['032', '046', '069', '085', '097'] DEDUP_VERSION = 'howard_dedup' def norm(s): return (s or '').strip().lower() def hash8(text): return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8] def hash12(text): return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12] def gen_cap_id(name): return f'CAP-{hash8(norm(name))}' def gen_resource_id(platform, url): p = (platform or 'unknown').lower().strip() return f'resource/research/{p}/{hash12(url)}' def gen_strategy_id(req_text, strategy_name): return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}' # ═══════════════════════════════════════════════════════════ def build_alias_map(cur): """Build norm(name) -> canonical_id alias from current DB + MERGE_CLUSTERS + RENAMES.""" # Step A: member→canonical with transitive closure member_to_canonical = {} for canonical, members in MERGE_CLUSTERS.items(): for m in members: member_to_canonical[m] = canonical def final(cid, limit=10): seen = set() while cid in member_to_canonical and cid not in seen and limit > 0: seen.add(cid) cid = member_to_canonical[cid] limit -= 1 return cid for m in list(member_to_canonical.keys()): member_to_canonical[m] = final(m) alias = {} # Current DB names cur.execute('SELECT id, name FROM capability') for r in cur.fetchall(): alias[norm(r['name'])] = r['id'] # RENAMES new names for cid, (new_name, _) in RENAMES.items(): alias[norm(new_name)] = final(cid) return alias # ═══════════════════════════════════════════════════════════ def load_raw_cases(folder_path): """Return list of case dicts; fallback to regex when json.load fails.""" raw_dir = folder_path / 'raw_cases' all_cases = [] if not raw_dir.exists(): return all_cases for cf in sorted(raw_dir.glob('*.json')): platform = cf.stem.replace('case_', '') try: data = json.loads(cf.read_text(encoding='utf-8')) cases = data.get('cases', []) if isinstance(data, dict) else data if isinstance(cases, list): for c in cases: if isinstance(c, dict): c.setdefault('platform', platform) all_cases.append(c) continue except Exception as e: print(f' ⚠️ {cf.name} parse fail ({e}); trying regex fallback', flush=True) # regex fallback: anchor by source_url (titles too unreliable) text = cf.read_text(encoding='utf-8') urls = re.findall(r'"source_url"\s*:\s*"([^"]+)"', text) ids = re.findall(r'"id"\s*:\s*"(case_[^"]+)"', text) recovered_n = 0 for i, url in enumerate(urls): case = { 'id': ids[i] if i < len(ids) else f'{platform}_fallback_{i}', 'title': '', # unreliable without proper JSON parse 'platform': platform, 'source_url': url, } all_cases.append(case) recovered_n += 1 print(f' ⇒ recovered {recovered_n} {platform} cases via regex (titles skipped)', flush=True) return all_cases # ═══════════════════════════════════════════════════════════ def cleanup_folder_data(cur, req_id, folder_key, stats): """Remove old resources with folder tag + related junctions + old strategy for this req + req_cap.""" # resources tagged with this folder cur.execute("SELECT id FROM resource WHERE metadata::jsonb->>'folder' = %s", (folder_key,)) old_res = [r['id'] for r in cur.fetchall()] for rid in old_res: cur.execute('DELETE FROM requirement_resource WHERE resource_id = %s', (rid,)) cur.execute('DELETE FROM strategy_resource WHERE resource_id = %s', (rid,)) cur.execute('DELETE FROM capability_resource WHERE resource_id = %s', (rid,)) cur.execute('DELETE FROM resource WHERE id = %s', (rid,)) stats['deleted_resources'] = len(old_res) # any remaining req_res junctions for this req (untagged orphans) cur.execute('DELETE FROM requirement_resource WHERE requirement_id = %s', (req_id,)) # strategies linked to this req cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id = %s', (req_id,)) old_strats = [r['strategy_id'] for r in cur.fetchall()] for sid in old_strats: cur.execute('DELETE FROM requirement_strategy WHERE strategy_id = %s', (sid,)) cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (sid,)) cur.execute('DELETE FROM strategy_resource WHERE strategy_id = %s', (sid,)) cur.execute('DELETE FROM strategy_knowledge WHERE strategy_id = %s', (sid,)) cur.execute('DELETE FROM strategy WHERE id = %s', (sid,)) stats['deleted_strategies'] = len(old_strats) # req_cap for this req (will be rebuilt) cur.execute('DELETE FROM requirement_capability WHERE requirement_id = %s', (req_id,)) # ═══════════════════════════════════════════════════════════ def ingest_folder(folder_path, cur, alias, stats): folder_key = folder_path.name # (1) requirement text — try blueprint, fallback to strategy req_text = '' try: bp = json.loads((folder_path / 'blueprint.json').read_text(encoding='utf-8')) req_text = bp.get('requirement', '') except Exception as e: print(f' ⚠️ blueprint parse fail ({e}); trying strategy.json', flush=True) if not req_text: try: sd = json.loads((folder_path / 'strategy.json').read_text(encoding='utf-8')) req_text = sd.get('requirement', '') except Exception as e: print(f' ❌ no requirement text available ({e})', flush=True) return cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,)) row = cur.fetchone() if not row: print(f' ❌ no matching requirement for {folder_key}', flush=True) return req_id = row['id'] print(f' → req_id={req_id}', flush=True) # (2) cleanup cleanup_folder_data(cur, req_id, folder_key, stats) del_r, del_s = stats['deleted_resources'], stats['deleted_strategies'] print(f' cleaned: del_res={del_r}, del_strat={del_s}', flush=True) # (3) resources cases = load_raw_cases(folder_path) resource_ids = [] for case in cases: url = case.get('source_url') or case.get('url') if not url: continue platform = case.get('platform') or 'unknown' rid = gen_resource_id(platform, url) title = (case.get('title') or '')[:200] metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {} likes = (metrics.get('likes') or 0) if metrics else 0 cur.execute('DELETE FROM resource WHERE id = %s', (rid,)) cur.execute( """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (rid, title, json.dumps(case, ensure_ascii=False)[:8000], 'research_case', json.dumps(case.get('images', []) or [], ensure_ascii=False), json.dumps({'platform': platform, 'source_url': url, 'metrics': metrics, 'folder': folder_key}, ensure_ascii=False), -int(likes), DEDUP_VERSION)) resource_ids.append(rid) stats['resources'] = len(resource_ids) # (4) capabilities (from capabilities_extracted.json) — track ALL of them for req_cap superset caps_path = folder_path / 'capabilities_extracted.json' all_cap_ids_research = set() # for req_cap (A-plan research superset) cap_key_to_id = {} # source_key -> resolved_id (for strat_cap resolution) if caps_path.exists(): try: caps_data = json.loads(caps_path.read_text(encoding='utf-8')) except Exception as e: print(f' ⚠️ capabilities_extracted parse fail: {e}', flush=True) caps_data = {'extracted_capabilities': []} for cap in caps_data.get('extracted_capabilities', []): name = (cap.get('name') or '').strip() if not name: continue src_id = cap.get('id') resolved = None # (a) source id exists? if src_id: cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,)) if cur.fetchone(): resolved = src_id # (b) alias by name if not resolved: cand = alias.get(norm(name)) if cand: cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,)) if cur.fetchone(): resolved = cand # (c) create new if not resolved: new_id = gen_cap_id(name) cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,)) if not cur.fetchone(): cur.execute( """INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, %s)""", (new_id, name, cap.get('criterion', '') or '', cap.get('description', '') or '', json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str), DEDUP_VERSION)) alias[norm(name)] = new_id stats['cap_new'] += 1 resolved = new_id else: # backfill criterion/effects if missing cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,)) ex = cur.fetchone() if ex: if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'): cur.execute('UPDATE capability SET criterion = %s WHERE id = %s', (cap['criterion'], resolved)) cur_eff = ex.get('effects') if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'): cur.execute('UPDATE capability SET effects = %s WHERE id = %s', (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved)) stats['cap_linked'] += 1 all_cap_ids_research.add(resolved) cap_key_to_id[src_id or name] = resolved # (5) strategy strat_path = folder_path / 'strategy.json' strat_id = None strat_cap_ids = set() if strat_path.exists(): try: strat_data = json.loads(strat_path.read_text(encoding='utf-8')) except Exception as e: print(f' ⚠️ strategy parse fail: {e}', flush=True) strat_data = {'strategies': []} selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None) if not selected and strat_data.get('strategies'): selected = strat_data['strategies'][0] if selected: strategy_name = selected.get('name') or f'Strategy-{folder_key}' strat_id = gen_strategy_id(req_text, strategy_name) now = int(time.time()) cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,)) cur.execute( """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000], json.dumps(selected, ensure_ascii=False, indent=2), 'draft', now, now, DEDUP_VERSION)) stats['strategies'] = 1 # workflow_outline cap resolution wo = selected.get('workflow_outline') or [] if isinstance(wo, list): for phase in wo: if not isinstance(phase, dict): continue caps = phase.get('capabilities') or [] if not isinstance(caps, list): continue for cref in caps: if not isinstance(cref, dict): continue key = cref.get('id') or cref.get('name', '') resolved = cap_key_to_id.get(key) or alias.get(norm(cref.get('name', ''))) if resolved: strat_cap_ids.add(resolved) # (6) wire junctions for rid in resource_ids: cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid)) if strat_id: cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid)) if strat_id: cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id)) for cid in strat_cap_ids: cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid)) # req_cap: research superset (A plan) for cid in all_cap_ids_research: cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid)) # also include strat-only caps (in case some are in workflow_outline but not in extracted list) for cid in strat_cap_ids: cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid)) stats['req_cap_wired'] = len(all_cap_ids_research | strat_cap_ids) stats['strat_cap_wired'] = len(strat_cap_ids) rc_n, sc_n = stats['req_cap_wired'], stats['strat_cap_wired'] s_n = 1 if strat_id else 0 print(f' ingested: res={len(resource_ids)}, strat={s_n}, req_cap={rc_n}, strat_cap={sc_n}', flush=True) # ═══════════════════════════════════════════════════════════ def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: print('Building alias map...', flush=True) alias = build_alias_map(cur) print(f' alias entries: {len(alias)}', flush=True) totals = {'deleted_resources': 0, 'deleted_strategies': 0, 'resources': 0, 'cap_new': 0, 'cap_linked': 0, 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0} for f in FOLDERS: print(f'\n=== {f} ===', flush=True) stats = {'deleted_resources': 0, 'deleted_strategies': 0, 'resources': 0, 'cap_new': 0, 'cap_linked': 0, 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0} try: ingest_folder(RERUN_DIR / f, cur, alias, stats) for k in totals: totals[k] += stats.get(k, 0) except Exception as e: print(f' ❌ {type(e).__name__}: {e}', flush=True) try: cur.close() except Exception: pass cur = s._get_cursor() print(f'\n{"="*50}\nTOTALS: {totals}', flush=True) finally: cur.close() s.close() if __name__ == '__main__': main()