from typing import List, Dict, Optional, Any from .base_search import BaseSearch from applications.utils.elastic_search import ElasticSearchStrategy class HybridSearch(BaseSearch): def __init__(self, milvus_pool, es_pool): super().__init__(milvus_pool, es_pool) self.es_strategy = ElasticSearchStrategy(self.es_pool) async def hybrid_search( self, filters: Dict[str, Any], # 条件过滤 query_vec: List[float], # query 的向量 anns_field: str = "vector_text", # query指定的向量空间 search_params: Optional[Dict[str, Any]] = None, # 向量距离方式 query_text: str = None, # 是否通过 topic 倒排 _source=False, # 是否返回元数据 es_size: int = 10000, # es 第一层过滤数量 sort_by: str = None, # 排序 milvus_size: int = 10, # milvus粗排返回数量 ): milvus_ids = await self.es_strategy.base_search( filters=filters, text_query=query_text, _source=_source, size=es_size, sort_by=sort_by, ) if not milvus_ids: return {"results": []} milvus_ids_list = ",".join(milvus_ids) expr = f"id in [{milvus_ids_list}]" return await self.base_vector_search( query_vec=query_vec, anns_field=anns_field, limit=milvus_size, expr=expr, search_params=search_params, )