"""解构结果中的高贡献词和点提取。""" from __future__ import annotations from typing import Any POINT_SOURCE_KEYS = ("关键点", "灵感点", "目的点") def build_contribution_points( decode_result: dict[str, Any], *, score_threshold: float, ) -> dict[str, Any]: high_words = extract_high_contribution_words(decode_result, score_threshold=score_threshold) return { "channelContentId": str( decode_result.get("帖子ID") or (decode_result.get("target_post") or {}).get("channel_content_id") or "" ), "高贡献词列表": high_words, "点列表": extract_matched_points(decode_result, high_words), } def extract_high_contribution_words( decode_result: dict[str, Any], *, score_threshold: float, ) -> list[dict[str, Any]]: rows = decode_result.get("contribution_results") or [] if not isinstance(rows, list): return [] words: list[dict[str, Any]] = [] seen: set[str] = set() for row in rows: if not isinstance(row, dict): continue word = str(row.get("词") or "").strip() score = _to_float(row.get("贡献度")) if not word or score is None or score < score_threshold: continue if word in seen: continue seen.add(word) words.append({"词": word, "贡献度": score}) return words def extract_matched_points( decode_result: dict[str, Any], high_words: list[dict[str, Any]], ) -> list[dict[str, Any]]: matched_points: list[dict[str, Any]] = [] seen: set[tuple[str, str]] = set() for source_key in POINT_SOURCE_KEYS: points = decode_result.get(source_key) or [] if not isinstance(points, list): continue for point_obj in points: if not isinstance(point_obj, dict): continue point_name = str(point_obj.get("点") or "").strip() if not point_name: continue token_words = collect_token_words(point_obj) if not token_words: continue hit_words = [ {"词": word_item["词"], "贡献度": word_item["贡献度"]} for word_item in high_words if word_matches_tokens(word_item["词"], token_words) ] if not hit_words: continue dedup_key = (source_key, point_name) if dedup_key in seen: continue seen.add(dedup_key) matched_points.append( { "来源": source_key, "点": point_name, "点描述": str(point_obj.get("点描述") or ""), "匹配词列表": hit_words, "分词结果": token_words, } ) return matched_points def collect_token_words(point_obj: dict[str, Any]) -> list[str]: token_rows = point_obj.get("分词结果") or [] if not isinstance(token_rows, list): return [] token_words: list[str] = [] for token in token_rows: if isinstance(token, dict): word = str(token.get("词") or "").strip() else: word = str(token or "").strip() if word: token_words.append(word) return token_words def word_matches_tokens(word: str, token_words: list[str]) -> bool: for token in token_words: if word in token or token in word: return True return False def _to_float(value: Any) -> float | None: try: return float(value) except (TypeError, ValueError): return None