Parcourir la source

chunk_code 优化

luojunhui il y a 2 semaines
Parent
commit
2eefcbba4b
1 fichiers modifiés avec 24 ajouts et 24 suppressions
  1. 24 24
      applications/async_task/chunk_task.py

+ 24 - 24
applications/async_task/chunk_task.py

@@ -11,27 +11,27 @@ from applications.config import ELASTIC_SEARCH_INDEX
 
 
 class ChunkEmbeddingTask(TopicAwareChunker):
-    def __init__(self, mysql_pool, vector_pool, cfg: ChunkerConfig, doc_id, es_pool):
+    def __init__(self, cfg: ChunkerConfig, doc_id, resource):
         super().__init__(cfg, doc_id)
-        self.content_chunk_processor = None
-        self.contents_processor = None
-        self.mysql_pool = mysql_pool
-        self.vector_pool = vector_pool
+        self.chunk_manager = None
+        self.content_manager = None
+        self.mysql_client = resource.mysql_client
+        self.milvus_client = resource.milvus_client
+        self.es_client = resource.es_client
         self.classifier = LLMClassifier()
-        self.es_client = es_pool
 
     @staticmethod
     async def get_embedding_list(text: str) -> List:
         return await get_basic_embedding(text=text, model=DEFAULT_MODEL)
 
     def init_processer(self):
-        self.contents_processor = Contents(self.mysql_pool)
-        self.content_chunk_processor = ContentChunks(self.mysql_pool)
+        self.content_manager = Contents(self.mysql_client)
+        self.chunk_manager = ContentChunks(self.mysql_client)
 
     async def _chunk_each_content(
         self, doc_id: str, text: str, text_type: int, title: str, dataset_id: int
     ) -> List[Chunk]:
-        flag = await self.contents_processor.insert_content(
+        flag = await self.content_manager.insert_content(
             doc_id, text, text_type, title, dataset_id
         )
         if not flag:
@@ -39,14 +39,14 @@ class ChunkEmbeddingTask(TopicAwareChunker):
         else:
             raw_chunks = await self.chunk(text, text_type, dataset_id)
             if not raw_chunks:
-                await self.contents_processor.update_content_status(
+                await self.content_manager.update_content_status(
                     doc_id=doc_id,
                     ori_status=self.INIT_STATUS,
                     new_status=self.FAILED_STATUS,
                 )
                 return []
 
-            await self.contents_processor.update_content_status(
+            await self.content_manager.update_content_status(
                 doc_id=doc_id,
                 ori_status=self.INIT_STATUS,
                 new_status=self.PROCESSING_STATUS,
@@ -79,12 +79,12 @@ class ChunkEmbeddingTask(TopicAwareChunker):
 
     async def save_each_chunk(self, chunk: Chunk):
         # insert
-        flag = await self.content_chunk_processor.insert_chunk(chunk)
+        flag = await self.chunk_manager.insert_chunk(chunk)
         if not flag:
             print("插入文本失败")
             return
 
-        acquire_lock = await self.content_chunk_processor.update_chunk_status(
+        acquire_lock = await self.chunk_manager.update_chunk_status(
             doc_id=chunk.doc_id,
             chunk_id=chunk.chunk_id,
             ori_status=self.INIT_STATUS,
@@ -96,7 +96,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
 
         completion = await self.classifier.classify_chunk(chunk)
         if not completion:
-            await self.content_chunk_processor.update_chunk_status(
+            await self.chunk_manager.update_chunk_status(
                 doc_id=chunk.doc_id,
                 chunk_id=chunk.chunk_id,
                 ori_status=self.PROCESSING_STATUS,
@@ -105,13 +105,13 @@ class ChunkEmbeddingTask(TopicAwareChunker):
             print("从deepseek获取信息失败")
             return
 
-        update_flag = await self.content_chunk_processor.set_chunk_result(
+        update_flag = await self.chunk_manager.set_chunk_result(
             chunk=completion,
             ori_status=self.PROCESSING_STATUS,
             new_status=self.FINISHED_STATUS,
         )
         if not update_flag:
-            await self.content_chunk_processor.update_chunk_status(
+            await self.chunk_manager.update_chunk_status(
                 doc_id=chunk.doc_id,
                 chunk_id=chunk.chunk_id,
                 ori_status=self.PROCESSING_STATUS,
@@ -125,7 +125,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
 
         # 存储到 es 中
         # acquire_lock
-        acquire_es_lock = await self.content_chunk_processor.update_es_status(
+        acquire_es_lock = await self.chunk_manager.update_es_status(
             doc_id=chunk.doc_id,
             chunk_id=chunk.chunk_id,
             ori_status=self.INIT_STATUS,
@@ -137,7 +137,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
 
         insert_rows = await self.insert_into_es(milvus_id, completion)
         final_status = self.FINISHED_STATUS if insert_rows else self.FAILED_STATUS
-        await self.content_chunk_processor.update_es_status(
+        await self.chunk_manager.update_es_status(
             doc_id=chunk.doc_id,
             chunk_id=chunk.chunk_id,
             ori_status=self.PROCESSING_STATUS,
@@ -150,7 +150,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
         :return:
         """
         # 抢锁
-        acquire_lock = await self.content_chunk_processor.update_embedding_status(
+        acquire_lock = await self.chunk_manager.update_embedding_status(
             doc_id=chunk.doc_id,
             chunk_id=chunk.chunk_id,
             new_status=self.PROCESSING_STATUS,
@@ -169,9 +169,9 @@ class ChunkEmbeddingTask(TopicAwareChunker):
                     ",".join(chunk.questions)
                 ),
             }
-            resp = await async_insert_chunk(self.vector_pool, data)
+            resp = await async_insert_chunk(self.milvus_client, data)
             if not resp:
-                await self.content_chunk_processor.update_embedding_status(
+                await self.chunk_manager.update_embedding_status(
                     doc_id=chunk.doc_id,
                     chunk_id=chunk.chunk_id,
                     ori_status=self.PROCESSING_STATUS,
@@ -179,7 +179,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
                 )
                 return None
 
-            await self.content_chunk_processor.update_embedding_status(
+            await self.chunk_manager.update_embedding_status(
                 doc_id=chunk.doc_id,
                 chunk_id=chunk.chunk_id,
                 ori_status=self.PROCESSING_STATUS,
@@ -188,7 +188,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
             milvus_id = resp[0]
             return milvus_id
         except Exception as e:
-            await self.content_chunk_processor.update_embedding_status(
+            await self.chunk_manager.update_embedding_status(
                 doc_id=chunk.doc_id,
                 chunk_id=chunk.chunk_id,
                 ori_status=self.PROCESSING_STATUS,
@@ -227,7 +227,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
                 max_concurrency=10,
             )
 
-            await self.contents_processor.update_content_status(
+            await self.content_manager.update_content_status(
                 doc_id=self.doc_id,
                 ori_status=self.PROCESSING_STATUS,
                 new_status=self.FINISHED_STATUS,