from ast import Import from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType import requests import json from typing import Dict, Any, List from pymongo import MongoClient from bson import ObjectId from pydub import AudioSegment import io, os from scipy.io import wavfile import numpy ################################连接milvus数据库 A # 配置信息 MILVUS_CONFIG = { "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com", "user": "root", "password": "Piaoquan@2025", "port": "19530", } print("正在连接 Milvus 数据库...") connections.connect("default", **MILVUS_CONFIG) print("连接成功!") ################################连接milvus数据库 B ##################################引入多模态向量模型################# import torch from PIL import Image from transformers.utils.import_utils import is_flash_attn_2_available from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor model = ColQwen2_5Omni.from_pretrained( "vidore/colqwen-omni-v0.1", torch_dtype=torch.bfloat16, device_map="cuda", # or "mps" if on Apple Silicon attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None, ).eval() processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1") ##################################引入多模态向量模型################# ##################### mongoDB MONGO_URI = "mongodb://localhost:27017/" DB_NAME = "mydeconstruct" COLL_NAME = "deconstruct_how" client = MongoClient(MONGO_URI) db = client[DB_NAME] coll = db[COLL_NAME] ##################### mongoDB #####################text embedding serviceS VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings" DEFAULT_MODEL = "/models/Qwen3-Embedding-4B" def get_basic_embedding(text: str, model=DEFAULT_MODEL): """通过HTTP调用在线embedding服务""" headers = { "Content-Type": "application/json" } data = { "model": model, "input": text } response = requests.post( VLLM_SERVER_URL, headers=headers, json=data, timeout=5 # 添加超时设置 ) response.raise_for_status() # 如果状态码不是200,抛出异常 result = response.json() return result["data"][0]["embedding"] #####################text embedding serviceS #####################multi vector search import numpy as np from collections import defaultdict from typing import List, Dict, Tuple ###############multi vector search def search_topk_multi( collection: Collection, query_vecs: List[List[float]], # 查询向量列表 [vec1, vec2, ...] topk: int = 2 ) -> List[Tuple[str, float]]: """ 对查询向量列表检索,计算每个对象的平均最大相似度,返回 TopK 对象 参数: collection: Milvus 集合实例 query_vecs: 查询向量列表(每个向量维度需与集合一致) topk: 返回的 top 数量 返回: 排序后的列表,元素为 (object_id, 平均最大相似度) """ # 步骤1:逐个检索查询向量,收集每个对象的最大相似度 all_query_results = [] # 存储每个查询的 {object_id: 最大相似度} for q_idx, q_vec in enumerate[List[float]](query_vecs): # 检索当前查询向量 search_params = { "metric_type": "IP", "params": {"nprobe": 10} } results = collection.search( data=[q_vec], anns_field="embedding", param=search_params, limit=16384, # Milvus最大允许的topk值 output_fields=["mongo_id", "type", "path"], expr='type == "image"' # 只检索type为text的记录 ) # 按 object_id 分组取最大相似度 query_object_sim = defaultdict(float) for hit in results[0]: obj_id = hit.entity.get("mongo_id") sim = hit.score if sim > query_object_sim[obj_id]: query_object_sim[obj_id] = sim all_query_results.append(query_object_sim) print(f"查询向量 {q_idx+1}/{len(query_vecs)} 处理完成,覆盖 {len(query_object_sim)} 个对象") # 步骤2:计算每个对象的平均最大相似度 all_object_ids = set() for res in all_query_results: all_object_ids.update(res.keys()) # 收集所有出现过的对象 object_avg_sim = {} for obj_id in all_object_ids: sims = [res.get(obj_id, 0.0) for res in all_query_results] # 未匹配的查询按0处理 avg_sim = sum(sims) / len(query_vecs) # 计算平均值 object_avg_sim[obj_id] = avg_sim # 步骤3:按平均相似度排序并取 TopK sorted_objects = sorted( object_avg_sim.items(), key=lambda x: x[1], reverse=True )[:topk] return sorted_objects ##################文本数据库 search ###############single vector search def search_topk_single( collection: Collection, query_vec: List[float], # 查询向量 topk: int = 2, expr='type=="why"', ) -> List[dict]: """ 对单个查询向量检索,计算每个对象的最大相似度,返回 TopK 对象 参数: collection: Milvus 集合实例 query_vec: 查询向量(维度需与集合一致) topk: 返回的 top 数量 返回: 排序后的列表,元素为 (object_id, 最大相似度) """ # 步骤1:检索当前查询向量 search_params = { "metric_type": "IP", "params": {"nprobe": 10} } results = collection.search( data=[query_vec], anns_field="embedding", param=search_params, limit=topk, # Milvus最大允许的topk值 output_fields=["mongo_id","type","path"], expr= expr ) return results[0] ###############single vector search search_mode = "how_search" #"why_search" if search_mode =="how_search": ##################添加what_search ########模拟计算出的embedding query = '#假如食物会说话' # queries_embeddings = get_basic_embedding(text = query ) #########暂时代替 import numpy as np q_vec = list(np.random.randn(2560)) #########暂时代替 milvus_client = Collection(name="deconstruct_how") milvus_client.load() collection_name = "deconstruct_how" search_params = { "metric_type": "IP", "params": {"nprobe": 10} } results = search_topk_single(milvus_client,q_vec,topk= 3,expr = 'type=="how"') print("results is ", results) for i,record in enumerate(results): #########暂时代替############ if record['mongo_id'] =='10000000': mongo_id = '68f894176a7850acc4851b27' else: mongo_id = record['mongo_id'] #########暂时代替############ docres = coll.find_one({"_id": ObjectId(mongo_id)}) # print(f"第{i+1}个结果*********************:{docres}\n") elif search_mode =="why_search": ##################添加what_search ########模拟计算出的embedding query = '#假如食物会说话' # queries_embeddings = get_basic_embedding(text = query ) #########暂时代替 import numpy as np q_vec = list(np.random.randn(2560)) #########暂时代替 milvus_client = Collection(name="deconstruct_how") milvus_client.load() collection_name = "deconstruct_how" search_params = { "metric_type": "IP", "params": {"nprobe": 10} } results = search_topk_single(milvus_client,q_vec,topk= 3,expr = 'type=="why"') print("results is ", results) for i,record in enumerate(results): #########暂时代替############ if record['mongo_id'] =='10000000': mongo_id = '68f894176a7850acc4851b27' else: mongo_id = record['mongo_id'] #########暂时代替############ docres = coll.find_one({"_id": ObjectId(mongo_id)}) print(f"第{i+1}个结果*********************:{docres}\n")