| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- """
- 将质量评估集成到 Pipeline 中
- 在 Phase 1.5 (source.json 生成后) 自动执行质量检查,
- 对低质量内容进行二次筛选或触发重新调研。
- """
- import json
- from pathlib import Path
- from typing import Dict, List
- from evaluate_source_quality import SourceQualityEvaluator, generate_quality_report
- def check_and_filter_source(
- source_file: Path,
- min_score: float = 40.0,
- min_pass_rate: float = 0.6,
- auto_filter: bool = True,
- ) -> Dict[str, any]:
- """
- 检查 source.json 质量并决定是否需要重新调研
- Args:
- source_file: source.json 路径
- min_score: 单条内容的最低分数阈值(默认40分,C级)
- min_pass_rate: 整体通过率阈值(默认60%)
- auto_filter: 是否自动过滤低质量内容
- Returns:
- {
- "status": str, # "pass" | "filtered" | "failed"
- "message": str,
- "stats": dict,
- "action": str, # "continue" | "rerun_research" | "manual_review"
- }
- """
- evaluator = SourceQualityEvaluator()
- # 1. 生成质量报告
- print(f"\n{'='*60}")
- print(f"📊 质量评估:{source_file.name}")
- print(f"{'='*60}")
- eval_result = evaluator.evaluate_source_file(source_file)
- total = eval_result["total_sources"]
- low_quality_count = eval_result["low_quality_count"]
- pass_count = total - low_quality_count
- pass_rate = pass_count / total if total > 0 else 0.0
- avg_score = eval_result["avg_score"]
- print(f"\n总条目数:{total}")
- print(f"平均得分:{avg_score:.1f}/100")
- print(f"通过率:{pass_rate*100:.1f}% ({pass_count}/{total})")
- print(f"\n等级分布:")
- for grade in ["A", "B", "C", "D", "F"]:
- count = eval_result["grade_distribution"][grade]
- pct = count / total * 100 if total > 0 else 0
- print(f" {grade}: {count:3d} ({pct:5.1f}%)")
- # 2. 判断质量是否达标
- result = {
- "status": "unknown",
- "message": "",
- "stats": {
- "total": total,
- "pass_count": pass_count,
- "pass_rate": pass_rate,
- "avg_score": avg_score,
- "grade_distribution": eval_result["grade_distribution"],
- },
- "action": "continue",
- }
- # 3. 决策逻辑
- if pass_rate >= min_pass_rate and avg_score >= 50:
- # 质量良好,直接通过
- result["status"] = "pass"
- result["message"] = f"✅ 质量达标(通过率 {pass_rate*100:.1f}%)"
- result["action"] = "continue"
- print(f"\n{result['message']}")
- elif pass_rate >= 0.4 and auto_filter:
- # 质量一般,自动过滤低质量内容
- print(f"\n⚠️ 质量一般(通过率 {pass_rate*100:.1f}%),执行自动过滤...")
- filtered_file = source_file.parent / "source_filtered.json"
- filter_result = evaluator.filter_low_quality(
- source_file, filtered_file, min_score
- )
- # 备份原文件
- backup_file = source_file.parent / "source_original.json"
- source_file.rename(backup_file)
- filtered_file.rename(source_file)
- result["status"] = "filtered"
- result["message"] = (
- f"🔍 已过滤 {filter_result['removed_count']} 条低质量内容\n"
- f" 保留:{filter_result['filtered_count']}/{filter_result['original_count']}\n"
- f" 原始文件备份至:{backup_file.name}"
- )
- result["action"] = "continue"
- result["stats"]["filtered_count"] = filter_result["filtered_count"]
- result["stats"]["removed_count"] = filter_result["removed_count"]
- print(f"\n{result['message']}")
- else:
- # 质量太差,建议重新调研
- result["status"] = "failed"
- result["message"] = (
- f"❌ 质量不达标(通过率 {pass_rate*100:.1f}%,平均分 {avg_score:.1f})\n"
- f" 建议:重新调研或人工审核"
- )
- result["action"] = "manual_review"
- print(f"\n{result['message']}")
- # 显示主要问题
- print(f"\n主要问题:")
- issue_summary = _summarize_issues(eval_result["details"])
- for issue, count in issue_summary.items():
- print(f" - {issue}: {count} 条")
- return result
- def _summarize_issues(details: List[Dict]) -> Dict[str, int]:
- """汇总常见问题"""
- issue_counts = {}
- for detail in details:
- for issue in detail.get("issues", []):
- issue_counts[issue] = issue_counts.get(issue, 0) + 1
- # 按频率排序
- return dict(sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)[:5])
- def integrate_into_pipeline(output_dir: Path, min_score: float = 40.0):
- """
- 集成到 pipeline 的入口函数
- 在 Phase 1.5 之后调用,检查 source.json 质量
- """
- source_file = output_dir / "raw_cases" / "source.json"
- if not source_file.exists():
- print(f"⚠️ source.json 不存在,跳过质量检查")
- return {"status": "skip", "action": "continue"}
- # 执行质量检查
- result = check_and_filter_source(
- source_file,
- min_score=min_score,
- min_pass_rate=0.6,
- auto_filter=True,
- )
- # 保存质量报告
- report_file = output_dir / "quality_report.txt"
- generate_quality_report(source_file, report_file)
- print(f"\n📄 详细报告已保存:{report_file}")
- return result
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="集成质量检查到 pipeline")
- parser.add_argument("output_dir", type=Path, help="输出目录(如 output/001)")
- parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值")
- args = parser.parse_args()
- result = integrate_into_pipeline(args.output_dir, args.min_score)
- # 根据结果决定是否继续
- if result["action"] == "manual_review":
- print(f"\n⚠️ 需要人工介入,pipeline 暂停")
- exit(1)
- else:
- print(f"\n✅ 质量检查完成,pipeline 继续")
- exit(0)
|