functions.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import asyncio
  2. from typing import Dict, List
  3. import pymilvus
  4. async def async_insert_chunk(collection: pymilvus.Collection, data: Dict) -> List[int]:
  5. """
  6. :param collection:
  7. :param data: insert data
  8. :return:
  9. """
  10. result = await asyncio.to_thread(collection.insert, [data])
  11. return result.primary_keys
  12. async def async_update_embedding(collection: pymilvus.Collection, data: Dict, flush: bool = True) -> List[int]:
  13. """
  14. Update entities by ids from a Milvus collection asynchronously.
  15. :param collection: Milvus Collection object
  16. :param data: Dictionary of fields to update, with keys as field names and values as new values
  17. :param flush: Whether to flush the collection after update
  18. :return: List of primary key ids that were updated
  19. """
  20. result = await asyncio.to_thread(
  21. collection.upsert,
  22. data=data
  23. )
  24. if flush:
  25. await asyncio.to_thread(collection.flush)
  26. # Prefer server-returned PKs; fallback to input pk
  27. try:
  28. print("update successful")
  29. return list(result.primary_keys)
  30. except Exception:
  31. return [data['id']]
  32. async def async_delete_chunk(
  33. collection: pymilvus.Collection, ids: List[int]
  34. ) -> List[int]:
  35. """
  36. Delete entities by ids from a Milvus collection asynchronously.
  37. :param collection: Milvus Collection object
  38. :param ids: List of primary key ids to delete
  39. :return: List of successfully deleted ids
  40. """
  41. if not ids:
  42. return []
  43. expr = f"id in {ids}"
  44. result = await asyncio.to_thread(collection.delete, expr)
  45. await asyncio.to_thread(collection.flush)
  46. success_count = result.succ_count
  47. if success_count == len(ids):
  48. return ids
  49. else:
  50. return ids[:success_count]