from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType import requests import json from typing import Dict, Any, List from pymongo import MongoClient from pydub import AudioSegment import io from scipy.io import wavfile import numpy as np ################################连接milvus数据库 A # 配置信息 MILVUS_CONFIG = { "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com", "user": "root", "password": "Piaoquan@2025", "port": "19530", } print("正在连接 Milvus 数据库...") connections.connect("default", **MILVUS_CONFIG) print("连接成功!") ################################连接milvus数据库 B ################################连接Embedding service A # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地 # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings" DEFAULT_MODEL = "/models/Qwen3-Embedding-4B" def get_basic_embedding(text: str, model=DEFAULT_MODEL): """通过HTTP调用在线embedding服务""" headers = { "Content-Type": "application/json" } data = { "model": model, "input": text } response = requests.post( VLLM_SERVER_URL, headers=headers, json=data, timeout=5 # 添加超时设置 ) response.raise_for_status() # 如果状态码不是200,抛出异常 result = response.json() return result["data"][0]["embedding"] def parse_pattern_res(json_data) -> Dict[str, Dict[str, str]]: """ 解析 pattern_res.json 文件,提取两类信息: 1. 所有 "模式ID","模式命名","模式说明" 字段的 path 与 value 映射 返回: { "模式ID": {path: value, ...}, "模式命名": {path: value, ...}, "模式说明": {path: value, ...} } """ data = json_data pattern_dict: Dict[str, Any] = {} def traverse(obj: Any, current_path: str = ""): """递归遍历 JSON 结构,记录目标字段""" if isinstance(obj, dict): for k, v in obj.items(): # 构建新路径,避免在开头添加点号 new_path = f"{current_path}.{k}" if current_path else k if k == "模式ID": # 当遇到“模式ID”时,同时获取同层的“模式命名”和“模式描述” temp_dict ={} temp_dict["模式ID"] = v temp_dict["模式命名"] = obj.get("模式命名", "") temp_dict["模式说明"] = obj.get("模式说明", "") pattern_dict[current_path] = temp_dict traverse(v, new_path) elif isinstance(obj, list): for idx, item in enumerate(obj): # 对于数组元素,使用方括号索引 new_path = f"{current_path}[{idx}]" traverse(item, new_path) traverse(data) return {"pattern": pattern_dict} # 使用示例 if __name__ == "__main__": # 连接 MongoDB 数据库 ##################### 存储到mongoDB MONGO_URI = "mongodb://localhost:27017/" DB_NAME = "mydeconstruct" COLL_NAME = "deconstruct_how" client = MongoClient(MONGO_URI) db = client[DB_NAME] coll = db[COLL_NAME] # 读取并插入 JSON 文件 json_path = "/home/ecs-user/project/colpali/src/pattern_res.json" with open(json_path, "r", encoding="utf-8") as f: doc = json.load(f) result = parse_pattern_res(doc) for key, value in result["pattern"].items(): print(f"pattern 字段 {key} 的值为: {value}") # exit() insert_result = coll.insert_one(doc) inserted_id = insert_result.inserted_id ##################### 将 result["how"] 中的每个 value 转换为向量并插入 Milvus ########## 文本向量库存一份how # 创建 Milvus 集合(如不存在) collection_name = "deconstruct_pattern" if not utility.has_collection(collection_name): # utility.drop_collection(collection_name) fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="pattern_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="pattern_name", dtype=DataType.VARCHAR, max_length=128), FieldSchema(name="pattern_desc", dtype=DataType.VARCHAR, max_length=2048), FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512), FieldSchema(name="name_embedding", dtype=DataType.FLOAT_VECTOR, dim=2560), FieldSchema(name="desc_embedding", dtype=DataType.FLOAT_VECTOR, dim=2560) ] schema = CollectionSchema(fields, description="Deconstruct how embeddings") collection = Collection(name=collection_name, schema=schema) # 创建 IVF_FLAT 索引 index_params = { "metric_type": "IP", "index_type": "IVF_FLAT", "params": {"nlist": 128} } # 为 pattern_id 字段创建字符串索引 collection.create_index("pattern_id", { "index_type": "INVERTED" #"Trie" }) collection.create_index("name_embedding", index_params) collection.create_index("desc_embedding", index_params) else: collection = Collection(name=collection_name) entities = [] for key, value in result["pattern"].items(): pattern_id = value["模式ID"] pattern_name = value["模式命名"] pattern_desc = value["模式说明"] ### 访问可达则替换 # name_embedding = get_basic_embedding(pattern_name, model=DEFAULT_MODEL) # desc_embedding = get_basic_embedding(pattern_desc, model=DEFAULT_MODEL) ### name_embedding = np.random.rand(2560).tolist() desc_embedding = np.random.rand(2560).tolist() path = key entities.append({ "mongo_id": str(inserted_id), "pattern_id": pattern_id, "pattern_name": pattern_name, "pattern_desc": pattern_desc, "path": path, "name_embedding": name_embedding, "desc_embedding": desc_embedding }) # 遍历 result["pattern"],生成 embeddings 并插入 Milvus # print("entities is ", entities) if entities: collection.insert(entities) collection.flush() print(f"已插入 {len(entities)} 条 how 字段向量到 Milvus") else: print("未找到 how 字段,未插入向量")