elastic_search_api.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import ssl
  2. from elasticsearch import AsyncElasticsearch
  3. from elasticsearch.helpers import async_bulk
  4. from applications.config import es_index
  5. class AsyncElasticSearchClient:
  6. def __init__(self, index_=es_index):
  7. self.password = "nkvvASQuQ0XUGRq5OLvm"
  8. self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
  9. self.ctx = ssl.create_default_context(cafile="applications/config/es_certs.crt")
  10. self.es = AsyncElasticsearch(
  11. self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
  12. )
  13. self.index_name = index_
  14. async def create_index(self, settings, mappings):
  15. exists = await self.es.indices.exists(index=self.index_name)
  16. if exists:
  17. await self.es.indices.delete(index=self.index_name)
  18. try:
  19. await self.es.indices.create(
  20. index=self.index_name, settings=settings, mappings=mappings
  21. )
  22. print("Index created successfully")
  23. except Exception as e:
  24. print("fail to create index, reason:", e)
  25. async def get_max_article_id(self):
  26. response = await self.es.search(
  27. index=self.index_name,
  28. size=1,
  29. sort="article_id:desc",
  30. _source=["article_id"],
  31. )
  32. return response["hits"]["hits"][0]["_source"]["article_id"]
  33. async def search(self, search_keys, size=10):
  34. query = {
  35. "query": {"match": {"title": search_keys}},
  36. "_source": ["article_id", "title"],
  37. "size": size,
  38. }
  39. resp = await self.es.search(index=self.index_name, body=query)
  40. return [i["_source"] for i in resp["hits"]["hits"]]
  41. async def bulk_insert(self, docs):
  42. await async_bulk(self.es, docs)
  43. async def close(self):
  44. await self.es.close()
  45. async def __aenter__(self):
  46. return self
  47. async def __aexit__(self, exc_type, exc_val, exc_tb):
  48. await self.es.close()