client.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. from elasticsearch import AsyncElasticsearch
  2. from elasticsearch.helpers import async_bulk
  3. from applications.utils.async_utils import run_tasks_with_asyncio_task_group
  4. class AsyncElasticSearchClient:
  5. def __init__(self, index_name, hosts, password):
  6. self.es = AsyncElasticsearch(hosts=hosts, basic_auth=("elastic", password))
  7. self.index_name = index_name
  8. async def create_index(self, settings, mappings):
  9. if await self.es.ping():
  10. print("ElasticSearch client is up and running")
  11. else:
  12. print("ElasticSearch client is not up and running")
  13. exists = await self.es.indices.exists(index=self.index_name)
  14. if exists:
  15. print("index exists")
  16. await self.es.indices.delete(index=self.index_name)
  17. print("already delete index")
  18. try:
  19. await self.es.indices.create(
  20. index=self.index_name, settings=settings, mappings=mappings
  21. )
  22. print("Index created successfully")
  23. except Exception as e:
  24. print("fail to create index, reason:", e)
  25. async def search(self, query):
  26. resp = await self.es.search(index=self.index_name, body=query)
  27. return resp
  28. async def update(self, obj):
  29. return await self.es.update(
  30. index=self.index_name, id=obj["es_id"], body=obj["doc"]
  31. )
  32. async def update_by_filed(self, field_name: str, field_value: str, doc: dict):
  33. try:
  34. # 先查出 doc_id
  35. query = {"query": {"term": {field_name: field_value}}}
  36. resp = await self.es.search(index=self.index_name, body=query)
  37. if not resp["hits"]["hits"]:
  38. print(f"No document found with {field_name}={field_value}")
  39. return None
  40. task_list = [
  41. {"es_id": hit["_id"], "doc": doc} for hit in resp["hits"]["hits"]
  42. ]
  43. # update by ids
  44. return await run_tasks_with_asyncio_task_group(
  45. task_list=task_list,
  46. handler=self.es.update,
  47. description="update by filed",
  48. unit="document",
  49. max_concurrency=10,
  50. )
  51. except Exception as e:
  52. print(f"fail to update by {field_name}={field_value}, reason:", e)
  53. return None
  54. async def bulk_insert(self, docs):
  55. success, errors = await async_bulk(self.es, docs, request_timeout=10)
  56. return {"success": success, "failed": len(errors), "errors": errors}
  57. async def close(self):
  58. await self.es.close()
  59. async def __aenter__(self):
  60. return self
  61. async def __aexit__(self, exc_type, exc_val, exc_tb):
  62. await self.es.close()