milvus_how_query.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. from ast import Import
  2. from pymilvus import Collection, connections, utility, FieldSchema, CollectionSchema, DataType
  3. import requests
  4. import json
  5. from typing import Dict, Any, List
  6. from pymongo import MongoClient
  7. from bson import ObjectId
  8. from pydub import AudioSegment
  9. import io
  10. from scipy.io import wavfile
  11. ################################连接milvus数据库 A
  12. # 配置信息
  13. MILVUS_CONFIG = {
  14. "host": "c-981be0ee7225467b-internal.milvus.aliyuncs.com",
  15. "user": "root",
  16. "password": "Piaoquan@2025",
  17. "port": "19530",
  18. }
  19. print("正在连接 Milvus 数据库...")
  20. connections.connect("default", **MILVUS_CONFIG)
  21. print("连接成功!")
  22. ################################连接milvus数据库 B
  23. ##################### mongoDB
  24. MONGO_URI = "mongodb://localhost:27017/"
  25. DB_NAME = "mydeconstruct"
  26. COLL_NAME = "deconstruct_how"
  27. client = MongoClient(MONGO_URI)
  28. db = client[DB_NAME]
  29. coll = db[COLL_NAME]
  30. ##################### mongoDB
  31. ##################### 路径解析返回
  32. def resolve_mongo_path(mongo_id: str, path: str):
  33. """
  34. 根据 mongo_id 与形如 '文本元素[1].子节点元素[0].what' 的路径字符串,
  35. 从 MongoDB 中定位并返回对应的对象。
  36. """
  37. doc = coll.find_one({"_id": ObjectId(mongo_id)})
  38. if not doc:
  39. return None
  40. # 将路径按 '.' 分割,逐级访问
  41. parts = path.split('.')
  42. current = doc
  43. for part in parts:
  44. # 处理数组索引,如 子节点元素[0]
  45. if '[' in part and part.endswith(']'):
  46. key, idx_str = part.split('[', 1)
  47. idx = int(idx_str[:-1]) # 去掉 ']'
  48. current = current[key][idx]
  49. else:
  50. current = current[part]
  51. return current
  52. ##################### 路径解析返回
  53. search_mode ="why_search" # "why_search"
  54. if search_mode == "how_search":
  55. ##################query what
  56. ##################
  57. milvus_client = Collection(name="deconstruct_how")
  58. milvus_client.load()
  59. collection_name = "deconstruct_how"
  60. if not utility.has_collection(collection_name):
  61. print(f"no collection named {collection_name}")
  62. else:
  63. # 查询并打印 collection 中的所有记录
  64. print(f"正在查询 collection '{collection_name}' 中的所有记录...")
  65. try:
  66. # 使用 query 方法获取所有记录,不设置过滤条件
  67. all_records = milvus_client.query(
  68. expr="mongo_id >\"10000000\" and type == \"how\"", # 空表达式表示查询所有
  69. output_fields=["mongo_id","type","path"], # 输出所有字段
  70. limit=10 # 设置一个较大的上限,确保能获取全部
  71. )
  72. print(f"共查询到 {len(all_records)} 条记录:")
  73. for record in all_records:
  74. print(record)
  75. rec = resolve_mongo_path(record["mongo_id"], record["path"])
  76. print("定位items:",rec)
  77. # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])})
  78. # print(docres)
  79. except Exception as e:
  80. print(f"查询失败:{e}")
  81. ##############all_records返回存储的每个record, rec返回解析后的对象
  82. elif search_mode == "why_search":
  83. ##################query why
  84. ##################
  85. milvus_client = Collection(name="deconstruct_how")
  86. milvus_client.load()
  87. collection_name = "deconstruct_how"
  88. if not utility.has_collection(collection_name):
  89. print(f"no collection named {collection_name}")
  90. else:
  91. # 查询并打印 collection 中的所有记录
  92. print(f"正在查询 collection '{collection_name}' 中的所有记录...")
  93. try:
  94. # 使用 query 方法获取所有记录,不设置过滤条件
  95. all_records = milvus_client.query(
  96. expr="mongo_id >\"10000000\" and type == \"why\"", # 空表达式表示查询所有
  97. output_fields=["mongo_id","type","path"], # 输出所有字段
  98. limit=10 # 设置一个较大的上限,确保能获取全部
  99. )
  100. print(f"共查询到 {len(all_records)} 条记录:")
  101. for record in all_records:
  102. print(record)
  103. rec = resolve_mongo_path(record["mongo_id"], record["path"])
  104. print("定位items:",rec)
  105. # docres = coll.find_one({"_id": ObjectId(record["mongo_id"])})
  106. # print(docres)
  107. except Exception as e:
  108. print(f"查询失败:{e}")
  109. ##############all_records返回存储的每个record, rec返回解析后的对象