| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env python3
- """
- 补丁:为 tao_dev 的 capability 从源 JSON 的 implements 字段构造 capability_tool junction。
- 策略(与 dev_abstract 现有数据一致):
- - 把 implements 的 key 原样写入 tool_id(可能是路径 "tools/workflow/comfyui"、
- 下划线名 "ji_meng_add_task"、或人类可读名 "ComfyUI")
- - value(描述字符串)写入 capability_tool.description
- - 不做任何 canonical 映射,不改 tool 表
- 每 folder 的 cap 用 {orig_req_id}::{raw_cap_id or 'NEW-<idx>'} 重建 mapping。
- """
- import json, sys, time
- from pathlib import Path
- import psycopg2.extras
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- OUTPUT = Path('/Users/sunlit/Downloads/output-new')
- def main():
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- cur.execute("SET statement_timeout = '120s'")
- cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction'
- AND pid!=pg_backend_pid() AND datname=current_database()""")
- for r in cur.fetchall():
- cur.execute('SELECT pg_terminate_backend(%s)', (r['pid'],))
- # v0 req 文本 → orig_req_id 映射
- cur.execute("SELECT id, description FROM requirement WHERE version='v0'")
- req_map = {r['description']: r['id'] for r in cur.fetchall()}
- print(f'v0 req 映射: {len(req_map)}', flush=True)
- # 建立 tao_dev cap 集合(快速校验)
- cur.execute("SELECT id FROM capability WHERE version='tao_dev'")
- valid_caps = {r['id'] for r in cur.fetchall()}
- print(f'tao_dev capability 现有: {len(valid_caps)}', flush=True)
- folders = sorted([f for f in OUTPUT.iterdir() if f.is_dir()])
- stats = {'inserted': 0, 'skipped_no_cap': 0, 'total_implements': 0}
- for folder in folders:
- t0 = time.time()
- cd = json.loads((folder / 'capabilities_extracted.json').read_text(encoding='utf-8'))
- req_text = cd.get('requirement')
- orig_req = req_map.get(req_text)
- if not orig_req:
- print(f' [{folder.name}] 无法匹配 req', flush=True); continue
- for idx, c in enumerate(cd.get('extracted_capabilities', [])):
- if not isinstance(c, dict): continue
- raw_id = (c.get('id') or '').strip()
- cap_id = f'{orig_req}::{raw_id}' if raw_id else f'{orig_req}::NEW-{idx}'
- if cap_id not in valid_caps:
- stats['skipped_no_cap'] += 1; continue
- implements = c.get('implements') or {}
- if not isinstance(implements, dict): continue
- for tool_key, desc in implements.items():
- stats['total_implements'] += 1
- cur.execute("""INSERT INTO capability_tool (capability_id, tool_id, description)
- VALUES (%s,%s,%s) ON CONFLICT DO NOTHING""",
- (cap_id, tool_key, str(desc) if desc is not None else ''))
- stats['inserted'] += cur.rowcount or 0
- print(f' [{folder.name}] {orig_req}: {time.time()-t0:.1f}s', flush=True)
- print('\n=== cap_tool 补丁统计 ===', flush=True)
- for k, v in stats.items():
- print(f' {k}: {v}', flush=True)
- cur.execute("""SELECT COUNT(*) c FROM capability_tool ct
- JOIN capability c ON c.id=ct.capability_id
- WHERE c.version='tao_dev'""")
- print(f' tao_dev cap_tool 总计: {cur.fetchone()["c"]}', flush=True)
- # 独特 tool_key 数量
- cur.execute("""SELECT COUNT(DISTINCT ct.tool_id) c FROM capability_tool ct
- JOIN capability c ON c.id=ct.capability_id
- WHERE c.version='tao_dev'""")
- print(f' tao_dev 独特 tool_key: {cur.fetchone()["c"]}', flush=True)
- finally:
- cur.close(); s.close()
- if __name__ == '__main__':
- main()
|