|
@@ -68,3 +68,107 @@ class BoundaryDetector(ChunkerConfig):
|
|
|
)
|
|
|
|
|
|
return boundaries
|
|
|
+
|
|
|
+ def detect_boundaries_v2(
|
|
|
+ self, sentence_list: List[str], embs: np.ndarray, debug: bool = False
|
|
|
+ ) -> List[int]:
|
|
|
+ """
|
|
|
+ 约束:相邻 boundary(含开头到第一个 boundary)之间的句子数 ∈ [3, 10]
|
|
|
+ boundary 的含义:作为“段落末句”的索引(与 pack 时的 b 含义一致)
|
|
|
+ """
|
|
|
+ n = len(sentence_list)
|
|
|
+ if n <= 1 or embs is None or len(embs) != n:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # --- 基础打分 ---
|
|
|
+ sims = np.array([self.cosine_sim(embs[i], embs[i + 1]) for i in range(n - 1)])
|
|
|
+ cut_scores = 1 - sims
|
|
|
+ cut_scores = minmax_scale(cut_scores) if len(cut_scores) > 0 else np.array([])
|
|
|
+
|
|
|
+ # 组合信号:内容转折/图片编号等
|
|
|
+ adj_scores = np.zeros_like(cut_scores)
|
|
|
+ for i in range(len(cut_scores)):
|
|
|
+ sent_to_check = sentence_list[i] if i < n else sentence_list[-1]
|
|
|
+ snippet = (sent_to_check[-20:] if sent_to_check else "")
|
|
|
+ adj_scores[i] = (
|
|
|
+ cut_scores[i]
|
|
|
+ + self.turn_signal(snippet)
|
|
|
+ + self.figure_signal(sent_to_check)
|
|
|
+ )
|
|
|
+
|
|
|
+ # --- 3-10 句强约束切分 ---
|
|
|
+ MIN_SIZE = self.min_sent_per_chunk
|
|
|
+ MAX_SIZE = self.max_sent_per_chunk
|
|
|
+ thr = getattr(self, "boundary_threshold", 0.5)
|
|
|
+
|
|
|
+ boundaries: List[int] = []
|
|
|
+ last_boundary = -1 # 作为上一个“段末句”的索引(开头前为 -1)
|
|
|
+
|
|
|
+ best_idx = None # 记录当前窗口内(已达 MIN_SIZE)的最高分切点
|
|
|
+ best_score = -1e9
|
|
|
+
|
|
|
+ for i in range(n - 1): # i 表示把 i 作为“段末句”的候选
|
|
|
+ seg_len = i - last_boundary # 若切在 i,本段包含的句数 = i - last_boundary
|
|
|
+
|
|
|
+ # 更新当前窗口最佳候选(仅在达到最低长度后才可记为候选)
|
|
|
+ if seg_len >= MIN_SIZE:
|
|
|
+ if adj_scores[i] > best_score:
|
|
|
+ best_score = float(adj_scores[i])
|
|
|
+ best_idx = i
|
|
|
+
|
|
|
+ cut_now = False
|
|
|
+ cut_at = None
|
|
|
+
|
|
|
+ if seg_len < MIN_SIZE:
|
|
|
+ # 不足 3 句,绝不切
|
|
|
+ pass
|
|
|
+ elif adj_scores[i] >= thr and seg_len <= MAX_SIZE:
|
|
|
+ # 在 [3,10] 区间且过阈值,直接切
|
|
|
+ cut_now = True
|
|
|
+ cut_at = i
|
|
|
+ elif seg_len == MAX_SIZE:
|
|
|
+ # 已到 10 句必须切:优先用窗口内最高分位置
|
|
|
+ cut_now = True
|
|
|
+ cut_at = best_idx if best_idx is not None else i
|
|
|
+
|
|
|
+ if cut_now:
|
|
|
+ boundaries.append(cut_at)
|
|
|
+ last_boundary = cut_at
|
|
|
+ best_idx = None
|
|
|
+ best_score = -1e9
|
|
|
+
|
|
|
+ if debug:
|
|
|
+ print(
|
|
|
+ f"[{i}] sim={sims[i]:.3f}, cut={cut_scores[i]:.3f}, "
|
|
|
+ f"adj={adj_scores[i]:.3f}, len={seg_len}, "
|
|
|
+ f"cut={'Y@' + str(cut_at) if cut_now else 'N'}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # --- 收尾:避免最后一段 < 3 句 ---
|
|
|
+ # pack 时会额外补上末尾 n-1 作为最终 boundary,因此尾段长度为 (n-1 - last_boundary)
|
|
|
+ tail_len = (n - 1) - last_boundary
|
|
|
+ if tail_len < MIN_SIZE and boundaries:
|
|
|
+ # 需要把“最后一个 boundary”往前/后微调到一个可行区间内
|
|
|
+ prev_last = boundaries[-2] if len(boundaries) >= 2 else -1
|
|
|
+ # 新的最后切点需满足:
|
|
|
+ # 1) 前一段长度在 [3,10] => j ∈ [prev_last+3, prev_last+10]
|
|
|
+ # 2) 尾段长度在 [3,10] => j ∈ [n-1-10, n-1-3]
|
|
|
+ lower = max(prev_last + MIN_SIZE, (n - 1) - MAX_SIZE)
|
|
|
+ upper = min(prev_last + MAX_SIZE, (n - 1) - MIN_SIZE)
|
|
|
+
|
|
|
+ if lower <= upper:
|
|
|
+ # 在允许区间里找 adj_scores 最高的位置
|
|
|
+ window = adj_scores[lower: upper + 1]
|
|
|
+ j = int(np.argmax(window)) + lower
|
|
|
+ if j != boundaries[-1]:
|
|
|
+ boundaries[-1] = j
|
|
|
+ if debug:
|
|
|
+ print(f"[fix-tail] move last boundary -> {j}, tail_len={n - 1 - j}")
|
|
|
+ else:
|
|
|
+ # 没有可行区间:退化为合并尾段(删掉最后一个 boundary)
|
|
|
+ dropped = boundaries.pop()
|
|
|
+ if debug:
|
|
|
+ print(f"[fix-tail] drop last boundary {dropped} to avoid tiny tail")
|
|
|
+
|
|
|
+ return boundaries
|
|
|
+
|