""" 主题感知分块 """ from __future__ import annotations import re from typing import List import numpy as np from sklearn.preprocessing import minmax_scale from applications.api import get_basic_embedding from applications.config import DEFAULT_MODEL, Chunk, ChunkerConfig from applications.utils.nlp import SplitTextIntoSentences, num_tokens # from .llm_classifier import LLMClassifier # sentence boundary strategy class BoundaryDetector: def __init__(self, cfg: ChunkerConfig, debug: bool = False): self.cfg = cfg self.debug = debug # 信号增强因子 self.signal_boost_turn = 0.20 self.signal_boost_fig = 0.20 self.min_gap = 1 @staticmethod def cosine_sim(u: np.ndarray, v: np.ndarray) -> float: """计算余弦相似度""" return float(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v) + 1e-8)) def detect_boundaries( self, sentence_list: List[str], embs: np.ndarray ) -> List[int]: # 1. 相邻句子相似度 sims = np.array( [self.cosine_sim(embs[i], embs[i + 1]) for i in range(len(embs) - 1)] ) cut_scores = 1 - sims # 2. 归一化 cut_scores 到 [0,1] cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else [] boundaries = [] last_boundary = -999 for index, base_score in enumerate(cut_scores): sent_to_check = ( sentence_list[index] if index < len(sentence_list) else sentence_list[-1] ) snippet = sent_to_check[-20:] if sent_to_check else "" turn = ( self.signal_boost_turn if re.search( r"(因此|但是|综上|然而|另一方面|In conclusion|However|Therefore)", snippet, ) else 0.0 ) fig = ( self.signal_boost_fig if re.search( r"(见下图|如表|表\s*\d+|图\s*\d+|Figure|Table)", sent_to_check ) else 0.0 ) adj_score = base_score + turn + fig if adj_score >= self.cfg.boundary_threshold and ( index - last_boundary >= self.min_gap ): boundaries.append(index) last_boundary = index # Debug 输出 if self.debug: print( f"[{index}] sim={sims[index]:.3f}, cut={base_score:.3f}, adj={adj_score:.3f}, boundary={index in boundaries}" ) return boundaries class TopicAwareChunker(BoundaryDetector, SplitTextIntoSentences): INIT_STATUS = 0 PROCESSING_STATUS = 1 FINISHED_STATUS = 2 FAILED_STATUS = 3 def __init__(self, cfg: ChunkerConfig, doc_id: str): super().__init__(cfg) # self.classifier = LLMClassifier() self.doc_id = doc_id @staticmethod async def _encode_batch(texts: List[str]) -> np.ndarray: embs = [] for t in texts: e = await get_basic_embedding(t, model=DEFAULT_MODEL) embs.append(np.array(e, dtype=np.float32)) return np.stack(embs) def _pack_by_boundaries( self, sentence_list: List[str], boundaries: List[int], text_type: int ) -> List[Chunk]: boundary_set = set(boundaries) chunks: List[Chunk] = [] start = 0 n = len(sentence_list) chunk_id = 0 while start < n: end = start sent_count = 0 while end < n and sent_count < self.cfg.max_sent_per_chunk: cur_tokens = num_tokens(" ".join(sentence_list[start : end + 1])) sent_count += 1 if cur_tokens >= self.cfg.target_tokens: cut = end for b in range(end, start - 1, -1): if b in boundary_set: cut = b break if cut - start + 1 >= self.cfg.min_sent_per_chunk: end = cut break end += 1 text = " ".join(sentence_list[start : end + 1]).strip() tokens = num_tokens(text) chunk_id += 1 chunk = Chunk( doc_id=self.doc_id, chunk_id=chunk_id, text=text, tokens=tokens, text_type=text_type, ) chunks.append(chunk) start = end + 1 return chunks async def _refine_chunk_by_topic(self, chunk: Chunk) -> List[Chunk]: sentence_list = self.jieba_sent_tokenize(chunk.text) if len(sentence_list) <= self.cfg.min_sent_per_chunk * 2: return [chunk] embs = await self._encode_batch(sentence_list) orig = self.cfg.boundary_threshold try: self.cfg.boundary_threshold = max(0.3, orig - 0.1) boundaries = self.detect_boundaries(sentence_list, embs) sub_chunks = self._pack_by_boundaries(sentence_list, boundaries) final = [] for ch in sub_chunks: topics, purity = await self.kg.classify(ch.text, topk=self.cfg.kg_topk) ch.topics, ch.topic_purity = topics, purity final.append(ch) return final finally: self.cfg.boundary_threshold = orig async def chunk(self, text: str, text_type: int) -> List[Chunk]: sentence_list = self.jieba_sent_tokenize(text) if not sentence_list: return [] sentences_embeddings = await self._encode_batch(sentence_list) boundaries = self.detect_boundaries(sentence_list, sentences_embeddings) raw_chunks = self._pack_by_boundaries(sentence_list, boundaries, text_type) return raw_chunks # async def main(): # cfg = ChunkerConfig() # sample_text = """ # RAG(Retrieval-Augmented Generation)是一种增强生成的技术。 # 在复杂知识问答中,RAG 通过检索相关文档片段来改善答案质量。 # 然而,分块策略会显著影响检索召回与可引用性。 # 因此,我们提出一种主题感知的分块方法,结合 Transformer 边界探测与知识图谱层次分类。 # 然后,我们讲一个新的主题,篮球 # 这个也就是罚球动作。一般原地动作分为两种。 # 第一种原地投篮动作是先下蹲,做好投篮的发力前上举动作,然后竖直向上伸直身体,右臂顺势在身体向上的过程中竖直向上将球向上投出。这种原地投篮的好处是,发力轻松,可以借助身体向上竖直的这个力度的趋势,帮助投篮发力,会让投篮的力气减少很多。尤其是在比赛后半程体力不好的时候,依然可以做到很高的命中略。这种投篮的要领是:主动的竖直向上的意识。我们以前就经常强调竖直起跳和竖直的概念,但是,同样看起来是竖直,但是用出来的效果却很不同,这主要就是技巧的关系了。这个技巧的精髓就在于“主动意识”。在你练习这种投篮的时候,每一次,都要在下蹲以后,明确的在脑子里想着,要竖直向上发力。双腿要竖直向上用力,整个身体也是这样,而且,最为重要的是,你一定要在练习的时候每次都要主动的去想,然后刻意的去竖直向上。这样,长久下去,养成习惯,你的这种投篮才会稳定。这里我们要顺便强调之前的一篇文章,就是录像纠错法,我们这里之所以一再强调要主动意识的竖直上起,就是因为,在录像上,未必能看得出来这个问题。也就是说,你的录像虽然看起来你是竖直起跳的,但是你没有一个主动的也就是刻意的竖直起跳的意识的话,这个球也不是竖直起跳。另外,相反的,如果你在视频上看到自己不是竖直起跳,但是实际上这个球是你使用了竖直起跳的主动意识来发力的。那么,尽管看起来不是很竖直,却依然可以很稳定。也就是说,眼睛会欺骗你,一定要注重你的意识。 # """ # chunker = TopicAwareChunker(cfg) # chunks = await chunker.chunk(sample_text) # # for c in chunks: # print(f"[{c.tokens} tokens] {c.topic} purity={c.topic_purity:.2f}") # print(c.text) # # # if __name__ == "__main__": # asyncio.run(main())