|
|
@@ -0,0 +1,395 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+处理 5 个重跑数据 folder:032/046/069/085/097。
|
|
|
+
|
|
|
+策略(对每个 folder):
|
|
|
+ 1. 定位 req_id(blueprint.json 解析失败时 fallback strategy.json)
|
|
|
+ 2. 清理旧数据:
|
|
|
+ - 删除 folder 标签为 F 的 resource 及其所有 junction
|
|
|
+ - 删除 req 关联的 strategy + 其 junction
|
|
|
+ - 删除 requirement_capability (req, *) 条目(后续重建全集)
|
|
|
+ 3. 重新 ingest:
|
|
|
+ - resources from raw_cases/(case_bili.json parse fail → 正则 fallback)
|
|
|
+ - capabilities via alias(不存在则新建 howard_dedup)
|
|
|
+ - strategy(is_selected 或第一条)
|
|
|
+ - junctions:
|
|
|
+ · req_res / strat_res: 按 resource 逐条写
|
|
|
+ · strat_cap: workflow_outline 的 caps(relation_type='compose')
|
|
|
+ · req_strat: 1 条
|
|
|
+ · req_cap: capabilities_extracted.json 的所有 caps(A 方案:研究全集)
|
|
|
+
|
|
|
+所有操作 autocommit=True;脚本幂等、允许断点重跑。
|
|
|
+"""
|
|
|
+import hashlib
|
|
|
+import json
|
|
|
+import re
|
|
|
+import sys
|
|
|
+import time
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
+from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
|
|
|
+from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
|
|
|
+from knowhub.scripts.rename_merged_capabilities import RENAMES
|
|
|
+
|
|
|
+RERUN_DIR = Path('/Users/sunlit/Downloads/5')
|
|
|
+FOLDERS = ['032', '046', '069', '085', '097']
|
|
|
+DEDUP_VERSION = 'howard_dedup'
|
|
|
+
|
|
|
+
|
|
|
+def norm(s):
|
|
|
+ return (s or '').strip().lower()
|
|
|
+
|
|
|
+
|
|
|
+def hash8(text):
|
|
|
+ return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
|
|
|
+
|
|
|
+
|
|
|
+def hash12(text):
|
|
|
+ return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12]
|
|
|
+
|
|
|
+
|
|
|
+def gen_cap_id(name):
|
|
|
+ return f'CAP-{hash8(norm(name))}'
|
|
|
+
|
|
|
+
|
|
|
+def gen_resource_id(platform, url):
|
|
|
+ p = (platform or 'unknown').lower().strip()
|
|
|
+ return f'resource/research/{p}/{hash12(url)}'
|
|
|
+
|
|
|
+
|
|
|
+def gen_strategy_id(req_text, strategy_name):
|
|
|
+ return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def build_alias_map(cur):
|
|
|
+ """Build norm(name) -> canonical_id alias from current DB + MERGE_CLUSTERS + RENAMES."""
|
|
|
+ # Step A: member→canonical with transitive closure
|
|
|
+ member_to_canonical = {}
|
|
|
+ for canonical, members in MERGE_CLUSTERS.items():
|
|
|
+ for m in members:
|
|
|
+ member_to_canonical[m] = canonical
|
|
|
+
|
|
|
+ def final(cid, limit=10):
|
|
|
+ seen = set()
|
|
|
+ while cid in member_to_canonical and cid not in seen and limit > 0:
|
|
|
+ seen.add(cid)
|
|
|
+ cid = member_to_canonical[cid]
|
|
|
+ limit -= 1
|
|
|
+ return cid
|
|
|
+ for m in list(member_to_canonical.keys()):
|
|
|
+ member_to_canonical[m] = final(m)
|
|
|
+
|
|
|
+ alias = {}
|
|
|
+ # Current DB names
|
|
|
+ cur.execute('SELECT id, name FROM capability')
|
|
|
+ for r in cur.fetchall():
|
|
|
+ alias[norm(r['name'])] = r['id']
|
|
|
+
|
|
|
+ # RENAMES new names
|
|
|
+ for cid, (new_name, _) in RENAMES.items():
|
|
|
+ alias[norm(new_name)] = final(cid)
|
|
|
+
|
|
|
+ return alias
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def load_raw_cases(folder_path):
|
|
|
+ """Return list of case dicts; fallback to regex when json.load fails."""
|
|
|
+ raw_dir = folder_path / 'raw_cases'
|
|
|
+ all_cases = []
|
|
|
+ if not raw_dir.exists():
|
|
|
+ return all_cases
|
|
|
+ for cf in sorted(raw_dir.glob('*.json')):
|
|
|
+ platform = cf.stem.replace('case_', '')
|
|
|
+ try:
|
|
|
+ data = json.loads(cf.read_text(encoding='utf-8'))
|
|
|
+ cases = data.get('cases', []) if isinstance(data, dict) else data
|
|
|
+ if isinstance(cases, list):
|
|
|
+ for c in cases:
|
|
|
+ if isinstance(c, dict):
|
|
|
+ c.setdefault('platform', platform)
|
|
|
+ all_cases.append(c)
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ⚠️ {cf.name} parse fail ({e}); trying regex fallback', flush=True)
|
|
|
+ # regex fallback: anchor by source_url (titles too unreliable)
|
|
|
+ text = cf.read_text(encoding='utf-8')
|
|
|
+ urls = re.findall(r'"source_url"\s*:\s*"([^"]+)"', text)
|
|
|
+ ids = re.findall(r'"id"\s*:\s*"(case_[^"]+)"', text)
|
|
|
+ recovered_n = 0
|
|
|
+ for i, url in enumerate(urls):
|
|
|
+ case = {
|
|
|
+ 'id': ids[i] if i < len(ids) else f'{platform}_fallback_{i}',
|
|
|
+ 'title': '', # unreliable without proper JSON parse
|
|
|
+ 'platform': platform,
|
|
|
+ 'source_url': url,
|
|
|
+ }
|
|
|
+ all_cases.append(case)
|
|
|
+ recovered_n += 1
|
|
|
+ print(f' ⇒ recovered {recovered_n} {platform} cases via regex (titles skipped)',
|
|
|
+ flush=True)
|
|
|
+ return all_cases
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def cleanup_folder_data(cur, req_id, folder_key, stats):
|
|
|
+ """Remove old resources with folder tag + related junctions + old strategy for this req + req_cap."""
|
|
|
+ # resources tagged with this folder
|
|
|
+ cur.execute("SELECT id FROM resource WHERE metadata::jsonb->>'folder' = %s", (folder_key,))
|
|
|
+ old_res = [r['id'] for r in cur.fetchall()]
|
|
|
+ for rid in old_res:
|
|
|
+ cur.execute('DELETE FROM requirement_resource WHERE resource_id = %s', (rid,))
|
|
|
+ cur.execute('DELETE FROM strategy_resource WHERE resource_id = %s', (rid,))
|
|
|
+ cur.execute('DELETE FROM capability_resource WHERE resource_id = %s', (rid,))
|
|
|
+ cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
|
|
|
+ stats['deleted_resources'] = len(old_res)
|
|
|
+
|
|
|
+ # any remaining req_res junctions for this req (untagged orphans)
|
|
|
+ cur.execute('DELETE FROM requirement_resource WHERE requirement_id = %s', (req_id,))
|
|
|
+
|
|
|
+ # strategies linked to this req
|
|
|
+ cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id = %s', (req_id,))
|
|
|
+ old_strats = [r['strategy_id'] for r in cur.fetchall()]
|
|
|
+ for sid in old_strats:
|
|
|
+ cur.execute('DELETE FROM requirement_strategy WHERE strategy_id = %s', (sid,))
|
|
|
+ cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (sid,))
|
|
|
+ cur.execute('DELETE FROM strategy_resource WHERE strategy_id = %s', (sid,))
|
|
|
+ cur.execute('DELETE FROM strategy_knowledge WHERE strategy_id = %s', (sid,))
|
|
|
+ cur.execute('DELETE FROM strategy WHERE id = %s', (sid,))
|
|
|
+ stats['deleted_strategies'] = len(old_strats)
|
|
|
+
|
|
|
+ # req_cap for this req (will be rebuilt)
|
|
|
+ cur.execute('DELETE FROM requirement_capability WHERE requirement_id = %s', (req_id,))
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def ingest_folder(folder_path, cur, alias, stats):
|
|
|
+ folder_key = folder_path.name
|
|
|
+
|
|
|
+ # (1) requirement text — try blueprint, fallback to strategy
|
|
|
+ req_text = ''
|
|
|
+ try:
|
|
|
+ bp = json.loads((folder_path / 'blueprint.json').read_text(encoding='utf-8'))
|
|
|
+ req_text = bp.get('requirement', '')
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ⚠️ blueprint parse fail ({e}); trying strategy.json', flush=True)
|
|
|
+ if not req_text:
|
|
|
+ try:
|
|
|
+ sd = json.loads((folder_path / 'strategy.json').read_text(encoding='utf-8'))
|
|
|
+ req_text = sd.get('requirement', '')
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ❌ no requirement text available ({e})', flush=True)
|
|
|
+ return
|
|
|
+
|
|
|
+ cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
|
|
|
+ row = cur.fetchone()
|
|
|
+ if not row:
|
|
|
+ print(f' ❌ no matching requirement for {folder_key}', flush=True)
|
|
|
+ return
|
|
|
+ req_id = row['id']
|
|
|
+ print(f' → req_id={req_id}', flush=True)
|
|
|
+
|
|
|
+ # (2) cleanup
|
|
|
+ cleanup_folder_data(cur, req_id, folder_key, stats)
|
|
|
+ del_r, del_s = stats['deleted_resources'], stats['deleted_strategies']
|
|
|
+ print(f' cleaned: del_res={del_r}, del_strat={del_s}', flush=True)
|
|
|
+
|
|
|
+ # (3) resources
|
|
|
+ cases = load_raw_cases(folder_path)
|
|
|
+ resource_ids = []
|
|
|
+ for case in cases:
|
|
|
+ url = case.get('source_url') or case.get('url')
|
|
|
+ if not url:
|
|
|
+ continue
|
|
|
+ platform = case.get('platform') or 'unknown'
|
|
|
+ rid = gen_resource_id(platform, url)
|
|
|
+ title = (case.get('title') or '')[:200]
|
|
|
+ metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
|
|
|
+ likes = (metrics.get('likes') or 0) if metrics else 0
|
|
|
+
|
|
|
+ cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
|
+ (rid, title,
|
|
|
+ json.dumps(case, ensure_ascii=False)[:8000],
|
|
|
+ 'research_case',
|
|
|
+ json.dumps(case.get('images', []) or [], ensure_ascii=False),
|
|
|
+ json.dumps({'platform': platform, 'source_url': url,
|
|
|
+ 'metrics': metrics, 'folder': folder_key},
|
|
|
+ ensure_ascii=False),
|
|
|
+ -int(likes), DEDUP_VERSION))
|
|
|
+ resource_ids.append(rid)
|
|
|
+ stats['resources'] = len(resource_ids)
|
|
|
+
|
|
|
+ # (4) capabilities (from capabilities_extracted.json) — track ALL of them for req_cap superset
|
|
|
+ caps_path = folder_path / 'capabilities_extracted.json'
|
|
|
+ all_cap_ids_research = set() # for req_cap (A-plan research superset)
|
|
|
+ cap_key_to_id = {} # source_key -> resolved_id (for strat_cap resolution)
|
|
|
+ if caps_path.exists():
|
|
|
+ try:
|
|
|
+ caps_data = json.loads(caps_path.read_text(encoding='utf-8'))
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ⚠️ capabilities_extracted parse fail: {e}', flush=True)
|
|
|
+ caps_data = {'extracted_capabilities': []}
|
|
|
+ for cap in caps_data.get('extracted_capabilities', []):
|
|
|
+ name = (cap.get('name') or '').strip()
|
|
|
+ if not name:
|
|
|
+ continue
|
|
|
+ src_id = cap.get('id')
|
|
|
+ resolved = None
|
|
|
+ # (a) source id exists?
|
|
|
+ if src_id:
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,))
|
|
|
+ if cur.fetchone():
|
|
|
+ resolved = src_id
|
|
|
+ # (b) alias by name
|
|
|
+ if not resolved:
|
|
|
+ cand = alias.get(norm(name))
|
|
|
+ if cand:
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,))
|
|
|
+ if cur.fetchone():
|
|
|
+ resolved = cand
|
|
|
+ # (c) create new
|
|
|
+ if not resolved:
|
|
|
+ new_id = gen_cap_id(name)
|
|
|
+ cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
|
|
|
+ if not cur.fetchone():
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO capability (id, name, criterion, description, effects, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s)""",
|
|
|
+ (new_id, name, cap.get('criterion', '') or '',
|
|
|
+ cap.get('description', '') or '',
|
|
|
+ json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
|
|
|
+ DEDUP_VERSION))
|
|
|
+ alias[norm(name)] = new_id
|
|
|
+ stats['cap_new'] += 1
|
|
|
+ resolved = new_id
|
|
|
+ else:
|
|
|
+ # backfill criterion/effects if missing
|
|
|
+ cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,))
|
|
|
+ ex = cur.fetchone()
|
|
|
+ if ex:
|
|
|
+ if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'):
|
|
|
+ cur.execute('UPDATE capability SET criterion = %s WHERE id = %s',
|
|
|
+ (cap['criterion'], resolved))
|
|
|
+ cur_eff = ex.get('effects')
|
|
|
+ if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'):
|
|
|
+ cur.execute('UPDATE capability SET effects = %s WHERE id = %s',
|
|
|
+ (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved))
|
|
|
+ stats['cap_linked'] += 1
|
|
|
+ all_cap_ids_research.add(resolved)
|
|
|
+ cap_key_to_id[src_id or name] = resolved
|
|
|
+
|
|
|
+ # (5) strategy
|
|
|
+ strat_path = folder_path / 'strategy.json'
|
|
|
+ strat_id = None
|
|
|
+ strat_cap_ids = set()
|
|
|
+ if strat_path.exists():
|
|
|
+ try:
|
|
|
+ strat_data = json.loads(strat_path.read_text(encoding='utf-8'))
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ⚠️ strategy parse fail: {e}', flush=True)
|
|
|
+ strat_data = {'strategies': []}
|
|
|
+ selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None)
|
|
|
+ if not selected and strat_data.get('strategies'):
|
|
|
+ selected = strat_data['strategies'][0]
|
|
|
+ if selected:
|
|
|
+ strategy_name = selected.get('name') or f'Strategy-{folder_key}'
|
|
|
+ strat_id = gen_strategy_id(req_text, strategy_name)
|
|
|
+ now = int(time.time())
|
|
|
+ cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
|
|
|
+ cur.execute(
|
|
|
+ """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
|
+ (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000],
|
|
|
+ json.dumps(selected, ensure_ascii=False, indent=2),
|
|
|
+ 'draft', now, now, DEDUP_VERSION))
|
|
|
+ stats['strategies'] = 1
|
|
|
+ # workflow_outline cap resolution
|
|
|
+ wo = selected.get('workflow_outline') or []
|
|
|
+ if isinstance(wo, list):
|
|
|
+ for phase in wo:
|
|
|
+ if not isinstance(phase, dict):
|
|
|
+ continue
|
|
|
+ caps = phase.get('capabilities') or []
|
|
|
+ if not isinstance(caps, list):
|
|
|
+ continue
|
|
|
+ for cref in caps:
|
|
|
+ if not isinstance(cref, dict):
|
|
|
+ continue
|
|
|
+ key = cref.get('id') or cref.get('name', '')
|
|
|
+ resolved = cap_key_to_id.get(key) or alias.get(norm(cref.get('name', '')))
|
|
|
+ if resolved:
|
|
|
+ strat_cap_ids.add(resolved)
|
|
|
+
|
|
|
+ # (6) wire junctions
|
|
|
+ for rid in resource_ids:
|
|
|
+ cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
|
|
|
+ if strat_id:
|
|
|
+ cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
|
|
|
+ if strat_id:
|
|
|
+ cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
|
|
|
+ for cid in strat_cap_ids:
|
|
|
+ cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
|
|
|
+ VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
|
|
|
+
|
|
|
+ # req_cap: research superset (A plan)
|
|
|
+ for cid in all_cap_ids_research:
|
|
|
+ cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
|
|
|
+ # also include strat-only caps (in case some are in workflow_outline but not in extracted list)
|
|
|
+ for cid in strat_cap_ids:
|
|
|
+ cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
|
|
|
+ VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
|
|
|
+
|
|
|
+ stats['req_cap_wired'] = len(all_cap_ids_research | strat_cap_ids)
|
|
|
+ stats['strat_cap_wired'] = len(strat_cap_ids)
|
|
|
+ rc_n, sc_n = stats['req_cap_wired'], stats['strat_cap_wired']
|
|
|
+ s_n = 1 if strat_id else 0
|
|
|
+ print(f' ingested: res={len(resource_ids)}, strat={s_n}, req_cap={rc_n}, strat_cap={sc_n}', flush=True)
|
|
|
+
|
|
|
+
|
|
|
+# ═══════════════════════════════════════════════════════════
|
|
|
+def main():
|
|
|
+ s = PostgreSQLCapabilityStore()
|
|
|
+ cur = s._get_cursor()
|
|
|
+ try:
|
|
|
+ print('Building alias map...', flush=True)
|
|
|
+ alias = build_alias_map(cur)
|
|
|
+ print(f' alias entries: {len(alias)}', flush=True)
|
|
|
+
|
|
|
+ totals = {'deleted_resources': 0, 'deleted_strategies': 0,
|
|
|
+ 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
|
|
|
+ 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
|
|
|
+
|
|
|
+ for f in FOLDERS:
|
|
|
+ print(f'\n=== {f} ===', flush=True)
|
|
|
+ stats = {'deleted_resources': 0, 'deleted_strategies': 0,
|
|
|
+ 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
|
|
|
+ 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
|
|
|
+ try:
|
|
|
+ ingest_folder(RERUN_DIR / f, cur, alias, stats)
|
|
|
+ for k in totals:
|
|
|
+ totals[k] += stats.get(k, 0)
|
|
|
+ except Exception as e:
|
|
|
+ print(f' ❌ {type(e).__name__}: {e}', flush=True)
|
|
|
+ try:
|
|
|
+ cur.close()
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ cur = s._get_cursor()
|
|
|
+
|
|
|
+ print(f'\n{"="*50}\nTOTALS: {totals}', flush=True)
|
|
|
+ finally:
|
|
|
+ cur.close()
|
|
|
+ s.close()
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ main()
|