| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- #!/usr/bin/env python3
- """
- 填充 atomic_capability 和 requirement_table 的初始数据
- 数据来源:
- - atomic_capabilities.md -> atomic_capability
- - requirements_sorted.json -> requirement_table
- """
- import os, sys, json
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- _dir = os.path.dirname(os.path.abspath(__file__))
- _root = os.path.normpath(os.path.join(_dir, '..', '..'))
- load_dotenv(os.path.join(_root, '.env'))
- def get_conn():
- return psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME'),
- connect_timeout=10
- )
- # ── 解析 atomic_capabilities.md ──
- def parse_caps(path):
- caps = []
- cur = None
- with open(path, 'r', encoding='utf-8') as f:
- for line in f:
- line = line.rstrip()
- if line.startswith('### CAP-'):
- if cur:
- caps.append(cur)
- parts = line.split(':', 1)
- cap_id = parts[0].replace('### ', '').strip()
- cap_name = parts[1].strip() if len(parts) > 1 else ''
- cur = {'id': cap_id, 'name': cap_name, 'criterion': '', 'description': '', 'implements': {}, 'tools': [], 'source_knowledge': [], 'requirements': []}
- elif cur:
- if line.startswith('- **'):
- val = line.split(':', 1)[1].strip() if ':' in line else ''
- if '功能描述' in line:
- cur['description'] = val
- elif '判定标准' in line:
- cur['criterion'] = val
- elif line.strip().startswith('- ComfyUI') or line.strip().startswith('- FLUX') or \
- line.strip().startswith('- Midjourney') or line.strip().startswith('- Nano Banana') or \
- line.strip().startswith('- Seedream'):
- t = line.strip().lstrip('- ')
- name = t.split(':')[0].strip()
- desc = t.split(':', 1)[1].strip() if ':' in t else ''
- cur['implements'][name] = desc
- if cur:
- caps.append(cur)
- return caps
- # ── 解析 requirements_sorted.json ──
- def parse_reqs(path):
- with open(path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- out = []
- for r in data.get('requirements', []):
- nodes = [{'node_name': n, 'posts': r.get('source_posts', [])} for n in r.get('source_nodes', [])]
- status = '已满足' if r.get('match_status') == '完全满足' else '未满足'
- parts = []
- if r.get('capability_combination'): parts.append(r['capability_combination'])
- if r.get('research_note'): parts.append(r['research_note'])
- out.append({
- 'id': r['requirement_id'],
- 'description': r['requirement_text'],
- 'atomics': r.get('matched_capabilities', []),
- 'source_nodes': nodes,
- 'status': status,
- 'match_result': '\n'.join(parts),
- })
- return out
- # ── 主流程 ──
- def main():
- conn = get_conn()
- conn.autocommit = True
- cur = conn.cursor(cursor_factory=RealDictCursor)
- print('Connected.')
- # 1. 填充 atomic_capability
- md = os.path.join(_root, 'examples', 'tool_research', 'atomic_capabilities.md')
- print(f'\n--- atomic_capability ---')
- if not os.path.exists(md):
- print(f'File not found: {md}')
- else:
- caps = parse_caps(md)
- ok = 0
- for c in caps:
- try:
- cur.execute("""
- INSERT INTO atomic_capability (id, name, criterion, description, requirements, implements, tools, source_knowledge)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
- ON CONFLICT (id) DO UPDATE SET
- name=EXCLUDED.name, criterion=EXCLUDED.criterion, description=EXCLUDED.description, implements=EXCLUDED.implements
- """, (c['id'], c['name'], c['criterion'], c['description'],
- json.dumps(c['requirements']), json.dumps(c['implements']),
- json.dumps(c['tools']), json.dumps(c['source_knowledge'])))
- ok += 1
- except Exception as e:
- print(f' FAIL {c["id"]}: {e}')
- print(f'Inserted/updated {ok}/{len(caps)} capabilities.')
- # 2. 填充 requirement_table
- jf = os.path.join(_root, 'examples', 'tool_research', 'requirements_sorted.json')
- print(f'\n--- requirement_table ---')
- if not os.path.exists(jf):
- print(f'File not found: {jf}')
- else:
- reqs = parse_reqs(jf)
- ok = 0
- for r in reqs:
- try:
- cur.execute("""
- INSERT INTO requirement_table (id, description, atomics, source_nodes, status, match_result)
- VALUES (%s,%s,%s,%s,%s,%s)
- ON CONFLICT (id) DO UPDATE SET
- description=EXCLUDED.description, atomics=EXCLUDED.atomics,
- source_nodes=EXCLUDED.source_nodes, status=EXCLUDED.status, match_result=EXCLUDED.match_result
- """, (r['id'], r['description'], json.dumps(r['atomics']),
- json.dumps(r['source_nodes']), r['status'], r['match_result']))
- ok += 1
- except Exception as e:
- print(f' FAIL {r["id"]}: {e}')
- print(f'Inserted/updated {ok}/{len(reqs)} requirements.')
- # 3. 验证
- print('\n--- verify ---')
- for t in ['atomic_capability', 'requirement_table']:
- cur.execute(f'SELECT COUNT(*) as cnt FROM {t}')
- print(f'{t}: {cur.fetchone()["cnt"]} rows')
- cur.close()
- conn.close()
- print('\nDone.')
- if __name__ == '__main__':
- main()
|