elastic_search.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import ssl
  2. import base64
  3. from elasticsearch import AsyncElasticsearch
  4. from elasticsearch.helpers import async_bulk
  5. from app.core.config import GlobalConfigSettings
  6. class AsyncElasticSearchClient:
  7. def __init__(self, global_config: GlobalConfigSettings):
  8. config = global_config.elasticsearch
  9. self.hosts = config.hosts
  10. # 优先使用 cert_content (base64 编码的证书内容),其次使用 cert_path
  11. if config.cert_content:
  12. cert_data = base64.b64decode(config.cert_content).decode("utf-8")
  13. self.ctx = ssl.create_default_context(cadata=cert_data)
  14. elif config.cert_path:
  15. self.ctx = ssl.create_default_context(cafile=config.cert_path)
  16. else:
  17. self.ctx = ssl.create_default_context()
  18. self.es = AsyncElasticsearch(
  19. self.hosts,
  20. basic_auth=(config.username, config.password),
  21. ssl_context=self.ctx,
  22. )
  23. self.index_name = config.index_name
  24. async def create_index(self, settings, mappings):
  25. exists = await self.es.indices.exists(index=self.index_name)
  26. if exists:
  27. await self.es.indices.delete(index=self.index_name)
  28. try:
  29. await self.es.indices.create(
  30. index=self.index_name, settings=settings, mappings=mappings
  31. )
  32. print("Index created successfully")
  33. except Exception as e:
  34. print("fail to create index, reason:", e)
  35. async def get_max_article_id(self):
  36. response = await self.es.search(
  37. index=self.index_name,
  38. size=1,
  39. sort="article_id:desc",
  40. _source=["article_id"],
  41. )
  42. return response["hits"]["hits"][0]["_source"]["article_id"]
  43. async def search(self, search_keys, size=10):
  44. query = {
  45. "query": {"match": {"title": search_keys}},
  46. "_source": ["article_id", "title"],
  47. "size": size,
  48. }
  49. resp = await self.es.search(index=self.index_name, body=query)
  50. return [i["_source"] for i in resp["hits"]["hits"]]
  51. async def bulk_insert(self, docs):
  52. await async_bulk(self.es, docs)
  53. async def close(self):
  54. await self.es.close()
  55. async def __aenter__(self):
  56. return self
  57. async def __aexit__(self, exc_type, exc_val, exc_tb):
  58. await self.es.close()