milvus_how_insert.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
  2. import requests
  3. import json
  4. from typing import Dict, Any, List
  5. from pymongo import MongoClient
  6. from pydub import AudioSegment
  7. import io
  8. from scipy.io import wavfile
  9. import numpy as np
  10. ################################连接milvus数据库 A
  11. # 配置信息
  12. MILVUS_CONFIG = {
  13. "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
  14. "user": "root",
  15. "password": "Piaoquan@2025",
  16. "port": "19530",
  17. }
  18. print("正在连接 Milvus 数据库...")
  19. connections.connect("default", **MILVUS_CONFIG)
  20. print("连接成功!")
  21. ################################连接milvus数据库 B
  22. ################################连接Embedding service A
  23. # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地
  24. # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip
  25. VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
  26. DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
  27. def get_basic_embedding(text: str, model=DEFAULT_MODEL):
  28. """通过HTTP调用在线embedding服务"""
  29. headers = {
  30. "Content-Type": "application/json"
  31. }
  32. data = {
  33. "model": model,
  34. "input": text
  35. }
  36. response = requests.post(
  37. VLLM_SERVER_URL,
  38. headers=headers,
  39. json=data,
  40. timeout=5 # 添加超时设置
  41. )
  42. response.raise_for_status() # 如果状态码不是200,抛出异常
  43. result = response.json()
  44. return result["data"][0]["embedding"]
  45. def parse_how_res(json_data) -> Dict[str, Dict[str, str]]:
  46. """
  47. 解析 how_res.json 文件,提取两类信息:
  48. 1. 所有 "how","why" 字段的 path 与 value 映射
  49. 返回:
  50. {
  51. "how": {path: value, ...},
  52. "why": {path: value, ...}
  53. }
  54. """
  55. data = json_data
  56. how_dict: Dict[str, Any] = {}
  57. why_dict: Dict[str, Any] = {}
  58. def traverse(obj: Any, current_path: str = ""):
  59. """递归遍历 JSON 结构,记录目标字段"""
  60. if isinstance(obj, dict):
  61. for k, v in obj.items():
  62. # 构建新路径,避免在开头添加点号
  63. new_path = f"{current_path}.{k}" if current_path else k
  64. if k == "how":
  65. how_dict[new_path] = v
  66. elif k == "why":
  67. why_dict[new_path] = v
  68. # 继续递归遍历
  69. traverse(v, new_path)
  70. elif isinstance(obj, list):
  71. for idx, item in enumerate(obj):
  72. # 对于数组元素,使用方括号索引
  73. new_path = f"{current_path}[{idx}]"
  74. traverse(item, new_path)
  75. traverse(data)
  76. return {"how": how_dict, "why": why_dict}
  77. # 使用示例
  78. if __name__ == "__main__":
  79. # 连接 MongoDB 数据库
  80. ##################### 存储到mongoDB
  81. MONGO_URI = "mongodb://localhost:27017/"
  82. DB_NAME = "mydeconstruct"
  83. COLL_NAME = "deconstruct_how"
  84. client = MongoClient(MONGO_URI)
  85. db = client[DB_NAME]
  86. coll = db[COLL_NAME]
  87. # 读取并插入 JSON 文件
  88. json_path = "/home/ecs-user/project/colpali/src/how_res.json"
  89. with open(json_path, "r", encoding="utf-8") as f:
  90. doc = json.load(f)
  91. result = parse_how_res(doc)
  92. # print("how 字段映射:", result["how"])
  93. # print("why 字段映射:", result["why"])
  94. for key, value in result["how"].items():
  95. print(f"how 字段 {key} 的值为: {value}")
  96. for key, value in result["why"].items():
  97. print(f"why 字段 {key} 的值为: {value}")
  98. insert_result = coll.insert_one(doc)
  99. inserted_id = insert_result.inserted_id
  100. ##################### 将 result["how"] 中的每个 value 转换为向量并插入 Milvus
  101. ########## 文本向量库存一份how
  102. # 创建 Milvus 集合(如不存在)
  103. collection_name = "deconstruct_how"
  104. if not utility.has_collection(collection_name):
  105. fields = [
  106. FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
  107. FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
  108. FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64),
  109. FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
  110. FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
  111. ]
  112. schema = CollectionSchema(fields, description="Deconstruct how embeddings")
  113. collection = Collection(name=collection_name, schema=schema)
  114. # 创建 IVF_FLAT 索引
  115. index_params = {
  116. "metric_type": "IP",
  117. "index_type": "IVF_FLAT",
  118. "params": {"nlist": 128}
  119. }
  120. collection.create_index("embedding", index_params)
  121. else:
  122. collection = Collection(name=collection_name)
  123. entities = []
  124. for key, value in result["how"].items():
  125. ### 访问可达则替换
  126. # embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
  127. ###
  128. embedding = np.random.rand(2560).tolist()
  129. path = key
  130. entities.append({
  131. "mongo_id": str(inserted_id),
  132. "type": "how",
  133. "path": path,
  134. "embedding": embedding
  135. })
  136. # 遍历 result["why"],生成 embeddings 并插入 Milvus
  137. if entities:
  138. collection.insert(entities)
  139. collection.flush()
  140. print(f"已插入 {len(entities)} 条 how 字段向量到 Milvus")
  141. else:
  142. print("未找到 how 字段,未插入向量")
  143. entities = []
  144. for key, value in result["why"].items():
  145. # embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
  146. embedding = np.random.rand(2560).tolist()
  147. path = key
  148. entities.append({
  149. "mongo_id": str(inserted_id),
  150. "type": "why",
  151. "path": path,
  152. "embedding": embedding
  153. })
  154. if entities:
  155. collection.insert(entities)
  156. collection.flush()
  157. print(f"已插入 {len(entities)} 条 why 字段向量到 Milvus")
  158. else:
  159. print("未找到 why 字段,未插入向量")