import asyncio from typing import Dict, List import pymilvus async def async_insert_chunk(collection: pymilvus.Collection, data: Dict) -> List[int]: """ :param collection: :param data: insert data :return: """ result = await asyncio.to_thread(collection.insert, [data]) return result.primary_keys async def async_search_chunk( collection: pymilvus.Collection, query_vector, params: Dict ): """ :param query_vector: query 向量 :param collection: :param params: search 参数 :return: """ expr = None return await asyncio.to_thread( collection.search, data=[query_vector], param={"metric_type": "COSINE", "params": {"nprobe": 10}}, limit=params["limit"], anns_field="vector_text", expr=expr, )