|
@@ -0,0 +1,312 @@
|
|
|
|
|
+from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
|
|
|
|
|
+import requests
|
|
|
|
|
+import json
|
|
|
|
|
+from typing import Dict, Any, List
|
|
|
|
|
+from pymongo import MongoClient
|
|
|
|
|
+
|
|
|
|
|
+from pydub import AudioSegment
|
|
|
|
|
+import io
|
|
|
|
|
+from scipy.io import wavfile
|
|
|
|
|
+
|
|
|
|
|
+################################连接milvus数据库 A
|
|
|
|
|
+# 配置信息
|
|
|
|
|
+MILVUS_CONFIG = {
|
|
|
|
|
+ "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
|
|
|
|
|
+ "user": "root",
|
|
|
|
|
+ "password": "Piaoquan@2025",
|
|
|
|
|
+ "port": "19530",
|
|
|
|
|
+}
|
|
|
|
|
+print("正在连接 Milvus 数据库...")
|
|
|
|
|
+connections.connect("default", **MILVUS_CONFIG)
|
|
|
|
|
+print("连接成功!")
|
|
|
|
|
+################################连接milvus数据库 B
|
|
|
|
|
+
|
|
|
|
|
+##################################引入多模态模型#################
|
|
|
|
|
+import torch
|
|
|
|
|
+from PIL import Image
|
|
|
|
|
+from transformers.utils.import_utils import is_flash_attn_2_available
|
|
|
|
|
+
|
|
|
|
|
+from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
|
|
|
|
|
+
|
|
|
|
|
+model = ColQwen2_5Omni.from_pretrained(
|
|
|
|
|
+ "vidore/colqwen-omni-v0.1",
|
|
|
|
|
+ torch_dtype=torch.bfloat16,
|
|
|
|
|
+ device_map="cuda", # or "mps" if on Apple Silicon
|
|
|
|
|
+ attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
|
|
|
|
|
+).eval()
|
|
|
|
|
+processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
|
|
|
|
|
+##################################引入模型#################
|
|
|
|
|
+
|
|
|
|
|
+################################连接Embedding service A
|
|
|
|
|
+# 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地
|
|
|
|
|
+# 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip
|
|
|
|
|
+VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
|
|
|
|
|
+DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
|
|
|
|
|
+
|
|
|
|
|
+def get_basic_embedding(text: str, model=DEFAULT_MODEL):
|
|
|
|
|
+ """通过HTTP调用在线embedding服务"""
|
|
|
|
|
+ headers = {
|
|
|
|
|
+ "Content-Type": "application/json"
|
|
|
|
|
+ }
|
|
|
|
|
+ data = {
|
|
|
|
|
+ "model": model,
|
|
|
|
|
+ "input": text
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ response = requests.post(
|
|
|
|
|
+ VLLM_SERVER_URL,
|
|
|
|
|
+ headers=headers,
|
|
|
|
|
+ json=data,
|
|
|
|
|
+ timeout=5 # 添加超时设置
|
|
|
|
|
+ )
|
|
|
|
|
+ response.raise_for_status() # 如果状态码不是200,抛出异常
|
|
|
|
|
+ result = response.json()
|
|
|
|
|
+ return result["data"][0]["embedding"]
|
|
|
|
|
+
|
|
|
|
|
+def get_media_embedding(query: str, type: str):
|
|
|
|
|
+ '''
|
|
|
|
|
+ query 是查询字符串或文件路径
|
|
|
|
|
+ type 是查询类型,可选值为 "audio", "image", "video", "text"
|
|
|
|
|
+ k 是返回的结果数量,默认值为 3
|
|
|
|
|
+ audio image video 的query为路径
|
|
|
|
|
+ text的query为问题本身
|
|
|
|
|
+ '''
|
|
|
|
|
+ if type =="audio":
|
|
|
|
|
+ batch_queries = processor.process_audios([query]).to(model.device)
|
|
|
|
|
+
|
|
|
|
|
+ elif type =="image":
|
|
|
|
|
+ query_image = Image.open(query)
|
|
|
|
|
+ batch_queries = processor.process_images([query_image]).to(model.device)
|
|
|
|
|
+ elif type =="video":
|
|
|
|
|
+ batch_queries = processor.process_videos([query]).to(model.device)
|
|
|
|
|
+ elif type =="text":
|
|
|
|
|
+ batch_queries = processor.process_queries([query]).to(model.device)
|
|
|
|
|
+ # Forward pass
|
|
|
|
|
+ with torch.no_grad():
|
|
|
|
|
+ query_embeddings = model(**batch_queries)
|
|
|
|
|
+ return query_embeddings
|
|
|
|
|
+ # # scores = processor.score_multi_vector(query_embeddings, ds)
|
|
|
|
|
+ # print("score is ", scores)
|
|
|
|
|
+ # # get top-5 scores
|
|
|
|
|
+ # return scores[0].topk(k).indices.tolist()
|
|
|
|
|
+
|
|
|
|
|
+# ################################连接Embedding service B
|
|
|
|
|
+
|
|
|
|
|
+def parse_deconstruct_res(json_data) -> Dict[str, Dict[str, str]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ 解析 deconstruct_res.json 文件,提取两类信息:
|
|
|
|
|
+ 1. 所有 "what" 字段的 path 与 value 映射
|
|
|
|
|
+ 2. 所有类型为 "image" 或 "video" 的媒体引用 path 与 content 值映射
|
|
|
|
|
+
|
|
|
|
|
+ 返回:
|
|
|
|
|
+ {
|
|
|
|
|
+ "what": {path: value, ...},
|
|
|
|
|
+ "media": {path: value, ...}
|
|
|
|
|
+ }
|
|
|
|
|
+ """
|
|
|
|
|
+ data = json_data
|
|
|
|
|
+ what_dict: Dict[str, Any] = {}
|
|
|
|
|
+ media_dict: Dict[str, Any] = {}
|
|
|
|
|
+
|
|
|
|
|
+ def traverse(obj: Any, current_path: str = ""):
|
|
|
|
|
+ """递归遍历 JSON 结构,记录目标字段"""
|
|
|
|
|
+ if isinstance(obj, dict):
|
|
|
|
|
+ for k, v in obj.items():
|
|
|
|
|
+ # 构建新路径,避免在开头添加点号
|
|
|
|
|
+ new_path = f"{current_path}.{k}" if current_path else k
|
|
|
|
|
+
|
|
|
|
|
+ if k == "what":
|
|
|
|
|
+ what_dict[new_path] = v
|
|
|
|
|
+ # 处理媒体引用字段
|
|
|
|
|
+ elif k == "媒体引用" and isinstance(v, list):
|
|
|
|
|
+ # 遍历媒体引用数组
|
|
|
|
|
+ for idx, media_item in enumerate(v):
|
|
|
|
|
+ if isinstance(media_item, dict) and media_item.get("type") in ("image", "video", "audio"):
|
|
|
|
|
+ # 记录content字段作为媒体路径
|
|
|
|
|
+ content = media_item.get("content")
|
|
|
|
|
+ type_nm = media_item.get("type")
|
|
|
|
|
+ if content:
|
|
|
|
|
+ # 生成正确格式的路径,如"图片元素[5].媒体引用[0].content"
|
|
|
|
|
+ media_ref_path = f"{type_nm}-{new_path}[{idx}].content"
|
|
|
|
|
+ media_dict[media_ref_path] = content
|
|
|
|
|
+
|
|
|
|
|
+ # 继续递归遍历
|
|
|
|
|
+ traverse(v, new_path)
|
|
|
|
|
+
|
|
|
|
|
+ elif isinstance(obj, list):
|
|
|
|
|
+ for idx, item in enumerate(obj):
|
|
|
|
|
+ # 对于数组元素,使用方括号索引
|
|
|
|
|
+ new_path = f"{current_path}[{idx}]"
|
|
|
|
|
+ traverse(item, new_path)
|
|
|
|
|
+
|
|
|
|
|
+ traverse(data)
|
|
|
|
|
+ return {"what": what_dict, "media": media_dict}
|
|
|
|
|
+
|
|
|
|
|
+# 使用示例
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+
|
|
|
|
|
+ # 连接 MongoDB 数据库
|
|
|
|
|
+ ##################### 存储到mongoDB
|
|
|
|
|
+
|
|
|
|
|
+ MONGO_URI = "mongodb://localhost:27017/"
|
|
|
|
|
+ DB_NAME = "mydeconstruct"
|
|
|
|
|
+ COLL_NAME = "deconstruct"
|
|
|
|
|
+
|
|
|
|
|
+ client = MongoClient(MONGO_URI)
|
|
|
|
|
+ db = client[DB_NAME]
|
|
|
|
|
+ coll = db[COLL_NAME]
|
|
|
|
|
+
|
|
|
|
|
+ # 读取并插入 JSON 文件
|
|
|
|
|
+ json_path = "/home/ecs-user/project/colpali/src/deconstruct_res.json"
|
|
|
|
|
+
|
|
|
|
|
+ with open(json_path, "r", encoding="utf-8") as f:
|
|
|
|
|
+ doc = json.load(f)
|
|
|
|
|
+
|
|
|
|
|
+ insert_result = coll.insert_one(doc)
|
|
|
|
|
+ inserted_id = insert_result.inserted_id
|
|
|
|
|
+ print("已插入 MongoDB,文档 _id:", inserted_id)
|
|
|
|
|
+
|
|
|
|
|
+ result = parse_deconstruct_res(doc)
|
|
|
|
|
+ print("what 字段映射:", result["what"])
|
|
|
|
|
+ print("媒体引用映射:", result["media"])
|
|
|
|
|
+
|
|
|
|
|
+ ##################### 存储到mongoDB
|
|
|
|
|
+
|
|
|
|
|
+ ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus
|
|
|
|
|
+ ########## 文本向量库存一份what
|
|
|
|
|
+ # 创建 Milvus 集合(如不存在)
|
|
|
|
|
+ collection_name = "deconstruct_what"
|
|
|
|
|
+ if not utility.has_collection(collection_name):
|
|
|
|
|
+ fields = [
|
|
|
|
|
+ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
|
|
|
|
|
+ FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
|
|
|
|
|
+ FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
|
|
|
|
|
+ FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
|
|
|
|
|
+ ]
|
|
|
|
|
+ schema = CollectionSchema(fields, description="Deconstruct what embeddings")
|
|
|
|
|
+ collection = Collection(name=collection_name, schema=schema)
|
|
|
|
|
+ # 创建 IVF_FLAT 索引
|
|
|
|
|
+ index_params = {
|
|
|
|
|
+ "metric_type": "IP",
|
|
|
|
|
+ "index_type": "IVF_FLAT",
|
|
|
|
|
+ "params": {"nlist": 128}
|
|
|
|
|
+ }
|
|
|
|
|
+ collection.create_index("embedding", index_params)
|
|
|
|
|
+ else:
|
|
|
|
|
+ collection = Collection(name=collection_name)
|
|
|
|
|
+
|
|
|
|
|
+ # 遍历 result["what"],生成 embeddings 并插入 Milvus
|
|
|
|
|
+ entities = []
|
|
|
|
|
+ for key, value in result["what"].items():
|
|
|
|
|
+ embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
|
|
|
|
|
+ path = key
|
|
|
|
|
+ entities.append({
|
|
|
|
|
+ "mongo_id": str(inserted_id),
|
|
|
|
|
+ "path": path,
|
|
|
|
|
+ "embedding": embedding
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ if entities:
|
|
|
|
|
+ collection.insert(entities)
|
|
|
|
|
+ collection.flush()
|
|
|
|
|
+ print(f"已插入 {len(entities)} 条 what 字段向量到 Milvus")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("未找到 what 字段,未插入向量")
|
|
|
|
|
+ ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus
|
|
|
|
|
+
|
|
|
|
|
+ #####################将 result["media"] 中的每个 value 调用多模态编码模型计算embedding并插入Milvus
|
|
|
|
|
+ # 创建 Milvus 集合(如不存在)
|
|
|
|
|
+ collection_name = "deconstruct_media"
|
|
|
|
|
+ if not utility.has_collection(collection_name):
|
|
|
|
|
+ fields = [
|
|
|
|
|
+ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
|
|
|
|
|
+ FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
|
|
|
|
|
+ FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64),
|
|
|
|
|
+ FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
|
|
|
|
|
+ FieldSchema(name="no", dtype=DataType.INT32),
|
|
|
|
|
+ FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
|
|
|
|
|
+ ]
|
|
|
|
|
+ schema = CollectionSchema(fields, description="Deconstruct media embeddings")
|
|
|
|
|
+ collection = Collection(name=collection_name, schema=schema)
|
|
|
|
|
+ # 创建 IVF_FLAT 索引
|
|
|
|
|
+ index_params = {
|
|
|
|
|
+ "metric_type": "IP",
|
|
|
|
|
+ "index_type": "IVF_FLAT",
|
|
|
|
|
+ "params": {"nlist": 128}
|
|
|
|
|
+ }
|
|
|
|
|
+ collection.create_index("embedding", index_params)
|
|
|
|
|
+ else:
|
|
|
|
|
+ collection = Collection(name=collection_name)
|
|
|
|
|
+ # 遍历 result["media"],生成 embeddings 并插入 Milvus
|
|
|
|
|
+ #############存储一份media embedding到Milvus
|
|
|
|
|
+ entities = []
|
|
|
|
|
+ for key, value in result["media"].items():
|
|
|
|
|
+ embedding = get_media_embedding(value, model=DEFAULT_MODEL)
|
|
|
|
|
+ type = key[:key.index("-")]
|
|
|
|
|
+ path = key[key.index("-"):]
|
|
|
|
|
+ # 将 embedding 列表拆分为单条向量,并记录其在原列表中的位置 no
|
|
|
|
|
+ if isinstance(embedding, list) and len(embedding) > 0:
|
|
|
|
|
+ for idx, vec in enumerate(embedding):
|
|
|
|
|
+ entities.append({
|
|
|
|
|
+ "mongo_id": str(inserted_id),
|
|
|
|
|
+ "type": type,
|
|
|
|
|
+ "path": path,
|
|
|
|
|
+ "no": idx,
|
|
|
|
|
+ "embedding": vec
|
|
|
|
|
+ })
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 若 embedding 不是列表或长度为 0,则 no 记为 0
|
|
|
|
|
+ entities.append({
|
|
|
|
|
+ "mongo_id": str(inserted_id),
|
|
|
|
|
+ "type": type,
|
|
|
|
|
+ "path": path,
|
|
|
|
|
+ "no": 0,
|
|
|
|
|
+ "embedding": embedding
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 将插入操作移到循环外部,避免重复插入和数据累积
|
|
|
|
|
+ if entities:
|
|
|
|
|
+ collection.insert(entities)
|
|
|
|
|
+ collection.flush()
|
|
|
|
|
+ print(f"已插入 {len(entities)} 条 media 字段向量到 Milvus")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("未找到有效的 media 字段向量,未插入数据")
|
|
|
|
|
+
|
|
|
|
|
+ #############存储一份what 多模态embedding 到Milvus
|
|
|
|
|
+ entities = []
|
|
|
|
|
+ for key, value in result["what"].items():
|
|
|
|
|
+ embedding = get_media_embedding(value, model=DEFAULT_MODEL)
|
|
|
|
|
+ # type = key[:key.index("-")]
|
|
|
|
|
+ # path = key[key.index("-"):]
|
|
|
|
|
+ path = key
|
|
|
|
|
+ if isinstance(embedding, list) and len(embedding) > 0:
|
|
|
|
|
+ for idx, vec in enumerate(embedding):
|
|
|
|
|
+ entities.append({
|
|
|
|
|
+ "mongo_id": str(inserted_id),
|
|
|
|
|
+ "type": "text",
|
|
|
|
|
+ "path": path,
|
|
|
|
|
+ "no": idx,
|
|
|
|
|
+ "embedding": vec
|
|
|
|
|
+ })
|
|
|
|
|
+ else:
|
|
|
|
|
+ # 若 embedding 不是列表或长度为 0,则 no 记为 0
|
|
|
|
|
+ entities.append({
|
|
|
|
|
+ "mongo_id": str(inserted_id),
|
|
|
|
|
+ "type": "text",
|
|
|
|
|
+ "path": path,
|
|
|
|
|
+ "no": 0,
|
|
|
|
|
+ "embedding": embedding
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 将插入操作移到循环外部,避免重复插入和数据累积
|
|
|
|
|
+ if entities:
|
|
|
|
|
+ collection.insert(entities)
|
|
|
|
|
+ collection.flush()
|
|
|
|
|
+ print(f"已插入 {len(entities)} 条 what 多模态向量到 Milvus")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print("未找到有效的 what 多模态向量,未插入数据")
|
|
|
|
|
+ #############存储一份what 多模态embedding 到Milvus
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|