| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- from ast import Import
- from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
- import requests
- import json
- from typing import Dict, Any, List
- from pymongo import MongoClient
- from bson import ObjectId
- from pydub import AudioSegment
- import io, os
- from scipy.io import wavfile
- import numpy
- ################################连接milvus数据库 A
- # 配置信息
- MILVUS_CONFIG = {
- "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
- "user": "root",
- "password": "Piaoquan@2025",
- "port": "19530",
- }
- print("正在连接 Milvus 数据库...")
- connections.connect("default", **MILVUS_CONFIG)
- print("连接成功!")
- ################################连接milvus数据库 B
- ##################################引入多模态向量模型#################
- import torch
- from PIL import Image
- from transformers.utils.import_utils import is_flash_attn_2_available
- from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
- model = ColQwen2_5Omni.from_pretrained(
- "vidore/colqwen-omni-v0.1",
- torch_dtype=torch.bfloat16,
- device_map="cuda", # or "mps" if on Apple Silicon
- attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
- ).eval()
- processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
- ##################################引入多模态向量模型#################
- ##################### mongoDB
- MONGO_URI = "mongodb://localhost:27017/"
- DB_NAME = "mydeconstruct"
- COLL_NAME = "deconstruct"
- client = MongoClient(MONGO_URI)
- db = client[DB_NAME]
- coll = db[COLL_NAME]
- ##################### mongoDB
- #####################text embedding serviceS
- VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
- DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
- def get_basic_embedding(text: str, model=DEFAULT_MODEL):
- """通过HTTP调用在线embedding服务"""
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "model": model,
- "input": text
- }
- response = requests.post(
- VLLM_SERVER_URL,
- headers=headers,
- json=data,
- timeout=5 # 添加超时设置
- )
- response.raise_for_status() # 如果状态码不是200,抛出异常
- result = response.json()
- return result["data"][0]["embedding"]
- #####################text embedding serviceS
- #####################multi vector search
- import numpy as np
- from collections import defaultdict
- from typing import List, Dict, Tuple
- ###############multi vector search
- def search_topk_multi(
- collection: Collection,
- query_vecs: List[List[float]], # 查询向量列表 [vec1, vec2, ...]
- topk: int = 2
- ) -> List[Tuple[str, float]]:
- """
- 对查询向量列表检索,计算每个对象的平均最大相似度,返回 TopK 对象
-
- 参数:
- collection: Milvus 集合实例
- query_vecs: 查询向量列表(每个向量维度需与集合一致)
- topk: 返回的 top 数量
-
- 返回:
- 排序后的列表,元素为 (object_id, 平均最大相似度)
- """
- # 步骤1:逐个检索查询向量,收集每个对象的最大相似度
- all_query_results = [] # 存储每个查询的 {object_id: 最大相似度}
- for q_idx, q_vec in enumerate[List[float]](query_vecs):
- # 检索当前查询向量
- search_params = {
- "metric_type": "IP",
- "params": {"nprobe": 10}
- }
- results = collection.search(
- data=[q_vec],
- anns_field="embedding",
- param=search_params,
- limit=16384, # Milvus最大允许的topk值
- output_fields=["mongo_id", "type", "path"],
- expr='type == "image"' # 只检索type为text的记录
- )
- # 按 object_id 分组取最大相似度
- query_object_sim = defaultdict(float)
- for hit in results[0]:
- obj_id = hit.entity.get("mongo_id")
- sim = hit.score
- if sim > query_object_sim[obj_id]:
- query_object_sim[obj_id] = sim
-
- all_query_results.append(query_object_sim)
- print(f"查询向量 {q_idx+1}/{len(query_vecs)} 处理完成,覆盖 {len(query_object_sim)} 个对象")
-
- # 步骤2:计算每个对象的平均最大相似度
- all_object_ids = set()
- for res in all_query_results:
- all_object_ids.update(res.keys()) # 收集所有出现过的对象
-
- object_avg_sim = {}
- for obj_id in all_object_ids:
- sims = [res.get(obj_id, 0.0) for res in all_query_results] # 未匹配的查询按0处理
- avg_sim = sum(sims) / len(query_vecs) # 计算平均值
- object_avg_sim[obj_id] = avg_sim
-
- # 步骤3:按平均相似度排序并取 TopK
- sorted_objects = sorted(
- object_avg_sim.items(),
- key=lambda x: x[1],
- reverse=True
- )[:topk]
- return sorted_objects
- ##################文本数据库 search
- ###############single vector search
- def search_topk_single(
- collection: Collection,
- query_vec: List[float], # 查询向量
- topk: int = 2
- ) -> List[dict]:
- """
- 对单个查询向量检索,计算每个对象的最大相似度,返回 TopK 对象
-
- 参数:
- collection: Milvus 集合实例
- query_vec: 查询向量(维度需与集合一致)
- topk: 返回的 top 数量
-
- 返回:
- 排序后的列表,元素为 (object_id, 最大相似度)
- """
- # 步骤1:检索当前查询向量
- search_params = {
- "metric_type": "IP",
- "params": {"nprobe": 10}
- }
- results = collection.search(
- data=[query_vec],
- anns_field="embedding",
- param=search_params,
- limit=topk, # Milvus最大允许的topk值
- output_fields=["mongo_id", "path"],
- )
- return results[0]
- ###############single vector search
- search_mode = "what_search"
- if search_mode =="what_search":
- ##################添加what_search
- ########模拟计算出的embedding
- query = '#假如食物会说话'
- # queries_embeddings = get_basic_embedding(text = query )
- #########暂时代替
- import numpy as np
- q_vec = list(np.random.randn(2560))
- #########暂时代替
- milvus_client = Collection(name="deconstruct_what")
- milvus_client.load()
- collection_name = "deconstruct_what"
- search_params = {
- "metric_type": "IP",
- "params": {"nprobe": 10}
- }
- results = search_topk_single(milvus_client,q_vec,topk= 3)
- print("results is ", results)
- for i,record in enumerate(results):
- #########暂时代替############
- if record['mongo_id'] =='10000000':
- mongo_id = '68f894176a7850acc4851b27'
- else:
- mongo_id = record['mongo_id']
- #########暂时代替############
- docres = coll.find_one({"_id": ObjectId(mongo_id)})
- print(f"第{i+1}个结果*********************:{docres}\n")
- elif search_mode =="media_search":
- ##################多模态search
- ##################
- queries = os.path.join("../src", "dragon_mother.jpeg")
- query_image = Image.open(queries)
- # Process the inputs
- batch_queries = processor.process_images([query_image]).to(model.device)
- # Forward pass
- with torch.no_grad():
- query_embeddings = model(**batch_queries)
- ##################添加media_search
- milvus_client = Collection(name="deconstruct_media")
- milvus_client.load()
- collection_name = "deconstruct_media"
- query_embeddings = query_embeddings.cpu().to(dtype=torch.float32).numpy().tolist()
- query_embeddings=query_embeddings[0]
- scores = search_topk_multi(milvus_client,query_embeddings,topk= 3)
- print("search_topk_multi结果:",scores)
- ####输出结果
- for i,record in enumerate(scores):
- docres = coll.find_one({"_id": ObjectId(record[0])})
- print(f"第{i+1}个结果*********************:{docres}\n")
-
|