|
@@ -3,35 +3,35 @@ from typing import List
|
|
|
|
|
|
from applications.api import get_basic_embedding
|
|
from applications.api import get_basic_embedding
|
|
from applications.utils.async_utils import run_tasks_with_asyncio_task_group
|
|
from applications.utils.async_utils import run_tasks_with_asyncio_task_group
|
|
-from applications.utils.chunks import TopicAwareChunker, LLMClassifier
|
|
|
|
|
|
+from applications.utils.chunks import LLMClassifier, TopicAwarePackerV2
|
|
from applications.utils.milvus import async_insert_chunk
|
|
from applications.utils.milvus import async_insert_chunk
|
|
from applications.utils.mysql import ContentChunks, Contents
|
|
from applications.utils.mysql import ContentChunks, Contents
|
|
-from applications.config import Chunk, ChunkerConfig, DEFAULT_MODEL
|
|
|
|
|
|
+from applications.config import Chunk, DEFAULT_MODEL
|
|
from applications.config import ELASTIC_SEARCH_INDEX
|
|
from applications.config import ELASTIC_SEARCH_INDEX
|
|
|
|
|
|
|
|
|
|
-class ChunkEmbeddingTask(TopicAwareChunker):
|
|
|
|
- def __init__(self, mysql_pool, vector_pool, cfg: ChunkerConfig, doc_id, es_pool):
|
|
|
|
- super().__init__(cfg, doc_id)
|
|
|
|
- self.content_chunk_processor = None
|
|
|
|
- self.contents_processor = None
|
|
|
|
- self.mysql_pool = mysql_pool
|
|
|
|
- self.vector_pool = vector_pool
|
|
|
|
|
|
+class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
|
+ def __init__(self, doc_id, resource):
|
|
|
|
+ super().__init__(doc_id)
|
|
|
|
+ self.chunk_manager = None
|
|
|
|
+ self.content_manager = None
|
|
|
|
+ self.mysql_client = resource.mysql_client
|
|
|
|
+ self.milvus_client = resource.milvus_client
|
|
|
|
+ self.es_client = resource.es_client
|
|
self.classifier = LLMClassifier()
|
|
self.classifier = LLMClassifier()
|
|
- self.es_client = es_pool
|
|
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
async def get_embedding_list(text: str) -> List:
|
|
async def get_embedding_list(text: str) -> List:
|
|
return await get_basic_embedding(text=text, model=DEFAULT_MODEL)
|
|
return await get_basic_embedding(text=text, model=DEFAULT_MODEL)
|
|
|
|
|
|
def init_processer(self):
|
|
def init_processer(self):
|
|
- self.contents_processor = Contents(self.mysql_pool)
|
|
|
|
- self.content_chunk_processor = ContentChunks(self.mysql_pool)
|
|
|
|
|
|
+ self.content_manager = Contents(self.mysql_client)
|
|
|
|
+ self.chunk_manager = ContentChunks(self.mysql_client)
|
|
|
|
|
|
async def _chunk_each_content(
|
|
async def _chunk_each_content(
|
|
self, doc_id: str, text: str, text_type: int, title: str, dataset_id: int
|
|
self, doc_id: str, text: str, text_type: int, title: str, dataset_id: int
|
|
) -> List[Chunk]:
|
|
) -> List[Chunk]:
|
|
- flag = await self.contents_processor.insert_content(
|
|
|
|
|
|
+ flag = await self.content_manager.insert_content(
|
|
doc_id, text, text_type, title, dataset_id
|
|
doc_id, text, text_type, title, dataset_id
|
|
)
|
|
)
|
|
if not flag:
|
|
if not flag:
|
|
@@ -39,14 +39,14 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
else:
|
|
else:
|
|
raw_chunks = await self.chunk(text, text_type, dataset_id)
|
|
raw_chunks = await self.chunk(text, text_type, dataset_id)
|
|
if not raw_chunks:
|
|
if not raw_chunks:
|
|
- await self.contents_processor.update_content_status(
|
|
|
|
|
|
+ await self.content_manager.update_content_status(
|
|
doc_id=doc_id,
|
|
doc_id=doc_id,
|
|
ori_status=self.INIT_STATUS,
|
|
ori_status=self.INIT_STATUS,
|
|
new_status=self.FAILED_STATUS,
|
|
new_status=self.FAILED_STATUS,
|
|
)
|
|
)
|
|
return []
|
|
return []
|
|
|
|
|
|
- await self.contents_processor.update_content_status(
|
|
|
|
|
|
+ await self.content_manager.update_content_status(
|
|
doc_id=doc_id,
|
|
doc_id=doc_id,
|
|
ori_status=self.INIT_STATUS,
|
|
ori_status=self.INIT_STATUS,
|
|
new_status=self.PROCESSING_STATUS,
|
|
new_status=self.PROCESSING_STATUS,
|
|
@@ -79,12 +79,12 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
|
|
|
|
async def save_each_chunk(self, chunk: Chunk):
|
|
async def save_each_chunk(self, chunk: Chunk):
|
|
# insert
|
|
# insert
|
|
- flag = await self.content_chunk_processor.insert_chunk(chunk)
|
|
|
|
|
|
+ flag = await self.chunk_manager.insert_chunk(chunk)
|
|
if not flag:
|
|
if not flag:
|
|
print("插入文本失败")
|
|
print("插入文本失败")
|
|
return
|
|
return
|
|
|
|
|
|
- acquire_lock = await self.content_chunk_processor.update_chunk_status(
|
|
|
|
|
|
+ acquire_lock = await self.chunk_manager.update_chunk_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.INIT_STATUS,
|
|
ori_status=self.INIT_STATUS,
|
|
@@ -96,7 +96,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
|
|
|
|
completion = await self.classifier.classify_chunk(chunk)
|
|
completion = await self.classifier.classify_chunk(chunk)
|
|
if not completion:
|
|
if not completion:
|
|
- await self.content_chunk_processor.update_chunk_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_chunk_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -105,13 +105,13 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
print("从deepseek获取信息失败")
|
|
print("从deepseek获取信息失败")
|
|
return
|
|
return
|
|
|
|
|
|
- update_flag = await self.content_chunk_processor.set_chunk_result(
|
|
|
|
|
|
+ update_flag = await self.chunk_manager.set_chunk_result(
|
|
chunk=completion,
|
|
chunk=completion,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
new_status=self.FINISHED_STATUS,
|
|
new_status=self.FINISHED_STATUS,
|
|
)
|
|
)
|
|
if not update_flag:
|
|
if not update_flag:
|
|
- await self.content_chunk_processor.update_chunk_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_chunk_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -125,7 +125,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
|
|
|
|
# 存储到 es 中
|
|
# 存储到 es 中
|
|
# acquire_lock
|
|
# acquire_lock
|
|
- acquire_es_lock = await self.content_chunk_processor.update_es_status(
|
|
|
|
|
|
+ acquire_es_lock = await self.chunk_manager.update_es_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.INIT_STATUS,
|
|
ori_status=self.INIT_STATUS,
|
|
@@ -137,7 +137,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
|
|
|
|
insert_rows = await self.insert_into_es(milvus_id, completion)
|
|
insert_rows = await self.insert_into_es(milvus_id, completion)
|
|
final_status = self.FINISHED_STATUS if insert_rows else self.FAILED_STATUS
|
|
final_status = self.FINISHED_STATUS if insert_rows else self.FAILED_STATUS
|
|
- await self.content_chunk_processor.update_es_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_es_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -150,7 +150,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
# 抢锁
|
|
# 抢锁
|
|
- acquire_lock = await self.content_chunk_processor.update_embedding_status(
|
|
|
|
|
|
+ acquire_lock = await self.chunk_manager.update_embedding_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
new_status=self.PROCESSING_STATUS,
|
|
new_status=self.PROCESSING_STATUS,
|
|
@@ -169,9 +169,9 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
",".join(chunk.questions)
|
|
",".join(chunk.questions)
|
|
),
|
|
),
|
|
}
|
|
}
|
|
- resp = await async_insert_chunk(self.vector_pool, data)
|
|
|
|
|
|
+ resp = await async_insert_chunk(self.milvus_client, data)
|
|
if not resp:
|
|
if not resp:
|
|
- await self.content_chunk_processor.update_embedding_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_embedding_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -179,7 +179,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
)
|
|
)
|
|
return None
|
|
return None
|
|
|
|
|
|
- await self.content_chunk_processor.update_embedding_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_embedding_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -188,7 +188,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
milvus_id = resp[0]
|
|
milvus_id = resp[0]
|
|
return milvus_id
|
|
return milvus_id
|
|
except Exception as e:
|
|
except Exception as e:
|
|
- await self.content_chunk_processor.update_embedding_status(
|
|
|
|
|
|
+ await self.chunk_manager.update_embedding_status(
|
|
doc_id=chunk.doc_id,
|
|
doc_id=chunk.doc_id,
|
|
chunk_id=chunk.chunk_id,
|
|
chunk_id=chunk.chunk_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
@@ -227,7 +227,7 @@ class ChunkEmbeddingTask(TopicAwareChunker):
|
|
max_concurrency=10,
|
|
max_concurrency=10,
|
|
)
|
|
)
|
|
|
|
|
|
- await self.contents_processor.update_content_status(
|
|
|
|
|
|
+ await self.content_manager.update_content_status(
|
|
doc_id=self.doc_id,
|
|
doc_id=self.doc_id,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
ori_status=self.PROCESSING_STATUS,
|
|
new_status=self.FINISHED_STATUS,
|
|
new_status=self.FINISHED_STATUS,
|