from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType import requests import json from typing import Dict, Any, List from pymongo import MongoClient from pydub import AudioSegment import io from scipy.io import wavfile ################################连接milvus数据库 A # 配置信息 MILVUS_CONFIG = { "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com", "user": "root", "password": "Piaoquan@2025", "port": "19530", } print("正在连接 Milvus 数据库...") connections.connect("default", **MILVUS_CONFIG) print("连接成功!") ################################连接milvus数据库 B ##################################引入多模态模型################# import torch from PIL import Image from transformers.utils.import_utils import is_flash_attn_2_available from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor model = ColQwen2_5Omni.from_pretrained( "vidore/colqwen-omni-v0.1", torch_dtype=torch.bfloat16, device_map="cuda", # or "mps" if on Apple Silicon attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None, ).eval() processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1") ##################################引入模型################# ################################连接Embedding service A # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地 # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings" DEFAULT_MODEL = "/models/Qwen3-Embedding-4B" def get_basic_embedding(text: str, model=DEFAULT_MODEL): """通过HTTP调用在线embedding服务""" headers = { "Content-Type": "application/json" } data = { "model": model, "input": text } response = requests.post( VLLM_SERVER_URL, headers=headers, json=data, timeout=5 # 添加超时设置 ) response.raise_for_status() # 如果状态码不是200,抛出异常 result = response.json() return result["data"][0]["embedding"] def get_media_embedding(query: str, type: str): ''' query 是查询字符串或文件路径 type 是查询类型,可选值为 "audio", "image", "video", "text" k 是返回的结果数量,默认值为 3 audio image video 的query为路径 text的query为问题本身 ''' if type =="audio": batch_queries = processor.process_audios([query]).to(model.device) elif type =="image": query_image = Image.open(query) batch_queries = processor.process_images([query_image]).to(model.device) elif type =="video": batch_queries = processor.process_videos([query]).to(model.device) elif type =="text": batch_queries = processor.process_queries([query]).to(model.device) # Forward pass with torch.no_grad(): query_embeddings = model(**batch_queries) return query_embeddings # # scores = processor.score_multi_vector(query_embeddings, ds) # print("score is ", scores) # # get top-5 scores # return scores[0].topk(k).indices.tolist() # ################################连接Embedding service B def parse_deconstruct_res(json_data) -> Dict[str, Dict[str, str]]: """ 解析 deconstruct_res.json 文件,提取两类信息: 1. 所有 "what" 字段的 path 与 value 映射 2. 所有类型为 "image" 或 "video" 的媒体引用 path 与 content 值映射 返回: { "what": {path: value, ...}, "media": {path: value, ...} } """ data = json_data what_dict: Dict[str, Any] = {} media_dict: Dict[str, Any] = {} def traverse(obj: Any, current_path: str = ""): """递归遍历 JSON 结构,记录目标字段""" if isinstance(obj, dict): for k, v in obj.items(): # 构建新路径,避免在开头添加点号 new_path = f"{current_path}.{k}" if current_path else k if k == "what": what_dict[new_path] = v # 处理媒体引用字段 elif k == "媒体引用" and isinstance(v, list): # 遍历媒体引用数组 for idx, media_item in enumerate(v): if isinstance(media_item, dict) and media_item.get("type") in ("image", "video", "audio"): # 记录content字段作为媒体路径 content = media_item.get("content") type_nm = media_item.get("type") if content: # 生成正确格式的路径,如"图片元素[5].媒体引用[0].content" media_ref_path = f"{type_nm}-{new_path}[{idx}].content" media_dict[media_ref_path] = content # 继续递归遍历 traverse(v, new_path) elif isinstance(obj, list): for idx, item in enumerate(obj): # 对于数组元素,使用方括号索引 new_path = f"{current_path}[{idx}]" traverse(item, new_path) traverse(data) return {"what": what_dict, "media": media_dict} # 使用示例 if __name__ == "__main__": # 连接 MongoDB 数据库 ##################### 存储到mongoDB MONGO_URI = "mongodb://localhost:27017/" DB_NAME = "mydeconstruct" COLL_NAME = "deconstruct" client = MongoClient(MONGO_URI) db = client[DB_NAME] coll = db[COLL_NAME] # 读取并插入 JSON 文件 json_path = "/home/ecs-user/project/colpali/src/deconstruct_res.json" with open(json_path, "r", encoding="utf-8") as f: doc = json.load(f) insert_result = coll.insert_one(doc) inserted_id = insert_result.inserted_id print("已插入 MongoDB,文档 _id:", inserted_id) result = parse_deconstruct_res(doc) print("what 字段映射:", result["what"]) print("媒体引用映射:", result["media"]) ##################### 存储到mongoDB ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus ########## 文本向量库存一份what # 创建 Milvus 集合(如不存在) collection_name = "deconstruct_what" if not utility.has_collection(collection_name): fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560) ] schema = CollectionSchema(fields, description="Deconstruct what embeddings") collection = Collection(name=collection_name, schema=schema) # 创建 IVF_FLAT 索引 index_params = { "metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("embedding", index_params) else: collection = Collection(name=collection_name) # 遍历 result["what"],生成 embeddings 并插入 Milvus entities = [] for key, value in result["what"].items(): embedding = get_basic_embedding(value, model=DEFAULT_MODEL) path = key entities.append({ "mongo_id": str(inserted_id), "path": path, "embedding": embedding }) if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 what 字段向量到 Milvus") else: print("未找到 what 字段,未插入向量") ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus #####################将 result["media"] 中的每个 value 调用多模态编码模型计算embedding并插入Milvus # 创建 Milvus 集合(如不存在) collection_name = "deconstruct_media" if not utility.has_collection(collection_name): fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512), FieldSchema(name="no", dtype=DataType.INT32), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560) ] schema = CollectionSchema(fields, description="Deconstruct media embeddings") collection = Collection(name=collection_name, schema=schema) # 创建 IVF_FLAT 索引 index_params = { "metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("embedding", index_params) else: collection = Collection(name=collection_name) # 遍历 result["media"],生成 embeddings 并插入 Milvus #############存储一份media embedding到Milvus entities = [] for key, value in result["media"].items(): embedding = get_media_embedding(value, model=DEFAULT_MODEL) type = key[:key.index("-")] path = key[key.index("-"):] # 将 embedding 列表拆分为单条向量,并记录其在原列表中的位置 no if isinstance(embedding, list) and len(embedding) > 0: for idx, vec in enumerate(embedding): entities.append({ "mongo_id": str(inserted_id), "type": type, "path": path, "no": idx, "embedding": vec }) else: # 若 embedding 不是列表或长度为 0,则 no 记为 0 entities.append({ "mongo_id": str(inserted_id), "type": type, "path": path, "no": 0, "embedding": embedding }) # 将插入操作移到循环外部,避免重复插入和数据累积 if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 media 字段向量到 Milvus") else: print("未找到有效的 media 字段向量,未插入数据") #############存储一份what 多模态embedding 到Milvus entities = [] for key, value in result["what"].items(): embedding = get_media_embedding(value, model=DEFAULT_MODEL) # type = key[:key.index("-")] # path = key[key.index("-"):] path = key if isinstance(embedding, list) and len(embedding) > 0: for idx, vec in enumerate(embedding): entities.append({ "mongo_id": str(inserted_id), "type": "text", "path": path, "no": idx, "embedding": vec }) else: # 若 embedding 不是列表或长度为 0,则 no 记为 0 entities.append({ "mongo_id": str(inserted_id), "type": "text", "path": path, "no": 0, "embedding": embedding }) # 将插入操作移到循环外部,避免重复插入和数据累积 if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 what 多模态向量到 Milvus") else: print("未找到有效的 what 多模态向量,未插入数据") #############存储一份what 多模态embedding 到Milvus