validate_schema.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import json
  2. from pathlib import Path
  3. def check_keys(data, expected_keys, path_context=""):
  4. missing = [k for k in expected_keys if k not in data]
  5. if missing:
  6. return f"{path_context} missing keys: {missing}"
  7. return None
  8. def validate_case(data):
  9. if not isinstance(data, dict): return "Root is not a dict"
  10. err = check_keys(data, ["requirement", "cases"])
  11. if err: return err
  12. if not isinstance(data["cases"], list): return "'cases' is not a list"
  13. if len(data["cases"]) == 0:
  14. return "'cases' array is empty"
  15. for i, c in enumerate(data["cases"]):
  16. err = check_keys(c, ["id", "title", "platform", "source_url", "metrics", "user_feedback", "images", "input_details", "output_details", "workflow_process"], f"cases[{i}]")
  17. if err: return err
  18. if not isinstance(c.get("images", []), list): return f"cases[{i}].images must be a list"
  19. # 检查关键字段是否为空
  20. if not (c.get("title") or "").strip():
  21. return f"cases[{i}].title is empty"
  22. wp = c.get("workflow_process")
  23. if not wp or (isinstance(wp, str) and not wp.strip()) or (isinstance(wp, list) and len(wp) == 0):
  24. return f"cases[{i}].workflow_process is empty"
  25. return None
  26. def validate_blueprint(data):
  27. if not isinstance(data, dict): return "Root is not a dict"
  28. err = check_keys(data, ["requirement", "distilled_cases", "blueprints"])
  29. if err: return err
  30. if not isinstance(data["blueprints"], list): return "'blueprints' is not a list"
  31. if len(data["blueprints"]) == 0:
  32. return "'blueprints' array is empty"
  33. for i, bp in enumerate(data["blueprints"]):
  34. err = check_keys(bp, ["name", "phases", "reasoning"], f"blueprints[{i}]")
  35. if err: return err
  36. if not isinstance(bp.get("phases", []), list): return f"blueprints[{i}].phases must be a list"
  37. # 检查关键字段是否为空
  38. if not (bp.get("name") or "").strip():
  39. return f"blueprints[{i}].name is empty"
  40. if len(bp.get("phases", [])) == 0:
  41. return f"blueprints[{i}].phases array is empty"
  42. if not isinstance(data["distilled_cases"], list): return "'distilled_cases' is not a list"
  43. if len(data["distilled_cases"]) == 0:
  44. return "'distilled_cases' array is empty"
  45. for i, dc in enumerate(data["distilled_cases"]):
  46. err = check_keys(dc, ["id", "title", "source_url", "user_feedback", "workflow_process"], f"distilled_cases[{i}]")
  47. if err: return err
  48. # 检查关键字段是否为空
  49. if not (dc.get("title") or "").strip():
  50. return f"distilled_cases[{i}].title is empty"
  51. return None
  52. def validate_capabilities(data):
  53. if not isinstance(data, dict): return "Root is not a dict"
  54. err = check_keys(data, ["extracted_capabilities", "requirement"])
  55. if err: return err
  56. if not isinstance(data["extracted_capabilities"], list): return "'extracted_capabilities' is not a list"
  57. if len(data["extracted_capabilities"]) == 0:
  58. return "'extracted_capabilities' array is empty"
  59. for i, cap in enumerate(data["extracted_capabilities"]):
  60. err = check_keys(cap, ["id", "name", "description", "criterion", "effects", "implements", "is_new", "case_references"], f"extracted_capabilities[{i}]")
  61. if err: return err
  62. if not isinstance(cap.get("effects", []), list): return f"extracted_capabilities[{i}].effects must be a list"
  63. if not isinstance(cap.get("case_references", []), list): return f"extracted_capabilities[{i}].case_references must be a list"
  64. # 检查关键字段是否为空
  65. if not (cap.get("name") or "").strip():
  66. return f"extracted_capabilities[{i}].name is empty"
  67. if not (cap.get("description") or "").strip():
  68. return f"extracted_capabilities[{i}].description is empty"
  69. return None
  70. def validate_strategy(data):
  71. if not isinstance(data, dict): return "Root is not a dict"
  72. err = check_keys(data, ["requirement", "strategies", "uncovered_requirements"])
  73. if err: return err
  74. if not isinstance(data["strategies"], list): return "'strategies' is not a list"
  75. if len(data["strategies"]) == 0:
  76. return "'strategies' array is empty"
  77. for i, strat in enumerate(data["strategies"]):
  78. err = check_keys(strat, ["is_selected", "name", "source", "workflow_outline", "highlight_coverage", "baseline_coverage", "reasoning", "why_not", "could_switch_if", "coverage_score", "coverage_explanation"], f"strategies[{i}]")
  79. if err: return err
  80. # 检查关键字段是否为空
  81. if not (strat.get("name") or "").strip():
  82. return f"strategies[{i}].name is empty"
  83. if not (strat.get("reasoning") or "").strip():
  84. return f"strategies[{i}].reasoning is empty"
  85. if isinstance(strat.get("workflow_outline"), list):
  86. if len(strat["workflow_outline"]) == 0:
  87. return f"strategies[{i}].workflow_outline array is empty"
  88. for j, wo in enumerate(strat["workflow_outline"]):
  89. err = check_keys(wo, ["phase", "description", "capabilities"], f"strategies[{i}].workflow_outline[{j}]")
  90. if err: return err
  91. if not isinstance(wo.get("capabilities", []), list): return f"strategies[{i}].workflow_outline[{j}].capabilities must be a list"
  92. # 检查关键字段是否为空
  93. if not (wo.get("phase") or "").strip():
  94. return f"strategies[{i}].workflow_outline[{j}].phase is empty"
  95. if not (wo.get("description") or "").strip():
  96. return f"strategies[{i}].workflow_outline[{j}].description is empty"
  97. return None
  98. def check_missing_files(base_dir):
  99. """检查每个需求目录是否缺少必需的文件"""
  100. missing_files = []
  101. # 获取所有需求目录(格式为 001, 002, ...)
  102. req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()])
  103. for req_dir in req_dirs:
  104. req_id = req_dir.name
  105. # 检查必需的文件
  106. required_files = {
  107. "raw_cases": req_dir / "raw_cases",
  108. "blueprint.json": req_dir / "blueprint.json",
  109. "capabilities_extracted.json": req_dir / "capabilities_extracted.json",
  110. "strategy.json": req_dir / "strategy.json"
  111. }
  112. for file_name, file_path in required_files.items():
  113. if file_name == "raw_cases":
  114. # raw_cases 是目录,检查是否存在且至少有一个 case 文件
  115. if not file_path.exists():
  116. missing_files.append((req_id, f"raw_cases directory missing"))
  117. elif not list(file_path.glob("case_*.json")):
  118. missing_files.append((req_id, f"raw_cases directory exists but contains no case files"))
  119. else:
  120. # 其他是文件
  121. if not file_path.exists():
  122. missing_files.append((req_id, f"{file_name} missing"))
  123. return missing_files
  124. def main():
  125. base_dir = Path(__file__).parent.parent / "output"
  126. if not base_dir.exists():
  127. print(f"Error: {base_dir} does not exist.")
  128. return
  129. # 检查文件缺失
  130. print(f"[Start] Checking for missing files...")
  131. missing_files = check_missing_files(base_dir)
  132. if missing_files:
  133. print(f"[WARNING] Found {len(missing_files)} missing files:")
  134. for req_id, issue in missing_files:
  135. print(f" - REQ_{req_id}: {issue}")
  136. print("-" * 50)
  137. else:
  138. print("[OK] All required files are present.")
  139. print("-" * 50)
  140. # 检查 schema
  141. json_files = list(base_dir.rglob("*.json"))
  142. total_files = len(json_files)
  143. format_errors = []
  144. print(f"[Start] Validating schema for {total_files} JSON files...")
  145. for file_path in json_files:
  146. try:
  147. with open(file_path, "r", encoding="utf-8") as f:
  148. data = json.load(f)
  149. except Exception as e:
  150. format_errors.append((file_path, f"JSON Parsing Error: {e}"))
  151. continue
  152. filename = file_path.name
  153. rel_path = file_path.relative_to(base_dir.parent)
  154. err = None
  155. if filename.startswith("case_"):
  156. err = validate_case(data)
  157. elif filename == "blueprint.json":
  158. err = validate_blueprint(data)
  159. elif filename == "capabilities_extracted.json":
  160. err = validate_capabilities(data)
  161. elif filename == "strategy.json":
  162. err = validate_strategy(data)
  163. else:
  164. # Unknown json file
  165. pass
  166. if err:
  167. format_errors.append((rel_path, f"Schema mismatch: {err}"))
  168. report_path = Path(__file__).parent / "schema_errors_report.txt"
  169. print("-" * 50)
  170. with open(report_path, "w", encoding="utf-8") as out_f:
  171. if not format_errors:
  172. msg = f"[OK] All {total_files} JSON files match their expected schemas perfectly!"
  173. print(msg)
  174. out_f.write(msg + "\n")
  175. else:
  176. msg = f"[ERROR] Found {len(format_errors)} files with incorrect schemas/formats:"
  177. print(msg)
  178. out_f.write(msg + "\n")
  179. for path, error in format_errors:
  180. print(f" - {path}: {error}")
  181. out_f.write(f" - {path}: {error}\n")
  182. print("-" * 50)
  183. print(f"Schema error details saved to {report_path}")
  184. if __name__ == "__main__":
  185. main()