|
|
@@ -0,0 +1,443 @@
|
|
|
+"""
|
|
|
+Source.json 质量评估模块
|
|
|
+
|
|
|
+基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。
|
|
|
+
|
|
|
+评分维度:
|
|
|
+1. 字段完整性(40分):有效字段占比
|
|
|
+2. 文本质量(40分):body_text 长度和信息密度
|
|
|
+3. 互动数据(20分):点赞数、时间戳等
|
|
|
+"""
|
|
|
+
|
|
|
+import json
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, List, Tuple
|
|
|
+from datetime import datetime, timedelta
|
|
|
+
|
|
|
+
|
|
|
+class SourceQualityEvaluator:
|
|
|
+ """Source 数据质量评估器"""
|
|
|
+
|
|
|
+ # 字段权重配置
|
|
|
+ FIELD_WEIGHTS = {
|
|
|
+ "title": 5,
|
|
|
+ "body_text": 15,
|
|
|
+ "like_count": 5,
|
|
|
+ "publish_timestamp": 5,
|
|
|
+ "images": 3,
|
|
|
+ "videos": 3,
|
|
|
+ "link": 2,
|
|
|
+ "content_type": 2,
|
|
|
+ }
|
|
|
+
|
|
|
+ # 文本质量阈值
|
|
|
+ TEXT_LENGTH_THRESHOLDS = {
|
|
|
+ "excellent": 200, # 优秀:200字以上
|
|
|
+ "good": 100, # 良好:100-200字
|
|
|
+ "fair": 50, # 一般:50-100字
|
|
|
+ "poor": 20, # 较差:20-50字
|
|
|
+ # < 20字:极差
|
|
|
+ }
|
|
|
+
|
|
|
+ def __init__(self, time_window_days: int = 180):
|
|
|
+ """
|
|
|
+ Args:
|
|
|
+ time_window_days: 时效性窗口(天),默认180天(半年)
|
|
|
+ """
|
|
|
+ self.time_window_days = time_window_days
|
|
|
+ self.cutoff_timestamp = (
|
|
|
+ datetime.now() - timedelta(days=time_window_days)
|
|
|
+ ).timestamp()
|
|
|
+
|
|
|
+ def evaluate_post(self, post: dict) -> Dict[str, any]:
|
|
|
+ """
|
|
|
+ 评估单个 post 的质量
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ {
|
|
|
+ "field_score": float, # 字段完整性得分 (0-40)
|
|
|
+ "text_score": float, # 文本质量得分 (0-40)
|
|
|
+ "engagement_score": float, # 互动数据得分 (0-20)
|
|
|
+ "total_score": float, # 总分 (0-100)
|
|
|
+ "grade": str, # 等级 A/B/C/D/F
|
|
|
+ "issues": List[str], # 问题列表
|
|
|
+ "valid_fields": int, # 有效字段数
|
|
|
+ "total_fields": int, # 总字段数
|
|
|
+ }
|
|
|
+ """
|
|
|
+ result = {
|
|
|
+ "field_score": 0.0,
|
|
|
+ "text_score": 0.0,
|
|
|
+ "engagement_score": 0.0,
|
|
|
+ "total_score": 0.0,
|
|
|
+ "grade": "F",
|
|
|
+ "issues": [],
|
|
|
+ "valid_fields": 0,
|
|
|
+ "total_fields": len(self.FIELD_WEIGHTS),
|
|
|
+ }
|
|
|
+
|
|
|
+ # 1. 字段完整性评分 (0-40分)
|
|
|
+ field_score, valid_count = self._evaluate_fields(post)
|
|
|
+ result["field_score"] = field_score
|
|
|
+ result["valid_fields"] = valid_count
|
|
|
+
|
|
|
+ # 2. 文本质量评分 (0-40分)
|
|
|
+ text_score, text_issues = self._evaluate_text(post)
|
|
|
+ result["text_score"] = text_score
|
|
|
+ result["issues"].extend(text_issues)
|
|
|
+
|
|
|
+ # 3. 互动数据评分 (0-20分)
|
|
|
+ engagement_score, engagement_issues = self._evaluate_engagement(post)
|
|
|
+ result["engagement_score"] = engagement_score
|
|
|
+ result["issues"].extend(engagement_issues)
|
|
|
+
|
|
|
+ # 计算总分和等级
|
|
|
+ result["total_score"] = round(
|
|
|
+ result["field_score"] + result["text_score"] + result["engagement_score"], 2
|
|
|
+ )
|
|
|
+ result["grade"] = self._calculate_grade(result["total_score"])
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def _evaluate_fields(self, post: dict) -> Tuple[float, int]:
|
|
|
+ """评估字段完整性"""
|
|
|
+ total_weight = sum(self.FIELD_WEIGHTS.values())
|
|
|
+ earned_weight = 0.0
|
|
|
+ valid_count = 0
|
|
|
+
|
|
|
+ for field, weight in self.FIELD_WEIGHTS.items():
|
|
|
+ value = post.get(field)
|
|
|
+ is_valid = False
|
|
|
+
|
|
|
+ if field == "title":
|
|
|
+ is_valid = bool(value and len(str(value).strip()) > 0)
|
|
|
+ elif field == "body_text":
|
|
|
+ is_valid = bool(value and len(str(value).strip()) > 0)
|
|
|
+ elif field == "like_count":
|
|
|
+ is_valid = isinstance(value, (int, float)) and value > 0
|
|
|
+ elif field == "publish_timestamp":
|
|
|
+ is_valid = isinstance(value, (int, float)) and value > 0
|
|
|
+ elif field in ("images", "videos"):
|
|
|
+ is_valid = isinstance(value, list) and len(value) > 0
|
|
|
+ elif field == "link":
|
|
|
+ is_valid = bool(value and len(str(value).strip()) > 0)
|
|
|
+ elif field == "content_type":
|
|
|
+ is_valid = bool(value and len(str(value).strip()) > 0)
|
|
|
+
|
|
|
+ if is_valid:
|
|
|
+ earned_weight += weight
|
|
|
+ valid_count += 1
|
|
|
+
|
|
|
+ # 转换为 0-40 分
|
|
|
+ field_score = (earned_weight / total_weight) * 40
|
|
|
+ return round(field_score, 2), valid_count
|
|
|
+
|
|
|
+ def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]:
|
|
|
+ """评估文本质量"""
|
|
|
+ issues = []
|
|
|
+ body_text = post.get("body_text", "")
|
|
|
+ title = post.get("title", "")
|
|
|
+
|
|
|
+ # 清理 HTML 标签(如 <em class="keyword">)
|
|
|
+ import re
|
|
|
+ body_text_clean = re.sub(r'<[^>]+>', '', body_text)
|
|
|
+ title_clean = re.sub(r'<[^>]+>', '', title)
|
|
|
+
|
|
|
+ body_len = len(body_text_clean.strip())
|
|
|
+ title_len = len(title_clean.strip())
|
|
|
+
|
|
|
+ # 标题评分 (0-10分)
|
|
|
+ if title_len == 0:
|
|
|
+ title_score = 0
|
|
|
+ issues.append("标题为空")
|
|
|
+ elif title_len < 10:
|
|
|
+ title_score = 3
|
|
|
+ issues.append(f"标题过短 ({title_len}字)")
|
|
|
+ elif title_len < 20:
|
|
|
+ title_score = 6
|
|
|
+ else:
|
|
|
+ title_score = 10
|
|
|
+
|
|
|
+ # 正文评分 (0-30分)
|
|
|
+ if body_len == 0:
|
|
|
+ body_score = 0
|
|
|
+ issues.append("正文为空")
|
|
|
+ elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]:
|
|
|
+ body_score = 5
|
|
|
+ issues.append(f"正文极短 ({body_len}字)")
|
|
|
+ elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]:
|
|
|
+ body_score = 12
|
|
|
+ issues.append(f"正文较短 ({body_len}字)")
|
|
|
+ elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]:
|
|
|
+ body_score = 20
|
|
|
+ elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]:
|
|
|
+ body_score = 26
|
|
|
+ else:
|
|
|
+ body_score = 30
|
|
|
+
|
|
|
+ text_score = title_score + body_score
|
|
|
+ return round(text_score, 2), issues
|
|
|
+
|
|
|
+ def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]:
|
|
|
+ """评估互动数据"""
|
|
|
+ issues = []
|
|
|
+ score = 0.0
|
|
|
+
|
|
|
+ # 点赞数评分 (0-10分)
|
|
|
+ like_count = post.get("like_count", 0)
|
|
|
+ if not isinstance(like_count, (int, float)):
|
|
|
+ like_count = 0
|
|
|
+
|
|
|
+ if like_count == 0:
|
|
|
+ issues.append("无点赞数据")
|
|
|
+ elif like_count < 10:
|
|
|
+ score += 3
|
|
|
+ elif like_count < 100:
|
|
|
+ score += 6
|
|
|
+ elif like_count < 1000:
|
|
|
+ score += 8
|
|
|
+ else:
|
|
|
+ score += 10
|
|
|
+
|
|
|
+ # 时间戳评分 (0-10分)
|
|
|
+ timestamp = post.get("publish_timestamp", 0)
|
|
|
+ if not isinstance(timestamp, (int, float)):
|
|
|
+ timestamp = 0
|
|
|
+
|
|
|
+ if timestamp == 0:
|
|
|
+ issues.append("无发布时间")
|
|
|
+ elif timestamp < self.cutoff_timestamp:
|
|
|
+ issues.append(f"内容过时(超过{self.time_window_days}天)")
|
|
|
+ score += 2
|
|
|
+ else:
|
|
|
+ score += 10
|
|
|
+
|
|
|
+ return round(score, 2), issues
|
|
|
+
|
|
|
+ def _calculate_grade(self, score: float) -> str:
|
|
|
+ """计算等级"""
|
|
|
+ if score >= 80:
|
|
|
+ return "A"
|
|
|
+ elif score >= 60:
|
|
|
+ return "B"
|
|
|
+ elif score >= 40:
|
|
|
+ return "C"
|
|
|
+ elif score >= 20:
|
|
|
+ return "D"
|
|
|
+ else:
|
|
|
+ return "F"
|
|
|
+
|
|
|
+ def evaluate_source_file(self, source_file: Path) -> Dict[str, any]:
|
|
|
+ """
|
|
|
+ 评估整个 source.json 文件
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ {
|
|
|
+ "total_sources": int,
|
|
|
+ "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布
|
|
|
+ "avg_score": float,
|
|
|
+ "low_quality_count": int, # C/D/F 的数量
|
|
|
+ "low_quality_indices": List[int], # 低质量条目的索引
|
|
|
+ "details": List[Dict], # 每条的详细评分
|
|
|
+ }
|
|
|
+ """
|
|
|
+ with open(source_file, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+
|
|
|
+ sources = data.get("sources", [])
|
|
|
+ total = len(sources)
|
|
|
+
|
|
|
+ grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
|
|
|
+ scores = []
|
|
|
+ low_quality_indices = []
|
|
|
+ details = []
|
|
|
+
|
|
|
+ for idx, source in enumerate(sources):
|
|
|
+ post = source.get("post", {})
|
|
|
+ eval_result = self.evaluate_post(post)
|
|
|
+ eval_result["index"] = idx
|
|
|
+ eval_result["case_id"] = source.get("case_id", "")
|
|
|
+ eval_result["platform"] = source.get("platform", "")
|
|
|
+
|
|
|
+ grade_dist[eval_result["grade"]] += 1
|
|
|
+ scores.append(eval_result["total_score"])
|
|
|
+ details.append(eval_result)
|
|
|
+
|
|
|
+ # C/D/F 视为低质量
|
|
|
+ if eval_result["grade"] in ("C", "D", "F"):
|
|
|
+ low_quality_indices.append(idx)
|
|
|
+
|
|
|
+ avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0
|
|
|
+
|
|
|
+ return {
|
|
|
+ "total_sources": total,
|
|
|
+ "grade_distribution": grade_dist,
|
|
|
+ "avg_score": avg_score,
|
|
|
+ "low_quality_count": len(low_quality_indices),
|
|
|
+ "low_quality_indices": low_quality_indices,
|
|
|
+ "details": details,
|
|
|
+ }
|
|
|
+
|
|
|
+ def filter_low_quality(
|
|
|
+ self, source_file: Path, output_file: Path, min_score: float = 40.0
|
|
|
+ ) -> Dict[str, any]:
|
|
|
+ """
|
|
|
+ 过滤低质量内容,生成新的 source.json
|
|
|
+
|
|
|
+ Args:
|
|
|
+ source_file: 原始 source.json 路径
|
|
|
+ output_file: 输出文件路径
|
|
|
+ min_score: 最低分数阈值(默认40分,即C级以上)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ {
|
|
|
+ "original_count": int,
|
|
|
+ "filtered_count": int,
|
|
|
+ "removed_count": int,
|
|
|
+ "removed_cases": List[str], # 被移除的 case_id
|
|
|
+ }
|
|
|
+ """
|
|
|
+ with open(source_file, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+
|
|
|
+ sources = data.get("sources", [])
|
|
|
+ original_count = len(sources)
|
|
|
+
|
|
|
+ filtered_sources = []
|
|
|
+ removed_sources = []
|
|
|
+ removed_cases = []
|
|
|
+
|
|
|
+ for source in sources:
|
|
|
+ post = source.get("post", {})
|
|
|
+ eval_result = self.evaluate_post(post)
|
|
|
+
|
|
|
+ if eval_result["total_score"] >= min_score:
|
|
|
+ filtered_sources.append(source)
|
|
|
+ else:
|
|
|
+ source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})"
|
|
|
+ removed_sources.append(source)
|
|
|
+ removed_cases.append(source.get("case_id", "unknown"))
|
|
|
+
|
|
|
+ # 将被过滤的数据保存到 filtered_cases.json
|
|
|
+ if removed_sources:
|
|
|
+ filtered_cases_file = output_file.parent / "filtered_cases.json"
|
|
|
+ filtered_data = {"total": len(removed_sources), "sources": removed_sources}
|
|
|
+ with open(filtered_cases_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(filtered_data, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ # 更新数据
|
|
|
+ data["sources"] = filtered_sources
|
|
|
+ data["total"] = len(filtered_sources)
|
|
|
+ data["quality_filter"] = {
|
|
|
+ "min_score": min_score,
|
|
|
+ "original_count": original_count,
|
|
|
+ "filtered_count": len(filtered_sources),
|
|
|
+ "removed_count": len(removed_cases),
|
|
|
+ "filter_timestamp": datetime.now().isoformat(),
|
|
|
+ }
|
|
|
+
|
|
|
+ # 写入新文件
|
|
|
+ with open(output_file, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ return {
|
|
|
+ "original_count": original_count,
|
|
|
+ "filtered_count": len(filtered_sources),
|
|
|
+ "removed_count": len(removed_cases),
|
|
|
+ "removed_cases": removed_cases,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def generate_quality_report(source_file: Path, output_report: Path = None):
|
|
|
+ """生成质量评估报告"""
|
|
|
+ evaluator = SourceQualityEvaluator()
|
|
|
+ result = evaluator.evaluate_source_file(source_file)
|
|
|
+
|
|
|
+ # 生成报告文本
|
|
|
+ report_lines = [
|
|
|
+ "=" * 60,
|
|
|
+ f"Source 质量评估报告",
|
|
|
+ f"文件:{source_file}",
|
|
|
+ f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
|
+ "=" * 60,
|
|
|
+ "",
|
|
|
+ f"📊 总体统计",
|
|
|
+ f" 总条目数:{result['total_sources']}",
|
|
|
+ f" 平均得分:{result['avg_score']}/100",
|
|
|
+ f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)",
|
|
|
+ "",
|
|
|
+ f"📈 等级分布",
|
|
|
+ ]
|
|
|
+
|
|
|
+ for grade in ["A", "B", "C", "D", "F"]:
|
|
|
+ count = result["grade_distribution"][grade]
|
|
|
+ pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0
|
|
|
+ bar = "█" * int(pct / 2)
|
|
|
+ report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}")
|
|
|
+
|
|
|
+ report_lines.extend([
|
|
|
+ "",
|
|
|
+ f"⚠️ 低质量条目详情 (C/D/F 级)",
|
|
|
+ "",
|
|
|
+ ])
|
|
|
+
|
|
|
+ # 只显示低质量条目
|
|
|
+ low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")]
|
|
|
+ low_quality_details.sort(key=lambda x: x["total_score"])
|
|
|
+
|
|
|
+ for detail in low_quality_details[:20]: # 最多显示20条
|
|
|
+ report_lines.extend([
|
|
|
+ f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})",
|
|
|
+ f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)",
|
|
|
+ f" 文本: {detail['text_score']}/40",
|
|
|
+ f" 互动: {detail['engagement_score']}/20",
|
|
|
+ ])
|
|
|
+ if detail["issues"]:
|
|
|
+ report_lines.append(f" 问题: {', '.join(detail['issues'])}")
|
|
|
+ report_lines.append("")
|
|
|
+
|
|
|
+ if len(low_quality_details) > 20:
|
|
|
+ report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示")
|
|
|
+
|
|
|
+ report_text = "\n".join(report_lines)
|
|
|
+
|
|
|
+ # 输出到控制台
|
|
|
+ print(report_text)
|
|
|
+
|
|
|
+ # 保存到文件
|
|
|
+ if output_report:
|
|
|
+ with open(output_report, "w", encoding="utf-8") as f:
|
|
|
+ f.write(report_text)
|
|
|
+ print(f"\n报告已保存到:{output_report}")
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ import argparse
|
|
|
+
|
|
|
+ parser = argparse.ArgumentParser(description="评估 source.json 的质量")
|
|
|
+ parser.add_argument("source_file", type=Path, help="source.json 文件路径")
|
|
|
+ parser.add_argument("--report", type=Path, help="输出报告文件路径")
|
|
|
+ parser.add_argument("--filter", type=Path, help="过滤后输出文件路径")
|
|
|
+ parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)")
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ if not args.source_file.exists():
|
|
|
+ print(f"错误:文件不存在 {args.source_file}")
|
|
|
+ exit(1)
|
|
|
+
|
|
|
+ # 生成评估报告
|
|
|
+ result = generate_quality_report(args.source_file, args.report)
|
|
|
+
|
|
|
+ # 如果指定了过滤输出
|
|
|
+ if args.filter:
|
|
|
+ evaluator = SourceQualityEvaluator()
|
|
|
+ filter_result = evaluator.filter_low_quality(
|
|
|
+ args.source_file, args.filter, args.min_score
|
|
|
+ )
|
|
|
+ print(f"\n🔍 质量过滤完成")
|
|
|
+ print(f" 原始条目:{filter_result['original_count']}")
|
|
|
+ print(f" 保留条目:{filter_result['filtered_count']}")
|
|
|
+ print(f" 移除条目:{filter_result['removed_count']}")
|
|
|
+ print(f" 输出文件:{args.filter}")
|