search_strategy.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. from typing import List, Dict
  2. class ElasticSearchStrategy:
  3. def __init__(self, es):
  4. self.es = es
  5. async def base_search(
  6. self,
  7. filters: Dict[str, List],
  8. text_query: str = None,
  9. _source=False,
  10. size: int = 10000,
  11. sort_by: str = None,
  12. ) -> List:
  13. must_clauses = []
  14. for field, value in filters.items():
  15. must_clauses.append({"terms": {field: value}})
  16. if text_query:
  17. must_clauses.append({"match": {"topic": text_query}})
  18. query = {
  19. "query": {"bool": {"must": must_clauses}},
  20. "size": size,
  21. "_source": _source,
  22. }
  23. try:
  24. resp = await self.es.search(query=query)
  25. return [
  26. hit["_source"] if _source else hit["_id"]
  27. for hit in resp["hits"]["hits"]
  28. ]
  29. except Exception as e:
  30. print(f"search failed: {e}")
  31. return []
  32. async def search_strategy(self, query):
  33. pass