123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import json
- import os
- import uuid
- from applications.api import get_basic_embedding
- from applications.api.qwen import QwenClient
- from applications.async_task import ChunkBooksTask
- from applications.config import BASE_MILVUS_SEARCH_PARAMS, DEFAULT_MODEL
- from applications.resource import get_resource_manager
- from applications.search import HybridSearch
- from applications.utils.mysql import Books, ChatResult, ContentChunks
- from applications.utils.oss.oss_client import OSSClient
- from applications.utils.pdf.book_extract import book_extract
- from applications.utils.spider.study import study
- async def handle_books():
- try:
- # 获取资源管理器和客户端
- resource = get_resource_manager()
- books_mapper = Books(resource.mysql_client)
- oss_client = OSSClient()
- # 获取待处理的书籍列表
- books = await books_mapper.select_init_books()
- for book in books:
- book_id = book.get("book_id")
- # 获取提取状态
- extract_status = (await books_mapper.select_book_extract_status(book_id))[0].get("extract_status")
- if extract_status == 0:
- # 更新提取状态为处理中
- await books_mapper.update_book_extract_status(book_id, 1)
- book_path = os.path.join("/tmp", book_id)
- if not os.path.exists(book_path):
- oss_path = f"rag/pdfs/{book_id}"
- try:
- # 下载书籍文件
- await oss_client.download_file(oss_path, book_path)
- except Exception as e:
- continue # 如果下载失败,跳过该书籍
- try:
- # 提取书籍内容
- res = await book_extract(book_path, book_id)
- if res:
- content_list = res.get("results", {}).get(book_id, {}).get("content_list", [])
- if content_list:
- # 更新提取结果
- await books_mapper.update_book_extract_result(book_id, content_list)
- # 创建文档 ID
- doc_id = f"doc-{uuid.uuid4()}"
- chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
- # 处理分片任务
- body = {"book_id": book_id}
- await chunk_task.deal(body) # 异步执行分片任务
- except Exception as e:
- continue # 如果提取过程失败,跳过该书籍
- except Exception as e:
- # 捕获整体异常
- print(f"处理请求失败,错误: {e}")
- async def process_question(question, query_text, rag_chat_agent):
- try:
- dataset_id_strs = "11,12"
- dataset_ids = dataset_id_strs.split(",")
- search_type = "hybrid"
- # 执行查询任务
- query_results = await query_search(
- query_text=question,
- filters={"dataset_id": dataset_ids},
- search_type=search_type,
- )
- resource = get_resource_manager()
- chat_result_mapper = ChatResult(resource.mysql_client)
- # 异步执行 chat 与 deepseek 的对话
- chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
- # # 判断是否需要执行 study
- study_task_id = None
- if chat_result["status"] == 0:
- study_task_id = study(question)["task_id"]
- qwen_client = QwenClient()
- llm_search = qwen_client.search_and_chat(
- user_prompt=question
- )
- decision = await rag_chat_agent.make_decision(
- question, chat_result, llm_search
- )
- # 构建返回的数据
- data = {
- "query": question,
- "result": decision["result"],
- "status": decision["status"],
- "relevance_score": decision["relevance_score"],
- # "used_tools": decision["used_tools"],
- }
- # 插入数据库
- await chat_result_mapper.insert_chat_result(
- question,
- dataset_id_strs,
- json.dumps(query_results, ensure_ascii=False),
- chat_result["summary"],
- chat_result["relevance_score"],
- chat_result["status"],
- llm_search["content"],
- json.dumps(llm_search["search_results"], ensure_ascii=False),
- 1,
- decision["result"],
- study_task_id,
- )
- return data
- except Exception as e:
- print(f"Error processing question: {question}. Error: {str(e)}")
- return {"query": question, "error": str(e)}
- async def query_search(
- query_text,
- filters=None,
- search_type="",
- anns_field="vector_text",
- search_params=BASE_MILVUS_SEARCH_PARAMS,
- _source=False,
- es_size=10000,
- sort_by=None,
- milvus_size=20,
- limit=10,
- ):
- if filters is None:
- filters = {}
- query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
- resource = get_resource_manager()
- search_engine = HybridSearch(
- milvus_pool=resource.milvus_client,
- es_pool=resource.es_client,
- graph_pool=resource.graph_client,
- )
- try:
- match search_type:
- case "base":
- response = await search_engine.base_vector_search(
- query_vec=query_vector,
- anns_field=anns_field,
- search_params=search_params,
- limit=limit,
- )
- return response
- case "hybrid":
- response = await search_engine.hybrid_search(
- filters=filters,
- query_vec=query_vector,
- anns_field=anns_field,
- search_params=search_params,
- es_size=es_size,
- sort_by=sort_by,
- milvus_size=milvus_size,
- )
- case "strategy":
- return None
- case _:
- return None
- except Exception as e:
- return None
- if response is None:
- return None
- resource = get_resource_manager()
- content_chunk_mapper = ContentChunks(resource.mysql_client)
- res = []
- for result in response["results"]:
- content_chunks = await content_chunk_mapper.select_chunk_content(
- doc_id=result["doc_id"], chunk_id=result["chunk_id"]
- )
- if content_chunks:
- content_chunk = content_chunks[0]
- res.append(
- {
- "docId": content_chunk["doc_id"],
- "content": content_chunk["text"],
- "contentSummary": content_chunk["summary"],
- "score": result["score"],
- "datasetId": content_chunk["dataset_id"],
- }
- )
- return res[:limit]
|