import json from pathlib import Path def check_keys(data, expected_keys, path_context=""): missing = [k for k in expected_keys if k not in data] if missing: return f"{path_context} missing keys: {missing}" return None def validate_case(data): if not isinstance(data, dict): return "Root is not a dict" err = check_keys(data, ["requirement", "cases"]) if err: return err if not isinstance(data["cases"], list): return "'cases' is not a list" if len(data["cases"]) == 0: return "'cases' array is empty" for i, c in enumerate(data["cases"]): err = check_keys(c, ["id", "title", "platform", "source_url", "metrics", "user_feedback", "images", "input_details", "output_details", "workflow_process"], f"cases[{i}]") if err: return err if not isinstance(c.get("images", []), list): return f"cases[{i}].images must be a list" # 检查关键字段是否为空 if not (c.get("title") or "").strip(): return f"cases[{i}].title is empty" wp = c.get("workflow_process") if not wp or (isinstance(wp, str) and not wp.strip()) or (isinstance(wp, list) and len(wp) == 0): return f"cases[{i}].workflow_process is empty" return None def validate_blueprint(data): if not isinstance(data, dict): return "Root is not a dict" err = check_keys(data, ["requirement", "distilled_cases", "blueprints"]) if err: return err if not isinstance(data["blueprints"], list): return "'blueprints' is not a list" if len(data["blueprints"]) == 0: return "'blueprints' array is empty" for i, bp in enumerate(data["blueprints"]): err = check_keys(bp, ["name", "phases", "reasoning"], f"blueprints[{i}]") if err: return err if not isinstance(bp.get("phases", []), list): return f"blueprints[{i}].phases must be a list" # 检查关键字段是否为空 if not (bp.get("name") or "").strip(): return f"blueprints[{i}].name is empty" if len(bp.get("phases", [])) == 0: return f"blueprints[{i}].phases array is empty" if not isinstance(data["distilled_cases"], list): return "'distilled_cases' is not a list" if len(data["distilled_cases"]) == 0: return "'distilled_cases' array is empty" for i, dc in enumerate(data["distilled_cases"]): err = check_keys(dc, ["id", "title", "source_url", "user_feedback", "workflow_process"], f"distilled_cases[{i}]") if err: return err # 检查关键字段是否为空 if not (dc.get("title") or "").strip(): return f"distilled_cases[{i}].title is empty" return None def validate_capabilities(data): if not isinstance(data, dict): return "Root is not a dict" err = check_keys(data, ["extracted_capabilities", "requirement"]) if err: return err if not isinstance(data["extracted_capabilities"], list): return "'extracted_capabilities' is not a list" if len(data["extracted_capabilities"]) == 0: return "'extracted_capabilities' array is empty" for i, cap in enumerate(data["extracted_capabilities"]): err = check_keys(cap, ["id", "name", "description", "criterion", "effects", "implements", "is_new", "case_references"], f"extracted_capabilities[{i}]") if err: return err if not isinstance(cap.get("effects", []), list): return f"extracted_capabilities[{i}].effects must be a list" if not isinstance(cap.get("case_references", []), list): return f"extracted_capabilities[{i}].case_references must be a list" # 检查关键字段是否为空 if not (cap.get("name") or "").strip(): return f"extracted_capabilities[{i}].name is empty" if not (cap.get("description") or "").strip(): return f"extracted_capabilities[{i}].description is empty" return None def validate_strategy(data): if not isinstance(data, dict): return "Root is not a dict" err = check_keys(data, ["requirement", "strategies", "uncovered_requirements"]) if err: return err if not isinstance(data["strategies"], list): return "'strategies' is not a list" if len(data["strategies"]) == 0: return "'strategies' array is empty" for i, strat in enumerate(data["strategies"]): err = check_keys(strat, ["is_selected", "name", "source", "workflow_outline", "highlight_coverage", "baseline_coverage", "reasoning", "why_not", "could_switch_if", "coverage_score", "coverage_explanation"], f"strategies[{i}]") if err: return err # 检查关键字段是否为空 if not (strat.get("name") or "").strip(): return f"strategies[{i}].name is empty" # 只检查选中策略的 reasoning(非选中策略用 why_not 解释) if strat.get("is_selected") and not (strat.get("reasoning") or "").strip(): return f"strategies[{i}].reasoning is empty (selected strategy must have reasoning)" if isinstance(strat.get("workflow_outline"), list): if len(strat["workflow_outline"]) == 0: return f"strategies[{i}].workflow_outline array is empty" for j, wo in enumerate(strat["workflow_outline"]): err = check_keys(wo, ["phase", "description", "capabilities"], f"strategies[{i}].workflow_outline[{j}]") if err: return err if not isinstance(wo.get("capabilities", []), list): return f"strategies[{i}].workflow_outline[{j}].capabilities must be a list" # 检查关键字段是否为空 if not (wo.get("phase") or "").strip(): return f"strategies[{i}].workflow_outline[{j}].phase is empty" if not (wo.get("description") or "").strip(): return f"strategies[{i}].workflow_outline[{j}].description is empty" return None def check_missing_files(base_dir): """检查每个需求目录是否缺少必需的文件""" missing_files = [] # 获取所有需求目录(格式为 001, 002, ...) req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()]) for req_dir in req_dirs: req_id = req_dir.name # 检查必需的文件 required_files = { "raw_cases": req_dir / "raw_cases", "blueprint.json": req_dir / "blueprint.json", "capabilities_extracted.json": req_dir / "capabilities_extracted.json", "strategy.json": req_dir / "strategy.json" } for file_name, file_path in required_files.items(): if file_name == "raw_cases": # raw_cases 是目录,检查是否存在且至少有一个 case 文件 if not file_path.exists(): missing_files.append((req_id, f"raw_cases directory missing")) elif not list(file_path.glob("case_*.json")): missing_files.append((req_id, f"raw_cases directory exists but contains no case files")) else: # 其他是文件 if not file_path.exists(): missing_files.append((req_id, f"{file_name} missing")) return missing_files def main(): base_dir = Path(__file__).parent.parent / "output" if not base_dir.exists(): print(f"Error: {base_dir} does not exist.") return # 检查文件缺失 print(f"[Start] Checking for missing files...") missing_files = check_missing_files(base_dir) if missing_files: print(f"[WARNING] Found {len(missing_files)} missing files:") for req_id, issue in missing_files: print(f" - REQ_{req_id}: {issue}") print("-" * 50) else: print("[OK] All required files are present.") print("-" * 50) # 检查 schema json_files = list(base_dir.rglob("*.json")) total_files = len(json_files) format_errors = [] print(f"[Start] Validating schema for {total_files} JSON files...") for file_path in json_files: try: with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) except Exception as e: format_errors.append((file_path, f"JSON Parsing Error: {e}")) continue filename = file_path.name rel_path = file_path.relative_to(base_dir.parent) err = None if filename.startswith("case_"): err = validate_case(data) elif filename == "blueprint.json": err = validate_blueprint(data) elif filename == "capabilities_extracted.json": err = validate_capabilities(data) elif filename == "strategy.json": err = validate_strategy(data) else: # Unknown json file pass if err: format_errors.append((rel_path, f"Schema mismatch: {err}")) report_path = Path(__file__).parent / "schema_errors_report.txt" print("-" * 50) with open(report_path, "w", encoding="utf-8") as out_f: if not format_errors: msg = f"[OK] All {total_files} JSON files match their expected schemas perfectly!" print(msg) out_f.write(msg + "\n") else: msg = f"[ERROR] Found {len(format_errors)} files with incorrect schemas/formats:" print(msg) out_f.write(msg + "\n") for path, error in format_errors: print(f" - {path}: {error}") out_f.write(f" - {path}: {error}\n") print("-" * 50) print(f"Schema error details saved to {report_path}") if __name__ == "__main__": main()