| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- from ast import Import
- from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
- import requests
- import json
- from typing import Dict, Any, List
- from pymongo import MongoClient
- from bson import ObjectId
- from pydub import AudioSegment
- import io
- from scipy.io import wavfile
- ################################连接milvus数据库 A
- # 配置信息
- MILVUS_CONFIG = {
- "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
- "user": "root",
- "password": "Piaoquan@2025",
- "port": "19530",
- }
- print("正在连接 Milvus 数据库...")
- connections.connect("default", **MILVUS_CONFIG)
- print("连接成功!")
- ################################连接milvus数据库 B
- ##################### mongoDB
- MONGO_URI = "mongodb://localhost:27017/"
- DB_NAME = "mydeconstruct"
- COLL_NAME = "deconstruct_how"
- client = MongoClient(MONGO_URI)
- db = client[DB_NAME]
- coll = db[COLL_NAME]
- ##################### mongoDB
- ##################### 路径解析返回
- def resolve_mongo_path(mongo_id: str, path: str):
- """
- 根据 mongo_id 与形如 '文本元素[1].子节点元素[0].what' 的路径字符串,
- 从 MongoDB 中定位并返回对应的对象。
- """
- doc = coll.find_one({"_id": ObjectId(mongo_id)})
- if not doc:
- return None
- # 将路径按 '.' 分割,逐级访问
- parts = path.split('.')
- current = doc
- for part in parts:
- # 处理数组索引,如 子节点元素[0]
- if '[' in part and part.endswith(']'):
- key, idx_str = part.split('[', 1)
- idx = int(idx_str[:-1]) # 去掉 ']'
- current = current[key][idx]
- else:
- current = current[part]
- return current
- ##################### 路径解析返回
- search_mode ="why_search" # "why_search"
- if search_mode == "how_search":
- ##################query what
- ##################
- milvus_client = Collection(name="deconstruct_how")
- milvus_client.load()
- collection_name = "deconstruct_how"
- if not utility.has_collection(collection_name):
- print(f"no collection named {collection_name}")
- else:
- # 查询并打印 collection 中的所有记录
- print(f"正在查询 collection '{collection_name}' 中的所有记录...")
- try:
- # 使用 query 方法获取所有记录,不设置过滤条件
- all_records = milvus_client.query(
- expr="mongo_id >\"10000000\" and type == \"how\"", # 空表达式表示查询所有
- output_fields=["mongo_id","type","path"], # 输出所有字段
- limit=10 # 设置一个较大的上限,确保能获取全部
- )
- print(f"共查询到 {len(all_records)} 条记录:")
- for record in all_records:
- print(record)
- rec = resolve_mongo_path(record["mongo_id"], record["path"])
- print("定位items:",rec)
- # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])})
- # print(docres)
- except Exception as e:
- print(f"查询失败:{e}")
- ##############all_records返回存储的每个record, rec返回解析后的对象
- elif search_mode == "why_search":
- ##################query why
- ##################
- milvus_client = Collection(name="deconstruct_how")
- milvus_client.load()
- collection_name = "deconstruct_how"
- if not utility.has_collection(collection_name):
- print(f"no collection named {collection_name}")
- else:
- # 查询并打印 collection 中的所有记录
- print(f"正在查询 collection '{collection_name}' 中的所有记录...")
- try:
- # 使用 query 方法获取所有记录,不设置过滤条件
- all_records = milvus_client.query(
- expr="mongo_id >\"10000000\" and type == \"why\"", # 空表达式表示查询所有
- output_fields=["mongo_id","type","path"], # 输出所有字段
- limit=10 # 设置一个较大的上限,确保能获取全部
- )
- print(f"共查询到 {len(all_records)} 条记录:")
- for record in all_records:
- print(record)
- rec = resolve_mongo_path(record["mongo_id"], record["path"])
- print("定位items:",rec)
- # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])})
- # print(docres)
- except Exception as e:
- print(f"查询失败:{e}")
- ##############all_records返回存储的每个record, rec返回解析后的对象
|