evaluate_source_quality.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. """
  2. Source.json 质量评估模块
  3. 基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。
  4. 评分维度:
  5. 1. 字段完整性(40分):有效字段占比
  6. 2. 文本质量(40分):body_text 长度和信息密度
  7. 3. 互动数据(20分):点赞数、时间戳等
  8. """
  9. import json
  10. from pathlib import Path
  11. from typing import Dict, List, Tuple
  12. from datetime import datetime, timedelta
  13. class SourceQualityEvaluator:
  14. """Source 数据质量评估器"""
  15. # 字段权重配置
  16. FIELD_WEIGHTS = {
  17. "title": 5,
  18. "body_text": 15,
  19. "like_count": 5,
  20. "publish_timestamp": 5,
  21. "images": 3,
  22. "videos": 3,
  23. "link": 2,
  24. "content_type": 2,
  25. }
  26. # 文本质量阈值
  27. TEXT_LENGTH_THRESHOLDS = {
  28. "excellent": 200, # 优秀:200字以上
  29. "good": 100, # 良好:100-200字
  30. "fair": 50, # 一般:50-100字
  31. "poor": 20, # 较差:20-50字
  32. # < 20字:极差
  33. }
  34. def __init__(self, time_window_days: int = 180):
  35. """
  36. Args:
  37. time_window_days: 时效性窗口(天),默认180天(半年)
  38. """
  39. self.time_window_days = time_window_days
  40. self.cutoff_timestamp = (
  41. datetime.now() - timedelta(days=time_window_days)
  42. ).timestamp()
  43. def evaluate_post(self, post: dict) -> Dict[str, any]:
  44. """
  45. 评估单个 post 的质量
  46. Returns:
  47. {
  48. "field_score": float, # 字段完整性得分 (0-40)
  49. "text_score": float, # 文本质量得分 (0-40)
  50. "engagement_score": float, # 互动数据得分 (0-20)
  51. "total_score": float, # 总分 (0-100)
  52. "grade": str, # 等级 A/B/C/D/F
  53. "issues": List[str], # 问题列表
  54. "valid_fields": int, # 有效字段数
  55. "total_fields": int, # 总字段数
  56. }
  57. """
  58. result = {
  59. "field_score": 0.0,
  60. "text_score": 0.0,
  61. "engagement_score": 0.0,
  62. "total_score": 0.0,
  63. "grade": "F",
  64. "issues": [],
  65. "valid_fields": 0,
  66. "total_fields": len(self.FIELD_WEIGHTS),
  67. }
  68. # 1. 字段完整性评分 (0-40分)
  69. field_score, valid_count = self._evaluate_fields(post)
  70. result["field_score"] = field_score
  71. result["valid_fields"] = valid_count
  72. # 2. 文本质量评分 (0-40分)
  73. text_score, text_issues = self._evaluate_text(post)
  74. result["text_score"] = text_score
  75. result["issues"].extend(text_issues)
  76. # 3. 互动数据评分 (0-20分)
  77. engagement_score, engagement_issues = self._evaluate_engagement(post)
  78. result["engagement_score"] = engagement_score
  79. result["issues"].extend(engagement_issues)
  80. # 计算总分和等级
  81. result["total_score"] = round(
  82. result["field_score"] + result["text_score"] + result["engagement_score"], 2
  83. )
  84. result["grade"] = self._calculate_grade(result["total_score"])
  85. return result
  86. def _evaluate_fields(self, post: dict) -> Tuple[float, int]:
  87. """评估字段完整性"""
  88. total_weight = sum(self.FIELD_WEIGHTS.values())
  89. earned_weight = 0.0
  90. valid_count = 0
  91. for field, weight in self.FIELD_WEIGHTS.items():
  92. value = post.get(field)
  93. is_valid = False
  94. if field == "title":
  95. is_valid = bool(value and len(str(value).strip()) > 0)
  96. elif field == "body_text":
  97. is_valid = bool(value and len(str(value).strip()) > 0)
  98. elif field == "like_count":
  99. is_valid = isinstance(value, (int, float)) and value > 0
  100. elif field == "publish_timestamp":
  101. is_valid = isinstance(value, (int, float)) and value > 0
  102. elif field in ("images", "videos"):
  103. is_valid = isinstance(value, list) and len(value) > 0
  104. elif field == "link":
  105. is_valid = bool(value and len(str(value).strip()) > 0)
  106. elif field == "content_type":
  107. is_valid = bool(value and len(str(value).strip()) > 0)
  108. if is_valid:
  109. earned_weight += weight
  110. valid_count += 1
  111. # 转换为 0-40 分
  112. field_score = (earned_weight / total_weight) * 40
  113. return round(field_score, 2), valid_count
  114. def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]:
  115. """评估文本质量"""
  116. issues = []
  117. body_text = post.get("body_text", "")
  118. title = post.get("title", "")
  119. # 清理 HTML 标签(如 <em class="keyword">)
  120. import re
  121. body_text_clean = re.sub(r'<[^>]+>', '', body_text)
  122. title_clean = re.sub(r'<[^>]+>', '', title)
  123. body_len = len(body_text_clean.strip())
  124. title_len = len(title_clean.strip())
  125. # 标题评分 (0-10分)
  126. if title_len == 0:
  127. title_score = 0
  128. issues.append("标题为空")
  129. elif title_len < 10:
  130. title_score = 3
  131. issues.append(f"标题过短 ({title_len}字)")
  132. elif title_len < 20:
  133. title_score = 6
  134. else:
  135. title_score = 10
  136. # 正文评分 (0-30分)
  137. if body_len == 0:
  138. body_score = 0
  139. issues.append("正文为空")
  140. elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]:
  141. body_score = 5
  142. issues.append(f"正文极短 ({body_len}字)")
  143. elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]:
  144. body_score = 12
  145. issues.append(f"正文较短 ({body_len}字)")
  146. elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]:
  147. body_score = 20
  148. elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]:
  149. body_score = 26
  150. else:
  151. body_score = 30
  152. text_score = title_score + body_score
  153. return round(text_score, 2), issues
  154. def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]:
  155. """评估互动数据"""
  156. issues = []
  157. score = 0.0
  158. # 点赞数评分 (0-10分)
  159. like_count = post.get("like_count", 0)
  160. if not isinstance(like_count, (int, float)):
  161. like_count = 0
  162. if like_count == 0:
  163. issues.append("无点赞数据")
  164. elif like_count < 10:
  165. score += 3
  166. elif like_count < 100:
  167. score += 6
  168. elif like_count < 1000:
  169. score += 8
  170. else:
  171. score += 10
  172. # 时间戳评分 (0-10分)
  173. timestamp = post.get("publish_timestamp", 0)
  174. if not isinstance(timestamp, (int, float)):
  175. timestamp = 0
  176. if timestamp == 0:
  177. issues.append("无发布时间")
  178. elif timestamp < self.cutoff_timestamp:
  179. issues.append(f"内容过时(超过{self.time_window_days}天)")
  180. score += 2
  181. else:
  182. score += 10
  183. return round(score, 2), issues
  184. def _calculate_grade(self, score: float) -> str:
  185. """计算等级"""
  186. if score >= 80:
  187. return "A"
  188. elif score >= 60:
  189. return "B"
  190. elif score >= 40:
  191. return "C"
  192. elif score >= 20:
  193. return "D"
  194. else:
  195. return "F"
  196. def evaluate_source_file(self, source_file: Path) -> Dict[str, any]:
  197. """
  198. 评估整个 source.json 文件
  199. Returns:
  200. {
  201. "total_sources": int,
  202. "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布
  203. "avg_score": float,
  204. "low_quality_count": int, # C/D/F 的数量
  205. "low_quality_indices": List[int], # 低质量条目的索引
  206. "details": List[Dict], # 每条的详细评分
  207. }
  208. """
  209. with open(source_file, "r", encoding="utf-8") as f:
  210. data = json.load(f)
  211. sources = data.get("sources", [])
  212. total = len(sources)
  213. grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
  214. scores = []
  215. low_quality_indices = []
  216. details = []
  217. for idx, source in enumerate(sources):
  218. post = source.get("post", {})
  219. eval_result = self.evaluate_post(post)
  220. eval_result["index"] = idx
  221. eval_result["case_id"] = source.get("case_id", "")
  222. eval_result["platform"] = source.get("platform", "")
  223. grade_dist[eval_result["grade"]] += 1
  224. scores.append(eval_result["total_score"])
  225. details.append(eval_result)
  226. # C/D/F 视为低质量
  227. if eval_result["grade"] in ("C", "D", "F"):
  228. low_quality_indices.append(idx)
  229. avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0
  230. return {
  231. "total_sources": total,
  232. "grade_distribution": grade_dist,
  233. "avg_score": avg_score,
  234. "low_quality_count": len(low_quality_indices),
  235. "low_quality_indices": low_quality_indices,
  236. "details": details,
  237. }
  238. def filter_low_quality(
  239. self, source_file: Path, output_file: Path, min_score: float = 40.0
  240. ) -> Dict[str, any]:
  241. """
  242. 过滤低质量内容,生成新的 source.json
  243. Args:
  244. source_file: 原始 source.json 路径
  245. output_file: 输出文件路径
  246. min_score: 最低分数阈值(默认40分,即C级以上)
  247. Returns:
  248. {
  249. "original_count": int,
  250. "filtered_count": int,
  251. "removed_count": int,
  252. "removed_cases": List[str], # 被移除的 case_id
  253. }
  254. """
  255. with open(source_file, "r", encoding="utf-8") as f:
  256. data = json.load(f)
  257. sources = data.get("sources", [])
  258. original_count = len(sources)
  259. filtered_sources = []
  260. removed_sources = []
  261. removed_cases = []
  262. for source in sources:
  263. post = source.get("post", {})
  264. eval_result = self.evaluate_post(post)
  265. if eval_result["total_score"] >= min_score:
  266. filtered_sources.append(source)
  267. else:
  268. source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})"
  269. removed_sources.append(source)
  270. removed_cases.append(source.get("case_id", "unknown"))
  271. # 将被过滤的数据保存到 filtered_cases.json
  272. if removed_sources:
  273. filtered_cases_file = output_file.parent / "filtered_cases.json"
  274. filtered_data = {"total": len(removed_sources), "sources": removed_sources}
  275. with open(filtered_cases_file, "w", encoding="utf-8") as f:
  276. json.dump(filtered_data, f, ensure_ascii=False, indent=2)
  277. # 更新数据
  278. data["sources"] = filtered_sources
  279. data["total"] = len(filtered_sources)
  280. data["quality_filter"] = {
  281. "min_score": min_score,
  282. "original_count": original_count,
  283. "filtered_count": len(filtered_sources),
  284. "removed_count": len(removed_cases),
  285. "filter_timestamp": datetime.now().isoformat(),
  286. }
  287. # 写入新文件
  288. with open(output_file, "w", encoding="utf-8") as f:
  289. json.dump(data, f, ensure_ascii=False, indent=2)
  290. return {
  291. "original_count": original_count,
  292. "filtered_count": len(filtered_sources),
  293. "removed_count": len(removed_cases),
  294. "removed_cases": removed_cases,
  295. }
  296. def generate_quality_report(source_file: Path, output_report: Path = None):
  297. """生成质量评估报告"""
  298. evaluator = SourceQualityEvaluator()
  299. result = evaluator.evaluate_source_file(source_file)
  300. # 生成报告文本
  301. report_lines = [
  302. "=" * 60,
  303. f"Source 质量评估报告",
  304. f"文件:{source_file}",
  305. f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
  306. "=" * 60,
  307. "",
  308. f"📊 总体统计",
  309. f" 总条目数:{result['total_sources']}",
  310. f" 平均得分:{result['avg_score']}/100",
  311. f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)",
  312. "",
  313. f"📈 等级分布",
  314. ]
  315. for grade in ["A", "B", "C", "D", "F"]:
  316. count = result["grade_distribution"][grade]
  317. pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0
  318. bar = "█" * int(pct / 2)
  319. report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}")
  320. report_lines.extend([
  321. "",
  322. f"⚠️ 低质量条目详情 (C/D/F 级)",
  323. "",
  324. ])
  325. # 只显示低质量条目
  326. low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")]
  327. low_quality_details.sort(key=lambda x: x["total_score"])
  328. for detail in low_quality_details[:20]: # 最多显示20条
  329. report_lines.extend([
  330. f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})",
  331. f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)",
  332. f" 文本: {detail['text_score']}/40",
  333. f" 互动: {detail['engagement_score']}/20",
  334. ])
  335. if detail["issues"]:
  336. report_lines.append(f" 问题: {', '.join(detail['issues'])}")
  337. report_lines.append("")
  338. if len(low_quality_details) > 20:
  339. report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示")
  340. report_text = "\n".join(report_lines)
  341. # 输出到控制台
  342. print(report_text)
  343. # 保存到文件
  344. if output_report:
  345. with open(output_report, "w", encoding="utf-8") as f:
  346. f.write(report_text)
  347. print(f"\n报告已保存到:{output_report}")
  348. return result
  349. if __name__ == "__main__":
  350. import argparse
  351. parser = argparse.ArgumentParser(description="评估 source.json 的质量")
  352. parser.add_argument("source_file", type=Path, help="source.json 文件路径")
  353. parser.add_argument("--report", type=Path, help="输出报告文件路径")
  354. parser.add_argument("--filter", type=Path, help="过滤后输出文件路径")
  355. parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)")
  356. args = parser.parse_args()
  357. if not args.source_file.exists():
  358. print(f"错误:文件不存在 {args.source_file}")
  359. exit(1)
  360. # 生成评估报告
  361. result = generate_quality_report(args.source_file, args.report)
  362. # 如果指定了过滤输出
  363. if args.filter:
  364. evaluator = SourceQualityEvaluator()
  365. filter_result = evaluator.filter_low_quality(
  366. args.source_file, args.filter, args.min_score
  367. )
  368. print(f"\n🔍 质量过滤完成")
  369. print(f" 原始条目:{filter_result['original_count']}")
  370. print(f" 保留条目:{filter_result['filtered_count']}")
  371. print(f" 移除条目:{filter_result['removed_count']}")
  372. print(f" 输出文件:{args.filter}")