| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
- import requests
- import json
- from typing import Dict, Any, List
- from pymongo import MongoClient
- from pydub import AudioSegment
- import io
- from scipy.io import wavfile
- import numpy as np
- ################################连接milvus数据库 A
- # 配置信息
- MILVUS_CONFIG = {
- "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
- "user": "root",
- "password": "Piaoquan@2025",
- "port": "19530",
- }
- print("正在连接 Milvus 数据库...")
- connections.connect("default", **MILVUS_CONFIG)
- print("连接成功!")
- ################################连接milvus数据库 B
- ################################连接Embedding service A
- # 注意:根据之前的讨论,需要通过SSH隧道将远程服务转发到本地
- # 在本地机器上执行: ssh -R 8000:192.168.100.31:8000 username@server_ip
- VLLM_SERVER_URL = "http://192.168.100.31:8000/v1/embeddings"
- DEFAULT_MODEL = "/models/Qwen3-Embedding-4B"
- def get_basic_embedding(text: str, model=DEFAULT_MODEL):
- """通过HTTP调用在线embedding服务"""
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "model": model,
- "input": text
- }
-
- response = requests.post(
- VLLM_SERVER_URL,
- headers=headers,
- json=data,
- timeout=5 # 添加超时设置
- )
- response.raise_for_status() # 如果状态码不是200,抛出异常
- result = response.json()
- return result["data"][0]["embedding"]
- def parse_how_res(json_data) -> Dict[str, Dict[str, str]]:
- """
- 解析 how_res.json 文件,提取两类信息:
- 1. 所有 "how","why" 字段的 path 与 value 映射
-
- 返回:
- {
- "how": {path: value, ...},
- "why": {path: value, ...}
- }
- """
- data = json_data
- how_dict: Dict[str, Any] = {}
- why_dict: Dict[str, Any] = {}
- def traverse(obj: Any, current_path: str = ""):
- """递归遍历 JSON 结构,记录目标字段"""
- if isinstance(obj, dict):
- for k, v in obj.items():
- # 构建新路径,避免在开头添加点号
- new_path = f"{current_path}.{k}" if current_path else k
-
- if k == "how":
- how_dict[new_path] = v
- elif k == "why":
- why_dict[new_path] = v
-
- # 继续递归遍历
- traverse(v, new_path)
-
- elif isinstance(obj, list):
- for idx, item in enumerate(obj):
- # 对于数组元素,使用方括号索引
- new_path = f"{current_path}[{idx}]"
- traverse(item, new_path)
- traverse(data)
- return {"how": how_dict, "why": why_dict}
- # 使用示例
- if __name__ == "__main__":
-
- # 连接 MongoDB 数据库
- ##################### 存储到mongoDB
- MONGO_URI = "mongodb://localhost:27017/"
- DB_NAME = "mydeconstruct"
- COLL_NAME = "deconstruct_how"
- client = MongoClient(MONGO_URI)
- db = client[DB_NAME]
- coll = db[COLL_NAME]
- # 读取并插入 JSON 文件
- json_path = "/home/ecs-user/project/colpali/src/how_res.json"
- with open(json_path, "r", encoding="utf-8") as f:
- doc = json.load(f)
- result = parse_how_res(doc)
- # print("how 字段映射:", result["how"])
- # print("why 字段映射:", result["why"])
- for key, value in result["how"].items():
- print(f"how 字段 {key} 的值为: {value}")
- for key, value in result["why"].items():
- print(f"why 字段 {key} 的值为: {value}")
- insert_result = coll.insert_one(doc)
- inserted_id = insert_result.inserted_id
- ##################### 将 result["how"] 中的每个 value 转换为向量并插入 Milvus
- ########## 文本向量库存一份how
- # 创建 Milvus 集合(如不存在)
- collection_name = "deconstruct_how"
- if not utility.has_collection(collection_name):
- fields = [
- FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
- FieldSchema(name="mongo_id", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="type", dtype=DataType.VARCHAR, max_length=64),
- FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=512),
- FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=2560)
- ]
- schema = CollectionSchema(fields, description="Deconstruct how embeddings")
- collection = Collection(name=collection_name, schema=schema)
- # 创建 IVF_FLAT 索引
- index_params = {
- "metric_type": "IP",
- "index_type": "IVF_FLAT",
- "params": {"nlist": 128}
- }
- collection.create_index("embedding", index_params)
- else:
- collection = Collection(name=collection_name)
- entities = []
- for key, value in result["how"].items():
- ### 访问可达则替换
- # embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
- ###
- embedding = np.random.rand(2560).tolist()
- path = key
- entities.append({
- "mongo_id": str(inserted_id),
- "type": "how",
- "path": path,
- "embedding": embedding
- })
- # 遍历 result["why"],生成 embeddings 并插入 Milvus
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 how 字段向量到 Milvus")
- else:
- print("未找到 how 字段,未插入向量")
- entities = []
- for key, value in result["why"].items():
- # embedding = get_basic_embedding(value, model=DEFAULT_MODEL)
-
- embedding = np.random.rand(2560).tolist()
- path = key
- entities.append({
- "mongo_id": str(inserted_id),
- "type": "why",
- "path": path,
- "embedding": embedding
- })
- if entities:
- collection.insert(entities)
- collection.flush()
- print(f"已插入 {len(entities)} 条 why 字段向量到 Milvus")
- else:
- print("未找到 why 字段,未插入向量")
|