client.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. from elasticsearch import AsyncElasticsearch
  2. from elasticsearch.helpers import async_bulk
  3. class AsyncElasticSearchClient:
  4. def __init__(self, index_name, hosts, password):
  5. self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
  6. self.index_name = index_name
  7. async def create_index(self, settings, mappings):
  8. if await self.es.ping():
  9. print("ElasticSearch client is up and running")
  10. else:
  11. print("ElasticSearch client is not up and running")
  12. exists = await self.es.indices.exists(index=self.index_name)
  13. if exists:
  14. print("index exists")
  15. await self.es.indices.delete(index=self.index_name)
  16. print("already delete index")
  17. try:
  18. await self.es.indices.create(
  19. index=self.index_name, settings=settings, mappings=mappings
  20. )
  21. print("Index created successfully")
  22. except Exception as e:
  23. print("fail to create index, reason:", e)
  24. async def async_search(self, query):
  25. resp = await self.es.search(index=self.index_name, body=query)
  26. return resp
  27. async def async_update(self, obj):
  28. return await self.es.update(
  29. index=self.index_name, id=obj["es_id"], body=obj["doc"]
  30. )
  31. async def async_delete(self, es_id):
  32. await self.es.delete(index=self.index_name, id=es_id)
  33. async def async_delete_by_query(self, query):
  34. await self.es.delete_by_query(index=self.index_name, body=query, conflicts="proceed", refresh=True)
  35. async def bulk_insert(self, docs):
  36. success, errors = await async_bulk(self.es, docs, request_timeout=10)
  37. return {"success": success, "failed": len(errors), "errors": errors}
  38. async def close(self):
  39. await self.es.close()
  40. async def __aenter__(self):
  41. return self
  42. async def __aexit__(self, exc_type, exc_val, exc_tb):
  43. await self.es.close()