123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- import traceback
- import uuid
- from typing import Dict, Any
- from quart import Blueprint, jsonify, request
- from applications.config import (
- DEFAULT_MODEL,
- LOCAL_MODEL_CONFIG,
- ChunkerConfig,
- BASE_MILVUS_SEARCH_PARAMS,
- )
- from applications.resource import get_resource_manager
- from applications.api import get_basic_embedding
- from applications.api import get_img_embedding
- from applications.async_task import ChunkEmbeddingTask, DeleteTask
- from applications.search import HybridSearch
- server_bp = Blueprint("api", __name__, url_prefix="/api")
- @server_bp.route("/embed", methods=["POST"])
- async def embed():
- body = await request.get_json()
- text = body.get("text")
- model_name = body.get("model", DEFAULT_MODEL)
- if not LOCAL_MODEL_CONFIG.get(model_name):
- return jsonify({"error": "error model"})
- embedding = await get_basic_embedding(text, model_name)
- return jsonify({"embedding": embedding})
- @server_bp.route("/img_embed", methods=["POST"])
- async def img_embed():
- body = await request.get_json()
- url_list = body.get("url_list")
- if not url_list:
- return jsonify({"error": "error url_list"})
- embedding = await get_img_embedding(url_list)
- return jsonify(embedding)
- @server_bp.route("/delete", methods=["POST"])
- async def delete():
- body = await request.get_json()
- level = body.get("level")
- params = body.get("params")
- if not level or not params:
- return jsonify({"error": "error level or params"})
- resource = get_resource_manager()
- delete_task = DeleteTask(resource)
- response = await delete_task.deal(level, params)
- return jsonify(response)
- @server_bp.route("/chunk", methods=["POST"])
- async def chunk():
- body = await request.get_json()
- text = body.get("text", "")
- text = text.strip()
- if not text:
- return jsonify({"error": "error text"})
- resource = get_resource_manager()
- doc_id = f"doc-{uuid.uuid4()}"
- chunk_task = ChunkEmbeddingTask(
- resource.mysql_client,
- resource.milvus_client,
- cfg=ChunkerConfig(),
- doc_id=doc_id,
- es_pool=resource.es_client,
- )
- doc_id = await chunk_task.deal(body)
- return jsonify({"doc_id": doc_id})
- @server_bp.route("/search", methods=["POST"])
- async def search():
- """
- filters: Dict[str, Any], # 条件过滤
- query_vec: List[float], # query 的向量
- anns_field: str = "vector_text", # query指定的向量空间
- search_params: Optional[Dict[str, Any]] = None, # 向量距离方式
- query_text: str = None, #是否通过 topic 倒排
- _source=False, # 是否返回元数据
- es_size: int = 10000, #es 第一层过滤数量
- sort_by: str = None, # 排序
- milvus_size: int = 10 # milvus粗排返回数量
- :return:
- """
- body = await request.get_json()
- # 解析数据
- search_type: str = body.get("search_type")
- filters: Dict[str, Any] = body.get("filters", {})
- anns_field: str = body.get("anns_field", "vector_text")
- search_params: Dict[str, Any] = body.get("search_params", BASE_MILVUS_SEARCH_PARAMS)
- query_text: str = body.get("query_text")
- _source: bool = body.get("_source", False)
- es_size: int = body.get("es_size", 10000)
- sort_by: str = body.get("sort_by")
- milvus_size: int = body.get("milvus", 20)
- limit: int = body.get("limit", 10)
- if not query_text:
- return jsonify({"error": "error query_text"})
- query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
- resource = get_resource_manager()
- search_engine = HybridSearch(
- milvus_pool=resource.milvus_client, es_pool=resource.es_client
- )
- try:
- match search_type:
- case "base":
- response = await search_engine.base_vector_search(
- query_vec=query_vector,
- anns_field=anns_field,
- search_params=search_params,
- limit=limit,
- )
- return jsonify(response), 200
- case "hybrid":
- response = await search_engine.hybrid_search(
- filters=filters,
- query_vec=query_vector,
- anns_field=anns_field,
- search_params=search_params,
- es_size=es_size,
- sort_by=sort_by,
- milvus_size=milvus_size,
- )
- return jsonify(response), 200
- case "strategy":
- return jsonify({"error": "strategy not implemented"}), 405
- case _:
- return jsonify({"error": "error search_type"}), 200
- except Exception as e:
- return jsonify({"error": str(e), "traceback": traceback.format_exc()}), 500
|