| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- """
- Source.json 质量评估模块
- 基于字段完整性和文本量对调研结果进行评分,识别低质量内容并支持二次筛选。
- 评分维度:
- 1. 字段完整性(40分):有效字段占比
- 2. 文本质量(40分):body_text 长度和信息密度
- 3. 互动数据(20分):点赞数、时间戳等
- """
- import json
- from pathlib import Path
- from typing import Dict, List, Tuple
- from datetime import datetime, timedelta
- class SourceQualityEvaluator:
- """Source 数据质量评估器"""
- # 字段权重配置
- FIELD_WEIGHTS = {
- "title": 5,
- "body_text": 15,
- "like_count": 5,
- "publish_timestamp": 5,
- "images": 3,
- "videos": 3,
- "link": 2,
- "content_type": 2,
- }
- # 文本质量阈值
- TEXT_LENGTH_THRESHOLDS = {
- "excellent": 200, # 优秀:200字以上
- "good": 100, # 良好:100-200字
- "fair": 50, # 一般:50-100字
- "poor": 20, # 较差:20-50字
- # < 20字:极差
- }
- def __init__(self, time_window_days: int = 180):
- """
- Args:
- time_window_days: 时效性窗口(天),默认180天(半年)
- """
- self.time_window_days = time_window_days
- self.cutoff_timestamp = (
- datetime.now() - timedelta(days=time_window_days)
- ).timestamp()
- def evaluate_post(self, post: dict) -> Dict[str, any]:
- """
- 评估单个 post 的质量
- Returns:
- {
- "field_score": float, # 字段完整性得分 (0-40)
- "text_score": float, # 文本质量得分 (0-40)
- "engagement_score": float, # 互动数据得分 (0-20)
- "total_score": float, # 总分 (0-100)
- "grade": str, # 等级 A/B/C/D/F
- "issues": List[str], # 问题列表
- "valid_fields": int, # 有效字段数
- "total_fields": int, # 总字段数
- }
- """
- result = {
- "field_score": 0.0,
- "text_score": 0.0,
- "engagement_score": 0.0,
- "total_score": 0.0,
- "grade": "F",
- "issues": [],
- "valid_fields": 0,
- "total_fields": len(self.FIELD_WEIGHTS),
- }
- # 1. 字段完整性评分 (0-40分)
- field_score, valid_count = self._evaluate_fields(post)
- result["field_score"] = field_score
- result["valid_fields"] = valid_count
- # 2. 文本质量评分 (0-40分)
- text_score, text_issues = self._evaluate_text(post)
- result["text_score"] = text_score
- result["issues"].extend(text_issues)
- # 3. 互动数据评分 (0-20分)
- engagement_score, engagement_issues = self._evaluate_engagement(post)
- result["engagement_score"] = engagement_score
- result["issues"].extend(engagement_issues)
- # 计算总分和等级
- result["total_score"] = round(
- result["field_score"] + result["text_score"] + result["engagement_score"], 2
- )
- result["grade"] = self._calculate_grade(result["total_score"])
- return result
- def _evaluate_fields(self, post: dict) -> Tuple[float, int]:
- """评估字段完整性"""
- total_weight = sum(self.FIELD_WEIGHTS.values())
- earned_weight = 0.0
- valid_count = 0
- for field, weight in self.FIELD_WEIGHTS.items():
- value = post.get(field)
- is_valid = False
- if field == "title":
- is_valid = bool(value and len(str(value).strip()) > 0)
- elif field == "body_text":
- is_valid = bool(value and len(str(value).strip()) > 0)
- elif field == "like_count":
- is_valid = isinstance(value, (int, float)) and value > 0
- elif field == "publish_timestamp":
- is_valid = isinstance(value, (int, float)) and value > 0
- elif field in ("images", "videos"):
- is_valid = isinstance(value, list) and len(value) > 0
- elif field == "link":
- is_valid = bool(value and len(str(value).strip()) > 0)
- elif field == "content_type":
- is_valid = bool(value and len(str(value).strip()) > 0)
- if is_valid:
- earned_weight += weight
- valid_count += 1
- # 转换为 0-40 分
- field_score = (earned_weight / total_weight) * 40
- return round(field_score, 2), valid_count
- def _evaluate_text(self, post: dict) -> Tuple[float, List[str]]:
- """评估文本质量"""
- issues = []
- body_text = post.get("body_text", "")
- title = post.get("title", "")
- # 清理 HTML 标签(如 <em class="keyword">)
- import re
- body_text_clean = re.sub(r'<[^>]+>', '', body_text)
- title_clean = re.sub(r'<[^>]+>', '', title)
- body_len = len(body_text_clean.strip())
- title_len = len(title_clean.strip())
- # 标题评分 (0-10分)
- if title_len == 0:
- title_score = 0
- issues.append("标题为空")
- elif title_len < 10:
- title_score = 3
- issues.append(f"标题过短 ({title_len}字)")
- elif title_len < 20:
- title_score = 6
- else:
- title_score = 10
- # 正文评分 (0-30分)
- if body_len == 0:
- body_score = 0
- issues.append("正文为空")
- elif body_len < self.TEXT_LENGTH_THRESHOLDS["poor"]:
- body_score = 5
- issues.append(f"正文极短 ({body_len}字)")
- elif body_len < self.TEXT_LENGTH_THRESHOLDS["fair"]:
- body_score = 12
- issues.append(f"正文较短 ({body_len}字)")
- elif body_len < self.TEXT_LENGTH_THRESHOLDS["good"]:
- body_score = 20
- elif body_len < self.TEXT_LENGTH_THRESHOLDS["excellent"]:
- body_score = 26
- else:
- body_score = 30
- text_score = title_score + body_score
- return round(text_score, 2), issues
- def _evaluate_engagement(self, post: dict) -> Tuple[float, List[str]]:
- """评估互动数据"""
- issues = []
- score = 0.0
- # 点赞数评分 (0-10分)
- like_count = post.get("like_count", 0)
- if not isinstance(like_count, (int, float)):
- like_count = 0
- if like_count == 0:
- issues.append("无点赞数据")
- elif like_count < 10:
- score += 3
- elif like_count < 100:
- score += 6
- elif like_count < 1000:
- score += 8
- else:
- score += 10
- # 时间戳评分 (0-10分)
- timestamp = post.get("publish_timestamp", 0)
- if not isinstance(timestamp, (int, float)):
- timestamp = 0
- if timestamp == 0:
- issues.append("无发布时间")
- elif timestamp < self.cutoff_timestamp:
- issues.append(f"内容过时(超过{self.time_window_days}天)")
- score += 2
- else:
- score += 10
- return round(score, 2), issues
- def _calculate_grade(self, score: float) -> str:
- """计算等级"""
- if score >= 80:
- return "A"
- elif score >= 60:
- return "B"
- elif score >= 40:
- return "C"
- elif score >= 20:
- return "D"
- else:
- return "F"
- def evaluate_source_file(self, source_file: Path) -> Dict[str, any]:
- """
- 评估整个 source.json 文件
- Returns:
- {
- "total_sources": int,
- "grade_distribution": Dict[str, int], # A/B/C/D/F 的数量分布
- "avg_score": float,
- "low_quality_count": int, # C/D/F 的数量
- "low_quality_indices": List[int], # 低质量条目的索引
- "details": List[Dict], # 每条的详细评分
- }
- """
- with open(source_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- sources = data.get("sources", [])
- total = len(sources)
- grade_dist = {"A": 0, "B": 0, "C": 0, "D": 0, "F": 0}
- scores = []
- low_quality_indices = []
- details = []
- for idx, source in enumerate(sources):
- post = source.get("post", {})
- eval_result = self.evaluate_post(post)
- eval_result["index"] = idx
- eval_result["case_id"] = source.get("case_id", "")
- eval_result["platform"] = source.get("platform", "")
- grade_dist[eval_result["grade"]] += 1
- scores.append(eval_result["total_score"])
- details.append(eval_result)
- # C/D/F 视为低质量
- if eval_result["grade"] in ("C", "D", "F"):
- low_quality_indices.append(idx)
- avg_score = round(sum(scores) / total, 2) if total > 0 else 0.0
- return {
- "total_sources": total,
- "grade_distribution": grade_dist,
- "avg_score": avg_score,
- "low_quality_count": len(low_quality_indices),
- "low_quality_indices": low_quality_indices,
- "details": details,
- }
- def filter_low_quality(
- self, source_file: Path, output_file: Path, min_score: float = 40.0
- ) -> Dict[str, any]:
- """
- 过滤低质量内容,生成新的 source.json
- Args:
- source_file: 原始 source.json 路径
- output_file: 输出文件路径
- min_score: 最低分数阈值(默认40分,即C级以上)
- Returns:
- {
- "original_count": int,
- "filtered_count": int,
- "removed_count": int,
- "removed_cases": List[str], # 被移除的 case_id
- }
- """
- with open(source_file, "r", encoding="utf-8") as f:
- data = json.load(f)
- sources = data.get("sources", [])
- original_count = len(sources)
- filtered_sources = []
- removed_sources = []
- removed_cases = []
- for source in sources:
- post = source.get("post", {})
- eval_result = self.evaluate_post(post)
- if eval_result["total_score"] >= min_score:
- filtered_sources.append(source)
- else:
- source["filter_reason"] = f"完备性评分不足 (得分: {eval_result['total_score']} < {min_score})"
- removed_sources.append(source)
- removed_cases.append(source.get("case_id", "unknown"))
- # 将被过滤的数据保存到 filtered_cases.json
- if removed_sources:
- filtered_cases_file = output_file.parent / "filtered_cases.json"
- filtered_data = {"total": len(removed_sources), "sources": removed_sources}
- with open(filtered_cases_file, "w", encoding="utf-8") as f:
- json.dump(filtered_data, f, ensure_ascii=False, indent=2)
- # 更新数据
- data["sources"] = filtered_sources
- data["total"] = len(filtered_sources)
- data["quality_filter"] = {
- "min_score": min_score,
- "original_count": original_count,
- "filtered_count": len(filtered_sources),
- "removed_count": len(removed_cases),
- "filter_timestamp": datetime.now().isoformat(),
- }
- # 写入新文件
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- return {
- "original_count": original_count,
- "filtered_count": len(filtered_sources),
- "removed_count": len(removed_cases),
- "removed_cases": removed_cases,
- }
- def generate_quality_report(source_file: Path, output_report: Path = None):
- """生成质量评估报告"""
- evaluator = SourceQualityEvaluator()
- result = evaluator.evaluate_source_file(source_file)
- # 生成报告文本
- report_lines = [
- "=" * 60,
- f"Source 质量评估报告",
- f"文件:{source_file}",
- f"评估时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- "=" * 60,
- "",
- f"📊 总体统计",
- f" 总条目数:{result['total_sources']}",
- f" 平均得分:{result['avg_score']}/100",
- f" 低质量数:{result['low_quality_count']} ({result['low_quality_count']/result['total_sources']*100:.1f}%)",
- "",
- f"📈 等级分布",
- ]
- for grade in ["A", "B", "C", "D", "F"]:
- count = result["grade_distribution"][grade]
- pct = count / result["total_sources"] * 100 if result["total_sources"] > 0 else 0
- bar = "█" * int(pct / 2)
- report_lines.append(f" {grade}: {count:3d} ({pct:5.1f}%) {bar}")
- report_lines.extend([
- "",
- f"⚠️ 低质量条目详情 (C/D/F 级)",
- "",
- ])
- # 只显示低质量条目
- low_quality_details = [d for d in result["details"] if d["grade"] in ("C", "D", "F")]
- low_quality_details.sort(key=lambda x: x["total_score"])
- for detail in low_quality_details[:20]: # 最多显示20条
- report_lines.extend([
- f"[{detail['index']:3d}] {detail['case_id']} | 得分: {detail['total_score']}/100 ({detail['grade']})",
- f" 字段: {detail['field_score']}/40 ({detail['valid_fields']}/{detail['total_fields']}个有效)",
- f" 文本: {detail['text_score']}/40",
- f" 互动: {detail['engagement_score']}/20",
- ])
- if detail["issues"]:
- report_lines.append(f" 问题: {', '.join(detail['issues'])}")
- report_lines.append("")
- if len(low_quality_details) > 20:
- report_lines.append(f" ... 还有 {len(low_quality_details) - 20} 条低质量条目未显示")
- report_text = "\n".join(report_lines)
- # 输出到控制台
- print(report_text)
- # 保存到文件
- if output_report:
- with open(output_report, "w", encoding="utf-8") as f:
- f.write(report_text)
- print(f"\n报告已保存到:{output_report}")
- return result
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="评估 source.json 的质量")
- parser.add_argument("source_file", type=Path, help="source.json 文件路径")
- parser.add_argument("--report", type=Path, help="输出报告文件路径")
- parser.add_argument("--filter", type=Path, help="过滤后输出文件路径")
- parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值(默认40)")
- args = parser.parse_args()
- if not args.source_file.exists():
- print(f"错误:文件不存在 {args.source_file}")
- exit(1)
- # 生成评估报告
- result = generate_quality_report(args.source_file, args.report)
- # 如果指定了过滤输出
- if args.filter:
- evaluator = SourceQualityEvaluator()
- filter_result = evaluator.filter_low_quality(
- args.source_file, args.filter, args.min_score
- )
- print(f"\n🔍 质量过滤完成")
- print(f" 原始条目:{filter_result['original_count']}")
- print(f" 保留条目:{filter_result['filtered_count']}")
- print(f" 移除条目:{filter_result['removed_count']}")
- print(f" 输出文件:{args.filter}")
|