#!/usr/bin/env python """ 知识库迁移脚本: SQLite -> Milvus 从旧的 SQLite 数据库迁移知识数据到新的 Milvus 向量数据库 """ import sys import json import sqlite3 import asyncio from pathlib import Path from datetime import datetime # 添加项目路径 sys.path.insert(0, str(Path(__file__).parent)) from knowhub.vector_store import MilvusStore from knowhub.embeddings import get_embeddings_batch async def migrate_knowledge(): """迁移知识数据""" # 源数据库路径 source_db = Path("/root/knowhub_backup_20260309_204451.db") if not source_db.exists(): print(f"❌ 源数据库不存在: {source_db}") return # 目标 Milvus 存储 milvus_data_dir = Path(__file__).parent / "knowhub/milvus_data" target_store = MilvusStore(str(milvus_data_dir)) print(f"📂 源数据库: {source_db}") print(f"📂 目标 Milvus: {milvus_data_dir}") print(f"📊 当前 Milvus 中的知识数量: {target_store.count()}") # 读取源数据 print("\n📖 正在读取源数据...") conn = sqlite3.connect(str(source_db)) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute("SELECT * FROM knowledge ORDER BY created_at") rows = cursor.fetchall() conn.close() print(f"✅ 读取到 {len(rows)} 条知识数据") if len(rows) == 0: print("⚠️ 没有数据需要迁移") return # 显示迁移信息 print(f"\n⚠️ 即将迁移 {len(rows)} 条知识到 Milvus") print(f" 当前 Milvus 中已有 {target_store.count()} 条知识") print(" 开始迁移...") # 转换数据格式 print("\n🔄 正在转换数据格式...") knowledge_list = [] tasks = [] # 用于批量生成 embedding for row in rows: try: # 解析 JSON 字段 types = json.loads(row['types']) if row['types'] else ["strategy"] tags = json.loads(row['tags']) if row['tags'] else {} scopes = json.loads(row['scopes']) if row['scopes'] else ["org:cybertogether"] source = json.loads(row['source']) if row['source'] else {} eval_data = json.loads(row['eval']) if row['eval'] else { "score": 3, "helpful": 1, "harmful": 0, "confidence": 0.5, "helpful_history": [], "harmful_history": [] } resource_ids = json.loads(row['resource_ids']) if row['resource_ids'] else [] # 解析时间戳 created_at = row['created_at'] updated_at = row['updated_at'] if row['updated_at'] else created_at # 转换为时间戳(如果是 ISO 格式字符串) if isinstance(created_at, str): try: created_at = int(datetime.fromisoformat(created_at.replace('Z', '+00:00')).timestamp()) except: created_at = int(datetime.now().timestamp()) if isinstance(updated_at, str): try: updated_at = int(datetime.fromisoformat(updated_at.replace('Z', '+00:00')).timestamp()) except: updated_at = created_at knowledge_list.append({ "id": row['id'], "message_id": row['message_id'] or "", "task": row['task'], "content": row['content'], "types": types, "tags": tags, "scopes": scopes, "owner": row['owner'] or "agent:unknown", "resource_ids": resource_ids, "source": source, "eval": eval_data, "created_at": created_at, "updated_at": updated_at, }) # 收集 task 用于生成 embedding(只基于 task) tasks.append(row['task']) except Exception as e: print(f"⚠️ 跳过无效数据 {row['id']}: {e}") continue print(f"✅ 成功转换 {len(knowledge_list)} 条知识") # 批量生成 embeddings print(f"\n🧮 正在生成 embeddings (只基于 task 字段)...") batch_size = 100 all_embeddings = [] for i in range(0, len(tasks), batch_size): batch_tasks = tasks[i:i+batch_size] print(f" 处理 {i+1}-{min(i+batch_size, len(tasks))}/{len(tasks)}...") try: embeddings = await get_embeddings_batch(batch_tasks) all_embeddings.extend(embeddings) except Exception as e: print(f"❌ 生成 embeddings 失败: {e}") return print(f"✅ 成功生成 {len(all_embeddings)} 个 embeddings") # 添加 embeddings 到知识数据 for knowledge, embedding in zip(knowledge_list, all_embeddings): knowledge["embedding"] = embedding # 批量插入到 Milvus print(f"\n💾 正在插入数据到 Milvus...") batch_size = 100 for i in range(0, len(knowledge_list), batch_size): batch = knowledge_list[i:i+batch_size] try: target_store.insert_batch(batch) print(f" 已插入 {min(i+batch_size, len(knowledge_list))}/{len(knowledge_list)}") except Exception as e: print(f"❌ 插入失败: {e}") print(f" 失败的批次: {i}-{i+batch_size}") # 尝试逐条插入 for j, item in enumerate(batch): try: target_store.insert(item) except Exception as e2: print(f" ⚠️ 跳过 {item['id']}: {e2}") # 验证 final_count = target_store.count() print(f"\n✅ 迁移完成!") print(f" Milvus 中的知识总数: {final_count}") print(f" 新增: {final_count - target_store.count() + len(knowledge_list)}") if __name__ == "__main__": asyncio.run(migrate_knowledge())