import ssl import base64 from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk from app.core.config import GlobalConfigSettings class AsyncElasticSearchClient: def __init__(self, global_config: GlobalConfigSettings): config = global_config.elasticsearch self.hosts = config.hosts # 优先使用 cert_content (base64 编码的证书内容),其次使用 cert_path if config.cert_content: cert_data = base64.b64decode(config.cert_content).decode("utf-8") self.ctx = ssl.create_default_context(cadata=cert_data) elif config.cert_path: self.ctx = ssl.create_default_context(cafile=config.cert_path) else: self.ctx = ssl.create_default_context() self.es = AsyncElasticsearch( self.hosts, basic_auth=(config.username, config.password), ssl_context=self.ctx, ) self.index_name = config.index_name async def create_index(self, settings, mappings): exists = await self.es.indices.exists(index=self.index_name) if exists: await self.es.indices.delete(index=self.index_name) try: await self.es.indices.create( index=self.index_name, settings=settings, mappings=mappings ) print("Index created successfully") except Exception as e: print("fail to create index, reason:", e) async def get_max_article_id(self): response = await self.es.search( index=self.index_name, size=1, sort="article_id:desc", _source=["article_id"], ) return response["hits"]["hits"][0]["_source"]["article_id"] async def search(self, search_keys, size=10): query = { "query": {"match": {"title": search_keys}}, "_source": ["article_id", "title"], "size": size, } resp = await self.es.search(index=self.index_name, body=query) return [i["_source"] for i in resp["hits"]["hits"]] async def bulk_insert(self, docs): await async_bulk(self.es, docs) async def close(self): await self.es.close() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.es.close()