from ast import Import from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType import requests import json from typing import Dict, Any, List from pymongo import MongoClient from bson import ObjectId from pydub import AudioSegment import io from scipy.io import wavfile ################################连接milvus数据库 A # 配置信息 MILVUS_CONFIG = { "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com", "user": "root", "password": "Piaoquan@2025", "port": "19530", } print("正在连接 Milvus 数据库...") connections.connect("default", **MILVUS_CONFIG) print("连接成功!") ################################连接milvus数据库 B ##################### mongoDB MONGO_URI = "mongodb://localhost:27017/" DB_NAME = "mydeconstruct" COLL_NAME = "deconstruct_how" client = MongoClient(MONGO_URI) db = client[DB_NAME] coll = db[COLL_NAME] ##################### mongoDB ##################### 路径解析返回 def resolve_mongo_path(mongo_id: str, path: str): """ 根据 mongo_id 与形如 '文本元素[1].子节点元素[0].what' 的路径字符串, 从 MongoDB 中定位并返回对应的对象。 """ doc = coll.find_one({"_id": ObjectId(mongo_id)}) if not doc: return None # 将路径按 '.' 分割,逐级访问 parts = path.split('.') current = doc for part in parts: # 处理数组索引,如 子节点元素[0] if '[' in part and part.endswith(']'): key, idx_str = part.split('[', 1) idx = int(idx_str[:-1]) # 去掉 ']' current = current[key][idx] else: current = current[part] return current ##################### 路径解析返回 search_mode ="why_search" # "why_search" if search_mode == "how_search": ##################query what ################## milvus_client = Collection(name="deconstruct_how") milvus_client.load() collection_name = "deconstruct_how" if not utility.has_collection(collection_name): print(f"no collection named {collection_name}") else: # 查询并打印 collection 中的所有记录 print(f"正在查询 collection '{collection_name}' 中的所有记录...") try: # 使用 query 方法获取所有记录,不设置过滤条件 all_records = milvus_client.query( expr="mongo_id >\"10000000\" and type == \"how\"", # 空表达式表示查询所有 output_fields=["mongo_id","type","path"], # 输出所有字段 limit=10 # 设置一个较大的上限,确保能获取全部 ) print(f"共查询到 {len(all_records)} 条记录:") for record in all_records: print(record) rec = resolve_mongo_path(record["mongo_id"], record["path"]) print("定位items:",rec) # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])}) # print(docres) except Exception as e: print(f"查询失败:{e}") ##############all_records返回存储的每个record, rec返回解析后的对象 elif search_mode == "why_search": ##################query why ################## milvus_client = Collection(name="deconstruct_how") milvus_client.load() collection_name = "deconstruct_how" if not utility.has_collection(collection_name): print(f"no collection named {collection_name}") else: # 查询并打印 collection 中的所有记录 print(f"正在查询 collection '{collection_name}' 中的所有记录...") try: # 使用 query 方法获取所有记录,不设置过滤条件 all_records = milvus_client.query( expr="mongo_id >\"10000000\" and type == \"why\"", # 空表达式表示查询所有 output_fields=["mongo_id","type","path"], # 输出所有字段 limit=10 # 设置一个较大的上限,确保能获取全部 ) print(f"共查询到 {len(all_records)} 条记录:") for record in all_records: print(record) rec = resolve_mongo_path(record["mongo_id"], record["path"]) print("定位items:",rec) # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])}) # print(docres) except Exception as e: print(f"查询失败:{e}") ##############all_records返回存储的每个record, rec返回解析后的对象