evaluate_source_quality.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. """
  2. Source.json 质量评估模块
  3. 基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。
  4. 评分维度:
  5. 1. 字段完整性(40分):有效字段占比
  6. 2. 文本质量(40分):body_text 长度和信息密度
  7. 3. 互动数据(20分):点赞数、时间戳等
  8. """
  9. import json
  10. import re
  11. from pathlib import Path
  12. from typing import Dict, List, Tuple
  13. from datetime import datetime, timedelta
  14. _HTML_TAG_RE = re.compile(r"<[^>]+>")
  15. def _strip_html(text) -> str:
  16. """Remove inline HTML tags (e.g. <em class="highlight">) from search-result text."""
  17. if not text:
  18. return ""
  19. return _HTML_TAG_RE.sub("", str(text))
  20. class SourceQualityEvaluator:
  21. """Source 数据质量评估器"""
  22. # 字段权重配置
  23. FIELD_WEIGHTS = {
  24. "title": 5,
  25. "body_text": 15,
  26. "like_count": 5,
  27. "publish_timestamp": 5,
  28. "images": 3,
  29. "videos": 3,
  30. "link": 2,
  31. "content_type": 2,
  32. }
  33. # 文本质量阈值
  34. TEXT_LENGTH_THRESHOLDS = {
  35. "excellent": 200, # 优秀:200字以上
  36. "good": 100, # 良好:100-200字
  37. "fair": 50, # 一般:50-100字
  38. "poor": 20, # 较差:20-50字
  39. # < 20字:极差
  40. }
  41. def __init__(self, time_window_days: int = 180):
  42. """
  43. Args:
  44. time_window_days: 时效性窗口(天),默认180天(半年)
  45. """
  46. self.time_window_days = time_window_days
  47. self.cutoff_timestamp = (
  48. datetime.now() - timedelta(days=time_window_days)
  49. ).timestamp()
  50. def evaluate_post(self, post: dict) -> Dict[str, any]:
  51. """
  52. 评估单个 post 的质量
  53. Returns:
  54. {
  55. "field_score": float, # 字段完整性得分 (0-40)
  56. "text_score": float, # 文本质量得分 (0-40)
  57. "engagement_score": float, # 互动数据得分 (0-20)
  58. "total_score": float, # 总分 (0-100)
  59. "grade": str, # 等级 A/B/C/D/F
  60. "issues": List[str], # 问题列表
  61. "valid_fields": int, # 有效字段数
  62. "total_fields": int, # 总字段数
  63. }
  64. """
  65. # Video posts (content_type=="video" 或 videos 字段非空) 通常没有 body_text,
  66. # 仅靠 caption + 互动数据评分,避免被 body 长度一律打低分。
  67. is_video = (
  68. post.get("content_type") == "video"
  69. or bool(post.get("videos"))
  70. )
  71. result = {
  72. "mode": "video" if is_video else "text",
  73. "field_score": 0.0,
  74. "text_score": 0.0, # video 模式下含义为 title-only (0-15)
  75. "engagement_score": 0.0, # video 模式下扩展为 (0-45)
  76. "total_score": 0.0,
  77. "grade": "F",
  78. "issues": [],
  79. "valid_fields": 0,
  80. "total_fields": len(self.FIELD_WEIGHTS),
  81. }
  82. # 1. 字段完整性评分 (0-40 分)
  83. field_score, valid_count = self._evaluate_fields(post)
  84. result["field_score"] = field_score
  85. result["valid_fields"] = valid_count
  86. # 2 & 3. 文本/互动评分(视频模式跳过 body 长度,重分权重到 title + 互动)
  87. if is_video:
  88. title_score, eng_score, issues = self._evaluate_video_signals(post)
  89. result["text_score"] = title_score
  90. result["engagement_score"] = eng_score
  91. result["issues"].extend(issues)
  92. else:
  93. text_score, text_issues = self._evaluate_text(post)
  94. engagement_score, engagement_issues = self._evaluate_engagement(post)
  95. result["text_score"] = text_score
  96. result["engagement_score"] = engagement_score
  97. result["issues"].extend(text_issues + engagement_issues)
  98. # 计算总分和等级
  99. result["total_score"] = round(
  100. result["field_score"] + result["text_score"] + result["engagement_score"], 2
  101. )
  102. result["grade"] = self._calculate_grade(result["total_score"])
  103. return result
  104. # ── video-mode 阈值(mirror body length tiers, but on seconds) ──
  105. DURATION_THRESHOLDS = {
  106. "very_short": 30, # <30s -> 5/30
  107. "short": 60, # 30-60s -> 12/30
  108. "fair": 120, # 60-120s -> 20/30
  109. "good": 300, # 2-5 min -> 26/30
  110. "long": 1800, # 5-30 min -> 30/30 (best)
  111. # >=1800s (>30 min) -> 22/30 (信息密度下降)
  112. }
  113. def _evaluate_video_signals(self, post: dict) -> Tuple[float, float, List[str]]:
  114. """For video posts: replaces body-length scoring with video-duration scoring.
  115. Composition: title (0-10) + duration (0-30) + engagement (0-20) = 0-60,
  116. mirroring the article-post weights but with duration as the content signal.
  117. Reads `duration_sec` from the post (populated by search() via
  118. transcription.probe_durations_for_posts before scoring). If absent
  119. (probe failed / no video URL), duration_score is 0 with an issue noted.
  120. """
  121. issues: List[str] = []
  122. # ── title 0-10 ──
  123. title = _strip_html(post.get("title", "")).strip()
  124. tlen = len(title)
  125. if tlen == 0:
  126. title_score = 0
  127. issues.append("标题为空")
  128. elif tlen < 10:
  129. title_score = 3
  130. issues.append(f"标题过短 ({tlen}字)")
  131. elif tlen < 20:
  132. title_score = 6
  133. else:
  134. title_score = 10
  135. # ── duration 0-30 (replaces body_text length) ──
  136. duration = post.get("duration_sec")
  137. if not isinstance(duration, (int, float)) or duration <= 0:
  138. dur_score = 0
  139. issues.append("无视频时长")
  140. elif duration < self.DURATION_THRESHOLDS["very_short"]:
  141. dur_score = 5
  142. issues.append(f"视频极短 ({duration:.0f}s)")
  143. elif duration < self.DURATION_THRESHOLDS["short"]:
  144. dur_score = 12
  145. issues.append(f"视频较短 ({duration:.0f}s)")
  146. elif duration < self.DURATION_THRESHOLDS["fair"]:
  147. dur_score = 20
  148. elif duration < self.DURATION_THRESHOLDS["good"]:
  149. dur_score = 26
  150. elif duration < self.DURATION_THRESHOLDS["long"]:
  151. dur_score = 30
  152. else:
  153. dur_score = 22
  154. issues.append(f"视频较长 ({duration:.0f}s,>30 分钟密度可能下降)")
  155. # ── engagement 0-20 (与文章帖相同) ──
  156. like_count = post.get("like_count", 0)
  157. if not isinstance(like_count, (int, float)):
  158. like_count = 0
  159. if like_count == 0:
  160. like_score = 0
  161. issues.append("无点赞数据")
  162. elif like_count < 10:
  163. like_score = 3
  164. elif like_count < 100:
  165. like_score = 6
  166. elif like_count < 1000:
  167. like_score = 8
  168. else:
  169. like_score = 10
  170. timestamp = post.get("publish_timestamp", 0)
  171. if not isinstance(timestamp, (int, float)):
  172. timestamp = 0
  173. if timestamp == 0:
  174. ts_score = 0
  175. issues.append("无发布时间")
  176. elif timestamp < self.cutoff_timestamp:
  177. ts_score = 2
  178. issues.append(f"内容过时(超过{self.time_window_days}天)")
  179. else:
  180. ts_score = 10
  181. # text_score 字段在 video mode 下含义 = title + duration (0-40)
  182. return float(title_score + dur_score), float(like_score + ts_score), issues
  183. def _evaluate_fields(self, post: dict) -> Tuple[float, int]:
  184. """评估字段完整性"""
  185. total_weight = sum(self.FIELD_WEIGHTS.values())
  186. earned_weight = 0.0
  187. valid_count = 0
  188. for field, weight in self.FIELD_WEIGHTS.items():
  189. value = post.get(field)
  190. is_valid = False
  191. if field == "title":
  192. is_valid = bool(value and len(str(value).strip()) > 0)
  193. elif field == "body_text":
  194. is_valid = bool(value and len(str(value).strip()) > 0)
  195. elif field == "like_count":
  196. is_valid = isinstance(value, (int, float)) and value > 0
  197. elif field == "publish_timestamp":
  198. is_valid = isinstance(value, (int, float)) and value > 0
  199. elif field in ("images", "videos"):
  200. is_valid = isinstance(value, list) and len(value) > 0
  201. elif field == "link":
  202. is_valid = bool(value and len(str(value).strip()) > 0)
  203. elif field == "content_type":
  204. is_valid = bool(value and len(str(value).strip()) > 0)
  205. if is_valid:
  206. earned_weight += weight
  207. valid_count += 1
  208. # 转换为 0-40 分
  209. field_score = (earned_weight / total_weight) * 40
  210. return round(field_score, 2), valid_count
  211. def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]:
  212. """评估文本质量"""
  213. issues = []
  214. body_text = post.get("body_text", "")
  215. title = post.get("title", "")
  216. # 清理 HTML 标签(如 <em class="keyword">)
  217. import re
  218. body_text_clean = re.sub(r'<[^>]+>', '', body_text)
  219. title_clean = re.sub(r'<[^>]+>', '', title)
  220. body_len = len(body_text_clean.strip())
  221. title_len = len(title_clean.strip())
  222. # 标题评分 (0-10分)
  223. if title_len == 0:
  224. title_score = 0
  225. issues.append("标题为空")
  226. elif title_len < 10:
  227. title_score = 3
  228. issues.append(f"标题过短 ({title_len}字)")
  229. elif title_len < 20:
  230. title_score = 6
  231. else:
  232. title_score = 10
  233. # 正文评分 (0-30分)
  234. if body_len == 0:
  235. body_score = 0
  236. issues.append("正文为空")
  237. elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]:
  238. body_score = 5
  239. issues.append(f"正文极短 ({body_len}字)")
  240. elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]:
  241. body_score = 12
  242. issues.append(f"正文较短 ({body_len}字)")
  243. elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]:
  244. body_score = 20
  245. elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]:
  246. body_score = 26
  247. else:
  248. body_score = 30
  249. text_score = title_score + body_score
  250. return round(text_score, 2), issues
  251. def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]:
  252. """评估互动数据"""
  253. issues = []
  254. score = 0.0
  255. # 点赞数评分 (0-10分)
  256. like_count = post.get("like_count", 0)
  257. if not isinstance(like_count, (int, float)):
  258. like_count = 0
  259. if like_count == 0:
  260. issues.append("无点赞数据")
  261. elif like_count < 10:
  262. score += 3
  263. elif like_count < 100:
  264. score += 6
  265. elif like_count < 1000:
  266. score += 8
  267. else:
  268. score += 10
  269. # 时间戳评分 (0-10分)
  270. timestamp = post.get("publish_timestamp", 0)
  271. if not isinstance(timestamp, (int, float)):
  272. timestamp = 0
  273. if timestamp == 0:
  274. issues.append("无发布时间")
  275. elif timestamp < self.cutoff_timestamp:
  276. issues.append(f"内容过时(超过{self.time_window_days}天)")
  277. score += 2
  278. else:
  279. score += 10
  280. return round(score, 2), issues
  281. def _calculate_grade(self, score: float) -> str:
  282. """计算等级"""
  283. if score >= 80:
  284. return "A"
  285. elif score >= 60:
  286. return "B"
  287. elif score >= 40:
  288. return "C"
  289. elif score >= 20:
  290. return "D"
  291. else:
  292. return "F"
  293. def evaluate_source_file(self, source_file: Path) -> Dict[str, any]:
  294. """
  295. 评估整个 source.json 文件
  296. Returns:
  297. {
  298. "total_sources": int,
  299. "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布
  300. "avg_score": float,
  301. "low_quality_count": int, # C/D/F 的数量
  302. "low_quality_indices": List[int], # 低质量条目的索引
  303. "details": List[Dict], # 每条的详细评分
  304. }
  305. """
  306. with open(source_file, "r", encoding="utf-8") as f:
  307. data = json.load(f)
  308. sources = data.get("sources", [])
  309. total = len(sources)
  310. grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
  311. scores = []
  312. low_quality_indices = []
  313. details = []
  314. for idx, source in enumerate(sources):
  315. post = source.get("post", {})
  316. eval_result = self.evaluate_post(post)
  317. eval_result["index"] = idx
  318. eval_result["case_id"] = source.get("case_id", "")
  319. eval_result["platform"] = source.get("platform", "")
  320. grade_dist[eval_result["grade"]] += 1
  321. scores.append(eval_result["total_score"])
  322. details.append(eval_result)
  323. # C/D/F 视为低质量
  324. if eval_result["grade"] in ("C", "D", "F"):
  325. low_quality_indices.append(idx)
  326. avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0
  327. return {
  328. "total_sources": total,
  329. "grade_distribution": grade_dist,
  330. "avg_score": avg_score,
  331. "low_quality_count": len(low_quality_indices),
  332. "low_quality_indices": low_quality_indices,
  333. "details": details,
  334. }
  335. def filter_low_quality(
  336. self, source_file: Path, output_file: Path, min_score: float = 40.0
  337. ) -> Dict[str, any]:
  338. """
  339. 过滤低质量内容,生成新的 source.json
  340. Args:
  341. source_file: 原始 source.json 路径
  342. output_file: 输出文件路径
  343. min_score: 最低分数阈值(默认40分,即C级以上)
  344. Returns:
  345. {
  346. "original_count": int,
  347. "filtered_count": int,
  348. "removed_count": int,
  349. "removed_cases": List[str], # 被移除的 case_id
  350. }
  351. """
  352. with open(source_file, "r", encoding="utf-8") as f:
  353. data = json.load(f)
  354. sources = data.get("sources", [])
  355. original_count = len(sources)
  356. filtered_sources = []
  357. removed_sources = []
  358. removed_cases = []
  359. for source in sources:
  360. post = source.get("post", {})
  361. eval_result = self.evaluate_post(post)
  362. if eval_result["total_score"] >= min_score:
  363. filtered_sources.append(source)
  364. else:
  365. source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})"
  366. removed_sources.append(source)
  367. removed_cases.append(source.get("case_id", "unknown"))
  368. # 将被过滤的数据保存到 filtered_cases.json
  369. if removed_sources:
  370. filtered_cases_file = output_file.parent / "filtered_cases.json"
  371. filtered_data = {"total": len(removed_sources), "sources": removed_sources}
  372. with open(filtered_cases_file, "w", encoding="utf-8") as f:
  373. json.dump(filtered_data, f, ensure_ascii=False, indent=2)
  374. # 更新数据
  375. data["sources"] = filtered_sources
  376. data["total"] = len(filtered_sources)
  377. data["quality_filter"] = {
  378. "min_score": min_score,
  379. "original_count": original_count,
  380. "filtered_count": len(filtered_sources),
  381. "removed_count": len(removed_cases),
  382. "filter_timestamp": datetime.now().isoformat(),
  383. }
  384. # 写入新文件
  385. with open(output_file, "w", encoding="utf-8") as f:
  386. json.dump(data, f, ensure_ascii=False, indent=2)
  387. return {
  388. "original_count": original_count,
  389. "filtered_count": len(filtered_sources),
  390. "removed_count": len(removed_cases),
  391. "removed_cases": removed_cases,
  392. }
  393. def generate_quality_report(source_file: Path, output_report: Path = None):
  394. """生成质量评估报告"""
  395. evaluator = SourceQualityEvaluator()
  396. result = evaluator.evaluate_source_file(source_file)
  397. # 生成报告文本
  398. report_lines = [
  399. "=" * 60,
  400. f"Source 质量评估报告",
  401. f"文件:{source_file}",
  402. f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
  403. "=" * 60,
  404. "",
  405. f"📊 总体统计",
  406. f" 总条目数:{result['total_sources']}",
  407. f" 平均得分:{result['avg_score']}/100",
  408. f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)",
  409. "",
  410. f"📈 等级分布",
  411. ]
  412. for grade in ["A", "B", "C", "D", "F"]:
  413. count = result["grade_distribution"][grade]
  414. pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0
  415. bar = "█" * int(pct / 2)
  416. report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}")
  417. report_lines.extend([
  418. "",
  419. f"⚠️ 低质量条目详情 (C/D/F 级)",
  420. "",
  421. ])
  422. # 只显示低质量条目
  423. low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")]
  424. low_quality_details.sort(key=lambda x: x["total_score"])
  425. for detail in low_quality_details[:20]: # 最多显示20条
  426. report_lines.extend([
  427. f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})",
  428. f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)",
  429. f" 文本: {detail['text_score']}/40",
  430. f" 互动: {detail['engagement_score']}/20",
  431. ])
  432. if detail["issues"]:
  433. report_lines.append(f" 问题: {', '.join(detail['issues'])}")
  434. report_lines.append("")
  435. if len(low_quality_details) > 20:
  436. report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示")
  437. report_text = "\n".join(report_lines)
  438. # 输出到控制台
  439. print(report_text)
  440. # 保存到文件
  441. if output_report:
  442. with open(output_report, "w", encoding="utf-8") as f:
  443. f.write(report_text)
  444. print(f"\n报告已保存到:{output_report}")
  445. return result
  446. if __name__ == "__main__":
  447. import argparse
  448. parser = argparse.ArgumentParser(description="评估 source.json 的质量")
  449. parser.add_argument("source_file", type=Path, help="source.json 文件路径")
  450. parser.add_argument("--report", type=Path, help="输出报告文件路径")
  451. parser.add_argument("--filter", type=Path, help="过滤后输出文件路径")
  452. parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)")
  453. args = parser.parse_args()
  454. if not args.source_file.exists():
  455. print(f"错误:文件不存在 {args.source_file}")
  456. exit(1)
  457. # 生成评估报告
  458. result = generate_quality_report(args.source_file, args.report)
  459. # 如果指定了过滤输出
  460. if args.filter:
  461. evaluator = SourceQualityEvaluator()
  462. filter_result = evaluator.filter_low_quality(
  463. args.source_file, args.filter, args.min_score
  464. )
  465. print(f"\n🔍 质量过滤完成")
  466. print(f" 原始条目:{filter_result['original_count']}")
  467. print(f" 保留条目:{filter_result['filtered_count']}")
  468. print(f" 移除条目:{filter_result['removed_count']}")
  469. print(f" 输出文件:{args.filter}")