from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType import requests import json from typing import Dict, Any, List from pymongo import MongoClient from pydub import AudioSegment import io from scipy.io import wavfile import numpy as np ################################连接milvus数据库 A # 配置信息 MILVUS_CONFIG = { "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com", "user": "root", "password": "Piaoquan@2025", "port": "19530", } print("正在连接 Milvus 数据库...") connections.connect("default", **MILVUS_CONFIG) print("连接成功!") ################################连接milvus数据库 B ################################连接Embedding service A # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地 # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings" DEFAULT_MODEL = "/models/Qwen3-Embedding-4B" def get_basic_embedding(text: str, model=DEFAULT_MODEL): """通过HTTP调用在线embedding服务""" headers = { "Content-Type": "application/json" } data = { "model": model, "input": text } response = requests.post( VLLM_SERVER_URL, headers=headers, json=data, timeout=5 # 添加超时设置 ) response.raise_for_status() # 如果状态码不是200,抛出异常 result = response.json() return result["data"][0]["embedding"] def parse_how_res(json_data) -> Dict[str, Dict[str, str]]: """ 解析 how_res.json 文件,提取两类信息: 1. 所有 "how","why" 字段的 path 与 value 映射 返回: { "how": {path: value, ...}, "why": {path: value, ...} } """ data = json_data how_dict: Dict[str, Any] = {} why_dict: Dict[str, Any] = {} def traverse(obj: Any, current_path: str = ""): """递归遍历 JSON 结构,记录目标字段""" if isinstance(obj, dict): for k, v in obj.items(): # 构建新路径,避免在开头添加点号 new_path = f"{current_path}.{k}" if current_path else k if k == "how": how_dict[new_path] = v elif k == "why": why_dict[new_path] = v # 继续递归遍历 traverse(v, new_path) elif isinstance(obj, list): for idx, item in enumerate(obj): # 对于数组元素,使用方括号索引 new_path = f"{current_path}[{idx}]" traverse(item, new_path) traverse(data) return {"how": how_dict, "why": why_dict} # 使用示例 if __name__ == "__main__": # 连接 MongoDB 数据库 ##################### 存储到mongoDB MONGO_URI = "mongodb://localhost:27017/" DB_NAME = "mydeconstruct" COLL_NAME = "deconstruct_how" client = MongoClient(MONGO_URI) db = client[DB_NAME] coll = db[COLL_NAME] # 读取并插入 JSON 文件 json_path = "/home/ecs-user/project/colpali/src/how_res.json" with open(json_path, "r", encoding="utf-8") as f: doc = json.load(f) result = parse_how_res(doc) # print("how 字段映射:", result["how"]) # print("why 字段映射:", result["why"]) for key, value in result["how"].items(): print(f"how 字段 {key} 的值为: {value}") for key, value in result["why"].items(): print(f"why 字段 {key} 的值为: {value}") insert_result = coll.insert_one(doc) inserted_id = insert_result.inserted_id ##################### 将 result["how"] 中的每个 value 转换为向量并插入 Milvus ########## 文本向量库存一份how # 创建 Milvus 集合(如不存在) collection_name = "deconstruct_how" if not utility.has_collection(collection_name): fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560) ] schema = CollectionSchema(fields, description="Deconstruct how embeddings") collection = Collection(name=collection_name, schema=schema) # 创建 IVF_FLAT 索引 index_params = { "metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 128} } collection.create_index("embedding", index_params) else: collection = Collection(name=collection_name) entities = [] for key, value in result["how"].items(): ### 访问可达则替换 # embedding = get_basic_embedding(value, model=DEFAULT_MODEL) ### embedding = np.random.rand(2560).tolist() path = key entities.append({ "mongo_id": str(inserted_id), "type": "how", "path": path, "embedding": embedding }) # 遍历 result["why"],生成 embeddings 并插入 Milvus if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 how 字段向量到 Milvus") else: print("未找到 how 字段,未插入向量") entities = [] for key, value in result["why"].items(): # embedding = get_basic_embedding(value, model=DEFAULT_MODEL) embedding = np.random.rand(2560).tolist() path = key entities.append({ "mongo_id": str(inserted_id), "type": "why", "path": path, "embedding": embedding }) if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 why 字段向量到 Milvus") else: print("未找到 why 字段,未插入向量")