elastic_search.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import ssl
  2. from elasticsearch import AsyncElasticsearch
  3. from elasticsearch.helpers import async_bulk
  4. class AsyncElasticSearchClient:
  5. def __init__(self, index_):
  6. self.password = "nkvvASQuQ0XUGRq5OLvm"
  7. self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
  8. self.ctx = ssl.create_default_context(cafile="app/core/config/cert/es_certs.crt")
  9. self.es = AsyncElasticsearch(
  10. self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
  11. )
  12. self.index_name = index_
  13. async def create_index(self, settings, mappings):
  14. exists = await self.es.indices.exists(index=self.index_name)
  15. if exists:
  16. await self.es.indices.delete(index=self.index_name)
  17. try:
  18. await self.es.indices.create(
  19. index=self.index_name, settings=settings, mappings=mappings
  20. )
  21. print("Index created successfully")
  22. except Exception as e:
  23. print("fail to create index, reason:", e)
  24. async def get_max_article_id(self):
  25. response = await self.es.search(
  26. index=self.index_name,
  27. size=1,
  28. sort="article_id:desc",
  29. _source=["article_id"],
  30. )
  31. return response["hits"]["hits"][0]["_source"]["article_id"]
  32. async def search(self, search_keys, size=10):
  33. query = {
  34. "query": {"match": {"title": search_keys}},
  35. "_source": ["article_id", "title"],
  36. "size": size,
  37. }
  38. resp = await self.es.search(index=self.index_name, body=query)
  39. return [i["_source"] for i in resp["hits"]["hits"]]
  40. async def bulk_insert(self, docs):
  41. await async_bulk(self.es, docs)
  42. async def close(self):
  43. await self.es.close()
  44. async def __aenter__(self):
  45. return self
  46. async def __aexit__(self, exc_type, exc_val, exc_tb):
  47. await self.es.close()