#!/usr/bin/env python3 """ 把 backfill_req_cap.py 没匹配到 canonical 的 cap 全部 dump 出来, 附带 folder / is_new / description / implements,供人工判断。 """ import hashlib import json import re import sys from collections import defaultdict from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.scripts.merge_capabilities import MERGE_CLUSTERS from knowhub.scripts.rename_merged_capabilities import RENAMES OUTPUT_DIR = Path('/Users/sunlit/Downloads/output 2') RERUN_DIR = Path('/Users/sunlit/Downloads/5') RERUN_FOLDERS = {'032', '046', '069', '085', '097'} def norm(s): return (s or '').strip().lower() def build_alias(cur): m2c = {} for canonical, members in MERGE_CLUSTERS.items(): for m in members: m2c[m] = canonical def final(cid, limit=10): seen = set() while cid in m2c and cid not in seen and limit > 0: seen.add(cid); cid = m2c[cid]; limit -= 1 return cid for m in list(m2c.keys()): m2c[m] = final(m) alias = {} cur.execute('SELECT id, name FROM capability') for r in cur.fetchall(): alias[norm(r['name'])] = r['id'] for cid, (new_name, _) in RENAMES.items(): alias[norm(new_name)] = final(cid) return alias def extract_caps(folder): fp = folder / 'capabilities_extracted.json' if not fp.exists(): return [] text = fp.read_text(encoding='utf-8') try: data = json.loads(text) except Exception: names = re.findall(r'"name"\s*:\s*"([^"]+)"', text) ids = re.findall(r'"id"\s*:\s*(?:"([^"]+)"|null)', text) # is_new on same cap boundary — harder, skip for regex case return [{'id': ids[i] if i < len(ids) else None, 'name': n, 'is_new': None, 'description': '', 'implements': {}} for i, n in enumerate(names)] ec = data.get('extracted_capabilities', data.get('capabilities', [])) out = [] for c in ec: if not isinstance(c, dict): continue out.append({ 'id': c.get('id') or c.get('cap_id') or c.get('capability_id'), 'name': c.get('name') or c.get('capability_name', ''), 'is_new': c.get('is_new'), 'description': c.get('description', '') or c.get('why_needed', '') or c.get('relevance_reason', ''), 'implements': c.get('implements') or c.get('suggested_tools', []), }) return out def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: alias = build_alias(cur) # Group unresolved by normalized name (to see duplicates across folders) by_name = defaultdict(lambda: {'folders': [], 'desc': '', 'impl': '', 'is_new_votes': []}) folders = [] for d in sorted(OUTPUT_DIR.iterdir()): if not d.is_dir(): continue if d.name in RERUN_FOLDERS: folders.append(RERUN_DIR / d.name) else: folders.append(d) for folder in folders: caps = extract_caps(folder) for cap in caps: cid = cap.get('id'); name = cap.get('name', '') if not name: continue resolved = None if cid: cur.execute('SELECT 1 FROM capability WHERE id=%s', (cid,)) if cur.fetchone(): resolved = cid if not resolved: cand = alias.get(norm(name)) if cand: cur.execute('SELECT 1 FROM capability WHERE id=%s', (cand,)) if cur.fetchone(): resolved = cand if resolved: continue key = norm(name) by_name[key]['name'] = name by_name[key]['folders'].append(folder.name) if not by_name[key]['desc'] and cap.get('description'): by_name[key]['desc'] = cap['description'][:300] if not by_name[key]['impl'] and cap.get('implements'): by_name[key]['impl'] = str(cap['implements'])[:200] by_name[key]['is_new_votes'].append(cap.get('is_new')) # Sort by frequency (most common first) sorted_list = sorted(by_name.items(), key=lambda x: -len(x[1]['folders'])) total_occ = sum(len(v['folders']) for v in by_name.values()) print(f'UNIQUE UNRESOLVED CAPS: {len(sorted_list)}', flush=True) print(f'TOTAL OCCURRENCES: {total_occ}', flush=True) print() # Write to file for easier review out_path = Path('/tmp/unresolved_caps.md') with out_path.open('w') as f: f.write(f'# Unresolved Caps ({len(sorted_list)} unique)\n\n') for key, v in sorted_list: f.write(f'## {v["name"]}\n') f.write(f'- folders ({len(v["folders"])}): {v["folders"]}\n') f.write(f'- is_new votes: {v["is_new_votes"]}\n') if v['desc']: f.write(f'- desc: {v["desc"]}\n') if v['impl']: f.write(f'- impl: {v["impl"]}\n') f.write('\n') print(f'Written: {out_path}', flush=True) # print top 30 to terminal for i, (key, v) in enumerate(sorted_list[:30]): n_fold = len(v['folders']); nm = v['name']; ds = v['desc'][:120] if v['desc'] else '' print(f'{i+1:3d}. [{n_fold}x] {nm}', flush=True) if ds: print(f' desc: {ds}', flush=True) finally: cur.close(); s.close() if __name__ == '__main__': main()