migrate_knowledge.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #!/usr/bin/env python
  2. """
  3. 知识库迁移脚本: SQLite -> Milvus
  4. 从旧的 SQLite 数据库迁移知识数据到新的 Milvus 向量数据库
  5. """
  6. import sys
  7. import json
  8. import sqlite3
  9. import asyncio
  10. from pathlib import Path
  11. from datetime import datetime
  12. # 添加项目路径
  13. sys.path.insert(0, str(Path(__file__).parent))
  14. from knowhub.vector_store import MilvusStore
  15. from knowhub.embeddings import get_embeddings_batch
  16. async def migrate_knowledge():
  17. """迁移知识数据"""
  18. # 源数据库路径
  19. source_db = Path("/root/knowhub_backup_20260309_204451.db")
  20. if not source_db.exists():
  21. print(f"❌ 源数据库不存在: {source_db}")
  22. return
  23. # 目标 Milvus 存储
  24. milvus_data_dir = Path(__file__).parent / "knowhub/milvus_data"
  25. target_store = MilvusStore(str(milvus_data_dir))
  26. print(f"📂 源数据库: {source_db}")
  27. print(f"📂 目标 Milvus: {milvus_data_dir}")
  28. print(f"📊 当前 Milvus 中的知识数量: {target_store.count()}")
  29. # 读取源数据
  30. print("\n📖 正在读取源数据...")
  31. conn = sqlite3.connect(str(source_db))
  32. conn.row_factory = sqlite3.Row
  33. cursor = conn.cursor()
  34. cursor.execute("SELECT * FROM knowledge ORDER BY created_at")
  35. rows = cursor.fetchall()
  36. conn.close()
  37. print(f"✅ 读取到 {len(rows)} 条知识数据")
  38. if len(rows) == 0:
  39. print("⚠️ 没有数据需要迁移")
  40. return
  41. # 显示迁移信息
  42. print(f"\n⚠️ 即将迁移 {len(rows)} 条知识到 Milvus")
  43. print(f" 当前 Milvus 中已有 {target_store.count()} 条知识")
  44. print(" 开始迁移...")
  45. # 转换数据格式
  46. print("\n🔄 正在转换数据格式...")
  47. knowledge_list = []
  48. tasks = [] # 用于批量生成 embedding
  49. for row in rows:
  50. try:
  51. # 解析 JSON 字段
  52. types = json.loads(row['types']) if row['types'] else ["strategy"]
  53. tags = json.loads(row['tags']) if row['tags'] else {}
  54. scopes = json.loads(row['scopes']) if row['scopes'] else ["org:cybertogether"]
  55. source = json.loads(row['source']) if row['source'] else {}
  56. eval_data = json.loads(row['eval']) if row['eval'] else {
  57. "score": 3, "helpful": 1, "harmful": 0, "confidence": 0.5,
  58. "helpful_history": [], "harmful_history": []
  59. }
  60. resource_ids = json.loads(row['resource_ids']) if row['resource_ids'] else []
  61. # 解析时间戳
  62. created_at = row['created_at']
  63. updated_at = row['updated_at'] if row['updated_at'] else created_at
  64. # 转换为时间戳(如果是 ISO 格式字符串)
  65. if isinstance(created_at, str):
  66. try:
  67. created_at = int(datetime.fromisoformat(created_at.replace('Z', '+00:00')).timestamp())
  68. except:
  69. created_at = int(datetime.now().timestamp())
  70. if isinstance(updated_at, str):
  71. try:
  72. updated_at = int(datetime.fromisoformat(updated_at.replace('Z', '+00:00')).timestamp())
  73. except:
  74. updated_at = created_at
  75. knowledge_list.append({
  76. "id": row['id'],
  77. "message_id": row['message_id'] or "",
  78. "task": row['task'],
  79. "content": row['content'],
  80. "types": types,
  81. "tags": tags,
  82. "scopes": scopes,
  83. "owner": row['owner'] or "agent:unknown",
  84. "resource_ids": resource_ids,
  85. "source": source,
  86. "eval": eval_data,
  87. "created_at": created_at,
  88. "updated_at": updated_at,
  89. })
  90. # 收集 task 用于生成 embedding(只基于 task)
  91. tasks.append(row['task'])
  92. except Exception as e:
  93. print(f"⚠️ 跳过无效数据 {row['id']}: {e}")
  94. continue
  95. print(f"✅ 成功转换 {len(knowledge_list)} 条知识")
  96. # 批量生成 embeddings
  97. print(f"\n🧮 正在生成 embeddings (只基于 task 字段)...")
  98. batch_size = 100
  99. all_embeddings = []
  100. for i in range(0, len(tasks), batch_size):
  101. batch_tasks = tasks[i:i+batch_size]
  102. print(f" 处理 {i+1}-{min(i+batch_size, len(tasks))}/{len(tasks)}...")
  103. try:
  104. embeddings = await get_embeddings_batch(batch_tasks)
  105. all_embeddings.extend(embeddings)
  106. except Exception as e:
  107. print(f"❌ 生成 embeddings 失败: {e}")
  108. return
  109. print(f"✅ 成功生成 {len(all_embeddings)} 个 embeddings")
  110. # 添加 embeddings 到知识数据
  111. for knowledge, embedding in zip(knowledge_list, all_embeddings):
  112. knowledge["embedding"] = embedding
  113. # 批量插入到 Milvus
  114. print(f"\n💾 正在插入数据到 Milvus...")
  115. batch_size = 100
  116. for i in range(0, len(knowledge_list), batch_size):
  117. batch = knowledge_list[i:i+batch_size]
  118. try:
  119. target_store.insert_batch(batch)
  120. print(f" 已插入 {min(i+batch_size, len(knowledge_list))}/{len(knowledge_list)}")
  121. except Exception as e:
  122. print(f"❌ 插入失败: {e}")
  123. print(f" 失败的批次: {i}-{i+batch_size}")
  124. # 尝试逐条插入
  125. for j, item in enumerate(batch):
  126. try:
  127. target_store.insert(item)
  128. except Exception as e2:
  129. print(f" ⚠️ 跳过 {item['id']}: {e2}")
  130. # 验证
  131. final_count = target_store.count()
  132. print(f"\n✅ 迁移完成!")
  133. print(f" Milvus 中的知识总数: {final_count}")
  134. print(f" 新增: {final_count - target_store.count() + len(knowledge_list)}")
  135. if __name__ == "__main__":
  136. asyncio.run(migrate_knowledge())