| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
- import requests
- import json
- from typing import Dict, Any, List
- from pymongo import MongoClient
- from pydub import AudioSegment
- import io
- from scipy.io import wavfile
- ################################连接milvus数据库 A
- # 配置信息
- MILVUS_CONFIG = {
- "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
- "user": "root",
- "password": "Piaoquan@2025",
- "port": "19530",
- }
- print("正在连接 Milvus 数据库...")
- connections.connect("default", **MILVUS_CONFIG)
- print("连接成功!")
- ################################连接milvus数据库 B
- ##################################引入多模态模型#################
- import torch
- from PIL import Image
- from transformers.utils.import_utils import is_flash_attn_2_available
- from colpali_engine.models import ColQwen2_5Omni, ColQwen2_5OmniProcessor
- model = ColQwen2_5Omni.from_pretrained(
- "vidore/colqwen-omni-v0.1",
- torch_dtype=torch.bfloat16,
- device_map="cuda", # or "mps" if on Apple Silicon
- attn_implementation="flash_attention_2" if is_flash_attn_2_available() else None,
- ).eval()
- processor = ColQwen2_5OmniProcessor.from_pretrained("manu/colqwen-omni-v0.1")
- ##################################引入模型#################
- ################################连接Embedding service A
- # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地
- # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip
- VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
- DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
- def get_basic_embedding(text: str, model=DEFAULT_MODEL):
- """通过HTTP调用在线embedding服务"""
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "model": model,
- "input": text
- }
-
- response = requests.post(
- VLLM_SERVER_URL,
- headers=headers,
- json=data,
- timeout=5 # 添加超时设置
- )
- response.raise_for_status() # 如果状态码不是200,抛出异常
- result = response.json()
- return result["data"][0]["embedding"]
- def get_media_embedding(query: str, type: str):
- '''
- query 是查询字符串或文件路径
- type 是查询类型,可选值为 "audio", "image", "video", "text"
- k 是返回的结果数量,默认值为 3
- audio image video 的query为路径
- text的query为问题本身
- '''
- if type =="audio":
- batch_queries = processor.process_audios([query]).to(model.device)
- elif type =="image":
- query_image = Image.open(query)
- batch_queries = processor.process_images([query_image]).to(model.device)
- elif type =="video":
- batch_queries = processor.process_videos([query]).to(model.device)
- elif type =="text":
- batch_queries = processor.process_queries([query]).to(model.device)
- # Forward pass
- with torch.no_grad():
- query_embeddings = model(**batch_queries)
- return query_embeddings
- # # scores = processor.score_multi_vector(query_embeddings, ds)
- # print("score is ", scores)
- # # get top-5 scores
- # return scores[0].topk(k).indices.tolist()
- # ################################连接Embedding service B
- def parse_deconstruct_res(json_data) -> Dict[str, Dict[str, str]]:
- """
- 解析 deconstruct_res.json 文件,提取两类信息:
- 1. 所有 "what" 字段的 path 与 value 映射
- 2. 所有类型为 "image" 或 "video" 的媒体引用 path 与 content 值映射
-
- 返回:
- {
- "what": {path: value, ...},
- "media": {path: value, ...}
- }
- """
- data = json_data
- what_dict: Dict[str, Any] = {}
- media_dict: Dict[str, Any] = {}
- def traverse(obj: Any, current_path: str = ""):
- """递归遍历 JSON 结构,记录目标字段"""
- if isinstance(obj, dict):
- for k, v in obj.items():
- # 构建新路径,避免在开头添加点号
- new_path = f"{current_path}.{k}" if current_path else k
-
- if k == "what":
- what_dict[new_path] = v
- # 处理媒体引用字段
- elif k == "媒体引用" and isinstance(v, list):
- # 遍历媒体引用数组
- for idx, media_item in enumerate(v):
- if isinstance(media_item, dict) and media_item.get("type") in ("image", "video", "audio"):
- # 记录content字段作为媒体路径
- content = media_item.get("content")
- type_nm = media_item.get("type")
- if content:
- # 生成正确格式的路径,如"图片元素[5].媒体引用[0].content"
- media_ref_path = f"{type_nm}-{new_path}[{idx}].content"
- media_dict[media_ref_path] = content
-
- # 继续递归遍历
- traverse(v, new_path)
-
- elif isinstance(obj, list):
- for idx, item in enumerate(obj):
- # 对于数组元素,使用方括号索引
- new_path = f"{current_path}[{idx}]"
- traverse(item, new_path)
- traverse(data)
- return {"what": what_dict, "media": media_dict}
- # 使用示例
- if __name__ == "__main__":
-
- # 连接 MongoDB 数据库
- ##################### 存储到mongoDB
- MONGO_URI = "mongodb://localhost:27017/"
- DB_NAME = "mydeconstruct"
- COLL_NAME = "deconstruct"
- client = MongoClient(MONGO_URI)
- db = client[DB_NAME]
- coll = db[COLL_NAME]
- # 读取并插入 JSON 文件
- json_path = "/home/ecs-user/project/colpali/src/deconstruct_res.json"
- with open(json_path, "r", encoding="utf-8") as f:
- doc = json.load(f)
- insert_result = coll.insert_one(doc)
- inserted_id = insert_result.inserted_id
- print("已插入 MongoDB,文档 _id:", inserted_id)
- result = parse_deconstruct_res(doc)
- print("what 字段映射:", result["what"])
- print("媒体引用映射:", result["media"])
- ##################### 存储到mongoDB
- ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus
- ########## 文本向量库存一份what
- # 创建 Milvus 集合(如不存在)
- collection_name = "deconstruct_what"
- if not utility.has_collection(collection_name):
- fields = [
- FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
- FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
- FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
- ]
- schema = CollectionSchema(fields, description="Deconstruct what embeddings")
- collection = Collection(name=collection_name, schema=schema)
- # 创建 IVF_FLAT 索引
- index_params = {
- "metric_type": "IP",
- "index_type": "IVF_FLAT",
- "params": {"nlist": 128}
- }
- collection.create_index("embedding", index_params)
- else:
- collection = Collection(name=collection_name)
- # 遍历 result["what"],生成 embeddings 并插入 Milvus
- entities = []
- for key, value in result["what"].items():
- embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
- path = key
- entities.append({
- "mongo_id": str(inserted_id),
- "path": path,
- "embedding": embedding
- })
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 what 字段向量到 Milvus")
- else:
- print("未找到 what 字段,未插入向量")
- ##################### 将 result["what"] 中的每个 value 转换为向量并插入 Milvus
- #####################将 result["media"] 中的每个 value 调用多模态编码模型计算embedding并插入Milvus
- # 创建 Milvus 集合(如不存在)
- collection_name = "deconstruct_media"
- if not utility.has_collection(collection_name):
- fields = [
- FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
- FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
- FieldSchema(name="no", dtype=DataType.INT32),
- FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
- ]
- schema = CollectionSchema(fields, description="Deconstruct media embeddings")
- collection = Collection(name=collection_name, schema=schema)
- # 创建 IVF_FLAT 索引
- index_params = {
- "metric_type": "IP",
- "index_type": "IVF_FLAT",
- "params": {"nlist": 128}
- }
- collection.create_index("embedding", index_params)
- else:
- collection = Collection(name=collection_name)
- # 遍历 result["media"],生成 embeddings 并插入 Milvus
- #############存储一份media embedding到Milvus
- entities = []
- for key, value in result["media"].items():
- embedding = get_media_embedding(value, model=DEFAULT_MODEL)
- type = key[:key.index("-")]
- path = key[key.index("-"):]
- # 将 embedding 列表拆分为单条向量,并记录其在原列表中的位置 no
- if isinstance(embedding, list) and len(embedding) > 0:
- for idx, vec in enumerate(embedding):
- entities.append({
- "mongo_id": str(inserted_id),
- "type": type,
- "path": path,
- "no": idx,
- "embedding": vec
- })
- else:
- # 若 embedding 不是列表或长度为 0,则 no 记为 0
- entities.append({
- "mongo_id": str(inserted_id),
- "type": type,
- "path": path,
- "no": 0,
- "embedding": embedding
- })
- # 将插入操作移到循环外部,避免重复插入和数据累积
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 media 字段向量到 Milvus")
- else:
- print("未找到有效的 media 字段向量,未插入数据")
- #############存储一份what 多模态embedding 到Milvus
- entities = []
- for key, value in result["what"].items():
- embedding = get_media_embedding(value, model=DEFAULT_MODEL)
- # type = key[:key.index("-")]
- # path = key[key.index("-"):]
- path = key
- if isinstance(embedding, list) and len(embedding) > 0:
- for idx, vec in enumerate(embedding):
- entities.append({
- "mongo_id": str(inserted_id),
- "type": "text",
- "path": path,
- "no": idx,
- "embedding": vec
- })
- else:
- # 若 embedding 不是列表或长度为 0,则 no 记为 0
- entities.append({
- "mongo_id": str(inserted_id),
- "type": "text",
- "path": path,
- "no": 0,
- "embedding": embedding
- })
- # 将插入操作移到循环外部,避免重复插入和数据累积
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 what 多模态向量到 Milvus")
- else:
- print("未找到有效的 what 多模态向量,未插入数据")
- #############存储一份what 多模态embedding 到Milvus
|