buleprint.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. import asyncio
  2. import json
  3. import traceback
  4. import uuid
  5. from typing import Dict, Any
  6. from quart import Blueprint, jsonify, request
  7. from quart_cors import cors
  8. from applications.api import get_basic_embedding
  9. from applications.api import get_img_embedding
  10. from applications.async_task import AutoRechunkTask, BuildGraph
  11. from applications.async_task import ChunkEmbeddingTask, DeleteTask
  12. from applications.config import (
  13. DEFAULT_MODEL,
  14. LOCAL_MODEL_CONFIG,
  15. BASE_MILVUS_SEARCH_PARAMS,
  16. )
  17. from applications.resource import get_resource_manager
  18. from applications.search import HybridSearch
  19. from applications.utils.chat import ChatClassifier
  20. from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult
  21. server_bp = Blueprint("api", __name__, url_prefix="/api")
  22. server_bp = cors(server_bp, allow_origin="*")
  23. @server_bp.route("/embed", methods=["POST"])
  24. async def embed():
  25. body = await request.get_json()
  26. text = body.get("text")
  27. model_name = body.get("model", DEFAULT_MODEL)
  28. if not LOCAL_MODEL_CONFIG.get(model_name):
  29. return jsonify({"error": "error model"})
  30. embedding = await get_basic_embedding(text, model_name)
  31. return jsonify({"embedding": embedding})
  32. @server_bp.route("/img_embed", methods=["POST"])
  33. async def img_embed():
  34. body = await request.get_json()
  35. url_list = body.get("url_list")
  36. if not url_list:
  37. return jsonify({"error": "error url_list"})
  38. embedding = await get_img_embedding(url_list)
  39. return jsonify(embedding)
  40. @server_bp.route("/delete", methods=["POST"])
  41. async def delete():
  42. body = await request.get_json()
  43. level = body.get("level")
  44. params = body.get("params")
  45. if not level or not params:
  46. return jsonify({"error": "error level or params"})
  47. resource = get_resource_manager()
  48. delete_task = DeleteTask(resource)
  49. response = await delete_task.deal(level, params)
  50. return jsonify(response)
  51. @server_bp.route("/chunk", methods=["POST"])
  52. async def chunk():
  53. body = await request.get_json()
  54. text = body.get("text", "")
  55. ori_doc_id = body.get("doc_id")
  56. text = text.strip()
  57. if not text:
  58. return jsonify({"error": "error text"})
  59. resource = get_resource_manager()
  60. # generate doc id
  61. if ori_doc_id:
  62. body["re_chunk"] = True
  63. doc_id = ori_doc_id
  64. else:
  65. doc_id = f"doc-{uuid.uuid4()}"
  66. chunk_task = ChunkEmbeddingTask(doc_id=doc_id, resource=resource)
  67. doc_id = await chunk_task.deal(body)
  68. return jsonify({"doc_id": doc_id})
  69. @server_bp.route("/search", methods=["POST"])
  70. async def search():
  71. """
  72. filters: Dict[str, Any], # 条件过滤
  73. query_vec: List[float], # query 的向量
  74. anns_field: str = "vector_text", # query指定的向量空间
  75. search_params: Optional[Dict[str, Any]] = None, # 向量距离方式
  76. query_text: str = None, #是否通过 topic 倒排
  77. _source=False, # 是否返回元数据
  78. es_size: int = 10000, #es 第一层过滤数量
  79. sort_by: str = None, # 排序
  80. milvus_size: int = 10 # milvus粗排返回数量
  81. :return:
  82. """
  83. body = await request.get_json()
  84. # 解析数据
  85. search_type: str = body.get("search_type")
  86. filters: Dict[str, Any] = body.get("filters", {})
  87. anns_field: str = body.get("anns_field", "vector_text")
  88. search_params: Dict[str, Any] = body.get("search_params", BASE_MILVUS_SEARCH_PARAMS)
  89. query_text: str = body.get("query_text")
  90. _source: bool = body.get("_source", False)
  91. es_size: int = body.get("es_size", 10000)
  92. sort_by: str = body.get("sort_by")
  93. milvus_size: int = body.get("milvus", 20)
  94. limit: int = body.get("limit", 10)
  95. if not query_text:
  96. return jsonify({"error": "error query_text"})
  97. query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
  98. resource = get_resource_manager()
  99. search_engine = HybridSearch(
  100. milvus_pool=resource.milvus_client, es_pool=resource.es_client
  101. )
  102. try:
  103. match search_type:
  104. case "base":
  105. response = await search_engine.base_vector_search(
  106. query_vec=query_vector,
  107. anns_field=anns_field,
  108. search_params=search_params,
  109. limit=limit,
  110. )
  111. return jsonify(response), 200
  112. case "hybrid":
  113. response = await search_engine.hybrid_search(
  114. filters=filters,
  115. query_vec=query_vector,
  116. anns_field=anns_field,
  117. search_params=search_params,
  118. es_size=es_size,
  119. sort_by=sort_by,
  120. milvus_size=milvus_size,
  121. )
  122. return jsonify(response), 200
  123. case "strategy":
  124. return jsonify({"error": "strategy not implemented"}), 405
  125. case _:
  126. return jsonify({"error": "error search_type"}), 200
  127. except Exception as e:
  128. return jsonify({"error": str(e), "traceback": traceback.format_exc()}), 500
  129. @server_bp.route("/dataset/list", methods=["GET"])
  130. async def dataset_list():
  131. resource = get_resource_manager()
  132. datasets = await Dataset(resource.mysql_client).select_dataset()
  133. # 创建所有任务
  134. tasks = [
  135. Contents(resource.mysql_client).select_count(dataset["id"])
  136. for dataset in datasets
  137. ]
  138. counts = await asyncio.gather(*tasks)
  139. # 组装数据
  140. data_list = [
  141. {
  142. "dataset_id": dataset["id"],
  143. "name": dataset["name"],
  144. "count": count,
  145. "created_at": dataset["created_at"].strftime("%Y-%m-%d"),
  146. }
  147. for dataset, count in zip(datasets, counts)
  148. ]
  149. return jsonify({"status_code": 200, "detail": "success", "data": data_list})
  150. @server_bp.route("/dataset/add", methods=["POST"])
  151. async def add_dataset():
  152. resource = get_resource_manager()
  153. dataset = Dataset(resource.mysql_client)
  154. # 从请求体里取参数
  155. body = await request.get_json()
  156. name = body.get("name")
  157. if not name:
  158. return jsonify({"status_code": 400, "detail": "name is required"})
  159. # 执行新增
  160. await dataset.add_dataset(name)
  161. return jsonify({"status_code": 200, "detail": "success"})
  162. @server_bp.route("/content/get", methods=["GET"])
  163. async def get_content():
  164. resource = get_resource_manager()
  165. contents = Contents(resource.mysql_client)
  166. # 获取请求参数
  167. doc_id = request.args.get("docId")
  168. if not doc_id:
  169. return jsonify({"status_code": 400, "detail": "doc_id is required", "data": {}})
  170. # 查询内容
  171. rows = await contents.select_content_by_doc_id(doc_id)
  172. if not rows:
  173. return jsonify({"status_code": 404, "detail": "content not found", "data": {}})
  174. row = rows[0]
  175. return jsonify(
  176. {
  177. "status_code": 200,
  178. "detail": "success",
  179. "data": {
  180. "title": row.get("title", ""),
  181. "text": row.get("text", ""),
  182. "doc_id": row.get("doc_id", ""),
  183. },
  184. }
  185. )
  186. @server_bp.route("/content/list", methods=["GET"])
  187. async def content_list():
  188. resource = get_resource_manager()
  189. contents = Contents(resource.mysql_client)
  190. # 从 URL 查询参数获取分页和过滤参数
  191. page_num = int(request.args.get("page", 1))
  192. page_size = int(request.args.get("pageSize", 10))
  193. dataset_id = request.args.get("datasetId")
  194. doc_status = int(request.args.get("doc_status", 1))
  195. # order_by 可以用 JSON 字符串传递
  196. import json
  197. order_by_str = request.args.get("order_by", '{"id":"desc"}')
  198. try:
  199. order_by = json.loads(order_by_str)
  200. except Exception:
  201. order_by = {"id": "desc"}
  202. # 调用 select_contents,获取分页字典
  203. result = await contents.select_contents(
  204. page_num=page_num,
  205. page_size=page_size,
  206. dataset_id=dataset_id,
  207. doc_status=doc_status,
  208. order_by=order_by,
  209. )
  210. # 格式化 entities,只保留必要字段
  211. entities = [
  212. {
  213. "doc_id": row["doc_id"],
  214. "title": row.get("title") or "",
  215. "text": row.get("text") or "",
  216. }
  217. for row in result["entities"]
  218. ]
  219. return jsonify(
  220. {
  221. "status_code": 200,
  222. "detail": "success",
  223. "data": {
  224. "entities": entities,
  225. "total_count": result["total_count"],
  226. "page": result["page"],
  227. "page_size": result["page_size"],
  228. "total_pages": result["total_pages"],
  229. },
  230. }
  231. )
  232. async def query_search(
  233. query_text,
  234. filters=None,
  235. search_type="",
  236. anns_field="vector_text",
  237. search_params=BASE_MILVUS_SEARCH_PARAMS,
  238. _source=False,
  239. es_size=10000,
  240. sort_by=None,
  241. milvus_size=20,
  242. limit=10,
  243. ):
  244. if filters is None:
  245. filters = {}
  246. query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
  247. resource = get_resource_manager()
  248. search_engine = HybridSearch(
  249. milvus_pool=resource.milvus_client, es_pool=resource.es_client
  250. )
  251. try:
  252. match search_type:
  253. case "base":
  254. response = await search_engine.base_vector_search(
  255. query_vec=query_vector,
  256. anns_field=anns_field,
  257. search_params=search_params,
  258. limit=limit,
  259. )
  260. return response
  261. case "hybrid":
  262. response = await search_engine.hybrid_search(
  263. filters=filters,
  264. query_vec=query_vector,
  265. anns_field=anns_field,
  266. search_params=search_params,
  267. es_size=es_size,
  268. sort_by=sort_by,
  269. milvus_size=milvus_size,
  270. )
  271. return response
  272. case "strategy":
  273. return None
  274. case _:
  275. return None
  276. except Exception as e:
  277. return None
  278. @server_bp.route("/query", methods=["GET"])
  279. async def query():
  280. query_text = request.args.get("query")
  281. dataset_ids = request.args.get("datasetIds").split(",")
  282. search_type = request.args.get("search_type", "hybrid")
  283. query_results = await query_search(
  284. query_text=query_text,
  285. filters={"dataset_id": dataset_ids},
  286. search_type=search_type,
  287. )
  288. resource = get_resource_manager()
  289. content_chunk_mapper = ContentChunks(resource.mysql_client)
  290. dataset_mapper = Dataset(resource.mysql_client)
  291. res = []
  292. for result in query_results["results"]:
  293. content_chunks = await content_chunk_mapper.select_chunk_content(
  294. doc_id=result["doc_id"], chunk_id=result["chunk_id"]
  295. )
  296. if not content_chunks:
  297. return jsonify(
  298. {"status_code": 500, "detail": "content_chunk not found", "data": {}}
  299. )
  300. content_chunk = content_chunks[0]
  301. datasets = await dataset_mapper.select_dataset_by_id(
  302. content_chunk["dataset_id"]
  303. )
  304. if not datasets:
  305. return jsonify(
  306. {"status_code": 500, "detail": "dataset not found", "data": {}}
  307. )
  308. dataset = datasets[0]
  309. dataset_name = None
  310. if dataset:
  311. dataset_name = dataset["name"]
  312. res.append(
  313. {
  314. "docId": content_chunk["doc_id"],
  315. "content": content_chunk["text"],
  316. "contentSummary": content_chunk["summary"],
  317. "score": result["score"],
  318. "datasetName": dataset_name,
  319. }
  320. )
  321. data = {"results": res}
  322. return jsonify({"status_code": 200, "detail": "success", "data": data})
  323. @server_bp.route("/chat", methods=["GET"])
  324. async def chat():
  325. query_text = request.args.get("query")
  326. dataset_id_strs = request.args.get("datasetIds")
  327. dataset_ids = dataset_id_strs.split(",")
  328. search_type = request.args.get("search_type", "hybrid")
  329. query_results = await query_search(
  330. query_text=query_text,
  331. filters={"dataset_id": dataset_ids},
  332. search_type=search_type,
  333. )
  334. resource = get_resource_manager()
  335. content_chunk_mapper = ContentChunks(resource.mysql_client)
  336. dataset_mapper = Dataset(resource.mysql_client)
  337. chat_result_mapper = ChatResult(resource.mysql_client)
  338. res = []
  339. for result in query_results["results"]:
  340. content_chunks = await content_chunk_mapper.select_chunk_content(
  341. doc_id=result["doc_id"], chunk_id=result["chunk_id"]
  342. )
  343. if not content_chunks:
  344. return jsonify(
  345. {"status_code": 500, "detail": "content_chunk not found", "data": {}}
  346. )
  347. content_chunk = content_chunks[0]
  348. datasets = await dataset_mapper.select_dataset_by_id(
  349. content_chunk["dataset_id"]
  350. )
  351. if not datasets:
  352. return jsonify(
  353. {"status_code": 500, "detail": "dataset not found", "data": {}}
  354. )
  355. dataset = datasets[0]
  356. dataset_name = None
  357. if dataset:
  358. dataset_name = dataset["name"]
  359. res.append(
  360. {
  361. "docId": content_chunk["doc_id"],
  362. "content": content_chunk["text"],
  363. "contentSummary": content_chunk["summary"],
  364. "score": result["score"],
  365. "datasetName": dataset_name,
  366. }
  367. )
  368. chat_classifier = ChatClassifier()
  369. chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
  370. data = {"results": res, "chat_res": chat_res["summary"]}
  371. await chat_result_mapper.insert_chat_result(
  372. query_text,
  373. dataset_id_strs,
  374. json.dumps(data, ensure_ascii=False),
  375. chat_res["summary"],
  376. chat_res["relevance_score"],
  377. chat_res["status"],
  378. )
  379. return jsonify({"status_code": 200, "detail": "success", "data": data})
  380. @server_bp.route("/chunk/list", methods=["GET"])
  381. async def chunk_list():
  382. resource = get_resource_manager()
  383. content_chunk = ContentChunks(resource.mysql_client)
  384. # 从 URL 查询参数获取分页和过滤参数
  385. page_num = int(request.args.get("page", 1))
  386. page_size = int(request.args.get("pageSize", 10))
  387. doc_id = request.args.get("docId")
  388. if not doc_id:
  389. return jsonify({"status_code": 500, "detail": "docId not found", "data": {}})
  390. # 调用 select_contents,获取分页字典
  391. result = await content_chunk.select_chunk_contents(
  392. page_num=page_num, page_size=page_size, doc_id=doc_id
  393. )
  394. if not result:
  395. return jsonify({"status_code": 500, "detail": "chunk is empty", "data": {}})
  396. # 格式化 entities,只保留必要字段
  397. entities = [
  398. {
  399. "id": row["id"],
  400. "chunk_id": row["chunk_id"],
  401. "doc_id": row["doc_id"],
  402. "summary": row.get("summary") or "",
  403. "text": row.get("text") or "",
  404. }
  405. for row in result["entities"]
  406. ]
  407. return jsonify(
  408. {
  409. "status_code": 200,
  410. "detail": "success",
  411. "data": {
  412. "entities": entities,
  413. "total_count": result["total_count"],
  414. "page": result["page"],
  415. "page_size": result["page_size"],
  416. "total_pages": result["total_pages"],
  417. },
  418. }
  419. )
  420. @server_bp.route("/chat/detail", methods=["POST"])
  421. async def chat_detail():
  422. body = await request.get_json()
  423. query_text = body.get("query")
  424. dataset_id_strs = "11,12"
  425. dataset_ids = dataset_id_strs.split(",")
  426. search_type = "hybrid"
  427. query_results = await query_search(
  428. query_text=query_text,
  429. filters={"dataset_id": dataset_ids},
  430. search_type=search_type,
  431. )
  432. resource = get_resource_manager()
  433. content_chunk_mapper = ContentChunks(resource.mysql_client)
  434. contents_mapper = Contents(resource.mysql_client)
  435. chat_result_mapper = ChatResult(resource.mysql_client)
  436. res = []
  437. for result in query_results["results"]:
  438. content_chunks = await content_chunk_mapper.select_chunk_content(
  439. doc_id=result["doc_id"], chunk_id=result["chunk_id"]
  440. )
  441. contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
  442. if not content_chunks:
  443. return jsonify(
  444. {"status_code": 500, "detail": "content_chunk not found", "data": {}}
  445. )
  446. if not contents:
  447. return jsonify(
  448. {"status_code": 500, "detail": "contents not found", "data": {}}
  449. )
  450. content_chunk = content_chunks[0]
  451. content = contents[0]
  452. res.append(
  453. {
  454. "contentChunk": content_chunk["text"],
  455. "contentSummary": content_chunk["summary"],
  456. "content": content["text"],
  457. "score": result["score"],
  458. }
  459. )
  460. chat_classifier = ChatClassifier()
  461. chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
  462. data = {
  463. "result": chat_res["summary"],
  464. "status": chat_res["status"],
  465. "metaData": res,
  466. }
  467. await chat_result_mapper.insert_chat_result(
  468. query_text,
  469. dataset_id_strs,
  470. json.dumps(data, ensure_ascii=False),
  471. chat_res["summary"],
  472. chat_res["relevance_score"],
  473. chat_res["status"],
  474. )
  475. return jsonify({"status_code": 200, "detail": "success", "data": data})
  476. @server_bp.route("/auto_rechunk", methods=["GET"])
  477. async def auto_rechunk():
  478. resource = get_resource_manager()
  479. auto_rechunk_task = AutoRechunkTask(mysql_client=resource.mysql_client)
  480. process_cnt = await auto_rechunk_task.deal()
  481. return jsonify({"status_code": 200, "detail": "success", "cnt": process_cnt})
  482. @server_bp.route("/build_graph", methods=["POST"])
  483. async def delete_task():
  484. body = await request.get_json()
  485. doc_id: str = body.get("doc_id")
  486. if not doc_id:
  487. return jsonify({"status_code": 500, "detail": "docId not found", "data": {}})
  488. resource = get_resource_manager()
  489. build_graph_task = BuildGraph(neo4j=resource.graph_client, es_client=resource.es_client)
  490. await build_graph_task.deal(doc_id)
  491. return jsonify({"status_code": 200, "detail": "success", "data": {}})