| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- #!/usr/bin/env python3
- """
- 处理 5 个重跑数据 folder:032/046/069/085/097。
- 策略(对每个 folder):
- 1. 定位 req_id(blueprint.json 解析失败时 fallback strategy.json)
- 2. 清理旧数据:
- - 删除 folder 标签为 F 的 resource 及其所有 junction
- - 删除 req 关联的 strategy + 其 junction
- - 删除 requirement_capability (req, *) 条目(后续重建全集)
- 3. 重新 ingest:
- - resources from raw_cases/(case_bili.json parse fail → 正则 fallback)
- - capabilities via alias(不存在则新建 howard_dedup)
- - strategy(is_selected 或第一条)
- - junctions:
- · req_res / strat_res: 按 resource 逐条写
- · strat_cap: workflow_outline 的 caps(relation_type='compose')
- · req_strat: 1 条
- · req_cap: capabilities_extracted.json 的所有 caps(A 方案:研究全集)
- 所有操作 autocommit=True;脚本幂等、允许断点重跑。
- """
- import hashlib
- import json
- import re
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS
- from knowhub.scripts.rename_merged_capabilities import RENAMES
- RERUN_DIR = Path('/Users/sunlit/Downloads/5')
- FOLDERS = ['032', '046', '069', '085', '097']
- DEDUP_VERSION = 'howard_dedup'
- def norm(s):
- return (s or '').strip().lower()
- def hash8(text):
- return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
- def hash12(text):
- return hashlib.sha256(text.encode('utf-8')).hexdigest()[:12]
- def gen_cap_id(name):
- return f'CAP-{hash8(norm(name))}'
- def gen_resource_id(platform, url):
- p = (platform or 'unknown').lower().strip()
- return f'resource/research/{p}/{hash12(url)}'
- def gen_strategy_id(req_text, strategy_name):
- return f'strategy-{hash8((req_text or "") + "|" + (strategy_name or ""))}'
- # ═══════════════════════════════════════════════════════════
- def build_alias_map(cur):
- """Build norm(name) -> canonical_id alias from current DB + MERGE_CLUSTERS + RENAMES."""
- # Step A: member→canonical with transitive closure
- member_to_canonical = {}
- for canonical, members in MERGE_CLUSTERS.items():
- for m in members:
- member_to_canonical[m] = canonical
- def final(cid, limit=10):
- seen = set()
- while cid in member_to_canonical and cid not in seen and limit > 0:
- seen.add(cid)
- cid = member_to_canonical[cid]
- limit -= 1
- return cid
- for m in list(member_to_canonical.keys()):
- member_to_canonical[m] = final(m)
- alias = {}
- # Current DB names
- cur.execute('SELECT id, name FROM capability')
- for r in cur.fetchall():
- alias[norm(r['name'])] = r['id']
- # RENAMES new names
- for cid, (new_name, _) in RENAMES.items():
- alias[norm(new_name)] = final(cid)
- return alias
- # ═══════════════════════════════════════════════════════════
- def load_raw_cases(folder_path):
- """Return list of case dicts; fallback to regex when json.load fails."""
- raw_dir = folder_path / 'raw_cases'
- all_cases = []
- if not raw_dir.exists():
- return all_cases
- for cf in sorted(raw_dir.glob('*.json')):
- platform = cf.stem.replace('case_', '')
- try:
- data = json.loads(cf.read_text(encoding='utf-8'))
- cases = data.get('cases', []) if isinstance(data, dict) else data
- if isinstance(cases, list):
- for c in cases:
- if isinstance(c, dict):
- c.setdefault('platform', platform)
- all_cases.append(c)
- continue
- except Exception as e:
- print(f' ⚠️ {cf.name} parse fail ({e}); trying regex fallback', flush=True)
- # regex fallback: anchor by source_url (titles too unreliable)
- text = cf.read_text(encoding='utf-8')
- urls = re.findall(r'"source_url"\s*:\s*"([^"]+)"', text)
- ids = re.findall(r'"id"\s*:\s*"(case_[^"]+)"', text)
- recovered_n = 0
- for i, url in enumerate(urls):
- case = {
- 'id': ids[i] if i < len(ids) else f'{platform}_fallback_{i}',
- 'title': '', # unreliable without proper JSON parse
- 'platform': platform,
- 'source_url': url,
- }
- all_cases.append(case)
- recovered_n += 1
- print(f' ⇒ recovered {recovered_n} {platform} cases via regex (titles skipped)',
- flush=True)
- return all_cases
- # ═══════════════════════════════════════════════════════════
- def cleanup_folder_data(cur, req_id, folder_key, stats):
- """Remove old resources with folder tag + related junctions + old strategy for this req + req_cap."""
- # resources tagged with this folder
- cur.execute("SELECT id FROM resource WHERE metadata::jsonb->>'folder' = %s", (folder_key,))
- old_res = [r['id'] for r in cur.fetchall()]
- for rid in old_res:
- cur.execute('DELETE FROM requirement_resource WHERE resource_id = %s', (rid,))
- cur.execute('DELETE FROM strategy_resource WHERE resource_id = %s', (rid,))
- cur.execute('DELETE FROM capability_resource WHERE resource_id = %s', (rid,))
- cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
- stats['deleted_resources'] = len(old_res)
- # any remaining req_res junctions for this req (untagged orphans)
- cur.execute('DELETE FROM requirement_resource WHERE requirement_id = %s', (req_id,))
- # strategies linked to this req
- cur.execute('SELECT strategy_id FROM requirement_strategy WHERE requirement_id = %s', (req_id,))
- old_strats = [r['strategy_id'] for r in cur.fetchall()]
- for sid in old_strats:
- cur.execute('DELETE FROM requirement_strategy WHERE strategy_id = %s', (sid,))
- cur.execute('DELETE FROM strategy_capability WHERE strategy_id = %s', (sid,))
- cur.execute('DELETE FROM strategy_resource WHERE strategy_id = %s', (sid,))
- cur.execute('DELETE FROM strategy_knowledge WHERE strategy_id = %s', (sid,))
- cur.execute('DELETE FROM strategy WHERE id = %s', (sid,))
- stats['deleted_strategies'] = len(old_strats)
- # req_cap for this req (will be rebuilt)
- cur.execute('DELETE FROM requirement_capability WHERE requirement_id = %s', (req_id,))
- # ═══════════════════════════════════════════════════════════
- def ingest_folder(folder_path, cur, alias, stats):
- folder_key = folder_path.name
- # (1) requirement text — try blueprint, fallback to strategy
- req_text = ''
- try:
- bp = json.loads((folder_path / 'blueprint.json').read_text(encoding='utf-8'))
- req_text = bp.get('requirement', '')
- except Exception as e:
- print(f' ⚠️ blueprint parse fail ({e}); trying strategy.json', flush=True)
- if not req_text:
- try:
- sd = json.loads((folder_path / 'strategy.json').read_text(encoding='utf-8'))
- req_text = sd.get('requirement', '')
- except Exception as e:
- print(f' ❌ no requirement text available ({e})', flush=True)
- return
- cur.execute('SELECT id FROM requirement WHERE description = %s LIMIT 1', (req_text,))
- row = cur.fetchone()
- if not row:
- print(f' ❌ no matching requirement for {folder_key}', flush=True)
- return
- req_id = row['id']
- print(f' → req_id={req_id}', flush=True)
- # (2) cleanup
- cleanup_folder_data(cur, req_id, folder_key, stats)
- del_r, del_s = stats['deleted_resources'], stats['deleted_strategies']
- print(f' cleaned: del_res={del_r}, del_strat={del_s}', flush=True)
- # (3) resources
- cases = load_raw_cases(folder_path)
- resource_ids = []
- for case in cases:
- url = case.get('source_url') or case.get('url')
- if not url:
- continue
- platform = case.get('platform') or 'unknown'
- rid = gen_resource_id(platform, url)
- title = (case.get('title') or '')[:200]
- metrics = case.get('metrics') if isinstance(case.get('metrics'), dict) else {}
- likes = (metrics.get('likes') or 0) if metrics else 0
- cur.execute('DELETE FROM resource WHERE id = %s', (rid,))
- cur.execute(
- """INSERT INTO resource (id, title, body, content_type, images, metadata, sort_order, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (rid, title,
- json.dumps(case, ensure_ascii=False)[:8000],
- 'research_case',
- json.dumps(case.get('images', []) or [], ensure_ascii=False),
- json.dumps({'platform': platform, 'source_url': url,
- 'metrics': metrics, 'folder': folder_key},
- ensure_ascii=False),
- -int(likes), DEDUP_VERSION))
- resource_ids.append(rid)
- stats['resources'] = len(resource_ids)
- # (4) capabilities (from capabilities_extracted.json) — track ALL of them for req_cap superset
- caps_path = folder_path / 'capabilities_extracted.json'
- all_cap_ids_research = set() # for req_cap (A-plan research superset)
- cap_key_to_id = {} # source_key -> resolved_id (for strat_cap resolution)
- if caps_path.exists():
- try:
- caps_data = json.loads(caps_path.read_text(encoding='utf-8'))
- except Exception as e:
- print(f' ⚠️ capabilities_extracted parse fail: {e}', flush=True)
- caps_data = {'extracted_capabilities': []}
- for cap in caps_data.get('extracted_capabilities', []):
- name = (cap.get('name') or '').strip()
- if not name:
- continue
- src_id = cap.get('id')
- resolved = None
- # (a) source id exists?
- if src_id:
- cur.execute('SELECT 1 FROM capability WHERE id = %s', (src_id,))
- if cur.fetchone():
- resolved = src_id
- # (b) alias by name
- if not resolved:
- cand = alias.get(norm(name))
- if cand:
- cur.execute('SELECT 1 FROM capability WHERE id = %s', (cand,))
- if cur.fetchone():
- resolved = cand
- # (c) create new
- if not resolved:
- new_id = gen_cap_id(name)
- cur.execute('SELECT 1 FROM capability WHERE id = %s', (new_id,))
- if not cur.fetchone():
- cur.execute(
- """INSERT INTO capability (id, name, criterion, description, effects, version)
- VALUES (%s, %s, %s, %s, %s, %s)""",
- (new_id, name, cap.get('criterion', '') or '',
- cap.get('description', '') or '',
- json.dumps(cap.get('effects', []) or [], ensure_ascii=False, default=str),
- DEDUP_VERSION))
- alias[norm(name)] = new_id
- stats['cap_new'] += 1
- resolved = new_id
- else:
- # backfill criterion/effects if missing
- cur.execute('SELECT criterion, effects FROM capability WHERE id = %s', (resolved,))
- ex = cur.fetchone()
- if ex:
- if (not (ex.get('criterion') or '').strip()) and cap.get('criterion'):
- cur.execute('UPDATE capability SET criterion = %s WHERE id = %s',
- (cap['criterion'], resolved))
- cur_eff = ex.get('effects')
- if (not cur_eff or cur_eff in ([], '[]')) and cap.get('effects'):
- cur.execute('UPDATE capability SET effects = %s WHERE id = %s',
- (json.dumps(cap['effects'], ensure_ascii=False, default=str), resolved))
- stats['cap_linked'] += 1
- all_cap_ids_research.add(resolved)
- cap_key_to_id[src_id or name] = resolved
- # (5) strategy
- strat_path = folder_path / 'strategy.json'
- strat_id = None
- strat_cap_ids = set()
- if strat_path.exists():
- try:
- strat_data = json.loads(strat_path.read_text(encoding='utf-8'))
- except Exception as e:
- print(f' ⚠️ strategy parse fail: {e}', flush=True)
- strat_data = {'strategies': []}
- selected = next((s for s in strat_data.get('strategies', []) if s.get('is_selected')), None)
- if not selected and strat_data.get('strategies'):
- selected = strat_data['strategies'][0]
- if selected:
- strategy_name = selected.get('name') or f'Strategy-{folder_key}'
- strat_id = gen_strategy_id(req_text, strategy_name)
- now = int(time.time())
- cur.execute('DELETE FROM strategy WHERE id = %s', (strat_id,))
- cur.execute(
- """INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (strat_id, strategy_name, (selected.get('reasoning') or '')[:2000],
- json.dumps(selected, ensure_ascii=False, indent=2),
- 'draft', now, now, DEDUP_VERSION))
- stats['strategies'] = 1
- # workflow_outline cap resolution
- wo = selected.get('workflow_outline') or []
- if isinstance(wo, list):
- for phase in wo:
- if not isinstance(phase, dict):
- continue
- caps = phase.get('capabilities') or []
- if not isinstance(caps, list):
- continue
- for cref in caps:
- if not isinstance(cref, dict):
- continue
- key = cref.get('id') or cref.get('name', '')
- resolved = cap_key_to_id.get(key) or alias.get(norm(cref.get('name', '')))
- if resolved:
- strat_cap_ids.add(resolved)
- # (6) wire junctions
- for rid in resource_ids:
- cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, rid))
- if strat_id:
- cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (strat_id, rid))
- if strat_id:
- cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, strat_id))
- for cid in strat_cap_ids:
- cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type)
- VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (strat_id, cid))
- # req_cap: research superset (A plan)
- for cid in all_cap_ids_research:
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
- # also include strat-only caps (in case some are in workflow_outline but not in extracted list)
- for cid in strat_cap_ids:
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (req_id, cid))
- stats['req_cap_wired'] = len(all_cap_ids_research | strat_cap_ids)
- stats['strat_cap_wired'] = len(strat_cap_ids)
- rc_n, sc_n = stats['req_cap_wired'], stats['strat_cap_wired']
- s_n = 1 if strat_id else 0
- print(f' ingested: res={len(resource_ids)}, strat={s_n}, req_cap={rc_n}, strat_cap={sc_n}', flush=True)
- # ═══════════════════════════════════════════════════════════
- def main():
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- print('Building alias map...', flush=True)
- alias = build_alias_map(cur)
- print(f' alias entries: {len(alias)}', flush=True)
- totals = {'deleted_resources': 0, 'deleted_strategies': 0,
- 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
- 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
- for f in FOLDERS:
- print(f'\n=== {f} ===', flush=True)
- stats = {'deleted_resources': 0, 'deleted_strategies': 0,
- 'resources': 0, 'cap_new': 0, 'cap_linked': 0,
- 'strategies': 0, 'req_cap_wired': 0, 'strat_cap_wired': 0}
- try:
- ingest_folder(RERUN_DIR / f, cur, alias, stats)
- for k in totals:
- totals[k] += stats.get(k, 0)
- except Exception as e:
- print(f' ❌ {type(e).__name__}: {e}', flush=True)
- try:
- cur.close()
- except Exception:
- pass
- cur = s._get_cursor()
- print(f'\n{"="*50}\nTOTALS: {totals}', flush=True)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|