import asyncio from typing import Dict import pymilvus async def async_insert_chunk(collection: pymilvus.Collection, data: Dict): """ :param collection: :param data: insert data :return: """ return await asyncio.to_thread(collection.insert, [data]) async def async_search_chunk( collection: pymilvus.Collection, query_vector, params: Dict ): """ :param query_vector: query 向量 :param collection: :param params: search 参数 :return: """ expr = None return await asyncio.to_thread( collection.search, data=[query_vector], param={"metric_type": "COSINE", "params": {"nprobe": 10}}, limit=params["limit"], anns_field="vector_text", expr=expr, )