""" Source.json 质量评估模块 基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。 评分维度: 1. 字段完整性(40分):有效字段占比 2. 文本质量(40分):body_text 长度和信息密度 3. 互动数据(20分):点赞数、时间戳等 """ import json import re from pathlib import Path from typing import Dict, List, Tuple from datetime import datetime, timedelta _HTML_TAG_RE = re.compile(r"<[^>]+>") def _strip_html(text) -> str: """Remove inline HTML tags (e.g. ) from search-result text.""" if not text: return "" return _HTML_TAG_RE.sub("", str(text)) class SourceQualityEvaluator: """Source 数据质量评估器""" # 字段权重配置 FIELD_WEIGHTS = { "title": 5, "body_text": 15, "like_count": 5, "publish_timestamp": 5, "images": 3, "videos": 3, "link": 2, "content_type": 2, } # 文本质量阈值 TEXT_LENGTH_THRESHOLDS = { "excellent": 200, # 优秀:200字以上 "good": 100, # 良好:100-200字 "fair": 50, # 一般:50-100字 "poor": 20, # 较差:20-50字 # < 20字:极差 } def __init__(self, time_window_days: int = 180): """ Args: time_window_days: 时效性窗口(天),默认180天(半年) """ self.time_window_days = time_window_days self.cutoff_timestamp = ( datetime.now() - timedelta(days=time_window_days) ).timestamp() def evaluate_post(self, post: dict) -> Dict[str, any]: """ 评估单个 post 的质量 Returns: { "field_score": float, # 字段完整性得分 (0-40) "text_score": float, # 文本质量得分 (0-40) "engagement_score": float, # 互动数据得分 (0-20) "total_score": float, # 总分 (0-100) "grade": str, # 等级 A/B/C/D/F "issues": List[str], # 问题列表 "valid_fields": int, # 有效字段数 "total_fields": int, # 总字段数 } """ # Video posts (content_type=="video" 或 videos 字段非空) 通常没有 body_text, # 仅靠 caption + 互动数据评分,避免被 body 长度一律打低分。 is_video = ( post.get("content_type") == "video" or bool(post.get("videos")) ) result = { "mode": "video" if is_video else "text", "field_score": 0.0, "text_score": 0.0, # video 模式下含义为 title-only (0-15) "engagement_score": 0.0, # video 模式下扩展为 (0-45) "total_score": 0.0, "grade": "F", "issues": [], "valid_fields": 0, "total_fields": len(self.FIELD_WEIGHTS), } # 1. 字段完整性评分 (0-40 分) field_score, valid_count = self._evaluate_fields(post) result["field_score"] = field_score result["valid_fields"] = valid_count # 2 & 3. 文本/互动评分(视频模式跳过 body 长度,重分权重到 title + 互动) if is_video: title_score, eng_score, issues = self._evaluate_video_signals(post) result["text_score"] = title_score result["engagement_score"] = eng_score result["issues"].extend(issues) else: text_score, text_issues = self._evaluate_text(post) engagement_score, engagement_issues = self._evaluate_engagement(post) result["text_score"] = text_score result["engagement_score"] = engagement_score result["issues"].extend(text_issues + engagement_issues) # 计算总分和等级 result["total_score"] = round( result["field_score"] + result["text_score"] + result["engagement_score"], 2 ) result["grade"] = self._calculate_grade(result["total_score"]) return result # ── video-mode 阈值(mirror body length tiers, but on seconds) ── DURATION_THRESHOLDS = { "very_short": 30, # <30s -> 5/30 "short": 60, # 30-60s -> 12/30 "fair": 120, # 60-120s -> 20/30 "good": 300, # 2-5 min -> 26/30 "long": 1800, # 5-30 min -> 30/30 (best) # >=1800s (>30 min) -> 22/30 (信息密度下降) } def _evaluate_video_signals(self, post: dict) -> Tuple[float, float, List[str]]: """For video posts: replaces body-length scoring with video-duration scoring. Composition: title (0-10) + duration (0-30) + engagement (0-20) = 0-60, mirroring the article-post weights but with duration as the content signal. Reads `duration_sec` from the post (populated by search() via transcription.probe_durations_for_posts before scoring). If absent (probe failed / no video URL), duration_score is 0 with an issue noted. """ issues: List[str] = [] # ── title 0-10 ── title = _strip_html(post.get("title", "")).strip() tlen = len(title) if tlen == 0: title_score = 0 issues.append("标题为空") elif tlen < 10: title_score = 3 issues.append(f"标题过短 ({tlen}字)") elif tlen < 20: title_score = 6 else: title_score = 10 # ── duration 0-30 (replaces body_text length) ── duration = post.get("duration_sec") if not isinstance(duration, (int, float)) or duration <= 0: dur_score = 0 issues.append("无视频时长") elif duration < self.DURATION_THRESHOLDS["very_short"]: dur_score = 5 issues.append(f"视频极短 ({duration:.0f}s)") elif duration < self.DURATION_THRESHOLDS["short"]: dur_score = 12 issues.append(f"视频较短 ({duration:.0f}s)") elif duration < self.DURATION_THRESHOLDS["fair"]: dur_score = 20 elif duration < self.DURATION_THRESHOLDS["good"]: dur_score = 26 elif duration < self.DURATION_THRESHOLDS["long"]: dur_score = 30 else: dur_score = 22 issues.append(f"视频较长 ({duration:.0f}s,>30 分钟密度可能下降)") # ── engagement 0-20 (与文章帖相同) ── like_count = post.get("like_count", 0) if not isinstance(like_count, (int, float)): like_count = 0 if like_count == 0: like_score = 0 issues.append("无点赞数据") elif like_count < 10: like_score = 3 elif like_count < 100: like_score = 6 elif like_count < 1000: like_score = 8 else: like_score = 10 timestamp = post.get("publish_timestamp", 0) if not isinstance(timestamp, (int, float)): timestamp = 0 if timestamp == 0: ts_score = 0 issues.append("无发布时间") elif timestamp < self.cutoff_timestamp: ts_score = 2 issues.append(f"内容过时(超过{self.time_window_days}天)") else: ts_score = 10 # text_score 字段在 video mode 下含义 = title + duration (0-40) return float(title_score + dur_score), float(like_score + ts_score), issues def _evaluate_fields(self, post: dict) -> Tuple[float, int]: """评估字段完整性""" total_weight = sum(self.FIELD_WEIGHTS.values()) earned_weight = 0.0 valid_count = 0 for field, weight in self.FIELD_WEIGHTS.items(): value = post.get(field) is_valid = False if field == "title": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "body_text": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "like_count": is_valid = isinstance(value, (int, float)) and value > 0 elif field == "publish_timestamp": is_valid = isinstance(value, (int, float)) and value > 0 elif field in ("images", "videos"): is_valid = isinstance(value, list) and len(value) > 0 elif field == "link": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "content_type": is_valid = bool(value and len(str(value).strip()) > 0) if is_valid: earned_weight += weight valid_count += 1 # 转换为 0-40 分 field_score = (earned_weight / total_weight) * 40 return round(field_score, 2), valid_count def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]: """评估文本质量""" issues = [] body_text = post.get("body_text", "") title = post.get("title", "") # 清理 HTML 标签(如 ) import re body_text_clean = re.sub(r'<[^>]+>', '', body_text) title_clean = re.sub(r'<[^>]+>', '', title) body_len = len(body_text_clean.strip()) title_len = len(title_clean.strip()) # 标题评分 (0-10分) if title_len == 0: title_score = 0 issues.append("标题为空") elif title_len < 10: title_score = 3 issues.append(f"标题过短 ({title_len}字)") elif title_len < 20: title_score = 6 else: title_score = 10 # 正文评分 (0-30分) if body_len == 0: body_score = 0 issues.append("正文为空") elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]: body_score = 5 issues.append(f"正文极短 ({body_len}字)") elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]: body_score = 12 issues.append(f"正文较短 ({body_len}字)") elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]: body_score = 20 elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]: body_score = 26 else: body_score = 30 text_score = title_score + body_score return round(text_score, 2), issues def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]: """评估互动数据""" issues = [] score = 0.0 # 点赞数评分 (0-10分) like_count = post.get("like_count", 0) if not isinstance(like_count, (int, float)): like_count = 0 if like_count == 0: issues.append("无点赞数据") elif like_count < 10: score += 3 elif like_count < 100: score += 6 elif like_count < 1000: score += 8 else: score += 10 # 时间戳评分 (0-10分) timestamp = post.get("publish_timestamp", 0) if not isinstance(timestamp, (int, float)): timestamp = 0 if timestamp == 0: issues.append("无发布时间") elif timestamp < self.cutoff_timestamp: issues.append(f"内容过时(超过{self.time_window_days}天)") score += 2 else: score += 10 return round(score, 2), issues def _calculate_grade(self, score: float) -> str: """计算等级""" if score >= 80: return "A" elif score >= 60: return "B" elif score >= 40: return "C" elif score >= 20: return "D" else: return "F" def evaluate_source_file(self, source_file: Path) -> Dict[str, any]: """ 评估整个 source.json 文件 Returns: { "total_sources": int, "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布 "avg_score": float, "low_quality_count": int, # C/D/F 的数量 "low_quality_indices": List[int], # 低质量条目的索引 "details": List[Dict], # 每条的详细评分 } """ with open(source_file, "r", encoding="utf-8") as f: data = json.load(f) sources = data.get("sources", []) total = len(sources) grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0} scores = [] low_quality_indices = [] details = [] for idx, source in enumerate(sources): post = source.get("post", {}) eval_result = self.evaluate_post(post) eval_result["index"] = idx eval_result["case_id"] = source.get("case_id", "") eval_result["platform"] = source.get("platform", "") grade_dist[eval_result["grade"]] += 1 scores.append(eval_result["total_score"]) details.append(eval_result) # C/D/F 视为低质量 if eval_result["grade"] in ("C", "D", "F"): low_quality_indices.append(idx) avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0 return { "total_sources": total, "grade_distribution": grade_dist, "avg_score": avg_score, "low_quality_count": len(low_quality_indices), "low_quality_indices": low_quality_indices, "details": details, } def filter_low_quality( self, source_file: Path, output_file: Path, min_score: float = 40.0 ) -> Dict[str, any]: """ 过滤低质量内容,生成新的 source.json Args: source_file: 原始 source.json 路径 output_file: 输出文件路径 min_score: 最低分数阈值(默认40分,即C级以上) Returns: { "original_count": int, "filtered_count": int, "removed_count": int, "removed_cases": List[str], # 被移除的 case_id } """ with open(source_file, "r", encoding="utf-8") as f: data = json.load(f) sources = data.get("sources", []) original_count = len(sources) filtered_sources = [] removed_sources = [] removed_cases = [] for source in sources: post = source.get("post", {}) eval_result = self.evaluate_post(post) if eval_result["total_score"] >= min_score: filtered_sources.append(source) else: source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})" removed_sources.append(source) removed_cases.append(source.get("case_id", "unknown")) # 将被过滤的数据保存到 filtered_cases.json if removed_sources: filtered_cases_file = output_file.parent / "filtered_cases.json" filtered_data = {"total": len(removed_sources), "sources": removed_sources} with open(filtered_cases_file, "w", encoding="utf-8") as f: json.dump(filtered_data, f, ensure_ascii=False, indent=2) # 更新数据 data["sources"] = filtered_sources data["total"] = len(filtered_sources) data["quality_filter"] = { "min_score": min_score, "original_count": original_count, "filtered_count": len(filtered_sources), "removed_count": len(removed_cases), "filter_timestamp": datetime.now().isoformat(), } # 写入新文件 with open(output_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return { "original_count": original_count, "filtered_count": len(filtered_sources), "removed_count": len(removed_cases), "removed_cases": removed_cases, } def generate_quality_report(source_file: Path, output_report: Path = None): """生成质量评估报告""" evaluator = SourceQualityEvaluator() result = evaluator.evaluate_source_file(source_file) # 生成报告文本 report_lines = [ "=" * 60, f"Source 质量评估报告", f"文件:{source_file}", f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "=" * 60, "", f"📊 总体统计", f" 总条目数:{result['total_sources']}", f" 平均得分:{result['avg_score']}/100", f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)", "", f"📈 等级分布", ] for grade in ["A", "B", "C", "D", "F"]: count = result["grade_distribution"][grade] pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0 bar = "█" * int(pct / 2) report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}") report_lines.extend([ "", f"⚠️ 低质量条目详情 (C/D/F 级)", "", ]) # 只显示低质量条目 low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")] low_quality_details.sort(key=lambda x: x["total_score"]) for detail in low_quality_details[:20]: # 最多显示20条 report_lines.extend([ f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})", f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)", f" 文本: {detail['text_score']}/40", f" 互动: {detail['engagement_score']}/20", ]) if detail["issues"]: report_lines.append(f" 问题: {', '.join(detail['issues'])}") report_lines.append("") if len(low_quality_details) > 20: report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示") report_text = "\n".join(report_lines) # 输出到控制台 print(report_text) # 保存到文件 if output_report: with open(output_report, "w", encoding="utf-8") as f: f.write(report_text) print(f"\n报告已保存到:{output_report}") return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="评估 source.json 的质量") parser.add_argument("source_file", type=Path, help="source.json 文件路径") parser.add_argument("--report", type=Path, help="输出报告文件路径") parser.add_argument("--filter", type=Path, help="过滤后输出文件路径") parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)") args = parser.parse_args() if not args.source_file.exists(): print(f"错误:文件不存在 {args.source_file}") exit(1) # 生成评估报告 result = generate_quality_report(args.source_file, args.report) # 如果指定了过滤输出 if args.filter: evaluator = SourceQualityEvaluator() filter_result = evaluator.filter_low_quality( args.source_file, args.filter, args.min_score ) print(f"\n🔍 质量过滤完成") print(f" 原始条目:{filter_result['original_count']}") print(f" 保留条目:{filter_result['filtered_count']}") print(f" 移除条目:{filter_result['removed_count']}") print(f" 输出文件:{args.filter}")