milvus_how_search.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from ast import Import
  2. from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
  3. import requests
  4. import json
  5. from typing import Dict, Any, List
  6. from pymongo import MongoClient
  7. from bson import ObjectId
  8. from pydub import AudioSegment
  9. import io, os
  10. from scipy.io import wavfile
  11. import numpy
  12. ################################连接milvus数据库 A
  13. # 配置信息
  14. MILVUS_CONFIG = {
  15. "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
  16. "user": "root",
  17. "password": "Piaoquan@2025",
  18. "port": "19530",
  19. }
  20. print("正在连接 Milvus 数据库...")
  21. connections.connect("default", **MILVUS_CONFIG)
  22. print("连接成功!")
  23. ################################连接milvus数据库 B
  24. ##################################引入多模态向量模型#################
  25. import torch
  26. from PIL import Image
  27. from transformers.utils.import_utils import is_flash_attn_2_available
  28. from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
  29. model = ColQwen2_5Omni.from_pretrained(
  30. "vidore/colqwen-omni-v0.1",
  31. torch_dtype=torch.bfloat16,
  32. device_map="cuda", # or "mps" if on Apple Silicon
  33. attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
  34. ).eval()
  35. processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
  36. ##################################引入多模态向量模型#################
  37. ##################### mongoDB
  38. MONGO_URI = "mongodb://localhost:27017/"
  39. DB_NAME = "mydeconstruct"
  40. COLL_NAME = "deconstruct_how"
  41. client = MongoClient(MONGO_URI)
  42. db = client[DB_NAME]
  43. coll = db[COLL_NAME]
  44. ##################### mongoDB
  45. #####################text embedding serviceS
  46. VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
  47. DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
  48. def get_basic_embedding(text: str, model=DEFAULT_MODEL):
  49. """通过HTTP调用在线embedding服务"""
  50. headers = {
  51. "Content-Type": "application/json"
  52. }
  53. data = {
  54. "model": model,
  55. "input": text
  56. }
  57. response = requests.post(
  58. VLLM_SERVER_URL,
  59. headers=headers,
  60. json=data,
  61. timeout=5 # 添加超时设置
  62. )
  63. response.raise_for_status() # 如果状态码不是200,抛出异常
  64. result = response.json()
  65. return result["data"][0]["embedding"]
  66. #####################text embedding serviceS
  67. #####################multi vector search
  68. import numpy as np
  69. from collections import defaultdict
  70. from typing import List, Dict, Tuple
  71. ###############multi vector search
  72. def search_topk_multi(
  73. collection: Collection,
  74. query_vecs: List[List[float]], # 查询向量列表 [vec1, vec2, ...]
  75. topk: int = 2
  76. ) -> List[Tuple[str, float]]:
  77. """
  78. 对查询向量列表检索,计算每个对象的平均最大相似度,返回 TopK 对象
  79. 参数:
  80. collection: Milvus 集合实例
  81. query_vecs: 查询向量列表(每个向量维度需与集合一致)
  82. topk: 返回的 top 数量
  83. 返回:
  84. 排序后的列表,元素为 (object_id, 平均最大相似度)
  85. """
  86. # 步骤1:逐个检索查询向量,收集每个对象的最大相似度
  87. all_query_results = [] # 存储每个查询的 {object_id: 最大相似度}
  88. for q_idx, q_vec in enumerate[List[float]](query_vecs):
  89. # 检索当前查询向量
  90. search_params = {
  91. "metric_type": "IP",
  92. "params": {"nprobe": 10}
  93. }
  94. results = collection.search(
  95. data=[q_vec],
  96. anns_field="embedding",
  97. param=search_params,
  98. limit=16384, # Milvus最大允许的topk值
  99. output_fields=["mongo_id", "type", "path"],
  100. expr='type == "image"' # 只检索type为text的记录
  101. )
  102. # 按 object_id 分组取最大相似度
  103. query_object_sim = defaultdict(float)
  104. for hit in results[0]:
  105. obj_id = hit.entity.get("mongo_id")
  106. sim = hit.score
  107. if sim > query_object_sim[obj_id]:
  108. query_object_sim[obj_id] = sim
  109. all_query_results.append(query_object_sim)
  110. print(f"查询向量 {q_idx+1}/{len(query_vecs)} 处理完成,覆盖 {len(query_object_sim)} 个对象")
  111. # 步骤2:计算每个对象的平均最大相似度
  112. all_object_ids = set()
  113. for res in all_query_results:
  114. all_object_ids.update(res.keys()) # 收集所有出现过的对象
  115. object_avg_sim = {}
  116. for obj_id in all_object_ids:
  117. sims = [res.get(obj_id, 0.0) for res in all_query_results] # 未匹配的查询按0处理
  118. avg_sim = sum(sims) / len(query_vecs) # 计算平均值
  119. object_avg_sim[obj_id] = avg_sim
  120. # 步骤3:按平均相似度排序并取 TopK
  121. sorted_objects = sorted(
  122. object_avg_sim.items(),
  123. key=lambda x: x[1],
  124. reverse=True
  125. )[:topk]
  126. return sorted_objects
  127. ##################文本数据库 search
  128. ###############single vector search
  129. def search_topk_single(
  130. collection: Collection,
  131. query_vec: List[float], # 查询向量
  132. topk: int = 2,
  133. expr='type=="why"',
  134. ) -> List[dict]:
  135. """
  136. 对单个查询向量检索,计算每个对象的最大相似度,返回 TopK 对象
  137. 参数:
  138. collection: Milvus 集合实例
  139. query_vec: 查询向量(维度需与集合一致)
  140. topk: 返回的 top 数量
  141. 返回:
  142. 排序后的列表,元素为 (object_id, 最大相似度)
  143. """
  144. # 步骤1:检索当前查询向量
  145. search_params = {
  146. "metric_type": "IP",
  147. "params": {"nprobe": 10}
  148. }
  149. results = collection.search(
  150. data=[query_vec],
  151. anns_field="embedding",
  152. param=search_params,
  153. limit=topk, # Milvus最大允许的topk值
  154. output_fields=["mongo_id","type","path"],
  155. expr= expr
  156. )
  157. return results[0]
  158. ###############single vector search
  159. search_mode = "how_search" #"why_search"
  160. if search_mode =="how_search":
  161. ##################添加what_search
  162. ########模拟计算出的embedding
  163. query = '#假如食物会说话'
  164. # queries_embeddings = get_basic_embedding(text = query )
  165. #########暂时代替
  166. import numpy as np
  167. q_vec = list(np.random.randn(2560))
  168. #########暂时代替
  169. milvus_client = Collection(name="deconstruct_how")
  170. milvus_client.load()
  171. collection_name = "deconstruct_how"
  172. search_params = {
  173. "metric_type": "IP",
  174. "params": {"nprobe": 10}
  175. }
  176. results = search_topk_single(milvus_client,q_vec,topk= 3,expr = 'type=="how"')
  177. print("results is ", results)
  178. for i,record in enumerate(results):
  179. #########暂时代替############
  180. if record['mongo_id'] =='10000000':
  181. mongo_id = '68f894176a7850acc4851b27'
  182. else:
  183. mongo_id = record['mongo_id']
  184. #########暂时代替############
  185. docres = coll.find_one({"_id": ObjectId(mongo_id)})
  186. # print(f"第{i+1}个结果*********************:{docres}\n")
  187. elif search_mode =="why_search":
  188. ##################添加what_search
  189. ########模拟计算出的embedding
  190. query = '#假如食物会说话'
  191. # queries_embeddings = get_basic_embedding(text = query )
  192. #########暂时代替
  193. import numpy as np
  194. q_vec = list(np.random.randn(2560))
  195. #########暂时代替
  196. milvus_client = Collection(name="deconstruct_how")
  197. milvus_client.load()
  198. collection_name = "deconstruct_how"
  199. search_params = {
  200. "metric_type": "IP",
  201. "params": {"nprobe": 10}
  202. }
  203. results = search_topk_single(milvus_client,q_vec,topk= 3,expr = 'type=="why"')
  204. print("results is ", results)
  205. for i,record in enumerate(results):
  206. #########暂时代替############
  207. if record['mongo_id'] =='10000000':
  208. mongo_id = '68f894176a7850acc4851b27'
  209. else:
  210. mongo_id = record['mongo_id']
  211. #########暂时代替############
  212. docres = coll.find_one({"_id": ObjectId(mongo_id)})
  213. print(f"第{i+1}个结果*********************:{docres}\n")