import json import os import uuid from applications.api import get_basic_embedding from applications.api.qwen import QwenClient from applications.async_task import ChunkBooksTask from applications.config import BASE_MILVUS_SEARCH_PARAMS, DEFAULT_MODEL from applications.resource import get_resource_manager from applications.search import HybridSearch from applications.utils.mysql import Books, ChatResult, ContentChunks from applications.utils.oss.oss_client import OSSClient from applications.utils.pdf.book_extract import book_extract from applications.utils.spider.study import study async def handle_books(): try: # 获取资源管理器和客户端 resource = get_resource_manager() books_mapper = Books(resource.mysql_client) oss_client = OSSClient() # 获取待处理的书籍列表 books = await books_mapper.select_init_books() for book in books: book_id = book.get("book_id") # 获取提取状态 extract_status = (await books_mapper.select_book_extract_status(book_id))[ 0 ].get("extract_status") if extract_status == 0: # 更新提取状态为处理中 await books_mapper.update_book_extract_status(book_id, 1) book_path = os.path.join("/tmp", book_id) if not os.path.exists(book_path): oss_path = f"rag/pdfs/{book_id}" try: # 下载书籍文件 await oss_client.download_file(oss_path, book_path) except Exception as e: continue # 如果下载失败,跳过该书籍 try: # 提取书籍内容 res = await book_extract(book_path, book_id) if res: content_list = ( res.get("results", {}) .get(book_id, {}) .get("content_list", []) ) if content_list: # 更新提取结果 await books_mapper.update_book_extract_result( book_id, content_list ) except Exception as e: await books_mapper.update_book_extract_status(book_id, 99) continue # 如果提取过程失败,跳过该书籍 # 创建文档 ID doc_id = f"doc-{uuid.uuid4()}" chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource) # 处理分片任务 body = {"book_id": book_id} await chunk_task.deal(body) # 异步执行分片任务 except Exception as e: # 捕获整体异常 print(f"处理请求失败,错误: {e}") async def process_question(question, query_text, rag_chat_agent): try: dataset_id_strs = "11,12" dataset_ids = dataset_id_strs.split(",") search_type = "hybrid" # 执行查询任务 query_results = await query_search( query_text=question, filters={"dataset_id": dataset_ids}, search_type=search_type, ) resource = get_resource_manager() chat_result_mapper = ChatResult(resource.mysql_client) # 异步执行 chat 与 deepseek 的对话 chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results) # # 判断是否需要执行 study study_task_id = None if chat_result["status"] == 0: study_task_id = study(question)["task_id"] qwen_client = QwenClient() llm_search = qwen_client.search_and_chat(user_prompt=question) decision = await rag_chat_agent.make_decision(question, chat_result, llm_search) # 构建返回的数据 data = { "query": question, "result": decision["result"], "status": decision["status"], "relevance_score": decision["relevance_score"], # "used_tools": decision["used_tools"], } # 插入数据库 await chat_result_mapper.insert_chat_result( question, dataset_id_strs, json.dumps(query_results, ensure_ascii=False), chat_result["summary"], chat_result["relevance_score"], chat_result["status"], llm_search["content"], json.dumps(llm_search["search_results"], ensure_ascii=False), 1, decision["result"], study_task_id, ) return data except Exception as e: print(f"Error processing question: {question}. Error: {str(e)}") return {"query": question, "error": str(e)} async def query_search( query_text, filters=None, search_type="", anns_field="vector_text", search_params=BASE_MILVUS_SEARCH_PARAMS, _source=False, es_size=10000, sort_by=None, milvus_size=20, limit=10, ): if filters is None: filters = {} query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL) resource = get_resource_manager() search_engine = HybridSearch( milvus_pool=resource.milvus_client, es_pool=resource.es_client, graph_pool=resource.graph_client, ) try: match search_type: case "base": response = await search_engine.base_vector_search( query_vec=query_vector, anns_field=anns_field, search_params=search_params, limit=limit, ) return response case "hybrid": response = await search_engine.hybrid_search( filters=filters, query_vec=query_vector, anns_field=anns_field, search_params=search_params, es_size=es_size, sort_by=sort_by, milvus_size=milvus_size, ) case "strategy": return None case _: return None except Exception as e: return None if response is None: return None resource = get_resource_manager() content_chunk_mapper = ContentChunks(resource.mysql_client) res = [] for result in response["results"]: content_chunks = await content_chunk_mapper.select_chunk_content( doc_id=result["doc_id"], chunk_id=result["chunk_id"] ) if content_chunks: content_chunk = content_chunks[0] res.append( { "docId": content_chunk["doc_id"], "content": content_chunk["text"], "contentSummary": content_chunk["summary"], "score": result["score"], "datasetId": content_chunk["dataset_id"], } ) return res[:limit]