1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- from elasticsearch import AsyncElasticsearch
- from elasticsearch.helpers import async_bulk
- from applications.utils.async_utils import run_tasks_with_asyncio_task_group
- class AsyncElasticSearchClient:
- def __init__(self, index_name, hosts, password):
- self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
- self.index_name = index_name
- async def create_index(self, settings, mappings):
- if await self.es.ping():
- print("ElasticSearch client is up and running")
- else:
- print("ElasticSearch client is not up and running")
- exists = await self.es.indices.exists(index=self.index_name)
- if exists:
- print("index exists")
- await self.es.indices.delete(index=self.index_name)
- print("already delete index")
- try:
- await self.es.indices.create(
- index=self.index_name, settings=settings, mappings=mappings
- )
- print("Index created successfully")
- except Exception as e:
- print("fail to create index, reason:", e)
- async def search(self, query):
- resp = await self.es.search(index=self.index_name, body=query)
- return resp
- async def update(self, obj):
- return await self.es.update(
- index=self.index_name, id=obj["es_id"], body=obj["doc"]
- )
- async def update_by_filed(self, field_name: str, field_value: str, doc: dict):
- try:
- # 先查出 doc_id
- query = {"query": {"term": {field_name: field_value}}}
- resp = await self.es.search(index=self.index_name, body=query)
- if not resp["hits"]["hits"]:
- print(f"No document found with {field_name}={field_value}")
- return None
- task_list = [
- {"es_id": hit["_id"], "doc": doc} for hit in resp["hits"]["hits"]
- ]
- # update by ids
- return await run_tasks_with_asyncio_task_group(
- task_list=task_list,
- handler=self.es.update,
- description="update by filed",
- unit="document",
- max_concurrency=10,
- )
- except Exception as e:
- print(f"fail to update by {field_name}={field_value}, reason:", e)
- return None
- async def bulk_insert(self, docs):
- success, errors = await async_bulk(self.es, docs, request_timeout=10)
- return {"success": success, "failed": len(errors), "errors": errors}
- async def close(self):
- await self.es.close()
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.es.close()
|