integrate_quality_check.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. """
  2. 将质量评估集成到 Pipeline 中
  3. 在 Phase 1.5 (source.json 生成后) 自动执行质量检查,
  4. 对低质量内容进行二次筛选或触发重新调研。
  5. """
  6. import json
  7. from pathlib import Path
  8. from typing import Dict, List
  9. from evaluate_source_quality import SourceQualityEvaluator, generate_quality_report
  10. def check_and_filter_source(
  11. source_file: Path,
  12. min_score: float = 40.0,
  13. min_pass_rate: float = 0.6,
  14. auto_filter: bool = True,
  15. ) -> Dict[str, any]:
  16. """
  17. 检查 source.json 质量并决定是否需要重新调研
  18. Args:
  19. source_file: source.json 路径
  20. min_score: 单条内容的最低分数阈值(默认40分,C级)
  21. min_pass_rate: 整体通过率阈值(默认60%)
  22. auto_filter: 是否自动过滤低质量内容
  23. Returns:
  24. {
  25. "status": str, # "pass" | "filtered" | "failed"
  26. "message": str,
  27. "stats": dict,
  28. "action": str, # "continue" | "rerun_research" | "manual_review"
  29. }
  30. """
  31. evaluator = SourceQualityEvaluator()
  32. # 1. 生成质量报告
  33. print(f"\n{'='*60}")
  34. print(f"📊 质量评估:{source_file.name}")
  35. print(f"{'='*60}")
  36. eval_result = evaluator.evaluate_source_file(source_file)
  37. total = eval_result["total_sources"]
  38. low_quality_count = eval_result["low_quality_count"]
  39. pass_count = total - low_quality_count
  40. pass_rate = pass_count / total if total > 0 else 0.0
  41. avg_score = eval_result["avg_score"]
  42. print(f"\n总条目数:{total}")
  43. print(f"平均得分:{avg_score:.1f}/100")
  44. print(f"通过率:{pass_rate*100:.1f}% ({pass_count}/{total})")
  45. print(f"\n等级分布:")
  46. for grade in ["A", "B", "C", "D", "F"]:
  47. count = eval_result["grade_distribution"][grade]
  48. pct = count / total * 100 if total > 0 else 0
  49. print(f" {grade}: {count:3d} ({pct:5.1f}%)")
  50. # 2. 判断质量是否达标
  51. result = {
  52. "status": "unknown",
  53. "message": "",
  54. "stats": {
  55. "total": total,
  56. "pass_count": pass_count,
  57. "pass_rate": pass_rate,
  58. "avg_score": avg_score,
  59. "grade_distribution": eval_result["grade_distribution"],
  60. },
  61. "action": "continue",
  62. }
  63. # 3. 决策逻辑
  64. if pass_rate >= min_pass_rate and avg_score >= 50:
  65. # 质量良好,直接通过
  66. result["status"] = "pass"
  67. result["message"] = f"✅ 质量达标(通过率 {pass_rate*100:.1f}%)"
  68. result["action"] = "continue"
  69. print(f"\n{result['message']}")
  70. elif pass_rate >= 0.4 and auto_filter:
  71. # 质量一般,自动过滤低质量内容
  72. print(f"\n⚠️ 质量一般(通过率 {pass_rate*100:.1f}%),执行自动过滤...")
  73. filtered_file = source_file.parent / "source_filtered.json"
  74. filter_result = evaluator.filter_low_quality(
  75. source_file, filtered_file, min_score
  76. )
  77. # 备份原文件
  78. backup_file = source_file.parent / "source_original.json"
  79. source_file.rename(backup_file)
  80. filtered_file.rename(source_file)
  81. result["status"] = "filtered"
  82. result["message"] = (
  83. f"🔍 已过滤 {filter_result['removed_count']} 条低质量内容\n"
  84. f" 保留:{filter_result['filtered_count']}/{filter_result['original_count']}\n"
  85. f" 原始文件备份至:{backup_file.name}"
  86. )
  87. result["action"] = "continue"
  88. result["stats"]["filtered_count"] = filter_result["filtered_count"]
  89. result["stats"]["removed_count"] = filter_result["removed_count"]
  90. print(f"\n{result['message']}")
  91. else:
  92. # 质量太差,建议重新调研
  93. result["status"] = "failed"
  94. result["message"] = (
  95. f"❌ 质量不达标(通过率 {pass_rate*100:.1f}%,平均分 {avg_score:.1f})\n"
  96. f" 建议:重新调研或人工审核"
  97. )
  98. result["action"] = "manual_review"
  99. print(f"\n{result['message']}")
  100. # 显示主要问题
  101. print(f"\n主要问题:")
  102. issue_summary = _summarize_issues(eval_result["details"])
  103. for issue, count in issue_summary.items():
  104. print(f" - {issue}: {count} 条")
  105. return result
  106. def _summarize_issues(details: List[Dict]) -> Dict[str, int]:
  107. """汇总常见问题"""
  108. issue_counts = {}
  109. for detail in details:
  110. for issue in detail.get("issues", []):
  111. issue_counts[issue] = issue_counts.get(issue, 0) + 1
  112. # 按频率排序
  113. return dict(sorted(issue_counts.items(), key=lambda x: x[1], reverse=True)[:5])
  114. def integrate_into_pipeline(output_dir: Path, min_score: float = 40.0):
  115. """
  116. 集成到 pipeline 的入口函数
  117. 在 Phase 1.5 之后调用,检查 source.json 质量
  118. """
  119. source_file = output_dir / "raw_cases" / "source.json"
  120. if not source_file.exists():
  121. print(f"⚠️ source.json 不存在,跳过质量检查")
  122. return {"status": "skip", "action": "continue"}
  123. # 执行质量检查
  124. result = check_and_filter_source(
  125. source_file,
  126. min_score=min_score,
  127. min_pass_rate=0.6,
  128. auto_filter=True,
  129. )
  130. # 保存质量报告
  131. report_file = output_dir / "quality_report.txt"
  132. generate_quality_report(source_file, report_file)
  133. print(f"\n📄 详细报告已保存:{report_file}")
  134. return result
  135. if __name__ == "__main__":
  136. import argparse
  137. parser = argparse.ArgumentParser(description="集成质量检查到 pipeline")
  138. parser.add_argument("output_dir", type=Path, help="输出目录(如 output/001)")
  139. parser.add_argument("--min-score", type=float, default=40.0, help="最低分数阈值")
  140. args = parser.parse_args()
  141. result = integrate_into_pipeline(args.output_dir, args.min_score)
  142. # 根据结果决定是否继续
  143. if result["action"] == "manual_review":
  144. print(f"\n⚠️ 需要人工介入,pipeline 暂停")
  145. exit(1)
  146. else:
  147. print(f"\n✅ 质量检查完成,pipeline 继续")
  148. exit(0)