| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import ssl
- import base64
- from elasticsearch import AsyncElasticsearch
- from elasticsearch.helpers import async_bulk
- from app.core.config import GlobalConfigSettings
- class AsyncElasticSearchClient:
- def __init__(self, global_config: GlobalConfigSettings):
- config = global_config.elasticsearch
- self.hosts = config.hosts
- # 优先使用 cert_content (base64 编码的证书内容),其次使用 cert_path
- if config.cert_content:
- cert_data = base64.b64decode(config.cert_content).decode("utf-8")
- self.ctx = ssl.create_default_context(cadata=cert_data)
- elif config.cert_path:
- self.ctx = ssl.create_default_context(cafile=config.cert_path)
- else:
- self.ctx = ssl.create_default_context()
- self.es = AsyncElasticsearch(
- self.hosts,
- basic_auth=(config.username, config.password),
- ssl_context=self.ctx,
- )
- self.index_name = config.index_name
- async def create_index(self, settings, mappings):
- exists = await self.es.indices.exists(index=self.index_name)
- if exists:
- await self.es.indices.delete(index=self.index_name)
- try:
- await self.es.indices.create(
- index=self.index_name, settings=settings, mappings=mappings
- )
- print("Index created successfully")
- except Exception as e:
- print("fail to create index, reason:", e)
- async def get_max_article_id(self):
- response = await self.es.search(
- index=self.index_name,
- size=1,
- sort="article_id:desc",
- _source=["article_id"],
- )
- return response["hits"]["hits"][0]["_source"]["article_id"]
- async def search(self, search_keys, size=10):
- query = {
- "query": {"match": {"title": search_keys}},
- "_source": ["article_id", "title"],
- "size": size,
- }
- resp = await self.es.search(index=self.index_name, body=query)
- return [i["_source"] for i in resp["hits"]["hits"]]
- async def bulk_insert(self, docs):
- await async_bulk(self.es, docs)
- async def close(self):
- await self.es.close()
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.es.close()
|