contribution.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """解构结果中的高贡献词和点提取。"""
  2. from __future__ import annotations
  3. from typing import Any
  4. POINT_SOURCE_KEYS = ("关键点", "灵感点", "目的点")
  5. def build_contribution_points(
  6. decode_result: dict[str, Any],
  7. *,
  8. score_threshold: float,
  9. ) -> dict[str, Any]:
  10. high_words = extract_high_contribution_words(decode_result, score_threshold=score_threshold)
  11. return {
  12. "channelContentId": str(
  13. decode_result.get("帖子ID")
  14. or (decode_result.get("target_post") or {}).get("channel_content_id")
  15. or ""
  16. ),
  17. "高贡献词列表": high_words,
  18. "点列表": extract_matched_points(decode_result, high_words),
  19. }
  20. def extract_high_contribution_words(
  21. decode_result: dict[str, Any],
  22. *,
  23. score_threshold: float,
  24. ) -> list[dict[str, Any]]:
  25. rows = decode_result.get("contribution_results") or []
  26. if not isinstance(rows, list):
  27. return []
  28. words: list[dict[str, Any]] = []
  29. seen: set[str] = set()
  30. for row in rows:
  31. if not isinstance(row, dict):
  32. continue
  33. word = str(row.get("词") or "").strip()
  34. score = _to_float(row.get("贡献度"))
  35. if not word or score is None or score < score_threshold:
  36. continue
  37. if word in seen:
  38. continue
  39. seen.add(word)
  40. words.append({"词": word, "贡献度": score})
  41. return words
  42. def extract_matched_points(
  43. decode_result: dict[str, Any],
  44. high_words: list[dict[str, Any]],
  45. ) -> list[dict[str, Any]]:
  46. matched_points: list[dict[str, Any]] = []
  47. seen: set[tuple[str, str]] = set()
  48. for source_key in POINT_SOURCE_KEYS:
  49. points = decode_result.get(source_key) or []
  50. if not isinstance(points, list):
  51. continue
  52. for point_obj in points:
  53. if not isinstance(point_obj, dict):
  54. continue
  55. point_name = str(point_obj.get("点") or "").strip()
  56. if not point_name:
  57. continue
  58. token_words = collect_token_words(point_obj)
  59. if not token_words:
  60. continue
  61. hit_words = [
  62. {"词": word_item["词"], "贡献度": word_item["贡献度"]}
  63. for word_item in high_words
  64. if word_matches_tokens(word_item["词"], token_words)
  65. ]
  66. if not hit_words:
  67. continue
  68. dedup_key = (source_key, point_name)
  69. if dedup_key in seen:
  70. continue
  71. seen.add(dedup_key)
  72. matched_points.append(
  73. {
  74. "来源": source_key,
  75. "点": point_name,
  76. "点描述": str(point_obj.get("点描述") or ""),
  77. "匹配词列表": hit_words,
  78. "分词结果": token_words,
  79. }
  80. )
  81. return matched_points
  82. def collect_token_words(point_obj: dict[str, Any]) -> list[str]:
  83. token_rows = point_obj.get("分词结果") or []
  84. if not isinstance(token_rows, list):
  85. return []
  86. token_words: list[str] = []
  87. for token in token_rows:
  88. if isinstance(token, dict):
  89. word = str(token.get("词") or "").strip()
  90. else:
  91. word = str(token or "").strip()
  92. if word:
  93. token_words.append(word)
  94. return token_words
  95. def word_matches_tokens(word: str, token_words: list[str]) -> bool:
  96. for token in token_words:
  97. if word in token or token in word:
  98. return True
  99. return False
  100. def _to_float(value: Any) -> float | None:
  101. try:
  102. return float(value)
  103. except (TypeError, ValueError):
  104. return None