|
@@ -6,6 +6,7 @@ from applications.utils.async_utils import run_tasks_with_asyncio_task_group
|
|
|
from applications.utils.chunks import LLMClassifier, TopicAwarePackerV2
|
|
|
from applications.utils.milvus import async_insert_chunk
|
|
|
from applications.utils.mysql import ContentChunks, Contents
|
|
|
+from applications.utils.nlp import num_tokens
|
|
|
from applications.config import Chunk, DEFAULT_MODEL
|
|
|
from applications.config import ELASTIC_SEARCH_INDEX
|
|
|
|
|
@@ -31,12 +32,13 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
async def _chunk_each_content(
|
|
|
self,
|
|
|
doc_id: str,
|
|
|
- text: str,
|
|
|
- text_type: int,
|
|
|
- title: str,
|
|
|
- dataset_id: int,
|
|
|
- re_chunk: bool,
|
|
|
+ data: dict
|
|
|
) -> List[Chunk]:
|
|
|
+ title, text = data.get("title", "").strip(), data["text"].strip()
|
|
|
+ text_type = data.get("text_type", 1)
|
|
|
+ dataset_id = data.get("dataset_id", 0) # 默认知识库 id 为 0
|
|
|
+ re_chunk = data.get("re_chunk", False)
|
|
|
+ dont_chunk = data.get("dont_chunk", False)
|
|
|
if re_chunk:
|
|
|
await self.content_manager.update_content_info(
|
|
|
doc_id=doc_id,
|
|
@@ -53,7 +55,7 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
if not flag:
|
|
|
return []
|
|
|
else:
|
|
|
- raw_chunks = await self.chunk(text, text_type, dataset_id)
|
|
|
+ raw_chunks = await self.chunk(text, text_type, dataset_id, dont_chunk)
|
|
|
if not raw_chunks:
|
|
|
await self.content_manager.update_content_status(
|
|
|
doc_id=doc_id,
|
|
@@ -215,21 +217,18 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
|
|
|
async def deal(self, data):
|
|
|
text = data.get("text", "")
|
|
|
- title = data.get("title", "")
|
|
|
- text, title = text.strip(), title.strip()
|
|
|
- text_type = data.get("text_type", 1)
|
|
|
- dataset_id = data.get("dataset_id", 0) # 默认知识库 id 为 0
|
|
|
- re_chunk = data.get("re_chunk", False)
|
|
|
-
|
|
|
- if not text:
|
|
|
- return None
|
|
|
+ dont_chunk = data.get("dont_chunk", False)
|
|
|
+ # 如果无需分块,判断text 长度
|
|
|
+ if dont_chunk and num_tokens(text) >= self.max_tokens:
|
|
|
+ return {
|
|
|
+ "error": "文档超多模型支持的最大吞吐量"
|
|
|
+ }
|
|
|
|
|
|
self.init_processer()
|
|
|
|
|
|
async def _process():
|
|
|
- chunks = await self._chunk_each_content(
|
|
|
- self.doc_id, text, text_type, title, dataset_id, re_chunk
|
|
|
- )
|
|
|
+
|
|
|
+ chunks = await self._chunk_each_content(self.doc_id, data)
|
|
|
if not chunks:
|
|
|
return
|
|
|
|
|
@@ -242,7 +241,7 @@ class ChunkEmbeddingTask(TopicAwarePackerV2):
|
|
|
handler=self.save_each_chunk,
|
|
|
description="处理单篇文章分块",
|
|
|
unit="chunk",
|
|
|
- max_concurrency=10,
|
|
|
+ max_concurrency=20,
|
|
|
)
|
|
|
|
|
|
await self.content_manager.update_content_status(
|