| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
- import requests
- import json
- from typing import Dict, Any, List
- from pymongo import MongoClient
- from pydub import AudioSegment
- import io
- from scipy.io import wavfile
- import numpy as np
- ################################连接milvus数据库 A
- # 配置信息
- MILVUS_CONFIG = {
- "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
- "user": "root",
- "password": "Piaoquan@2025",
- "port": "19530",
- }
- print("正在连接 Milvus 数据库...")
- connections.connect("default", **MILVUS_CONFIG)
- print("连接成功!")
- ################################连接milvus数据库 B
- ################################连接Embedding service A
- # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地
- # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip
- VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
- DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
- def get_basic_embedding(text: str, model=DEFAULT_MODEL):
- """通过HTTP调用在线embedding服务"""
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "model": model,
- "input": text
- }
-
- response = requests.post(
- VLLM_SERVER_URL,
- headers=headers,
- json=data,
- timeout=5 # 添加超时设置
- )
- response.raise_for_status() # 如果状态码不是200,抛出异常
- result = response.json()
- return result["data"][0]["embedding"]
- def parse_pattern_res(json_data) -> Dict[str, Dict[str, str]]:
- """
- 解析 pattern_res.json 文件,提取两类信息:
- 1. 所有 "模式ID","模式命名","模式说明" 字段的 path 与 value 映射
-
- 返回:
- {
- "模式ID": {path: value, ...},
- "模式命名": {path: value, ...},
- "模式说明": {path: value, ...}
- }
- """
- data = json_data
- pattern_dict: Dict[str, Any] = {}
- def traverse(obj: Any, current_path: str = ""):
- """递归遍历 JSON 结构,记录目标字段"""
- if isinstance(obj, dict):
- for k, v in obj.items():
- # 构建新路径,避免在开头添加点号
- new_path = f"{current_path}.{k}" if current_path else k
- if k == "模式ID":
- # 当遇到“模式ID”时,同时获取同层的“模式命名”和“模式描述”
- temp_dict ={}
- temp_dict["模式ID"] = v
- temp_dict["模式命名"] = obj.get("模式命名", "")
- temp_dict["模式说明"] = obj.get("模式说明", "")
- pattern_dict[current_path] = temp_dict
- traverse(v, new_path)
-
- elif isinstance(obj, list):
- for idx, item in enumerate(obj):
- # 对于数组元素,使用方括号索引
- new_path = f"{current_path}[{idx}]"
- traverse(item, new_path)
- traverse(data)
- return {"pattern": pattern_dict}
- # 使用示例
- if __name__ == "__main__":
- # 连接 MongoDB 数据库
- ##################### 存储到mongoDB
- MONGO_URI = "mongodb://localhost:27017/"
- DB_NAME = "mydeconstruct"
- COLL_NAME = "deconstruct_how"
- client = MongoClient(MONGO_URI)
- db = client[DB_NAME]
- coll = db[COLL_NAME]
- # 读取并插入 JSON 文件
- json_path = "/home/ecs-user/project/colpali/src/pattern_res.json"
- with open(json_path, "r", encoding="utf-8") as f:
- doc = json.load(f)
- result = parse_pattern_res(doc)
- for key, value in result["pattern"].items():
- print(f"pattern 字段 {key} 的值为: {value}")
- # exit()
- insert_result = coll.insert_one(doc)
- inserted_id = insert_result.inserted_id
- ##################### 将 result["how"] 中的每个 value 转换为向量并插入 Milvus
- ########## 文本向量库存一份how
- # 创建 Milvus 集合(如不存在)
- collection_name = "deconstruct_pattern"
- if not utility.has_collection(collection_name):
- # utility.drop_collection(collection_name)
- fields = [
- FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
- FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="pattern_id", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="pattern_name", dtype=DataType.VARCHAR, max_length=128),
- FieldSchema(name="pattern_desc", dtype=DataType.VARCHAR, max_length=2048),
- FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
- FieldSchema(name="name_embedding", dtype=DataType.FLOAT_VECTOR, dim=2560),
- FieldSchema(name="desc_embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
- ]
- schema = CollectionSchema(fields, description="Deconstruct how embeddings")
- collection = Collection(name=collection_name, schema=schema)
- # 创建 IVF_FLAT 索引
- index_params = {
- "metric_type": "IP",
- "index_type": "IVF_FLAT",
- "params": {"nlist": 128}
- }
- # 为 pattern_id 字段创建字符串索引
- collection.create_index("pattern_id", {
- "index_type": "INVERTED" #"Trie"
- })
- collection.create_index("name_embedding", index_params)
- collection.create_index("desc_embedding", index_params)
- else:
- collection = Collection(name=collection_name)
- entities = []
- for key, value in result["pattern"].items():
- pattern_id = value["模式ID"]
- pattern_name = value["模式命名"]
- pattern_desc = value["模式说明"]
- ### 访问可达则替换
- # name_embedding = get_basic_embedding(pattern_name, model=DEFAULT_MODEL)
- # desc_embedding = get_basic_embedding(pattern_desc, model=DEFAULT_MODEL)
- ###
- name_embedding = np.random.rand(2560).tolist()
- desc_embedding = np.random.rand(2560).tolist()
- path = key
- entities.append({
- "mongo_id": str(inserted_id),
- "pattern_id": pattern_id,
- "pattern_name": pattern_name,
- "pattern_desc": pattern_desc,
- "path": path,
- "name_embedding": name_embedding,
- "desc_embedding": desc_embedding
- })
- # 遍历 result["pattern"],生成 embeddings 并插入 Milvus
- # print("entities is ", entities)
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 how 字段向量到 Milvus")
- else:
- print("未找到 how 字段,未插入向量")
|