#!/usr/bin/env python3 """ 补丁:为 tao_dev 的 capability 从源 JSON 的 implements 字段构造 capability_tool junction。 策略(与 dev_abstract 现有数据一致): - 把 implements 的 key 原样写入 tool_id(可能是路径 "tools/workflow/comfyui"、 下划线名 "ji_meng_add_task"、或人类可读名 "ComfyUI") - value(描述字符串)写入 capability_tool.description - 不做任何 canonical 映射,不改 tool 表 每 folder 的 cap 用 {orig_req_id}::{raw_cap_id or 'NEW-'} 重建 mapping。 """ import json, sys, time from pathlib import Path import psycopg2.extras sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore OUTPUT = Path('/Users/sunlit/Downloads/output-new') def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: cur.execute("SET statement_timeout = '120s'") cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid!=pg_backend_pid() AND datname=current_database()""") for r in cur.fetchall(): cur.execute('SELECT pg_terminate_backend(%s)', (r['pid'],)) # v0 req 文本 → orig_req_id 映射 cur.execute("SELECT id, description FROM requirement WHERE version='v0'") req_map = {r['description']: r['id'] for r in cur.fetchall()} print(f'v0 req 映射: {len(req_map)}', flush=True) # 建立 tao_dev cap 集合(快速校验) cur.execute("SELECT id FROM capability WHERE version='tao_dev'") valid_caps = {r['id'] for r in cur.fetchall()} print(f'tao_dev capability 现有: {len(valid_caps)}', flush=True) folders = sorted([f for f in OUTPUT.iterdir() if f.is_dir()]) stats = {'inserted': 0, 'skipped_no_cap': 0, 'total_implements': 0} for folder in folders: t0 = time.time() cd = json.loads((folder / 'capabilities_extracted.json').read_text(encoding='utf-8')) req_text = cd.get('requirement') orig_req = req_map.get(req_text) if not orig_req: print(f' [{folder.name}] 无法匹配 req', flush=True); continue for idx, c in enumerate(cd.get('extracted_capabilities', [])): if not isinstance(c, dict): continue raw_id = (c.get('id') or '').strip() cap_id = f'{orig_req}::{raw_id}' if raw_id else f'{orig_req}::NEW-{idx}' if cap_id not in valid_caps: stats['skipped_no_cap'] += 1; continue implements = c.get('implements') or {} if not isinstance(implements, dict): continue for tool_key, desc in implements.items(): stats['total_implements'] += 1 cur.execute("""INSERT INTO capability_tool (capability_id, tool_id, description) VALUES (%s,%s,%s) ON CONFLICT DO NOTHING""", (cap_id, tool_key, str(desc) if desc is not None else '')) stats['inserted'] += cur.rowcount or 0 print(f' [{folder.name}] {orig_req}: {time.time()-t0:.1f}s', flush=True) print('\n=== cap_tool 补丁统计 ===', flush=True) for k, v in stats.items(): print(f' {k}: {v}', flush=True) cur.execute("""SELECT COUNT(*) c FROM capability_tool ct JOIN capability c ON c.id=ct.capability_id WHERE c.version='tao_dev'""") print(f' tao_dev cap_tool 总计: {cur.fetchone()["c"]}', flush=True) # 独特 tool_key 数量 cur.execute("""SELECT COUNT(DISTINCT ct.tool_id) c FROM capability_tool ct JOIN capability c ON c.id=ct.capability_id WHERE c.version='tao_dev'""") print(f' tao_dev 独特 tool_key: {cur.fetchone()["c"]}', flush=True) finally: cur.close(); s.close() if __name__ == '__main__': main()