|
@@ -6,7 +6,7 @@ from applications.api import get_basic_embedding
|
|
|
from applications.utils.async_utils import run_tasks_with_asyncio_task_group
|
|
|
from applications.utils.chunks import LLMClassifier, TopicAwarePackerV2
|
|
|
from applications.utils.milvus import async_insert_chunk
|
|
|
-from applications.utils.mysql import ContentChunks, Contents
|
|
|
+from applications.utils.mysql import Books, ContentChunks, Contents
|
|
|
from applications.utils.nlp import num_tokens
|
|
|
from applications.config import Chunk, DEFAULT_MODEL
|
|
|
from applications.config import ELASTIC_SEARCH_INDEX
|
|
@@ -17,6 +17,7 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
super().__init__(doc_id)
|
|
|
self.chunk_manager = None
|
|
|
self.content_manager = None
|
|
|
+ self.book_manager = None
|
|
|
self.mysql_client = resource.mysql_client
|
|
|
self.milvus_client = resource.milvus_client
|
|
|
self.es_client = resource.es_client
|
|
@@ -29,6 +30,7 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
def init_processer(self):
|
|
|
self.content_manager = Contents(self.mysql_client)
|
|
|
self.chunk_manager = ContentChunks(self.mysql_client)
|
|
|
+ self.book_manager = Books(self.mysql_client)
|
|
|
|
|
|
async def _chunk_each_content(self, doc_id: str, data: dict) -> List[Chunk]:
|
|
|
title, text = data.get("title", "").strip(), data["text"].strip()
|
|
@@ -260,3 +262,102 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
|
|
|
asyncio.create_task(_process())
|
|
|
return self.doc_id
|
|
|
+
|
|
|
+
|
|
|
+class ChunkBooksTask(ChunkEmbeddingTask):
|
|
|
+ """图书类型分块任务"""
|
|
|
+
|
|
|
+ BOOK_PDF_DATASET_ID = 17
|
|
|
+ BOOK_PDF_TYPE = 3
|
|
|
+
|
|
|
+ async def _process_each_book(self, book_id):
|
|
|
+ result = await self.book_manager.get_book_extract_detail(book_id=book_id)
|
|
|
+ extract_result = result[0]["extract_result"]
|
|
|
+ book_name = result[0]["book_name"]
|
|
|
+ book_oss_path = result[0]["book_oss_path"]
|
|
|
+ book_texts = [
|
|
|
+ i["text"] for i in json.loads(extract_result) if i["type"] == "text"
|
|
|
+ ]
|
|
|
+
|
|
|
+ # first insert into contents
|
|
|
+ flag = await self.content_manager.insert_content(
|
|
|
+ self.doc_id,
|
|
|
+ book_oss_path,
|
|
|
+ self.BOOK_PDF_TYPE,
|
|
|
+ book_name,
|
|
|
+ self.BOOK_PDF_DATASET_ID,
|
|
|
+ ext=None,
|
|
|
+ )
|
|
|
+ if not flag:
|
|
|
+ return []
|
|
|
+ else:
|
|
|
+ raw_chunks = await self.chunk_books(sentence_list=book_texts, text_type=self.BOOK_PDF_TYPE, dataset_id=self.BOOK_PDF_DATASET_ID)
|
|
|
+ if not raw_chunks:
|
|
|
+ await self.content_manager.update_content_status(
|
|
|
+ doc_id=self.doc_id,
|
|
|
+ ori_status=self.INIT_STATUS,
|
|
|
+ new_status=self.FAILED_STATUS,
|
|
|
+ )
|
|
|
+ return []
|
|
|
+
|
|
|
+ await self.content_manager.update_content_status(
|
|
|
+ doc_id=self.doc_id,
|
|
|
+ ori_status=self.INIT_STATUS,
|
|
|
+ new_status=self.PROCESSING_STATUS,
|
|
|
+ )
|
|
|
+ return raw_chunks
|
|
|
+
|
|
|
+ async def deal(self, data):
|
|
|
+ book_id = data.get("book_id", None)
|
|
|
+ if not book_id:
|
|
|
+ return {"error": "Book id should not be None"}
|
|
|
+
|
|
|
+ self.init_processer()
|
|
|
+
|
|
|
+ # LOCK
|
|
|
+ acquire_lock = await self.book_manager.update_book_chunk_status(
|
|
|
+ book_id=book_id,
|
|
|
+ ori_status=self.INIT_STATUS,
|
|
|
+ new_status=self.PROCESSING_STATUS
|
|
|
+ )
|
|
|
+ print(acquire_lock)
|
|
|
+
|
|
|
+ if not acquire_lock:
|
|
|
+ return {
|
|
|
+ "info": "book is processing or processed"
|
|
|
+ }
|
|
|
+
|
|
|
+ async def _process():
|
|
|
+ chunks = await self._process_each_book(book_id)
|
|
|
+ if not chunks:
|
|
|
+ return
|
|
|
+
|
|
|
+ # # dev
|
|
|
+ # for chunk in tqdm(chunks):
|
|
|
+ # await self.save_each_chunk(chunk)
|
|
|
+
|
|
|
+ await run_tasks_with_asyncio_task_group(
|
|
|
+ task_list=chunks,
|
|
|
+ handler=self.save_each_chunk,
|
|
|
+ description="处理单篇文章分块",
|
|
|
+ unit="chunk",
|
|
|
+ max_concurrency=10,
|
|
|
+ )
|
|
|
+
|
|
|
+ await self.content_manager.update_content_status(
|
|
|
+ doc_id=self.doc_id,
|
|
|
+ ori_status=self.PROCESSING_STATUS,
|
|
|
+ new_status=self.FINISHED_STATUS,
|
|
|
+ )
|
|
|
+
|
|
|
+ await self.book_manager.update_book_chunk_status(
|
|
|
+ book_id=book_id,
|
|
|
+ ori_status=self.PROCESSING_STATUS,
|
|
|
+ new_status=self.FINISHED_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ asyncio.create_task(_process())
|
|
|
+ return self.doc_id
|
|
|
+
|
|
|
+
|
|
|
+__all__ = ["ChunkEmbeddingTask", "ChunkBooksTask"]
|