from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk from applications.utils.async_utils import run_tasks_with_asyncio_task_group class AsyncElasticSearchClient: def __init__(self, index_name, hosts, password): self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password)) self.index_name = index_name async def create_index(self, settings, mappings): if await self.es.ping(): print("ElasticSearch client is up and running") else: print("ElasticSearch client is not up and running") exists = await self.es.indices.exists(index=self.index_name) if exists: print("index exists") await self.es.indices.delete(index=self.index_name) print("already delete index") try: await self.es.indices.create( index=self.index_name, settings=settings, mappings=mappings ) print("Index created successfully") except Exception as e: print("fail to create index, reason:", e) async def search(self, query): resp = await self.es.search(index=self.index_name, body=query) return resp async def update(self, obj): return await self.es.update( index=self.index_name, id=obj["es_id"], body=obj["doc"] ) async def update_by_filed(self, field_name: str, field_value: str, doc: dict): try: # 先查出 doc_id query = {"query": {"term": {field_name: field_value}}} resp = await self.es.search(index=self.index_name, body=query) if not resp["hits"]["hits"]: print(f"No document found with {field_name}={field_value}") return None task_list = [ {"es_id": hit["_id"], "doc": doc} for hit in resp["hits"]["hits"] ] # update by ids return await run_tasks_with_asyncio_task_group( task_list=task_list, handler=self.es.update, description="update by filed", unit="document", max_concurrency=10, ) except Exception as e: print(f"fail to update by {field_name}={field_value}, reason:", e) return None async def bulk_insert(self, docs): success, errors = await async_bulk(self.es, docs, request_timeout=10) return {"success": success, "failed": len(errors), "errors": errors} async def close(self): await self.es.close() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.es.close()