from typing import List, Dict class ElasticSearchStrategy: def __init__(self, es): self.es = es async def base_search( self, filters: Dict[str, List], text_query: str = None, _source=False, size: int = 10000, sort_by: str = None, ) -> List: must_clauses = [] for field, value in filters.items(): must_clauses.append({"terms": {field: value}}) if text_query: must_clauses.append({"match": {"topic": text_query}}) query = { "query": {"bool": {"must": must_clauses}}, "size": size, "_source": _source, } try: resp = await self.es.search(query=query) return [ hit["_source"] if _source else hit["_id"] for hit in resp["hits"]["hits"] ] except Exception as e: print(f"search failed: {e}") return [] async def search_strategy(self, query): pass