""" 将质量评估集成到 Pipeline 中 在 Phase 1.5 (source.json 生成后) 自动执行质量检查, 对低质量内容进行二次筛选或触发重新调研。 """ import json from pathlib import Path from typing import Dict, List from evaluate_source_quality import SourceQualityEvaluator, generate_quality_report def check_and_filter_source( source_file: Path, min_score: float = 40.0, min_pass_rate: float = 0.6, auto_filter: bool = True, ) -> Dict[str, any]: """ 检查 source.json 质量并决定是否需要重新调研 Args: source_file: source.json 路径 min_score: 单条内容的最低分数阈值(默认40分,C级) min_pass_rate: 整体通过率阈值(默认60%) auto_filter: 是否自动过滤低质量内容 Returns: { "status": str, # "pass" | "filtered" | "failed" "message": str, "stats": dict, "action": str, # "continue" | "rerun_research" | "manual_review" } """ evaluator = SourceQualityEvaluator() # 1. 生成质量报告 print(f"\n{'='*60}") print(f"📊 质量评估:{source_file.name}") print(f"{'='*60}") eval_result = evaluator.evaluate_source_file(source_file) total = eval_result["total_sources"] low_quality_count = eval_result["low_quality_count"] pass_count = total - low_quality_count pass_rate = pass_count / total if total > 0 else 0.0 avg_score = eval_result["avg_score"] print(f"\n总条目数:{total}") print(f"平均得分:{avg_score:.1f}/100") print(f"通过率:{pass_rate*100:.1f}% ({pass_count}/{total})") print(f"\n等级分布:") for grade in ["A", "B", "C", "D", "F"]: count = eval_result["grade_distribution"][grade] pct = count / total * 100 if total > 0 else 0 print(f" {grade}: {count:3d} ({pct:5.1f}%)") # 2. 判断质量是否达标 result = { "status": "unknown", "message": "", "stats": { "total": total, "pass_count": pass_count, "pass_rate": pass_rate, "avg_score": avg_score, "grade_distribution": eval_result["grade_distribution"], }, "action": "continue", } # 3. 决策逻辑 if pass_rate >= min_pass_rate and avg_score >= 50: # 质量良好,直接通过 result["status"] = "pass" result["message"] = f"✅ 质量达标(通过率 {pass_rate*100:.1f}%)" result["action"] = "continue" print(f"\n{result['message']}") elif pass_rate >= 0.4 and auto_filter: # 质量一般,自动过滤低质量内容 print(f"\n⚠️ 质量一般(通过率 {pass_rate*100:.1f}%),执行自动过滤...") filtered_file = source_file.parent / "source_filtered.json" filter_result = evaluator.filter_low_quality( source_file, filtered_file, min_score ) # 备份原文件 backup_file = source_file.parent / "source_original.json" source_file.rename(backup_file) filtered_file.rename(source_file) result["status"] = "filtered" result["message"] = ( f"🔍 已过滤 {filter_result['removed_count']} 条低质量内容\n" f" 保留:{filter_result['filtered_count']}/{filter_result['original_count']}\n" f" 原始文件备份至:{backup_file.name}" ) result["action"] = "continue" result["stats"]["filtered_count"] = filter_result["filtered_count"] result["stats"]["removed_count"] = filter_result["removed_count"] print(f"\n{result['message']}") else: # 质量太差,建议重新调研 result["status"] = "failed" result["message"] = ( f"❌ 质量不达标(通过率 {pass_rate*100:.1f}%,平均分 {avg_score:.1f})\n" f" 建议:重新调研或人工审核" ) result["action"] = "manual_review" print(f"\n{result['message']}") # 显示主要问题 print(f"\n主要问题:") issue_summary = _summarize_issues(eval_result["details"]) for issue, count in issue_summary.items(): print(f" - {issue}: {count} 条") return result def _summarize_issues(details: List[Dict]) -> Dict[str, int]: """汇总常见问题""" issue_counts = {} for detail in details: for issue in detail.get("issues", []): issue_counts[issue] = issue_counts.get(issue, 0) + 1 # 按频率排序 return dict(sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)[:5]) def integrate_into_pipeline(output_dir: Path, min_score: float = 40.0): """ 集成到 pipeline 的入口函数 在 Phase 1.5 之后调用,检查 source.json 质量 """ source_file = output_dir / "raw_cases" / "source.json" if not source_file.exists(): print(f"⚠️ source.json 不存在,跳过质量检查") return {"status": "skip", "action": "continue"} # 执行质量检查 result = check_and_filter_source( source_file, min_score=min_score, min_pass_rate=0.6, auto_filter=True, ) # 保存质量报告 report_file = output_dir / "quality_report.txt" generate_quality_report(source_file, report_file) print(f"\n📄 详细报告已保存:{report_file}") return result if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="集成质量检查到 pipeline") parser.add_argument("output_dir", type=Path, help="输出目录(如 output/001)") parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值") args = parser.parse_args() result = integrate_into_pipeline(args.output_dir, args.min_score) # 根据结果决定是否继续 if result["action"] == "manual_review": print(f"\n⚠️ 需要人工介入,pipeline 暂停") exit(1) else: print(f"\n✅ 质量检查完成,pipeline 继续") exit(0)