123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- import uuid
- from quart import Blueprint, jsonify, request
- from applications.config import DEFAULT_MODEL, LOCAL_MODEL_CONFIG, ChunkerConfig, WEIGHT_MAP
- from applications.api import get_basic_embedding
- from applications.async_task import ChunkEmbeddingTask
- from applications.utils.milvus import MilvusSearcher
- server_bp = Blueprint("api", __name__, url_prefix="/api")
- def server_routes(mysql_db, vector_db):
- @server_bp.route("/embed", methods=["POST"])
- async def embed():
- body = await request.get_json()
- text = body.get("text")
- model_name = body.get("model", DEFAULT_MODEL)
- if not LOCAL_MODEL_CONFIG.get(model_name):
- return jsonify({"error": "error model"})
- embedding = await get_basic_embedding(text, model_name)
- return jsonify({"embedding": embedding})
- @server_bp.route("/chunk", methods=["POST"])
- async def chunk():
- body = await request.get_json()
- text = body.get("text", "")
- text = text.strip()
- if not text:
- return jsonify({"error": "error text"})
- doc_id = f"doc-{uuid.uuid4()}"
- chunk_task = ChunkEmbeddingTask(mysql_db, vector_db, cfg=ChunkerConfig(), doc_id=doc_id)
- doc_id = await chunk_task.deal(body)
- return jsonify({"doc_id": doc_id})
- @server_bp.route("/search", methods=["POST"])
- async def search():
- body = await request.get_json()
- search_type = body.get("search_type")
- if not search_type:
- return jsonify({"error": "missing search_type"}), 400
- searcher = MilvusSearcher(vector_db)
- try:
- # 统一参数
- expr = body.get("expr")
- search_params = body.get("search_params") or {"metric_type": "COSINE", "params": {"ef": 64}}
- limit = body.get("limit", 5)
- query = body.get("query")
- # 定义不同搜索策略
- async def by_pk_id():
- pk_id = body.get("id")
- if not pk_id:
- return {"error": "missing id"}
- return await searcher.get_by_id(pk_id)
- async def by_doc_id():
- doc_id, chunk_id = body.get("doc_id"), body.get("chunk_id")
- if not doc_id or chunk_id is None:
- return {"error": "missing doc_id or chunk_id"}
- return await searcher.get_by_doc_and_chunk(doc_id, chunk_id)
- async def by_vector():
- if not query:
- return {"error": "missing query"}
- field = body.get("field", "vector_text")
- query_vec = await get_basic_embedding(text=query, model=DEFAULT_MODEL)
- return await searcher.vector_search(
- query_vec=query_vec,
- anns_field=field,
- expr=expr,
- search_params=search_params,
- limit=limit,
- )
- async def by_filter():
- filter_map = body.get("filter_map")
- if not filter_map:
- return {"error": "missing filter_map"}
- return await searcher.filter_search(filter_map)
- async def hybrid():
- if not query:
- return {"error": "missing query"}
- field = body.get("field", "vector_text")
- query_vec = await get_basic_embedding(text=query, model=DEFAULT_MODEL)
- return await searcher.hybrid_search(
- query_vec=query_vec,
- anns_field=field,
- filters=body.get("filter_map"),
- limit=limit,
- )
- async def strategy():
- if not query:
- return {"error": "missing query"}
- query_vec = await get_basic_embedding(text=query, model=DEFAULT_MODEL)
- return await searcher.search_by_strategy(
- query_vec=query_vec,
- weight_map=body.get("weight_map", WEIGHT_MAP),
- expr=expr,
- limit=limit,
- )
- # dispatch table
- handlers = {
- "pk_id": by_pk_id,
- "by_doc_id": by_doc_id,
- "by_vector": by_vector,
- "by_filter": by_filter,
- "hybrid": hybrid,
- "strategy": strategy,
- }
- if search_type not in handlers:
- return jsonify({"error": "invalid search_type"}), 400
- result = await handlers[search_type]()
- return jsonify(result)
- except Exception as e:
- return jsonify({"error": str(e)}), 500
- return server_bp
|