ocsp.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import datetime
  5. import typing
  6. from cryptography import utils, x509
  7. from cryptography.exceptions import UnsupportedAlgorithm
  8. from cryptography.hazmat.backends.openssl.decode_asn1 import (
  9. _CRL_ENTRY_REASON_CODE_TO_ENUM,
  10. _asn1_integer_to_int,
  11. _asn1_string_to_bytes,
  12. _decode_x509_name,
  13. _obj2txt,
  14. _parse_asn1_generalized_time,
  15. )
  16. from cryptography.hazmat.backends.openssl.x509 import _Certificate
  17. from cryptography.hazmat.primitives import hashes, serialization
  18. from cryptography.x509.ocsp import (
  19. OCSPCertStatus,
  20. OCSPRequest,
  21. OCSPResponse,
  22. OCSPResponseStatus,
  23. _CERT_STATUS_TO_ENUM,
  24. _OIDS_TO_HASH,
  25. _RESPONSE_STATUS_TO_ENUM,
  26. )
  27. def _issuer_key_hash(backend, cert_id):
  28. key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
  29. res = backend._lib.OCSP_id_get0_info(
  30. backend._ffi.NULL,
  31. backend._ffi.NULL,
  32. key_hash,
  33. backend._ffi.NULL,
  34. cert_id,
  35. )
  36. backend.openssl_assert(res == 1)
  37. backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
  38. return _asn1_string_to_bytes(backend, key_hash[0])
  39. def _issuer_name_hash(backend, cert_id):
  40. name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
  41. res = backend._lib.OCSP_id_get0_info(
  42. name_hash,
  43. backend._ffi.NULL,
  44. backend._ffi.NULL,
  45. backend._ffi.NULL,
  46. cert_id,
  47. )
  48. backend.openssl_assert(res == 1)
  49. backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
  50. return _asn1_string_to_bytes(backend, name_hash[0])
  51. def _serial_number(backend, cert_id):
  52. num = backend._ffi.new("ASN1_INTEGER **")
  53. res = backend._lib.OCSP_id_get0_info(
  54. backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id
  55. )
  56. backend.openssl_assert(res == 1)
  57. backend.openssl_assert(num[0] != backend._ffi.NULL)
  58. return _asn1_integer_to_int(backend, num[0])
  59. def _hash_algorithm(backend, cert_id):
  60. asn1obj = backend._ffi.new("ASN1_OBJECT **")
  61. res = backend._lib.OCSP_id_get0_info(
  62. backend._ffi.NULL,
  63. asn1obj,
  64. backend._ffi.NULL,
  65. backend._ffi.NULL,
  66. cert_id,
  67. )
  68. backend.openssl_assert(res == 1)
  69. backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
  70. oid = _obj2txt(backend, asn1obj[0])
  71. try:
  72. return _OIDS_TO_HASH[oid]
  73. except KeyError:
  74. raise UnsupportedAlgorithm(
  75. "Signature algorithm OID: {} not recognized".format(oid)
  76. )
  77. class _OCSPResponse(OCSPResponse):
  78. def __init__(self, backend, ocsp_response):
  79. self._backend = backend
  80. self._ocsp_response = ocsp_response
  81. status = self._backend._lib.OCSP_response_status(self._ocsp_response)
  82. self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
  83. self._status = _RESPONSE_STATUS_TO_ENUM[status]
  84. if self._status is OCSPResponseStatus.SUCCESSFUL:
  85. basic = self._backend._lib.OCSP_response_get1_basic(
  86. self._ocsp_response
  87. )
  88. self._backend.openssl_assert(basic != self._backend._ffi.NULL)
  89. self._basic = self._backend._ffi.gc(
  90. basic, self._backend._lib.OCSP_BASICRESP_free
  91. )
  92. num_resp = self._backend._lib.OCSP_resp_count(self._basic)
  93. if num_resp != 1:
  94. raise ValueError(
  95. "OCSP response contains more than one SINGLERESP structure"
  96. ", which this library does not support. "
  97. "{} found".format(num_resp)
  98. )
  99. self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
  100. self._backend.openssl_assert(
  101. self._single != self._backend._ffi.NULL
  102. )
  103. self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
  104. self._single
  105. )
  106. self._backend.openssl_assert(
  107. self._cert_id != self._backend._ffi.NULL
  108. )
  109. response_status = utils.read_only_property("_status")
  110. def _requires_successful_response(self) -> None:
  111. if self.response_status != OCSPResponseStatus.SUCCESSFUL:
  112. raise ValueError(
  113. "OCSP response status is not successful so the property "
  114. "has no value"
  115. )
  116. @property
  117. def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
  118. self._requires_successful_response()
  119. alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
  120. self._backend.openssl_assert(alg != self._backend._ffi.NULL)
  121. oid = _obj2txt(self._backend, alg.algorithm)
  122. return x509.ObjectIdentifier(oid)
  123. @property
  124. def signature_hash_algorithm(
  125. self,
  126. ) -> typing.Optional[hashes.HashAlgorithm]:
  127. self._requires_successful_response()
  128. oid = self.signature_algorithm_oid
  129. try:
  130. return x509._SIG_OIDS_TO_HASH[oid]
  131. except KeyError:
  132. raise UnsupportedAlgorithm(
  133. "Signature algorithm OID:{} not recognized".format(oid)
  134. )
  135. @property
  136. def signature(self) -> bytes:
  137. self._requires_successful_response()
  138. sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
  139. self._backend.openssl_assert(sig != self._backend._ffi.NULL)
  140. return _asn1_string_to_bytes(self._backend, sig)
  141. @property
  142. def tbs_response_bytes(self) -> bytes:
  143. self._requires_successful_response()
  144. respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
  145. self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
  146. pp = self._backend._ffi.new("unsigned char **")
  147. res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
  148. self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
  149. pp = self._backend._ffi.gc(
  150. pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
  151. )
  152. self._backend.openssl_assert(res > 0)
  153. return self._backend._ffi.buffer(pp[0], res)[:]
  154. @property
  155. def certificates(self) -> typing.List[x509.Certificate]:
  156. self._requires_successful_response()
  157. sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
  158. num = self._backend._lib.sk_X509_num(sk_x509)
  159. certs: typing.List[x509.Certificate] = []
  160. for i in range(num):
  161. x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i)
  162. self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL)
  163. cert = _Certificate(self._backend, x509_ptr)
  164. # We need to keep the OCSP response that the certificate came from
  165. # alive until the Certificate object itself goes out of scope, so
  166. # we give it a private reference.
  167. cert._ocsp_resp_ref = self
  168. certs.append(cert)
  169. return certs
  170. @property
  171. def responder_key_hash(self) -> typing.Optional[bytes]:
  172. self._requires_successful_response()
  173. _, asn1_string = self._responder_key_name()
  174. if asn1_string == self._backend._ffi.NULL:
  175. return None
  176. else:
  177. return _asn1_string_to_bytes(self._backend, asn1_string)
  178. @property
  179. def responder_name(self) -> typing.Optional[x509.Name]:
  180. self._requires_successful_response()
  181. x509_name, _ = self._responder_key_name()
  182. if x509_name == self._backend._ffi.NULL:
  183. return None
  184. else:
  185. return _decode_x509_name(self._backend, x509_name)
  186. def _responder_key_name(self):
  187. asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
  188. x509_name = self._backend._ffi.new("X509_NAME **")
  189. res = self._backend._lib.OCSP_resp_get0_id(
  190. self._basic, asn1_string, x509_name
  191. )
  192. self._backend.openssl_assert(res == 1)
  193. return x509_name[0], asn1_string[0]
  194. @property
  195. def produced_at(self) -> datetime.datetime:
  196. self._requires_successful_response()
  197. produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
  198. self._basic
  199. )
  200. return _parse_asn1_generalized_time(self._backend, produced_at)
  201. @property
  202. def certificate_status(self) -> OCSPCertStatus:
  203. self._requires_successful_response()
  204. status = self._backend._lib.OCSP_single_get0_status(
  205. self._single,
  206. self._backend._ffi.NULL,
  207. self._backend._ffi.NULL,
  208. self._backend._ffi.NULL,
  209. self._backend._ffi.NULL,
  210. )
  211. self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
  212. return _CERT_STATUS_TO_ENUM[status]
  213. @property
  214. def revocation_time(self) -> typing.Optional[datetime.datetime]:
  215. self._requires_successful_response()
  216. if self.certificate_status is not OCSPCertStatus.REVOKED:
  217. return None
  218. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  219. self._backend._lib.OCSP_single_get0_status(
  220. self._single,
  221. self._backend._ffi.NULL,
  222. asn1_time,
  223. self._backend._ffi.NULL,
  224. self._backend._ffi.NULL,
  225. )
  226. self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
  227. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  228. @property
  229. def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
  230. self._requires_successful_response()
  231. if self.certificate_status is not OCSPCertStatus.REVOKED:
  232. return None
  233. reason_ptr = self._backend._ffi.new("int *")
  234. self._backend._lib.OCSP_single_get0_status(
  235. self._single,
  236. reason_ptr,
  237. self._backend._ffi.NULL,
  238. self._backend._ffi.NULL,
  239. self._backend._ffi.NULL,
  240. )
  241. # If no reason is encoded OpenSSL returns -1
  242. if reason_ptr[0] == -1:
  243. return None
  244. else:
  245. self._backend.openssl_assert(
  246. reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
  247. )
  248. return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
  249. @property
  250. def this_update(self) -> datetime.datetime:
  251. self._requires_successful_response()
  252. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  253. self._backend._lib.OCSP_single_get0_status(
  254. self._single,
  255. self._backend._ffi.NULL,
  256. self._backend._ffi.NULL,
  257. asn1_time,
  258. self._backend._ffi.NULL,
  259. )
  260. self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
  261. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  262. @property
  263. def next_update(self) -> typing.Optional[datetime.datetime]:
  264. self._requires_successful_response()
  265. asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
  266. self._backend._lib.OCSP_single_get0_status(
  267. self._single,
  268. self._backend._ffi.NULL,
  269. self._backend._ffi.NULL,
  270. self._backend._ffi.NULL,
  271. asn1_time,
  272. )
  273. if asn1_time[0] != self._backend._ffi.NULL:
  274. return _parse_asn1_generalized_time(self._backend, asn1_time[0])
  275. else:
  276. return None
  277. @property
  278. def issuer_key_hash(self) -> bytes:
  279. self._requires_successful_response()
  280. return _issuer_key_hash(self._backend, self._cert_id)
  281. @property
  282. def issuer_name_hash(self) -> bytes:
  283. self._requires_successful_response()
  284. return _issuer_name_hash(self._backend, self._cert_id)
  285. @property
  286. def hash_algorithm(self) -> hashes.HashAlgorithm:
  287. self._requires_successful_response()
  288. return _hash_algorithm(self._backend, self._cert_id)
  289. @property
  290. def serial_number(self) -> int:
  291. self._requires_successful_response()
  292. return _serial_number(self._backend, self._cert_id)
  293. @utils.cached_property
  294. def extensions(self) -> x509.Extensions:
  295. self._requires_successful_response()
  296. return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)
  297. @utils.cached_property
  298. def single_extensions(self) -> x509.Extensions:
  299. self._requires_successful_response()
  300. return self._backend._ocsp_singleresp_ext_parser.parse(self._single)
  301. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  302. if encoding is not serialization.Encoding.DER:
  303. raise ValueError("The only allowed encoding value is Encoding.DER")
  304. bio = self._backend._create_mem_bio_gc()
  305. res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
  306. bio, self._ocsp_response
  307. )
  308. self._backend.openssl_assert(res > 0)
  309. return self._backend._read_mem_bio(bio)
  310. class _OCSPRequest(OCSPRequest):
  311. def __init__(self, backend, ocsp_request):
  312. if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
  313. raise NotImplementedError(
  314. "OCSP request contains more than one request"
  315. )
  316. self._backend = backend
  317. self._ocsp_request = ocsp_request
  318. self._request = self._backend._lib.OCSP_request_onereq_get0(
  319. self._ocsp_request, 0
  320. )
  321. self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
  322. self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
  323. self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
  324. @property
  325. def issuer_key_hash(self) -> bytes:
  326. return _issuer_key_hash(self._backend, self._cert_id)
  327. @property
  328. def issuer_name_hash(self) -> bytes:
  329. return _issuer_name_hash(self._backend, self._cert_id)
  330. @property
  331. def serial_number(self) -> int:
  332. return _serial_number(self._backend, self._cert_id)
  333. @property
  334. def hash_algorithm(self) -> hashes.HashAlgorithm:
  335. return _hash_algorithm(self._backend, self._cert_id)
  336. @utils.cached_property
  337. def extensions(self) -> x509.Extensions:
  338. return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)
  339. def public_bytes(self, encoding: serialization.Encoding) -> bytes:
  340. if encoding is not serialization.Encoding.DER:
  341. raise ValueError("The only allowed encoding value is Encoding.DER")
  342. bio = self._backend._create_mem_bio_gc()
  343. res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
  344. self._backend.openssl_assert(res > 0)
  345. return self._backend._read_mem_bio(bio)