123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- from elasticsearch import AsyncElasticsearch
- from elasticsearch.helpers import async_bulk
- class AsyncElasticSearchClient:
- def __init__(self, index_name, hosts, password):
- self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
- self.index_name = index_name
- async def create_index(self, settings, mappings):
- if await self.es.ping():
- print("ElasticSearch client is up and running")
- else:
- print("ElasticSearch client is not up and running")
- exists = await self.es.indices.exists(index=self.index_name)
- if exists:
- print("index exists")
- await self.es.indices.delete(index=self.index_name)
- print("already delete index")
- try:
- await self.es.indices.create(
- index=self.index_name, settings=settings, mappings=mappings
- )
- print("Index created successfully")
- except Exception as e:
- print("fail to create index, reason:", e)
- async def async_search(self, query):
- resp = await self.es.search(index=self.index_name, body=query)
- return resp
- async def async_update(self, obj):
- return await self.es.update(
- index=self.index_name, id=obj["es_id"], body=obj["doc"]
- )
- async def async_delete(self, es_id):
- await self.es.delete(index=self.index_name, id=es_id)
- async def async_delete_by_query(self, query):
- await self.es.delete_by_query(
- index=self.index_name, body=query, conflicts="proceed", refresh=True
- )
- async def bulk_insert(self, docs):
- success, errors = await async_bulk(self.es, docs, request_timeout=10)
- return {"success": success, "failed": len(errors), "errors": errors}
- async def close(self):
- await self.es.close()
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.es.close()
|