import ssl from elasticsearch import Elasticsearch, ApiError from elasticsearch import helpers from config.es_mappings import index_name class ElasticSearchClient: def __init__(self, index_=index_name): self.password = "nkvvASQuQ0XUGRq5OLvm" self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"] self.ctx = ssl.create_default_context(cafile="config/es_certs.crt") self.es = Elasticsearch( self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx ) self.index_name = index_ def create_index(self, settings, mappings): if self.es.indices.exists(index=self.index_name): self.es.indices.delete(index=self.index_name) try: self.es.indices.create( index=self.index_name, settings=settings, mappings=mappings ) print("Index created successfully") except ApiError as e: print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}") def get_max_article_id(self): response = self.es.search( index=self.index_name, size=1, sort="article_id:desc", _source=["article_id"], ) return response["hits"]["hits"][0]["_source"]["article_id"] def search(self, search_keys, size=10): query = { "query": {"match": {"title": search_keys}}, "_source": ["article_id", "title"], "size": size } resp = self.es.search(index=index_name, body=query) return [i["_source"] for i in resp["hits"]["hits"]] def bulk_insert(self, docs): helpers.bulk(self.es, docs)