elastic_search.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import ssl
  2. from elasticsearch import AsyncElasticsearch
  3. from elasticsearch.helpers import async_bulk
  4. class AsyncElasticSearchClient:
  5. def __init__(self, index_):
  6. self.password = "nkvvASQuQ0XUGRq5OLvm"
  7. self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
  8. self.ctx = ssl.create_default_context(
  9. cafile="app/core/config/cert/es_certs.crt"
  10. )
  11. self.es = AsyncElasticsearch(
  12. self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
  13. )
  14. self.index_name = index_
  15. async def create_index(self, settings, mappings):
  16. exists = await self.es.indices.exists(index=self.index_name)
  17. if exists:
  18. await self.es.indices.delete(index=self.index_name)
  19. try:
  20. await self.es.indices.create(
  21. index=self.index_name, settings=settings, mappings=mappings
  22. )
  23. print("Index created successfully")
  24. except Exception as e:
  25. print("fail to create index, reason:", e)
  26. async def get_max_article_id(self):
  27. response = await self.es.search(
  28. index=self.index_name,
  29. size=1,
  30. sort="article_id:desc",
  31. _source=["article_id"],
  32. )
  33. return response["hits"]["hits"][0]["_source"]["article_id"]
  34. async def search(self, search_keys, size=10):
  35. query = {
  36. "query": {"match": {"title": search_keys}},
  37. "_source": ["article_id", "title"],
  38. "size": size,
  39. }
  40. resp = await self.es.search(index=self.index_name, body=query)
  41. return [i["_source"] for i in resp["hits"]["hits"]]
  42. async def bulk_insert(self, docs):
  43. await async_bulk(self.es, docs)
  44. async def close(self):
  45. await self.es.close()
  46. async def __aenter__(self):
  47. return self
  48. async def __aexit__(self, exc_type, exc_val, exc_tb):
  49. await self.es.close()