123456789101112131415161718192021222324252627282930313233343536373839 |
- from typing import List, Dict
- class ElasticSearchStrategy:
- def __init__(self, es):
- self.es_client = es
- async def base_search(
- self,
- filters: Dict[str, List],
- text_query: str = None,
- _source=False,
- size: int = 10000,
- sort_by: str = None,
- ) -> List:
- must_clauses = []
- for field, value in filters.items():
- must_clauses.append({"terms": {field: value}})
- if text_query:
- must_clauses.append({"match": {"topic": text_query}})
- query = {
- "query": {"bool": {"must": must_clauses}},
- "size": size,
- "_source": _source,
- }
- try:
- resp = await self.es_client.async_search(query=query)
- return [
- hit["_source"] if _source else hit["_id"]
- for hit in resp["hits"]["hits"]
- ]
- except Exception as e:
- print(f"search failed: {e}")
- return []
- async def search_strategy(self, query):
- pass
|