from __future__ import annotations import re from typing import List import numpy as np from sklearn.preprocessing import minmax_scale from applications.config import ChunkerConfig class BoundaryDetector(ChunkerConfig): def __init__(self): self.signal_boost_turn = 0.20 self.signal_boost_fig = 0.20 self.min_gap = 1 @staticmethod def cosine_sim(u: np.ndarray, v: np.ndarray) -> float: """计算余弦相似度""" return float(np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v) + 1e-8)) def turn_signal(self, text: str) -> float: pattern = r"(因此|但是|综上所述?|然而|另一方面|总之|结论是|In conclusion\b|To conclude\b|However\b|Therefore\b|Thus\b|On the other hand\b)" if re.search(pattern, text, flags=re.IGNORECASE): return self.signal_boost_turn return 0.0 def figure_signal(self, text: str) -> float: pattern = r"(见下图|如下图所示|如表所示|如下表所示|表\s*\d+[::]?|图\s*\d+[::]?|Figure\s*\d+|Table\s*\d+)" if re.search(pattern, text, flags=re.IGNORECASE): return self.signal_boost_fig return 0.0 def detect_boundaries( self, sentence_list: List[str], embs: np.ndarray, debug: bool = False ) -> List[int]: sims = np.array( [self.cosine_sim(embs[i], embs[i + 1]) for i in range(len(embs) - 1)] ) cut_scores = 1 - sims cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else [] boundaries = [] last_boundary = -999 for index, base_score in enumerate(cut_scores): sent_to_check = ( sentence_list[index] if index < len(sentence_list) else sentence_list[-1] ) snippet = sent_to_check[-20:] if sent_to_check else "" adj_score = ( base_score + self.turn_signal(snippet) + self.figure_signal(sent_to_check) ) if adj_score >= self.boundary_threshold and ( index - last_boundary >= self.min_gap ): boundaries.append(index) last_boundary = index # Debug 输出 if debug: print( f"[{index}] sim={sims[index]:.3f}, cut={base_score:.3f}, adj={adj_score:.3f}, boundary={index in boundaries}" ) return boundaries def detect_boundaries_v2( self, sentence_list: List[str], embs: np.ndarray, debug: bool = False ) -> List[int]: """ 约束:相邻 boundary(含开头到第一个 boundary)之间的句子数 ∈ [3, 10] boundary 的含义:作为“段落末句”的索引(与 pack 时的 b 含义一致) """ n = len(sentence_list) if n <= 1 or embs is None or len(embs) != n: return [] # --- 基础打分 --- sims = np.array([self.cosine_sim(embs[i], embs[i + 1]) for i in range(n - 1)]) cut_scores = 1 - sims cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else np.array([]) # 组合信号:内容转折/图片编号等 adj_scores = np.zeros_like(cut_scores) for i in range(len(cut_scores)): sent_to_check = sentence_list[i] if i < n else sentence_list[-1] snippet = (sent_to_check[-20:] if sent_to_check else "") adj_scores[i] = ( cut_scores[i] + self.turn_signal(snippet) + self.figure_signal(sent_to_check) ) # --- 3-10 句强约束切分 --- MIN_SIZE = self.min_sent_per_chunk MAX_SIZE = self.max_sent_per_chunk thr = getattr(self, "boundary_threshold", 0.5) boundaries: List[int] = [] last_boundary = -1 # 作为上一个“段末句”的索引(开头前为 -1) best_idx = None # 记录当前窗口内(已达 MIN_SIZE)的最高分切点 best_score = -1e9 for i in range(n - 1): # i 表示把 i 作为“段末句”的候选 seg_len = i - last_boundary # 若切在 i,本段包含的句数 = i - last_boundary # 更新当前窗口最佳候选(仅在达到最低长度后才可记为候选) if seg_len >= MIN_SIZE: if adj_scores[i] > best_score: best_score = float(adj_scores[i]) best_idx = i cut_now = False cut_at = None if seg_len < MIN_SIZE: # 不足 3 句,绝不切 pass elif adj_scores[i] >= thr and seg_len <= MAX_SIZE: # 在 [3,10] 区间且过阈值,直接切 cut_now = True cut_at = i elif seg_len == MAX_SIZE: # 已到 10 句必须切:优先用窗口内最高分位置 cut_now = True cut_at = best_idx if best_idx is not None else i if cut_now: boundaries.append(cut_at) last_boundary = cut_at best_idx = None best_score = -1e9 if debug: print( f"[{i}] sim={sims[i]:.3f}, cut={cut_scores[i]:.3f}, " f"adj={adj_scores[i]:.3f}, len={seg_len}, " f"cut={'Y@' + str(cut_at) if cut_now else 'N'}" ) # --- 收尾:避免最后一段 < 3 句 --- # pack 时会额外补上末尾 n-1 作为最终 boundary,因此尾段长度为 (n-1 - last_boundary) tail_len = (n - 1) - last_boundary if tail_len < MIN_SIZE and boundaries: # 需要把“最后一个 boundary”往前/后微调到一个可行区间内 prev_last = boundaries[-2] if len(boundaries) >= 2 else -1 # 新的最后切点需满足: # 1) 前一段长度在 [3,10] => j ∈ [prev_last+3, prev_last+10] # 2) 尾段长度在 [3,10] => j ∈ [n-1-10, n-1-3] lower = max(prev_last + MIN_SIZE, (n - 1) - MAX_SIZE) upper = min(prev_last + MAX_SIZE, (n - 1) - MIN_SIZE) if lower <= upper: # 在允许区间里找 adj_scores 最高的位置 window = adj_scores[lower: upper + 1] j = int(np.argmax(window)) + lower if j != boundaries[-1]: boundaries[-1] = j if debug: print(f"[fix-tail] move last boundary -> {j}, tail_len={n - 1 - j}") else: # 没有可行区间:退化为合并尾段(删掉最后一个 boundary) dropped = boundaries.pop() if debug: print(f"[fix-tail] drop last boundary {dropped} to avoid tiny tail") return boundaries