validate_schema.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import json
  2. from pathlib import Path
  3. def check_keys(data, expected_keys, path_context=""):
  4. missing = [k for k in expected_keys if k not in data]
  5. if missing:
  6. return f"{path_context} missing keys: {missing}"
  7. return None
  8. def validate_case(data):
  9. if not isinstance(data, dict): return "Root is not a dict"
  10. err = check_keys(data, ["requirement", "cases"])
  11. if err: return err
  12. if not isinstance(data["cases"], list): return "'cases' is not a list"
  13. if len(data["cases"]) == 0:
  14. return "'cases' array is empty"
  15. for i, c in enumerate(data["cases"]):
  16. err = check_keys(c, ["id", "title", "platform", "source_url", "metrics", "user_feedback", "images", "input_details", "output_details", "workflow_process"], f"cases[{i}]")
  17. if err: return err
  18. if not isinstance(c.get("images", []), list): return f"cases[{i}].images must be a list"
  19. # 检查关键字段是否为空
  20. if not (c.get("title") or "").strip():
  21. return f"cases[{i}].title is empty"
  22. wp = c.get("workflow_process")
  23. if not wp or (isinstance(wp, str) and not wp.strip()) or (isinstance(wp, list) and len(wp) == 0):
  24. return f"cases[{i}].workflow_process is empty"
  25. return None
  26. def validate_blueprint(data):
  27. if not isinstance(data, dict): return "Root is not a dict"
  28. err = check_keys(data, ["requirement", "distilled_cases", "blueprints"])
  29. if err: return err
  30. if not isinstance(data["blueprints"], list): return "'blueprints' is not a list"
  31. if len(data["blueprints"]) == 0:
  32. return "'blueprints' array is empty"
  33. for i, bp in enumerate(data["blueprints"]):
  34. err = check_keys(bp, ["name", "phases", "reasoning"], f"blueprints[{i}]")
  35. if err: return err
  36. if not isinstance(bp.get("phases", []), list): return f"blueprints[{i}].phases must be a list"
  37. # 检查关键字段是否为空
  38. if not (bp.get("name") or "").strip():
  39. return f"blueprints[{i}].name is empty"
  40. if len(bp.get("phases", [])) == 0:
  41. return f"blueprints[{i}].phases array is empty"
  42. if not isinstance(data["distilled_cases"], list): return "'distilled_cases' is not a list"
  43. if len(data["distilled_cases"]) == 0:
  44. return "'distilled_cases' array is empty"
  45. for i, dc in enumerate(data["distilled_cases"]):
  46. err = check_keys(dc, ["id", "title", "source_url", "user_feedback", "workflow_process"], f"distilled_cases[{i}]")
  47. if err: return err
  48. # 检查关键字段是否为空
  49. if not (dc.get("title") or "").strip():
  50. return f"distilled_cases[{i}].title is empty"
  51. return None
  52. def validate_capabilities(data):
  53. if not isinstance(data, dict): return "Root is not a dict"
  54. err = check_keys(data, ["extracted_capabilities", "requirement"])
  55. if err: return err
  56. if not isinstance(data["extracted_capabilities"], list): return "'extracted_capabilities' is not a list"
  57. if len(data["extracted_capabilities"]) == 0:
  58. return "'extracted_capabilities' array is empty"
  59. for i, cap in enumerate(data["extracted_capabilities"]):
  60. err = check_keys(cap, ["id", "name", "description", "criterion", "effects", "implements", "is_new", "case_references"], f"extracted_capabilities[{i}]")
  61. if err: return err
  62. if not isinstance(cap.get("effects", []), list): return f"extracted_capabilities[{i}].effects must be a list"
  63. if not isinstance(cap.get("case_references", []), list): return f"extracted_capabilities[{i}].case_references must be a list"
  64. # 检查关键字段是否为空
  65. if not (cap.get("name") or "").strip():
  66. return f"extracted_capabilities[{i}].name is empty"
  67. if not (cap.get("description") or "").strip():
  68. return f"extracted_capabilities[{i}].description is empty"
  69. return None
  70. def validate_strategy(data):
  71. if not isinstance(data, dict): return "Root is not a dict"
  72. err = check_keys(data, ["requirement", "strategies", "uncovered_requirements"])
  73. if err: return err
  74. if not isinstance(data["strategies"], list): return "'strategies' is not a list"
  75. if len(data["strategies"]) == 0:
  76. return "'strategies' array is empty"
  77. for i, strat in enumerate(data["strategies"]):
  78. err = check_keys(strat, ["is_selected", "name", "source", "workflow_outline", "highlight_coverage", "baseline_coverage", "reasoning", "why_not", "could_switch_if", "coverage_score", "coverage_explanation"], f"strategies[{i}]")
  79. if err: return err
  80. # 检查关键字段是否为空
  81. if not (strat.get("name") or "").strip():
  82. return f"strategies[{i}].name is empty"
  83. # 只检查选中策略的 reasoning(非选中策略用 why_not 解释)
  84. if strat.get("is_selected") and not (strat.get("reasoning") or "").strip():
  85. return f"strategies[{i}].reasoning is empty (selected strategy must have reasoning)"
  86. if isinstance(strat.get("workflow_outline"), list):
  87. if len(strat["workflow_outline"]) == 0:
  88. return f"strategies[{i}].workflow_outline array is empty"
  89. for j, wo in enumerate(strat["workflow_outline"]):
  90. err = check_keys(wo, ["phase", "description", "capabilities"], f"strategies[{i}].workflow_outline[{j}]")
  91. if err: return err
  92. if not isinstance(wo.get("capabilities", []), list): return f"strategies[{i}].workflow_outline[{j}].capabilities must be a list"
  93. # 检查关键字段是否为空
  94. if not (wo.get("phase") or "").strip():
  95. return f"strategies[{i}].workflow_outline[{j}].phase is empty"
  96. if not (wo.get("description") or "").strip():
  97. return f"strategies[{i}].workflow_outline[{j}].description is empty"
  98. return None
  99. def check_missing_files(base_dir):
  100. """检查每个需求目录是否缺少必需的文件"""
  101. missing_files = []
  102. # 获取所有需求目录(格式为 001, 002, ...)
  103. req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()])
  104. for req_dir in req_dirs:
  105. req_id = req_dir.name
  106. # 检查必需的文件
  107. required_files = {
  108. "raw_cases": req_dir / "raw_cases",
  109. "blueprint.json": req_dir / "blueprint.json",
  110. "capabilities_extracted.json": req_dir / "capabilities_extracted.json",
  111. "strategy.json": req_dir / "strategy.json"
  112. }
  113. for file_name, file_path in required_files.items():
  114. if file_name == "raw_cases":
  115. # raw_cases 是目录,检查是否存在且至少有一个 case 文件
  116. if not file_path.exists():
  117. missing_files.append((req_id, f"raw_cases directory missing"))
  118. elif not list(file_path.glob("case_*.json")):
  119. missing_files.append((req_id, f"raw_cases directory exists but contains no case files"))
  120. else:
  121. # 其他是文件
  122. if not file_path.exists():
  123. missing_files.append((req_id, f"{file_name} missing"))
  124. return missing_files
  125. def main():
  126. base_dir = Path(__file__).parent.parent / "output"
  127. if not base_dir.exists():
  128. print(f"Error: {base_dir} does not exist.")
  129. return
  130. # 检查文件缺失
  131. print(f"[Start] Checking for missing files...")
  132. missing_files = check_missing_files(base_dir)
  133. if missing_files:
  134. print(f"[WARNING] Found {len(missing_files)} missing files:")
  135. for req_id, issue in missing_files:
  136. print(f" - REQ_{req_id}: {issue}")
  137. print("-" * 50)
  138. else:
  139. print("[OK] All required files are present.")
  140. print("-" * 50)
  141. # 检查 schema
  142. json_files = list(base_dir.rglob("*.json"))
  143. total_files = len(json_files)
  144. format_errors = []
  145. print(f"[Start] Validating schema for {total_files} JSON files...")
  146. for file_path in json_files:
  147. try:
  148. with open(file_path, "r", encoding="utf-8") as f:
  149. data = json.load(f)
  150. except Exception as e:
  151. format_errors.append((file_path, f"JSON Parsing Error: {e}"))
  152. continue
  153. filename = file_path.name
  154. rel_path = file_path.relative_to(base_dir.parent)
  155. err = None
  156. if filename.startswith("case_"):
  157. err = validate_case(data)
  158. elif filename == "blueprint.json":
  159. err = validate_blueprint(data)
  160. elif filename == "capabilities_extracted.json":
  161. err = validate_capabilities(data)
  162. elif filename == "strategy.json":
  163. err = validate_strategy(data)
  164. else:
  165. # Unknown json file
  166. pass
  167. if err:
  168. format_errors.append((rel_path, f"Schema mismatch: {err}"))
  169. report_path = Path(__file__).parent / "schema_errors_report.txt"
  170. print("-" * 50)
  171. with open(report_path, "w", encoding="utf-8") as out_f:
  172. if not format_errors:
  173. msg = f"[OK] All {total_files} JSON files match their expected schemas perfectly!"
  174. print(msg)
  175. out_f.write(msg + "\n")
  176. else:
  177. msg = f"[ERROR] Found {len(format_errors)} files with incorrect schemas/formats:"
  178. print(msg)
  179. out_f.write(msg + "\n")
  180. for path, error in format_errors:
  181. print(f" - {path}: {error}")
  182. out_f.write(f" - {path}: {error}\n")
  183. print("-" * 50)
  184. print(f"Schema error details saved to {report_path}")
  185. if __name__ == "__main__":
  186. main()