| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229 |
- import json
- from pathlib import Path
- def check_keys(data, expected_keys, path_context=""):
- missing = [k for k in expected_keys if k not in data]
- if missing:
- return f"{path_context} missing keys: {missing}"
- return None
- def validate_case(data):
- if not isinstance(data, dict): return "Root is not a dict"
- err = check_keys(data, ["requirement", "cases"])
- if err: return err
- if not isinstance(data["cases"], list): return "'cases' is not a list"
- if len(data["cases"]) == 0:
- return "'cases' array is empty"
- for i, c in enumerate(data["cases"]):
- err = check_keys(c, ["id", "title", "platform", "source_url", "metrics", "user_feedback", "images", "input_details", "output_details", "workflow_process"], f"cases[{i}]")
- if err: return err
- if not isinstance(c.get("images", []), list): return f"cases[{i}].images must be a list"
- # 检查关键字段是否为空
- if not (c.get("title") or "").strip():
- return f"cases[{i}].title is empty"
- wp = c.get("workflow_process")
- if not wp or (isinstance(wp, str) and not wp.strip()) or (isinstance(wp, list) and len(wp) == 0):
- return f"cases[{i}].workflow_process is empty"
- return None
- def validate_blueprint(data):
- if not isinstance(data, dict): return "Root is not a dict"
- err = check_keys(data, ["requirement", "distilled_cases", "blueprints"])
- if err: return err
- if not isinstance(data["blueprints"], list): return "'blueprints' is not a list"
- if len(data["blueprints"]) == 0:
- return "'blueprints' array is empty"
- for i, bp in enumerate(data["blueprints"]):
- err = check_keys(bp, ["name", "phases", "reasoning"], f"blueprints[{i}]")
- if err: return err
- if not isinstance(bp.get("phases", []), list): return f"blueprints[{i}].phases must be a list"
- # 检查关键字段是否为空
- if not (bp.get("name") or "").strip():
- return f"blueprints[{i}].name is empty"
- if len(bp.get("phases", [])) == 0:
- return f"blueprints[{i}].phases array is empty"
- if not isinstance(data["distilled_cases"], list): return "'distilled_cases' is not a list"
- if len(data["distilled_cases"]) == 0:
- return "'distilled_cases' array is empty"
- for i, dc in enumerate(data["distilled_cases"]):
- err = check_keys(dc, ["id", "title", "source_url", "user_feedback", "workflow_process"], f"distilled_cases[{i}]")
- if err: return err
- # 检查关键字段是否为空
- if not (dc.get("title") or "").strip():
- return f"distilled_cases[{i}].title is empty"
- return None
- def validate_capabilities(data):
- if not isinstance(data, dict): return "Root is not a dict"
- err = check_keys(data, ["extracted_capabilities", "requirement"])
- if err: return err
- if not isinstance(data["extracted_capabilities"], list): return "'extracted_capabilities' is not a list"
- if len(data["extracted_capabilities"]) == 0:
- return "'extracted_capabilities' array is empty"
- for i, cap in enumerate(data["extracted_capabilities"]):
- err = check_keys(cap, ["id", "name", "description", "criterion", "effects", "implements", "is_new", "case_references"], f"extracted_capabilities[{i}]")
- if err: return err
- if not isinstance(cap.get("effects", []), list): return f"extracted_capabilities[{i}].effects must be a list"
- if not isinstance(cap.get("case_references", []), list): return f"extracted_capabilities[{i}].case_references must be a list"
- # 检查关键字段是否为空
- if not (cap.get("name") or "").strip():
- return f"extracted_capabilities[{i}].name is empty"
- if not (cap.get("description") or "").strip():
- return f"extracted_capabilities[{i}].description is empty"
- return None
- def validate_strategy(data):
- if not isinstance(data, dict): return "Root is not a dict"
- err = check_keys(data, ["requirement", "strategies", "uncovered_requirements"])
- if err: return err
- if not isinstance(data["strategies"], list): return "'strategies' is not a list"
- if len(data["strategies"]) == 0:
- return "'strategies' array is empty"
- for i, strat in enumerate(data["strategies"]):
- err = check_keys(strat, ["is_selected", "name", "source", "workflow_outline", "highlight_coverage", "baseline_coverage", "reasoning", "why_not", "could_switch_if", "coverage_score", "coverage_explanation"], f"strategies[{i}]")
- if err: return err
- # 检查关键字段是否为空
- if not (strat.get("name") or "").strip():
- return f"strategies[{i}].name is empty"
- # 只检查选中策略的 reasoning(非选中策略用 why_not 解释)
- if strat.get("is_selected") and not (strat.get("reasoning") or "").strip():
- return f"strategies[{i}].reasoning is empty (selected strategy must have reasoning)"
- if isinstance(strat.get("workflow_outline"), list):
- if len(strat["workflow_outline"]) == 0:
- return f"strategies[{i}].workflow_outline array is empty"
- for j, wo in enumerate(strat["workflow_outline"]):
- err = check_keys(wo, ["phase", "description", "capabilities"], f"strategies[{i}].workflow_outline[{j}]")
- if err: return err
- if not isinstance(wo.get("capabilities", []), list): return f"strategies[{i}].workflow_outline[{j}].capabilities must be a list"
- # 检查关键字段是否为空
- if not (wo.get("phase") or "").strip():
- return f"strategies[{i}].workflow_outline[{j}].phase is empty"
- if not (wo.get("description") or "").strip():
- return f"strategies[{i}].workflow_outline[{j}].description is empty"
- return None
- def check_missing_files(base_dir):
- """检查每个需求目录是否缺少必需的文件"""
- missing_files = []
- # 获取所有需求目录(格式为 001, 002, ...)
- req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()])
- for req_dir in req_dirs:
- req_id = req_dir.name
- # 检查必需的文件
- required_files = {
- "raw_cases": req_dir / "raw_cases",
- "blueprint.json": req_dir / "blueprint.json",
- "capabilities_extracted.json": req_dir / "capabilities_extracted.json",
- "strategy.json": req_dir / "strategy.json"
- }
- for file_name, file_path in required_files.items():
- if file_name == "raw_cases":
- # raw_cases 是目录,检查是否存在且至少有一个 case 文件
- if not file_path.exists():
- missing_files.append((req_id, f"raw_cases directory missing"))
- elif not list(file_path.glob("case_*.json")):
- missing_files.append((req_id, f"raw_cases directory exists but contains no case files"))
- else:
- # 其他是文件
- if not file_path.exists():
- missing_files.append((req_id, f"{file_name} missing"))
- return missing_files
- def main():
- base_dir = Path(__file__).parent.parent / "output"
- if not base_dir.exists():
- print(f"Error: {base_dir} does not exist.")
- return
- # 检查文件缺失
- print(f"[Start] Checking for missing files...")
- missing_files = check_missing_files(base_dir)
- if missing_files:
- print(f"[WARNING] Found {len(missing_files)} missing files:")
- for req_id, issue in missing_files:
- print(f" - REQ_{req_id}: {issue}")
- print("-" * 50)
- else:
- print("[OK] All required files are present.")
- print("-" * 50)
- # 检查 schema
- json_files = list(base_dir.rglob("*.json"))
- total_files = len(json_files)
- format_errors = []
- print(f"[Start] Validating schema for {total_files} JSON files...")
- for file_path in json_files:
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- except Exception as e:
- format_errors.append((file_path, f"JSON Parsing Error: {e}"))
- continue
-
- filename = file_path.name
- rel_path = file_path.relative_to(base_dir.parent)
-
- err = None
- if filename.startswith("case_"):
- err = validate_case(data)
- elif filename == "blueprint.json":
- err = validate_blueprint(data)
- elif filename == "capabilities_extracted.json":
- err = validate_capabilities(data)
- elif filename == "strategy.json":
- err = validate_strategy(data)
- else:
- # Unknown json file
- pass
-
- if err:
- format_errors.append((rel_path, f"Schema mismatch: {err}"))
- report_path = Path(__file__).parent / "schema_errors_report.txt"
- print("-" * 50)
- with open(report_path, "w", encoding="utf-8") as out_f:
- if not format_errors:
- msg = f"[OK] All {total_files} JSON files match their expected schemas perfectly!"
- print(msg)
- out_f.write(msg + "\n")
- else:
- msg = f"[ERROR] Found {len(format_errors)} files with incorrect schemas/formats:"
- print(msg)
- out_f.write(msg + "\n")
- for path, error in format_errors:
- print(f" - {path}: {error}")
- out_f.write(f" - {path}: {error}\n")
- print("-" * 50)
- print(f"Schema error details saved to {report_path}")
- if __name__ == "__main__":
- main()
|