client.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from elasticsearch import AsyncElasticsearch
  2. from elasticsearch.helpers import async_bulk
  3. class AsyncElasticSearchClient:
  4. def __init__(self, index_name, hosts, password):
  5. self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
  6. self.index_name = index_name
  7. async def create_index(self, settings, mappings):
  8. if await self.es.ping():
  9. print("ElasticSearch client is up and running")
  10. else:
  11. print("ElasticSearch client is not up and running")
  12. exists = await self.es.indices.exists(index=self.index_name)
  13. if exists:
  14. print("index exists")
  15. await self.es.indices.delete(index=self.index_name)
  16. print("already delete index")
  17. try:
  18. await self.es.indices.create(
  19. index=self.index_name, settings=settings, mappings=mappings
  20. )
  21. print("Index created successfully")
  22. except Exception as e:
  23. print("fail to create index, reason:", e)
  24. async def async_search(self, query):
  25. resp = await self.es.search(index=self.index_name, body=query)
  26. return resp
  27. async def async_update(self, obj):
  28. return await self.es.update(
  29. index=self.index_name, id=obj["es_id"], body=obj["doc"]
  30. )
  31. async def async_delete(self, es_id):
  32. await self.es.delete(index=self.index_name, id=es_id)
  33. async def async_delete_by_query(self, query):
  34. await self.es.delete_by_query(
  35. index=self.index_name, body=query, conflicts="proceed", refresh=True
  36. )
  37. async def bulk_insert(self, docs):
  38. success, errors = await async_bulk(self.es, docs, request_timeout=10)
  39. return {"success": success, "failed": len(errors), "errors": errors}
  40. async def close(self):
  41. await self.es.close()
  42. async def __aenter__(self):
  43. return self
  44. async def __aexit__(self, exc_type, exc_val, exc_tb):
  45. await self.es.close()