from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk class AsyncElasticSearchClient: def __init__(self, index_name, hosts, password): self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password)) self.index_name = index_name async def create_index(self, settings, mappings): if await self.es.ping(): print("ElasticSearch client is up and running") else: print("ElasticSearch client is not up and running") exists = await self.es.indices.exists(index=self.index_name) if exists: print("index exists") await self.es.indices.delete(index=self.index_name) print("already delete index") try: await self.es.indices.create( index=self.index_name, settings=settings, mappings=mappings ) print("Index created successfully") except Exception as e: print("fail to create index, reason:", e) async def async_search(self, query): resp = await self.es.search(index=self.index_name, body=query) return resp async def async_update(self, obj): return await self.es.update( index=self.index_name, id=obj["es_id"], body=obj["doc"] ) async def async_delete(self, es_id): await self.es.delete(index=self.index_name, id=es_id) async def async_delete_by_query(self, query): await self.es.delete_by_query(index=self.index_name, body=query, conflicts="proceed", refresh=True) async def bulk_insert(self, docs): success, errors = await async_bulk(self.es, docs, request_timeout=10) return {"success": success, "failed": len(errors), "errors": errors} async def close(self): await self.es.close() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.es.close()