| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- #!/usr/bin/env python3
- """
- 迁移 SQLite resources 表中的 tool 数据到 PostgreSQL tool_table
- """
- import os
- import json
- import sqlite3
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- from datetime import datetime
- load_dotenv()
- def migrate_tools():
- # 连接 SQLite
- sqlite_conn = sqlite3.connect('/root/Agent/knowhub/knowhub.db')
- sqlite_conn.row_factory = sqlite3.Row
- sqlite_cur = sqlite_conn.cursor()
- # 连接 PostgreSQL
- pg_conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME')
- )
- pg_conn.autocommit = False
- pg_cur = pg_conn.cursor(cursor_factory=RealDictCursor)
- # 读取所有 tool 类型数据
- sqlite_cur.execute("SELECT * FROM resources WHERE content_type='tool';")
- tools = sqlite_cur.fetchall()
- print(f"找到 {len(tools)} 条 tool 数据")
- success = 0
- failed = 0
- for tool in tools:
- try:
- metadata = json.loads(tool['metadata']) if tool['metadata'] else {}
- # 转换时间戳
- updated_time = None
- if tool['updated_at']:
- dt = datetime.fromisoformat(tool['updated_at'].replace('+00:00', ''))
- updated_time = int(dt.timestamp())
- # 插入数据
- pg_cur.execute("""
- INSERT INTO tool_table (
- id, name, version, introduction, tutorial, input, output,
- updated_time, status, knowledge, case_knowledge, process_knowledge
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (id) DO UPDATE SET
- name = EXCLUDED.name,
- version = EXCLUDED.version,
- introduction = EXCLUDED.introduction,
- tutorial = EXCLUDED.tutorial,
- input = EXCLUDED.input,
- output = EXCLUDED.output,
- updated_time = EXCLUDED.updated_time,
- status = EXCLUDED.status,
- knowledge = EXCLUDED.knowledge
- """, (
- tool['id'],
- metadata.get('tool_name'),
- metadata.get('version'),
- metadata.get('description'),
- metadata.get('usage'),
- json.dumps(metadata.get('input')) if metadata.get('input') else None,
- json.dumps(metadata.get('output')) if metadata.get('output') else None,
- updated_time,
- metadata.get('status', '未接入'),
- json.dumps(metadata.get('knowledge_ids', [])),
- json.dumps([]),
- json.dumps([])
- ))
- success += 1
- except Exception as e:
- print(f"❌ 迁移失败 {tool['id']}: {e}")
- failed += 1
- pg_conn.commit()
- print(f"\n✅ 迁移完成: 成功 {success} 条, 失败 {failed} 条")
- sqlite_conn.close()
- pg_conn.close()
- if __name__ == "__main__":
- migrate_tools()
|