#!/usr/bin/env python3 """ 填充 atomic_capability 和 requirement_table 的初始数据 数据来源: - atomic_capabilities.md -> atomic_capability - requirements_sorted.json -> requirement_table """ import os, sys, json import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv _dir = os.path.dirname(os.path.abspath(__file__)) _root = os.path.normpath(os.path.join(_dir, '..', '..')) load_dotenv(os.path.join(_root, '.env')) def get_conn(): return psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=os.getenv('KNOWHUB_DB_NAME'), connect_timeout=10 ) # ── 解析 atomic_capabilities.md ── def parse_caps(path): caps = [] cur = None with open(path, 'r', encoding='utf-8') as f: for line in f: line = line.rstrip() if line.startswith('### CAP-'): if cur: caps.append(cur) parts = line.split(':', 1) cap_id = parts[0].replace('### ', '').strip() cap_name = parts[1].strip() if len(parts) > 1 else '' cur = {'id': cap_id, 'name': cap_name, 'criterion': '', 'description': '', 'implements': {}, 'tools': [], 'source_knowledge': [], 'requirements': []} elif cur: if line.startswith('- **'): val = line.split(':', 1)[1].strip() if ':' in line else '' if '功能描述' in line: cur['description'] = val elif '判定标准' in line: cur['criterion'] = val elif line.strip().startswith('- ComfyUI') or line.strip().startswith('- FLUX') or \ line.strip().startswith('- Midjourney') or line.strip().startswith('- Nano Banana') or \ line.strip().startswith('- Seedream'): t = line.strip().lstrip('- ') name = t.split(':')[0].strip() desc = t.split(':', 1)[1].strip() if ':' in t else '' cur['implements'][name] = desc if cur: caps.append(cur) return caps # ── 解析 requirements_sorted.json ── def parse_reqs(path): with open(path, 'r', encoding='utf-8') as f: data = json.load(f) out = [] for r in data.get('requirements', []): nodes = [{'node_name': n, 'posts': r.get('source_posts', [])} for n in r.get('source_nodes', [])] status = '已满足' if r.get('match_status') == '完全满足' else '未满足' parts = [] if r.get('capability_combination'): parts.append(r['capability_combination']) if r.get('research_note'): parts.append(r['research_note']) out.append({ 'id': r['requirement_id'], 'description': r['requirement_text'], 'atomics': r.get('matched_capabilities', []), 'source_nodes': nodes, 'status': status, 'match_result': '\n'.join(parts), }) return out # ── 主流程 ── def main(): conn = get_conn() conn.autocommit = True cur = conn.cursor(cursor_factory=RealDictCursor) print('Connected.') # 1. 填充 atomic_capability md = os.path.join(_root, 'examples', 'tool_research', 'atomic_capabilities.md') print(f'\n--- atomic_capability ---') if not os.path.exists(md): print(f'File not found: {md}') else: caps = parse_caps(md) ok = 0 for c in caps: try: cur.execute(""" INSERT INTO atomic_capability (id, name, criterion, description, requirements, implements, tools, source_knowledge) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET name=EXCLUDED.name, criterion=EXCLUDED.criterion, description=EXCLUDED.description, implements=EXCLUDED.implements """, (c['id'], c['name'], c['criterion'], c['description'], json.dumps(c['requirements']), json.dumps(c['implements']), json.dumps(c['tools']), json.dumps(c['source_knowledge']))) ok += 1 except Exception as e: print(f' FAIL {c["id"]}: {e}') print(f'Inserted/updated {ok}/{len(caps)} capabilities.') # 2. 填充 requirement_table jf = os.path.join(_root, 'examples', 'tool_research', 'requirements_sorted.json') print(f'\n--- requirement_table ---') if not os.path.exists(jf): print(f'File not found: {jf}') else: reqs = parse_reqs(jf) ok = 0 for r in reqs: try: cur.execute(""" INSERT INTO requirement_table (id, description, atomics, source_nodes, status, match_result) VALUES (%s,%s,%s,%s,%s,%s) ON CONFLICT (id) DO UPDATE SET description=EXCLUDED.description, atomics=EXCLUDED.atomics, source_nodes=EXCLUDED.source_nodes, status=EXCLUDED.status, match_result=EXCLUDED.match_result """, (r['id'], r['description'], json.dumps(r['atomics']), json.dumps(r['source_nodes']), r['status'], r['match_result'])) ok += 1 except Exception as e: print(f' FAIL {r["id"]}: {e}') print(f'Inserted/updated {ok}/{len(reqs)} requirements.') # 3. 验证 print('\n--- verify ---') for t in ['atomic_capability', 'requirement_table']: cur.execute(f'SELECT COUNT(*) as cnt FROM {t}') print(f'{t}: {cur.fetchone()["cnt"]} rows') cur.close() conn.close() print('\nDone.') if __name__ == '__main__': main()