import asyncio import json from typing import List from applications.api import get_basic_embedding from applications.utils.async_utils import run_tasks_with_asyncio_task_group from applications.utils.chunks import LLMClassifier, TopicAwarePackerV2 from applications.utils.milvus import async_insert_chunk from applications.utils.mysql import Books, ContentChunks, Contents from applications.utils.nlp import num_tokens from applications.config import Chunk, DEFAULT_MODEL from applications.config import ELASTIC_SEARCH_INDEX class ChunkEmbeddingTask(TopicAwarePackerV2): def __init__(self, doc_id, resource): super().__init__(doc_id) self.chunk_manager = None self.content_manager = None self.book_manager = None self.mysql_client = resource.mysql_client self.milvus_client = resource.milvus_client self.es_client = resource.es_client self.classifier = LLMClassifier() @staticmethod async def get_embedding_list(text: str) -> List: return await get_basic_embedding(text=text, model=DEFAULT_MODEL) def init_processer(self): self.content_manager = Contents(self.mysql_client) self.chunk_manager = ContentChunks(self.mysql_client) self.book_manager = Books(self.mysql_client) async def _chunk_each_content(self, doc_id: str, data: dict) -> List[Chunk]: title, text = data.get("title", "").strip(), data["text"].strip() text_type = data.get("text_type", 1) dataset_id = data.get("dataset_id", 0) # 默认知识库 id 为 0 re_chunk = data.get("re_chunk", False) dont_chunk = data.get("dont_chunk", False) ext = data.get("ext", None) if title is None: if ext and isinstance(ext, str): try: ext_dict = json.loads(ext) title = ext_dict.get("query", None) except json.JSONDecodeError: title = None else: title = None if re_chunk: await self.content_manager.update_content_info( doc_id=doc_id, text=text, text_type=text_type, title=title, dataset_id=dataset_id, ) flag = True else: flag = await self.content_manager.insert_content( doc_id, text, text_type, title, dataset_id, ext ) if not flag: return [] else: raw_chunks = await self.chunk(text, text_type, dataset_id, dont_chunk) if not raw_chunks: await self.content_manager.update_content_status( doc_id=doc_id, ori_status=self.INIT_STATUS, new_status=self.FAILED_STATUS, ) return [] await self.content_manager.update_content_status( doc_id=doc_id, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) return raw_chunks async def insert_into_es(self, milvus_id, chunk: Chunk) -> int: docs = [ { "_index": ELASTIC_SEARCH_INDEX, "_id": milvus_id, "_source": { "milvus_id": milvus_id, "doc_id": chunk.doc_id, "dataset_id": chunk.dataset_id, "chunk_id": chunk.chunk_id, "topic": chunk.topic, "domain": chunk.domain, "task_type": chunk.task_type, "text_type": chunk.text_type, "keywords": chunk.keywords, "concepts": chunk.concepts, "entities": chunk.entities, "status": chunk.status, }, } ] resp = await self.es_client.bulk_insert(docs) return resp["success"] async def save_each_chunk(self, chunk: Chunk): # insert flag = await self.chunk_manager.insert_chunk(chunk) if not flag: print("插入文本失败") return acquire_lock = await self.chunk_manager.update_chunk_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) if not acquire_lock: print("抢占文本分块锁失败") return completion = await self.classifier.classify_chunk(chunk) if not completion: await self.chunk_manager.update_chunk_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, ) print("从deepseek获取信息失败") return update_flag = await self.chunk_manager.set_chunk_result( chunk=completion, ori_status=self.PROCESSING_STATUS, new_status=self.FINISHED_STATUS, ) if not update_flag: await self.chunk_manager.update_chunk_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, ) return milvus_id = await self.save_to_milvus(completion) if not milvus_id: return # 存储到 es 中 # acquire_lock acquire_es_lock = await self.chunk_manager.update_es_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) if not acquire_es_lock: print(f"获取 es Lock Fail: {chunk.doc_id}--{chunk.chunk_id}") return insert_rows = await self.insert_into_es(milvus_id, completion) final_status = self.FINISHED_STATUS if insert_rows else self.FAILED_STATUS await self.chunk_manager.update_es_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=final_status, ) async def save_to_milvus(self, chunk: Chunk): """ :param chunk: each single chunk :return: """ # 抢锁 acquire_lock = await self.chunk_manager.update_embedding_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, new_status=self.PROCESSING_STATUS, ori_status=self.INIT_STATUS, ) if not acquire_lock: print(f"抢占-{chunk.doc_id}-{chunk.chunk_id}分块-embedding处理锁失败") return None try: data = { "doc_id": chunk.doc_id, "chunk_id": chunk.chunk_id, "vector_text": await self.get_embedding_list(chunk.text), "vector_summary": await self.get_embedding_list(chunk.summary), "vector_questions": await self.get_embedding_list( ",".join(chunk.questions) ), } resp = await async_insert_chunk(self.milvus_client, data) if not resp: await self.chunk_manager.update_embedding_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, ) return None await self.chunk_manager.update_embedding_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=self.FINISHED_STATUS, ) milvus_id = resp[0] return milvus_id except Exception as e: await self.chunk_manager.update_embedding_status( doc_id=chunk.doc_id, chunk_id=chunk.chunk_id, ori_status=self.PROCESSING_STATUS, new_status=self.FAILED_STATUS, ) print(f"存入向量数据库失败", e) return None async def deal(self, data): text = data.get("text", "") dont_chunk = data.get("dont_chunk", False) dataset_id = data.get("dataset_id", 0) if dataset_id == 12: data["dont_chunk"] = True # 如果无需分块,判断text 长度 if dont_chunk and num_tokens(text) >= self.max_tokens: data["dont_chunk"] = False # return {"error": "文档超多模型支持的最大吞吐量"} self.init_processer() async def _process(): chunks = await self._chunk_each_content(self.doc_id, data) if not chunks: return # # dev # for chunk in tqdm(chunks): # await self.save_each_chunk(chunk) await run_tasks_with_asyncio_task_group( task_list=chunks, handler=self.save_each_chunk, description="处理单篇文章分块", unit="chunk", max_concurrency=10, ) await self.content_manager.update_content_status( doc_id=self.doc_id, ori_status=self.PROCESSING_STATUS, new_status=self.FINISHED_STATUS, ) asyncio.create_task(_process()) return self.doc_id class ChunkBooksTask(ChunkEmbeddingTask): """图书类型分块任务""" BOOK_PDF_DATASET_ID = 21 BOOK_PDF_TYPE = 3 async def _process_each_book(self, book_id): result = await self.book_manager.get_book_extract_detail(book_id=book_id) extract_result = result[0]["extract_result"] book_name = result[0]["book_name"] book_oss_path = result[0]["book_oss_path"] book_texts = [ i["text"] for i in json.loads(extract_result) if i["type"] == "text" ] # first insert into contents flag = await self.content_manager.insert_content( self.doc_id, book_oss_path, self.BOOK_PDF_TYPE, book_name, self.BOOK_PDF_DATASET_ID, ext=None, ) if not flag: return [] else: raw_chunks = await self.chunk_books( sentence_list=book_texts, text_type=self.BOOK_PDF_TYPE, dataset_id=self.BOOK_PDF_DATASET_ID, ) if not raw_chunks: await self.content_manager.update_content_status( doc_id=self.doc_id, ori_status=self.INIT_STATUS, new_status=self.FAILED_STATUS, ) return [] await self.content_manager.update_content_status( doc_id=self.doc_id, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) return raw_chunks async def deal(self, data): book_id = data.get("book_id", None) if not book_id: return {"error": "Book id should not be None"} self.init_processer() # LOCK acquire_lock = await self.book_manager.update_book_chunk_status( book_id=book_id, ori_status=self.INIT_STATUS, new_status=self.PROCESSING_STATUS, ) if not acquire_lock: return {"info": "book is processing or processed"} async def _process(): chunks = await self._process_each_book(book_id) if not chunks: return # # dev # for chunk in tqdm(chunks): # await self.save_each_chunk(chunk) await run_tasks_with_asyncio_task_group( task_list=chunks, handler=self.save_each_chunk, description="处理单篇文章分块", unit="chunk", max_concurrency=10, ) await self.content_manager.update_content_status( doc_id=self.doc_id, ori_status=self.PROCESSING_STATUS, new_status=self.FINISHED_STATUS, ) await self.book_manager.update_book_chunk_status( book_id=book_id, ori_status=self.PROCESSING_STATUS, new_status=self.FINISHED_STATUS, ) asyncio.create_task(_process()) return self.doc_id __all__ = ["ChunkEmbeddingTask", "ChunkBooksTask"]