topic_aware_chunking.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """
  2. 主题感知分块
  3. """
  4. from __future__ import annotations
  5. import re
  6. from typing import List
  7. import numpy as np
  8. from sklearn.preprocessing import minmax_scale
  9. from applications.api import get_basic_embedding
  10. from applications.config import DEFAULT_MODEL, Chunk, ChunkerConfig
  11. from applications.utils.nlp import SplitTextIntoSentences, num_tokens
  12. # from .llm_classifier import LLMClassifier
  13. # sentence boundary strategy
  14. class BoundaryDetector:
  15. def __init__(self, cfg: ChunkerConfig, debug: bool = False):
  16. self.cfg = cfg
  17. self.debug = debug
  18. # 信号增强因子
  19. self.signal_boost_turn = 0.20
  20. self.signal_boost_fig = 0.20
  21. self.min_gap = 1
  22. @staticmethod
  23. def cosine_sim(u: np.ndarray, v: np.ndarray) -> float:
  24. """计算余弦相似度"""
  25. return float(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v) + 1e-8))
  26. def detect_boundaries(
  27. self, sentence_list: List[str], embs: np.ndarray
  28. ) -> List[int]:
  29. # 1. 相邻句子相似度
  30. sims = np.array(
  31. [self.cosine_sim(embs[i], embs[i + 1]) for i in range(len(embs) - 1)]
  32. )
  33. cut_scores = 1 - sims
  34. # 2. 归一化 cut_scores 到 [0,1]
  35. cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else []
  36. boundaries = []
  37. last_boundary = -999
  38. for index, base_score in enumerate(cut_scores):
  39. sent_to_check = (
  40. sentence_list[index]
  41. if index < len(sentence_list)
  42. else sentence_list[-1]
  43. )
  44. snippet = sent_to_check[-20:] if sent_to_check else ""
  45. turn = (
  46. self.signal_boost_turn
  47. if re.search(
  48. r"(因此|但是|综上|然而|另一方面|In conclusion|However|Therefore)",
  49. snippet,
  50. )
  51. else 0.0
  52. )
  53. fig = (
  54. self.signal_boost_fig
  55. if re.search(
  56. r"(见下图|如表|表\s*\d+|图\s*\d+|Figure|Table)", sent_to_check
  57. )
  58. else 0.0
  59. )
  60. adj_score = base_score + turn + fig
  61. if adj_score >= self.cfg.boundary_threshold and (
  62. index - last_boundary >= self.min_gap
  63. ):
  64. boundaries.append(index)
  65. last_boundary = index
  66. # Debug 输出
  67. if self.debug:
  68. print(
  69. f"[{index}] sim={sims[index]:.3f}, cut={base_score:.3f}, adj={adj_score:.3f}, boundary={index in boundaries}"
  70. )
  71. return boundaries
  72. class TopicAwareChunker(BoundaryDetector, SplitTextIntoSentences):
  73. INIT_STATUS = 0
  74. PROCESSING_STATUS = 1
  75. FINISHED_STATUS = 2
  76. FAILED_STATUS = 3
  77. def __init__(self, cfg: ChunkerConfig, doc_id: str):
  78. super().__init__(cfg)
  79. # self.classifier = LLMClassifier()
  80. self.doc_id = doc_id
  81. @staticmethod
  82. async def _encode_batch(texts: List[str]) -> np.ndarray:
  83. embs = []
  84. for t in texts:
  85. e = await get_basic_embedding(t, model=DEFAULT_MODEL)
  86. embs.append(np.array(e, dtype=np.float32))
  87. return np.stack(embs)
  88. def _pack_by_boundaries(
  89. self, sentence_list: List[str], boundaries: List[int]
  90. ) -> List[Chunk]:
  91. boundary_set = set(boundaries)
  92. chunks: List[Chunk] = []
  93. start = 0
  94. n = len(sentence_list)
  95. chunk_id = 0
  96. while start < n:
  97. end = start
  98. sent_count = 0
  99. while end < n and sent_count < self.cfg.max_sent_per_chunk:
  100. cur_tokens = num_tokens(" ".join(sentence_list[start : end + 1]))
  101. sent_count += 1
  102. if cur_tokens >= self.cfg.target_tokens:
  103. cut = end
  104. for b in range(end, start - 1, -1):
  105. if b in boundary_set:
  106. cut = b
  107. break
  108. if cut - start + 1 >= self.cfg.min_sent_per_chunk:
  109. end = cut
  110. break
  111. end += 1
  112. text = " ".join(sentence_list[start : end + 1]).strip()
  113. tokens = num_tokens(text)
  114. chunk_id += 1
  115. chunk = Chunk(
  116. doc_id=self.doc_id, chunk_id=chunk_id, text=text, tokens=tokens
  117. )
  118. chunks.append(chunk)
  119. start = end + 1
  120. return chunks
  121. async def _refine_chunk_by_topic(self, chunk: Chunk) -> List[Chunk]:
  122. sentence_list = self.jieba_sent_tokenize(chunk.text)
  123. if len(sentence_list) <= self.cfg.min_sent_per_chunk * 2:
  124. return [chunk]
  125. embs = await self._encode_batch(sentence_list)
  126. orig = self.cfg.boundary_threshold
  127. try:
  128. self.cfg.boundary_threshold = max(0.3, orig - 0.1)
  129. boundaries = self.detect_boundaries(sentence_list, embs)
  130. sub_chunks = self._pack_by_boundaries(sentence_list, boundaries)
  131. final = []
  132. for ch in sub_chunks:
  133. topics, purity = await self.kg.classify(ch.text, topk=self.cfg.kg_topk)
  134. ch.topics, ch.topic_purity = topics, purity
  135. final.append(ch)
  136. return final
  137. finally:
  138. self.cfg.boundary_threshold = orig
  139. async def chunk(self, text: str) -> List[Chunk]:
  140. sentence_list = self.jieba_sent_tokenize(text)
  141. if not sentence_list:
  142. return []
  143. sentences_embeddings = await self._encode_batch(sentence_list)
  144. boundaries = self.detect_boundaries(sentence_list, sentences_embeddings)
  145. raw_chunks = self._pack_by_boundaries(sentence_list, boundaries)
  146. return raw_chunks
  147. # async def main():
  148. # cfg = ChunkerConfig()
  149. # sample_text = """
  150. # RAG(Retrieval-Augmented Generation)是一种增强生成的技术。
  151. # 在复杂知识问答中,RAG 通过检索相关文档片段来改善答案质量。
  152. # 然而,分块策略会显著影响检索召回与可引用性。
  153. # 因此,我们提出一种主题感知的分块方法,结合 Transformer 边界探测与知识图谱层次分类。
  154. # 然后,我们讲一个新的主题,篮球
  155. # 这个也就是罚球动作。一般原地动作分为两种。
  156. # 第一种原地投篮动作是先下蹲,做好投篮的发力前上举动作,然后竖直向上伸直身体,右臂顺势在身体向上的过程中竖直向上将球向上投出。这种原地投篮的好处是,发力轻松,可以借助身体向上竖直的这个力度的趋势,帮助投篮发力,会让投篮的力气减少很多。尤其是在比赛后半程体力不好的时候,依然可以做到很高的命中略。这种投篮的要领是:主动的竖直向上的意识。我们以前就经常强调竖直起跳和竖直的概念,但是,同样看起来是竖直,但是用出来的效果却很不同,这主要就是技巧的关系了。这个技巧的精髓就在于“主动意识”。在你练习这种投篮的时候,每一次,都要在下蹲以后,明确的在脑子里想着,要竖直向上发力。双腿要竖直向上用力,整个身体也是这样,而且,最为重要的是,你一定要在练习的时候每次都要主动的去想,然后刻意的去竖直向上。这样,长久下去,养成习惯,你的这种投篮才会稳定。这里我们要顺便强调之前的一篇文章,就是录像纠错法,我们这里之所以一再强调要主动意识的竖直上起,就是因为,在录像上,未必能看得出来这个问题。也就是说,你的录像虽然看起来你是竖直起跳的,但是你没有一个主动的也就是刻意的竖直起跳的意识的话,这个球也不是竖直起跳。另外,相反的,如果你在视频上看到自己不是竖直起跳,但是实际上这个球是你使用了竖直起跳的主动意识来发力的。那么,尽管看起来不是很竖直,却依然可以很稳定。也就是说,眼睛会欺骗你,一定要注重你的意识。
  157. # """
  158. # chunker = TopicAwareChunker(cfg)
  159. # chunks = await chunker.chunk(sample_text)
  160. #
  161. # for c in chunks:
  162. # print(f"[{c.tokens} tokens] {c.topic} purity={c.topic_purity:.2f}")
  163. # print(c.text)
  164. #
  165. #
  166. # if __name__ == "__main__":
  167. # asyncio.run(main())