""" Source.json 质量评估模块 基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。 评分维度: 1. 字段完整性(40分):有效字段占比 2. 文本质量(40分):body_text 长度和信息密度 3. 互动数据(20分):点赞数、时间戳等 """ import json from pathlib import Path from typing import Dict, List, Tuple from datetime import datetime, timedelta class SourceQualityEvaluator: """Source 数据质量评估器""" # 字段权重配置 FIELD_WEIGHTS = { "title": 5, "body_text": 15, "like_count": 5, "publish_timestamp": 5, "images": 3, "videos": 3, "link": 2, "content_type": 2, } # 文本质量阈值 TEXT_LENGTH_THRESHOLDS = { "excellent": 200, # 优秀:200字以上 "good": 100, # 良好:100-200字 "fair": 50, # 一般:50-100字 "poor": 20, # 较差:20-50字 # < 20字:极差 } def __init__(self, time_window_days: int = 180): """ Args: time_window_days: 时效性窗口(天),默认180天(半年) """ self.time_window_days = time_window_days self.cutoff_timestamp = ( datetime.now() - timedelta(days=time_window_days) ).timestamp() def evaluate_post(self, post: dict) -> Dict[str, any]: """ 评估单个 post 的质量 Returns: { "field_score": float, # 字段完整性得分 (0-40) "text_score": float, # 文本质量得分 (0-40) "engagement_score": float, # 互动数据得分 (0-20) "total_score": float, # 总分 (0-100) "grade": str, # 等级 A/B/C/D/F "issues": List[str], # 问题列表 "valid_fields": int, # 有效字段数 "total_fields": int, # 总字段数 } """ result = { "field_score": 0.0, "text_score": 0.0, "engagement_score": 0.0, "total_score": 0.0, "grade": "F", "issues": [], "valid_fields": 0, "total_fields": len(self.FIELD_WEIGHTS), } # 1. 字段完整性评分 (0-40分) field_score, valid_count = self._evaluate_fields(post) result["field_score"] = field_score result["valid_fields"] = valid_count # 2. 文本质量评分 (0-40分) text_score, text_issues = self._evaluate_text(post) result["text_score"] = text_score result["issues"].extend(text_issues) # 3. 互动数据评分 (0-20分) engagement_score, engagement_issues = self._evaluate_engagement(post) result["engagement_score"] = engagement_score result["issues"].extend(engagement_issues) # 计算总分和等级 result["total_score"] = round( result["field_score"] + result["text_score"] + result["engagement_score"], 2 ) result["grade"] = self._calculate_grade(result["total_score"]) return result def _evaluate_fields(self, post: dict) -> Tuple[float, int]: """评估字段完整性""" total_weight = sum(self.FIELD_WEIGHTS.values()) earned_weight = 0.0 valid_count = 0 for field, weight in self.FIELD_WEIGHTS.items(): value = post.get(field) is_valid = False if field == "title": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "body_text": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "like_count": is_valid = isinstance(value, (int, float)) and value > 0 elif field == "publish_timestamp": is_valid = isinstance(value, (int, float)) and value > 0 elif field in ("images", "videos"): is_valid = isinstance(value, list) and len(value) > 0 elif field == "link": is_valid = bool(value and len(str(value).strip()) > 0) elif field == "content_type": is_valid = bool(value and len(str(value).strip()) > 0) if is_valid: earned_weight += weight valid_count += 1 # 转换为 0-40 分 field_score = (earned_weight / total_weight) * 40 return round(field_score, 2), valid_count def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]: """评估文本质量""" issues = [] body_text = post.get("body_text", "") title = post.get("title", "") # 清理 HTML 标签(如 ) import re body_text_clean = re.sub(r'<[^>]+>', '', body_text) title_clean = re.sub(r'<[^>]+>', '', title) body_len = len(body_text_clean.strip()) title_len = len(title_clean.strip()) # 标题评分 (0-10分) if title_len == 0: title_score = 0 issues.append("标题为空") elif title_len < 10: title_score = 3 issues.append(f"标题过短 ({title_len}字)") elif title_len < 20: title_score = 6 else: title_score = 10 # 正文评分 (0-30分) if body_len == 0: body_score = 0 issues.append("正文为空") elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]: body_score = 5 issues.append(f"正文极短 ({body_len}字)") elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]: body_score = 12 issues.append(f"正文较短 ({body_len}字)") elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]: body_score = 20 elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]: body_score = 26 else: body_score = 30 text_score = title_score + body_score return round(text_score, 2), issues def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]: """评估互动数据""" issues = [] score = 0.0 # 点赞数评分 (0-10分) like_count = post.get("like_count", 0) if not isinstance(like_count, (int, float)): like_count = 0 if like_count == 0: issues.append("无点赞数据") elif like_count < 10: score += 3 elif like_count < 100: score += 6 elif like_count < 1000: score += 8 else: score += 10 # 时间戳评分 (0-10分) timestamp = post.get("publish_timestamp", 0) if not isinstance(timestamp, (int, float)): timestamp = 0 if timestamp == 0: issues.append("无发布时间") elif timestamp < self.cutoff_timestamp: issues.append(f"内容过时(超过{self.time_window_days}天)") score += 2 else: score += 10 return round(score, 2), issues def _calculate_grade(self, score: float) -> str: """计算等级""" if score >= 80: return "A" elif score >= 60: return "B" elif score >= 40: return "C" elif score >= 20: return "D" else: return "F" def evaluate_source_file(self, source_file: Path) -> Dict[str, any]: """ 评估整个 source.json 文件 Returns: { "total_sources": int, "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布 "avg_score": float, "low_quality_count": int, # C/D/F 的数量 "low_quality_indices": List[int], # 低质量条目的索引 "details": List[Dict], # 每条的详细评分 } """ with open(source_file, "r", encoding="utf-8") as f: data = json.load(f) sources = data.get("sources", []) total = len(sources) grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0} scores = [] low_quality_indices = [] details = [] for idx, source in enumerate(sources): post = source.get("post", {}) eval_result = self.evaluate_post(post) eval_result["index"] = idx eval_result["case_id"] = source.get("case_id", "") eval_result["platform"] = source.get("platform", "") grade_dist[eval_result["grade"]] += 1 scores.append(eval_result["total_score"]) details.append(eval_result) # C/D/F 视为低质量 if eval_result["grade"] in ("C", "D", "F"): low_quality_indices.append(idx) avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0 return { "total_sources": total, "grade_distribution": grade_dist, "avg_score": avg_score, "low_quality_count": len(low_quality_indices), "low_quality_indices": low_quality_indices, "details": details, } def filter_low_quality( self, source_file: Path, output_file: Path, min_score: float = 40.0 ) -> Dict[str, any]: """ 过滤低质量内容,生成新的 source.json Args: source_file: 原始 source.json 路径 output_file: 输出文件路径 min_score: 最低分数阈值(默认40分,即C级以上) Returns: { "original_count": int, "filtered_count": int, "removed_count": int, "removed_cases": List[str], # 被移除的 case_id } """ with open(source_file, "r", encoding="utf-8") as f: data = json.load(f) sources = data.get("sources", []) original_count = len(sources) filtered_sources = [] removed_sources = [] removed_cases = [] for source in sources: post = source.get("post", {}) eval_result = self.evaluate_post(post) if eval_result["total_score"] >= min_score: filtered_sources.append(source) else: source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})" removed_sources.append(source) removed_cases.append(source.get("case_id", "unknown")) # 将被过滤的数据保存到 filtered_cases.json if removed_sources: filtered_cases_file = output_file.parent / "filtered_cases.json" filtered_data = {"total": len(removed_sources), "sources": removed_sources} with open(filtered_cases_file, "w", encoding="utf-8") as f: json.dump(filtered_data, f, ensure_ascii=False, indent=2) # 更新数据 data["sources"] = filtered_sources data["total"] = len(filtered_sources) data["quality_filter"] = { "min_score": min_score, "original_count": original_count, "filtered_count": len(filtered_sources), "removed_count": len(removed_cases), "filter_timestamp": datetime.now().isoformat(), } # 写入新文件 with open(output_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return { "original_count": original_count, "filtered_count": len(filtered_sources), "removed_count": len(removed_cases), "removed_cases": removed_cases, } def generate_quality_report(source_file: Path, output_report: Path = None): """生成质量评估报告""" evaluator = SourceQualityEvaluator() result = evaluator.evaluate_source_file(source_file) # 生成报告文本 report_lines = [ "=" * 60, f"Source 质量评估报告", f"文件:{source_file}", f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "=" * 60, "", f"📊 总体统计", f" 总条目数:{result['total_sources']}", f" 平均得分:{result['avg_score']}/100", f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)", "", f"📈 等级分布", ] for grade in ["A", "B", "C", "D", "F"]: count = result["grade_distribution"][grade] pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0 bar = "█" * int(pct / 2) report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}") report_lines.extend([ "", f"⚠️ 低质量条目详情 (C/D/F 级)", "", ]) # 只显示低质量条目 low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")] low_quality_details.sort(key=lambda x: x["total_score"]) for detail in low_quality_details[:20]: # 最多显示20条 report_lines.extend([ f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})", f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)", f" 文本: {detail['text_score']}/40", f" 互动: {detail['engagement_score']}/20", ]) if detail["issues"]: report_lines.append(f" 问题: {', '.join(detail['issues'])}") report_lines.append("") if len(low_quality_details) > 20: report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示") report_text = "\n".join(report_lines) # 输出到控制台 print(report_text) # 保存到文件 if output_report: with open(output_report, "w", encoding="utf-8") as f: f.write(report_text) print(f"\n报告已保存到:{output_report}") return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="评估 source.json 的质量") parser.add_argument("source_file", type=Path, help="source.json 文件路径") parser.add_argument("--report", type=Path, help="输出报告文件路径") parser.add_argument("--filter", type=Path, help="过滤后输出文件路径") parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)") args = parser.parse_args() if not args.source_file.exists(): print(f"错误:文件不存在 {args.source_file}") exit(1) # 生成评估报告 result = generate_quality_report(args.source_file, args.report) # 如果指定了过滤输出 if args.filter: evaluator = SourceQualityEvaluator() filter_result = evaluator.filter_low_quality( args.source_file, args.filter, args.min_score ) print(f"\n🔍 质量过滤完成") print(f" 原始条目:{filter_result['original_count']}") print(f" 保留条目:{filter_result['filtered_count']}") print(f" 移除条目:{filter_result['removed_count']}") print(f" 输出文件:{args.filter}")