validate_schema.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. """
  2. Pipeline 输出验证脚本(Schema 驱动)
  3. 所有结构验证通过 .schema.json 文件驱动,不硬编码字段名。
  4. 非结构性检查(去重、计数一致性、引用完整性)拆到 validate_invariants 中。
  5. 文件名 → schema 映射:
  6. case_*.json → researcher.schema.json
  7. source.json → source.schema.json
  8. case_detailed.json → case_detailed.schema.json
  9. blueprint_temp.json → process_cluster.schema.json
  10. process.json → process.schema.json
  11. capabilities_temp.json → extract_capabilities.schema.json
  12. capabilities.json → capabilities.schema.json
  13. capabilities_extracted → capabilities_extracted.schema.json
  14. strategy.json → assemble_strategy.schema.json
  15. """
  16. import json
  17. from pathlib import Path
  18. import re
  19. import argparse
  20. from typing import Dict, List, Optional, Set, Tuple
  21. from .schema_manager import validate_with_schema, get_schema_manager
  22. VALID_PLATFORMS = {"xhs", "youtube", "bili", "x", "zhihu", "gzh"}
  23. # ── 文件名 → schema 名映射 ──────────────────────────────
  24. FILENAME_SCHEMA_MAP = {
  25. "source.json": "source",
  26. "case_detailed.json": "case_detailed",
  27. "blueprint_temp.json": "process_cluster",
  28. "process.json": "process",
  29. "capabilities_temp.json": "extract_capabilities",
  30. "capabilities.json": "capabilities",
  31. "capabilities_extracted.json": "capabilities_extracted",
  32. "strategy.json": "assemble_strategy",
  33. }
  34. def _resolve_schema_name(filename: str) -> Optional[str]:
  35. """根据文件名解析对应的 schema 名称"""
  36. if filename in FILENAME_SCHEMA_MAP:
  37. return FILENAME_SCHEMA_MAP[filename]
  38. if filename.startswith("case_") and filename.endswith(".json"):
  39. return "researcher"
  40. return None
  41. # ── Schema 驱动的验证函数 ──────────────────────────────
  42. # 每个函数只做一件事:调用 validate_with_schema
  43. # 公共签名保持不变:validate_X(data) -> Optional[str]
  44. def validate_case(data):
  45. """验证 case_*.json(Phase 1 输出)"""
  46. return validate_with_schema(data, "researcher")
  47. def validate_source(data):
  48. """验证 source.json(Phase 1.5 输出)"""
  49. return validate_with_schema(data, "source")
  50. def validate_case_detailed(data):
  51. """验证 case_detailed.json(Phase 1.6 输出)"""
  52. return validate_with_schema(data, "case_detailed")
  53. def validate_blueprint_temp(data):
  54. """验证 blueprint_temp.json(Phase 2.1.1 输出)"""
  55. return validate_with_schema(data, "process_cluster")
  56. def validate_process(data):
  57. """验证 process.json(Phase 2.1.2 输出)"""
  58. return validate_with_schema(data, "process")
  59. def validate_blueprint(data):
  60. """[Legacy] blueprint.json 已废弃,保留函数签名兼容旧调用"""
  61. return None
  62. def validate_capabilities_temp(data):
  63. """验证 capabilities_temp.json(Phase 2.2.1 输出)"""
  64. return validate_with_schema(data, "extract_capabilities")
  65. def validate_capabilities_enriched(data):
  66. """验证 capabilities.json(Phase 2.2.2 输出)"""
  67. return validate_with_schema(data, "capabilities")
  68. def validate_capabilities(data):
  69. """验证 capabilities_extracted.json(Phase 2 输出)"""
  70. return validate_with_schema(data, "capabilities_extracted")
  71. def validate_strategy(data):
  72. """验证 strategy.json(Phase 3 输出)"""
  73. return validate_with_schema(data, "assemble_strategy")
  74. # ── 非结构性检查(invariants) ──────────────────────────
  75. def is_valid_case_id(case_id: str) -> bool:
  76. """检查 case_id 是否为 {platform}_{content_id} 格式"""
  77. if not case_id or "_" not in case_id:
  78. return False
  79. platform = case_id.split("_", 1)[0]
  80. return platform in VALID_PLATFORMS
  81. def validate_invariants_source(data) -> Optional[str]:
  82. """source.json 的非结构性检查:去重、total 一致性"""
  83. sources = data.get("sources", [])
  84. seen_ids: Set[str] = set()
  85. for i, src in enumerate(sources):
  86. p = src.get("platform", "")
  87. cid = src.get("channel_content_id", "")
  88. if p and cid:
  89. dedup_key = f"{p}_{cid}"
  90. if dedup_key in seen_ids:
  91. return f"sources[{i}] duplicate: {dedup_key}"
  92. seen_ids.add(dedup_key)
  93. total = data.get("total")
  94. if total is not None and total != len(sources):
  95. return f"total ({total}) != len(sources) ({len(sources)})"
  96. return None
  97. def validate_invariants_case_detailed(data) -> Optional[str]:
  98. """case_detailed.json 的非结构性检查:去重、计数一致性"""
  99. cases = data.get("cases", [])
  100. seen_ids: Set[str] = set()
  101. success_count = 0
  102. for i, c in enumerate(cases):
  103. p = c.get("platform", "")
  104. cid = c.get("channel_content_id", "")
  105. if p and cid:
  106. dedup_key = f"{p}_{cid}"
  107. if dedup_key in seen_ids:
  108. return f"cases[{i}] duplicate: {dedup_key}"
  109. seen_ids.add(dedup_key)
  110. if c.get("workflow") is not None:
  111. success_count += 1
  112. total = data.get("total")
  113. if total is not None and total != len(cases):
  114. return f"total ({total}) != len(cases) ({len(cases)})"
  115. success = data.get("success")
  116. if success is not None and success != success_count:
  117. return f"success ({success}) != actual success count ({success_count})"
  118. return None
  119. # ── 跨文件引用检查 ──────────────────────────────────────
  120. def collect_valid_case_ids(raw_cases_dir: Path) -> Set[str]:
  121. """从 source.json 和 case_*.json 收集所有有效的 case_id"""
  122. valid_ids = set()
  123. source_file = raw_cases_dir / "source.json"
  124. if source_file.exists():
  125. try:
  126. with open(source_file, "r", encoding="utf-8") as f:
  127. data = json.load(f)
  128. for src in data.get("sources", []):
  129. p = src.get("platform")
  130. cid = src.get("channel_content_id")
  131. if p and cid:
  132. valid_ids.add(f"{p}_{cid}")
  133. case_id = src.get("case_id")
  134. if case_id:
  135. valid_ids.add(case_id)
  136. except Exception:
  137. pass
  138. for case_file in raw_cases_dir.glob("case_*.json"):
  139. if case_file.name in ("case_detailed.json",):
  140. continue
  141. try:
  142. with open(case_file, "r", encoding="utf-8") as f:
  143. data = json.load(f)
  144. for c in data.get("cases", []):
  145. case_id = c.get("case_id")
  146. if case_id:
  147. valid_ids.add(case_id)
  148. except Exception:
  149. pass
  150. return valid_ids
  151. def check_referential_integrity(req_dir: Path) -> List[Tuple[Path, str]]:
  152. """检查跨文件的引用一致性"""
  153. errors = []
  154. for filename in ["blueprint.json", "capabilities_extracted.json", "strategy.json"]:
  155. file_path = req_dir / filename
  156. if not file_path.exists():
  157. continue
  158. try:
  159. with open(file_path, "r", encoding="utf-8") as f:
  160. content = f.read()
  161. legacy_refs = re.findall(r'\bcase_\d{3}\b', content)
  162. for ref in set(legacy_refs):
  163. errors.append((file_path, f"Legacy reference: {ref} (should use {{platform}}_{{content_id}} format)"))
  164. except Exception:
  165. pass
  166. return errors
  167. # ── 文件缺失检查 ──────────────────────────────────────
  168. def check_missing_files(base_dir: Path) -> List[Tuple[str, str]]:
  169. """检查每个需求目录是否缺少必需的文件"""
  170. missing_files = []
  171. req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()])
  172. for req_dir in req_dirs:
  173. req_id = req_dir.name
  174. required = {
  175. "raw_cases": req_dir / "raw_cases",
  176. "blueprint.json": req_dir / "blueprint.json",
  177. "capabilities_extracted.json": req_dir / "capabilities_extracted.json",
  178. "strategy.json": req_dir / "strategy.json"
  179. }
  180. for file_name, file_path in required.items():
  181. if file_name == "raw_cases":
  182. if not file_path.exists():
  183. missing_files.append((req_id, "raw_cases directory missing"))
  184. elif not list(file_path.glob("case_*.json")):
  185. missing_files.append((req_id, "raw_cases has no case files"))
  186. else:
  187. if not file_path.exists():
  188. missing_files.append((req_id, f"{file_name} missing"))
  189. return missing_files
  190. # ── 主入口 ──────────────────────────────────────
  191. def main():
  192. parser = argparse.ArgumentParser()
  193. parser.add_argument("--dir", default="output", help="Directory to validate")
  194. args = parser.parse_args()
  195. base_dir = Path(__file__).parent.parent / args.dir
  196. if not base_dir.exists():
  197. print(f"Error: {base_dir} does not exist.")
  198. return
  199. print(f"[Start] Checking for missing files...")
  200. missing_files = check_missing_files(base_dir)
  201. if missing_files:
  202. print(f"[WARNING] Found {len(missing_files)} missing files:")
  203. for req_id, issue in missing_files:
  204. print(f" - REQ_{req_id}: {issue}")
  205. else:
  206. print("[OK] All required files are present.")
  207. print("-" * 50)
  208. json_files = list(base_dir.rglob("*.json"))
  209. total_files = len(json_files)
  210. format_errors = []
  211. # 引用完整性检查
  212. req_dirs = sorted([d for d in base_dir.iterdir() if d.is_dir() and d.name.isdigit()])
  213. for req_dir in req_dirs:
  214. ref_errors = check_referential_integrity(req_dir)
  215. for path, err in ref_errors:
  216. rel_path = path.relative_to(base_dir.parent)
  217. format_errors.append((rel_path, f"Referential Integrity: {err}"))
  218. print(f"[Start] Validating schema for {total_files} JSON files...")
  219. for file_path in json_files:
  220. try:
  221. with open(file_path, "r", encoding="utf-8") as f:
  222. data = json.load(f)
  223. except Exception as e:
  224. format_errors.append((file_path, f"JSON Parsing Error: {e}"))
  225. continue
  226. filename = file_path.name
  227. rel_path = file_path.relative_to(base_dir.parent)
  228. # Schema 结构验证
  229. schema_name = _resolve_schema_name(filename)
  230. if schema_name:
  231. err = validate_with_schema(data, schema_name)
  232. if err:
  233. format_errors.append((rel_path, f"Schema mismatch: {err}"))
  234. # Invariant 检查
  235. if filename == "source.json":
  236. err = validate_invariants_source(data)
  237. if err:
  238. format_errors.append((rel_path, f"Invariant: {err}"))
  239. elif filename == "case_detailed.json":
  240. err = validate_invariants_case_detailed(data)
  241. if err:
  242. format_errors.append((rel_path, f"Invariant: {err}"))
  243. report_path = Path(__file__).parent / "schema_errors_report.txt"
  244. print("-" * 50)
  245. with open(report_path, "w", encoding="utf-8") as out_f:
  246. if not format_errors:
  247. msg = f"[OK] All {total_files} JSON files match their expected schemas!"
  248. print(msg)
  249. out_f.write(msg + "\n")
  250. else:
  251. msg = f"[ERROR] Found {len(format_errors)} files with issues:"
  252. print(msg)
  253. out_f.write(msg + "\n")
  254. for path, error in format_errors:
  255. print(f" - {path}: {error}")
  256. out_f.write(f" - {path}: {error}\n")
  257. print("-" * 50)
  258. print(f"Report saved to {report_path}")
  259. if __name__ == "__main__":
  260. main()