| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- #!/usr/bin/env python
- """
- 知识库迁移脚本: SQLite -> Milvus
- 从旧的 SQLite 数据库迁移知识数据到新的 Milvus 向量数据库
- """
- import sys
- import json
- import sqlite3
- import asyncio
- from pathlib import Path
- from datetime import datetime
- # 添加项目路径
- sys.path.insert(0, str(Path(__file__).parent))
- from knowhub.vector_store import MilvusStore
- from knowhub.embeddings import get_embeddings_batch
- async def migrate_knowledge():
- """迁移知识数据"""
- # 源数据库路径
- source_db = Path("/root/knowhub_backup_20260309_204451.db")
- if not source_db.exists():
- print(f"❌ 源数据库不存在: {source_db}")
- return
- # 目标 Milvus 存储
- milvus_data_dir = Path(__file__).parent / "knowhub/milvus_data"
- target_store = MilvusStore(str(milvus_data_dir))
- print(f"📂 源数据库: {source_db}")
- print(f"📂 目标 Milvus: {milvus_data_dir}")
- print(f"📊 当前 Milvus 中的知识数量: {target_store.count()}")
- # 读取源数据
- print("\n📖 正在读取源数据...")
- conn = sqlite3.connect(str(source_db))
- conn.row_factory = sqlite3.Row
- cursor = conn.cursor()
- cursor.execute("SELECT * FROM knowledge ORDER BY created_at")
- rows = cursor.fetchall()
- conn.close()
- print(f"✅ 读取到 {len(rows)} 条知识数据")
- if len(rows) == 0:
- print("⚠️ 没有数据需要迁移")
- return
- # 显示迁移信息
- print(f"\n⚠️ 即将迁移 {len(rows)} 条知识到 Milvus")
- print(f" 当前 Milvus 中已有 {target_store.count()} 条知识")
- print(" 开始迁移...")
- # 转换数据格式
- print("\n🔄 正在转换数据格式...")
- knowledge_list = []
- tasks = [] # 用于批量生成 embedding
- for row in rows:
- try:
- # 解析 JSON 字段
- types = json.loads(row['types']) if row['types'] else ["strategy"]
- tags = json.loads(row['tags']) if row['tags'] else {}
- scopes = json.loads(row['scopes']) if row['scopes'] else ["org:cybertogether"]
- source = json.loads(row['source']) if row['source'] else {}
- eval_data = json.loads(row['eval']) if row['eval'] else {
- "score": 3, "helpful": 1, "harmful": 0, "confidence": 0.5,
- "helpful_history": [], "harmful_history": []
- }
- resource_ids = json.loads(row['resource_ids']) if row['resource_ids'] else []
- # 解析时间戳
- created_at = row['created_at']
- updated_at = row['updated_at'] if row['updated_at'] else created_at
- # 转换为时间戳(如果是 ISO 格式字符串)
- if isinstance(created_at, str):
- try:
- created_at = int(datetime.fromisoformat(created_at.replace('Z', '+00:00')).timestamp())
- except:
- created_at = int(datetime.now().timestamp())
- if isinstance(updated_at, str):
- try:
- updated_at = int(datetime.fromisoformat(updated_at.replace('Z', '+00:00')).timestamp())
- except:
- updated_at = created_at
- knowledge_list.append({
- "id": row['id'],
- "message_id": row['message_id'] or "",
- "task": row['task'],
- "content": row['content'],
- "types": types,
- "tags": tags,
- "tag_keys": list(tags.keys()) if isinstance(tags, dict) else [],
- "scopes": scopes,
- "owner": row['owner'] or "agent:unknown",
- "resource_ids": resource_ids,
- "source": source,
- "eval": eval_data,
- "created_at": created_at,
- "updated_at": updated_at,
- })
- # 收集 task 用于生成 embedding(只基于 task)
- tasks.append(row['task'])
- except Exception as e:
- print(f"⚠️ 跳过无效数据 {row['id']}: {e}")
- continue
- print(f"✅ 成功转换 {len(knowledge_list)} 条知识")
- # 批量生成 embeddings
- print(f"\n🧮 正在生成 embeddings (只基于 task 字段)...")
- batch_size = 100
- all_embeddings = []
- for i in range(0, len(tasks), batch_size):
- batch_tasks = tasks[i:i+batch_size]
- print(f" 处理 {i+1}-{min(i+batch_size, len(tasks))}/{len(tasks)}...")
- try:
- embeddings = await get_embeddings_batch(batch_tasks)
- all_embeddings.extend(embeddings)
- except Exception as e:
- print(f"❌ 生成 embeddings 失败: {e}")
- return
- print(f"✅ 成功生成 {len(all_embeddings)} 个 embeddings")
- # 添加 embeddings 到知识数据
- for knowledge, embedding in zip(knowledge_list, all_embeddings):
- knowledge["embedding"] = embedding
- # 批量插入到 Milvus
- print(f"\n💾 正在插入数据到 Milvus...")
- batch_size = 100
- for i in range(0, len(knowledge_list), batch_size):
- batch = knowledge_list[i:i+batch_size]
- try:
- target_store.insert_batch(batch)
- print(f" 已插入 {min(i+batch_size, len(knowledge_list))}/{len(knowledge_list)}")
- except Exception as e:
- print(f"❌ 插入失败: {e}")
- print(f" 失败的批次: {i}-{i+batch_size}")
- # 尝试逐条插入
- for j, item in enumerate(batch):
- try:
- target_store.insert(item)
- except Exception as e2:
- print(f" ⚠️ 跳过 {item['id']}: {e2}")
- # 验证
- final_count = target_store.count()
- print(f"\n✅ 迁移完成!")
- print(f" Milvus 中的知识总数: {final_count}")
- print(f" 新增: {final_count - target_store.count() + len(knowledge_list)}")
- if __name__ == "__main__":
- asyncio.run(migrate_knowledge())
|