topic_aware_chunking.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. 主题感知分块
  3. """
  4. from __future__ import annotations
  5. import re, uuid
  6. import time
  7. from typing import List
  8. import numpy as np
  9. from sklearn.preprocessing import minmax_scale
  10. from applications.api import get_basic_embedding
  11. from applications.config import DEFAULT_MODEL, Chunk, ChunkerConfig
  12. from applications.utils.nlp import SplitTextIntoSentences, num_tokens
  13. # from .llm_classifier import LLMClassifier
  14. # sentence boundary strategy
  15. class BoundaryDetector:
  16. def __init__(self, cfg: ChunkerConfig, debug: bool = False):
  17. self.cfg = cfg
  18. self.debug = debug
  19. # 信号增强因子
  20. self.signal_boost_turn = 0.20
  21. self.signal_boost_fig = 0.20
  22. self.min_gap = 1
  23. @staticmethod
  24. def cosine_sim(u: np.ndarray, v: np.ndarray) -> float:
  25. """计算余弦相似度"""
  26. return float(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v) + 1e-8))
  27. def detect_boundaries(
  28. self, sentence_list: List[str], embs: np.ndarray
  29. ) -> List[int]:
  30. # 1. 相邻句子相似度
  31. sims = np.array(
  32. [self.cosine_sim(embs[i], embs[i + 1]) for i in range(len(embs) - 1)]
  33. )
  34. cut_scores = 1 - sims
  35. # 2. 归一化 cut_scores 到 [0,1]
  36. cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else []
  37. boundaries = []
  38. last_boundary = -999
  39. for index, base_score in enumerate(cut_scores):
  40. sent_to_check = (
  41. sentence_list[index]
  42. if index < len(sentence_list)
  43. else sentence_list[-1]
  44. )
  45. snippet = sent_to_check[-20:] if sent_to_check else ""
  46. turn = (
  47. self.signal_boost_turn
  48. if re.search(
  49. r"(因此|但是|综上|然而|另一方面|In conclusion|However|Therefore)",
  50. snippet,
  51. )
  52. else 0.0
  53. )
  54. fig = (
  55. self.signal_boost_fig
  56. if re.search(
  57. r"(见下图|如表|表\s*\d+|图\s*\d+|Figure|Table)", sent_to_check
  58. )
  59. else 0.0
  60. )
  61. adj_score = base_score + turn + fig
  62. if adj_score >= self.cfg.boundary_threshold and (
  63. index - last_boundary >= self.min_gap
  64. ):
  65. boundaries.append(index)
  66. last_boundary = index
  67. # Debug 输出
  68. if self.debug:
  69. print(
  70. f"[{index}] sim={sims[index]:.3f}, cut={base_score:.3f}, adj={adj_score:.3f}, boundary={index in boundaries}"
  71. )
  72. return boundaries
  73. class TopicAwareChunker(BoundaryDetector, SplitTextIntoSentences):
  74. INIT_STATUS = 0
  75. PROCESSING_STATUS = 1
  76. FINISHED_STATUS = 2
  77. FAILED_STATUS = 3
  78. def __init__(self, cfg: ChunkerConfig):
  79. super().__init__(cfg)
  80. # self.classifier = LLMClassifier()
  81. self.doc_id = f"doc-{uuid.uuid4()}"
  82. @staticmethod
  83. async def _encode_batch(texts: List[str]) -> np.ndarray:
  84. embs = []
  85. for t in texts:
  86. e = await get_basic_embedding(t, model=DEFAULT_MODEL, dev=True)
  87. embs.append(np.array(e, dtype=np.float32))
  88. return np.stack(embs)
  89. def _pack_by_boundaries(
  90. self, sentence_list: List[str], boundaries: List[int]
  91. ) -> List[Chunk]:
  92. boundary_set = set(boundaries)
  93. chunks: List[Chunk] = []
  94. start = 0
  95. n = len(sentence_list)
  96. chunk_id = 0
  97. while start < n:
  98. end = start
  99. sent_count = 0
  100. while end < n and sent_count < self.cfg.max_sent_per_chunk:
  101. cur_tokens = num_tokens(" ".join(sentence_list[start : end + 1]))
  102. sent_count += 1
  103. if cur_tokens >= self.cfg.target_tokens:
  104. cut = end
  105. for b in range(end, start - 1, -1):
  106. if b in boundary_set:
  107. cut = b
  108. break
  109. if cut - start + 1 >= self.cfg.min_sent_per_chunk:
  110. end = cut
  111. break
  112. end += 1
  113. text = " ".join(sentence_list[start : end + 1]).strip()
  114. tokens = num_tokens(text)
  115. chunk_id += 1
  116. chunk = Chunk(
  117. doc_id=self.doc_id, chunk_id=chunk_id, text=text, tokens=tokens
  118. )
  119. chunks.append(chunk)
  120. start = end + 1
  121. return chunks
  122. async def _refine_chunk_by_topic(self, chunk: Chunk) -> List[Chunk]:
  123. sentence_list = self.jieba_sent_tokenize(chunk.text)
  124. if len(sentence_list) <= self.cfg.min_sent_per_chunk * 2:
  125. return [chunk]
  126. embs = await self._encode_batch(sentence_list)
  127. orig = self.cfg.boundary_threshold
  128. try:
  129. self.cfg.boundary_threshold = max(0.3, orig - 0.1)
  130. boundaries = self.detect_boundaries(sentence_list, embs)
  131. sub_chunks = self._pack_by_boundaries(sentence_list, boundaries)
  132. final = []
  133. for ch in sub_chunks:
  134. topics, purity = await self.kg.classify(ch.text, topk=self.cfg.kg_topk)
  135. ch.topics, ch.topic_purity = topics, purity
  136. final.append(ch)
  137. return final
  138. finally:
  139. self.cfg.boundary_threshold = orig
  140. async def chunk(self, text: str) -> List[Chunk]:
  141. sentence_list = self.jieba_sent_tokenize(text)
  142. if not sentence_list:
  143. return []
  144. sentences_embeddings = await self._encode_batch(sentence_list)
  145. boundaries = self.detect_boundaries(sentence_list, sentences_embeddings)
  146. raw_chunks = self._pack_by_boundaries(sentence_list, boundaries)
  147. return raw_chunks
  148. # async def main():
  149. # cfg = ChunkerConfig()
  150. # sample_text = """
  151. # RAG(Retrieval-Augmented Generation)是一种增强生成的技术。
  152. # 在复杂知识问答中,RAG 通过检索相关文档片段来改善答案质量。
  153. # 然而,分块策略会显著影响检索召回与可引用性。
  154. # 因此,我们提出一种主题感知的分块方法,结合 Transformer 边界探测与知识图谱层次分类。
  155. # 然后,我们讲一个新的主题,篮球
  156. # 这个也就是罚球动作。一般原地动作分为两种。
  157. # 第一种原地投篮动作是先下蹲,做好投篮的发力前上举动作,然后竖直向上伸直身体,右臂顺势在身体向上的过程中竖直向上将球向上投出。这种原地投篮的好处是,发力轻松,可以借助身体向上竖直的这个力度的趋势,帮助投篮发力,会让投篮的力气减少很多。尤其是在比赛后半程体力不好的时候,依然可以做到很高的命中略。这种投篮的要领是:主动的竖直向上的意识。我们以前就经常强调竖直起跳和竖直的概念,但是,同样看起来是竖直,但是用出来的效果却很不同,这主要就是技巧的关系了。这个技巧的精髓就在于“主动意识”。在你练习这种投篮的时候,每一次,都要在下蹲以后,明确的在脑子里想着,要竖直向上发力。双腿要竖直向上用力,整个身体也是这样,而且,最为重要的是,你一定要在练习的时候每次都要主动的去想,然后刻意的去竖直向上。这样,长久下去,养成习惯,你的这种投篮才会稳定。这里我们要顺便强调之前的一篇文章,就是录像纠错法,我们这里之所以一再强调要主动意识的竖直上起,就是因为,在录像上,未必能看得出来这个问题。也就是说,你的录像虽然看起来你是竖直起跳的,但是你没有一个主动的也就是刻意的竖直起跳的意识的话,这个球也不是竖直起跳。另外,相反的,如果你在视频上看到自己不是竖直起跳,但是实际上这个球是你使用了竖直起跳的主动意识来发力的。那么,尽管看起来不是很竖直,却依然可以很稳定。也就是说,眼睛会欺骗你,一定要注重你的意识。
  158. # """
  159. # chunker = TopicAwareChunker(cfg)
  160. # chunks = await chunker.chunk(sample_text)
  161. #
  162. # for c in chunks:
  163. # print(f"[{c.tokens} tokens] {c.topic} purity={c.topic_purity:.2f}")
  164. # print(c.text)
  165. #
  166. #
  167. # if __name__ == "__main__":
  168. # asyncio.run(main())