123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- # This file is dual licensed under the terms of the Apache License, Version
- # 2.0, and the BSD License. See the LICENSE file in the root of this repository
- # for complete details.
- import datetime
- import typing
- from cryptography import utils, x509
- from cryptography.exceptions import UnsupportedAlgorithm
- from cryptography.hazmat.backends.openssl.decode_asn1 import (
- _CRL_ENTRY_REASON_CODE_TO_ENUM,
- _asn1_integer_to_int,
- _asn1_string_to_bytes,
- _decode_x509_name,
- _obj2txt,
- _parse_asn1_generalized_time,
- )
- from cryptography.hazmat.backends.openssl.x509 import _Certificate
- from cryptography.hazmat.primitives import hashes, serialization
- from cryptography.x509.ocsp import (
- OCSPCertStatus,
- OCSPRequest,
- OCSPResponse,
- OCSPResponseStatus,
- _CERT_STATUS_TO_ENUM,
- _OIDS_TO_HASH,
- _RESPONSE_STATUS_TO_ENUM,
- )
- def _issuer_key_hash(backend, cert_id):
- key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
- res = backend._lib.OCSP_id_get0_info(
- backend._ffi.NULL,
- backend._ffi.NULL,
- key_hash,
- backend._ffi.NULL,
- cert_id,
- )
- backend.openssl_assert(res == 1)
- backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
- return _asn1_string_to_bytes(backend, key_hash[0])
- def _issuer_name_hash(backend, cert_id):
- name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
- res = backend._lib.OCSP_id_get0_info(
- name_hash,
- backend._ffi.NULL,
- backend._ffi.NULL,
- backend._ffi.NULL,
- cert_id,
- )
- backend.openssl_assert(res == 1)
- backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
- return _asn1_string_to_bytes(backend, name_hash[0])
- def _serial_number(backend, cert_id):
- num = backend._ffi.new("ASN1_INTEGER **")
- res = backend._lib.OCSP_id_get0_info(
- backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id
- )
- backend.openssl_assert(res == 1)
- backend.openssl_assert(num[0] != backend._ffi.NULL)
- return _asn1_integer_to_int(backend, num[0])
- def _hash_algorithm(backend, cert_id):
- asn1obj = backend._ffi.new("ASN1_OBJECT **")
- res = backend._lib.OCSP_id_get0_info(
- backend._ffi.NULL,
- asn1obj,
- backend._ffi.NULL,
- backend._ffi.NULL,
- cert_id,
- )
- backend.openssl_assert(res == 1)
- backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
- oid = _obj2txt(backend, asn1obj[0])
- try:
- return _OIDS_TO_HASH[oid]
- except KeyError:
- raise UnsupportedAlgorithm(
- "Signature algorithm OID: {} not recognized".format(oid)
- )
- class _OCSPResponse(OCSPResponse):
- def __init__(self, backend, ocsp_response):
- self._backend = backend
- self._ocsp_response = ocsp_response
- status = self._backend._lib.OCSP_response_status(self._ocsp_response)
- self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
- self._status = _RESPONSE_STATUS_TO_ENUM[status]
- if self._status is OCSPResponseStatus.SUCCESSFUL:
- basic = self._backend._lib.OCSP_response_get1_basic(
- self._ocsp_response
- )
- self._backend.openssl_assert(basic != self._backend._ffi.NULL)
- self._basic = self._backend._ffi.gc(
- basic, self._backend._lib.OCSP_BASICRESP_free
- )
- num_resp = self._backend._lib.OCSP_resp_count(self._basic)
- if num_resp != 1:
- raise ValueError(
- "OCSP response contains more than one SINGLERESP structure"
- ", which this library does not support. "
- "{} found".format(num_resp)
- )
- self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
- self._backend.openssl_assert(
- self._single != self._backend._ffi.NULL
- )
- self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
- self._single
- )
- self._backend.openssl_assert(
- self._cert_id != self._backend._ffi.NULL
- )
- response_status = utils.read_only_property("_status")
- def _requires_successful_response(self) -> None:
- if self.response_status != OCSPResponseStatus.SUCCESSFUL:
- raise ValueError(
- "OCSP response status is not successful so the property "
- "has no value"
- )
- @property
- def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
- self._requires_successful_response()
- alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
- self._backend.openssl_assert(alg != self._backend._ffi.NULL)
- oid = _obj2txt(self._backend, alg.algorithm)
- return x509.ObjectIdentifier(oid)
- @property
- def signature_hash_algorithm(
- self,
- ) -> typing.Optional[hashes.HashAlgorithm]:
- self._requires_successful_response()
- oid = self.signature_algorithm_oid
- try:
- return x509._SIG_OIDS_TO_HASH[oid]
- except KeyError:
- raise UnsupportedAlgorithm(
- "Signature algorithm OID:{} not recognized".format(oid)
- )
- @property
- def signature(self) -> bytes:
- self._requires_successful_response()
- sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
- self._backend.openssl_assert(sig != self._backend._ffi.NULL)
- return _asn1_string_to_bytes(self._backend, sig)
- @property
- def tbs_response_bytes(self) -> bytes:
- self._requires_successful_response()
- respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
- self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
- pp = self._backend._ffi.new("unsigned char **")
- res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
- self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
- pp = self._backend._ffi.gc(
- pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
- )
- self._backend.openssl_assert(res > 0)
- return self._backend._ffi.buffer(pp[0], res)[:]
- @property
- def certificates(self) -> typing.List[x509.Certificate]:
- self._requires_successful_response()
- sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
- num = self._backend._lib.sk_X509_num(sk_x509)
- certs: typing.List[x509.Certificate] = []
- for i in range(num):
- x509_ptr = self._backend._lib.sk_X509_value(sk_x509, i)
- self._backend.openssl_assert(x509_ptr != self._backend._ffi.NULL)
- cert = _Certificate(self._backend, x509_ptr)
- # We need to keep the OCSP response that the certificate came from
- # alive until the Certificate object itself goes out of scope, so
- # we give it a private reference.
- cert._ocsp_resp_ref = self
- certs.append(cert)
- return certs
- @property
- def responder_key_hash(self) -> typing.Optional[bytes]:
- self._requires_successful_response()
- _, asn1_string = self._responder_key_name()
- if asn1_string == self._backend._ffi.NULL:
- return None
- else:
- return _asn1_string_to_bytes(self._backend, asn1_string)
- @property
- def responder_name(self) -> typing.Optional[x509.Name]:
- self._requires_successful_response()
- x509_name, _ = self._responder_key_name()
- if x509_name == self._backend._ffi.NULL:
- return None
- else:
- return _decode_x509_name(self._backend, x509_name)
- def _responder_key_name(self):
- asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
- x509_name = self._backend._ffi.new("X509_NAME **")
- res = self._backend._lib.OCSP_resp_get0_id(
- self._basic, asn1_string, x509_name
- )
- self._backend.openssl_assert(res == 1)
- return x509_name[0], asn1_string[0]
- @property
- def produced_at(self) -> datetime.datetime:
- self._requires_successful_response()
- produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
- self._basic
- )
- return _parse_asn1_generalized_time(self._backend, produced_at)
- @property
- def certificate_status(self) -> OCSPCertStatus:
- self._requires_successful_response()
- status = self._backend._lib.OCSP_single_get0_status(
- self._single,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- )
- self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
- return _CERT_STATUS_TO_ENUM[status]
- @property
- def revocation_time(self) -> typing.Optional[datetime.datetime]:
- self._requires_successful_response()
- if self.certificate_status is not OCSPCertStatus.REVOKED:
- return None
- asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
- self._backend._lib.OCSP_single_get0_status(
- self._single,
- self._backend._ffi.NULL,
- asn1_time,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- )
- self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
- return _parse_asn1_generalized_time(self._backend, asn1_time[0])
- @property
- def revocation_reason(self) -> typing.Optional[x509.ReasonFlags]:
- self._requires_successful_response()
- if self.certificate_status is not OCSPCertStatus.REVOKED:
- return None
- reason_ptr = self._backend._ffi.new("int *")
- self._backend._lib.OCSP_single_get0_status(
- self._single,
- reason_ptr,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- )
- # If no reason is encoded OpenSSL returns -1
- if reason_ptr[0] == -1:
- return None
- else:
- self._backend.openssl_assert(
- reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
- )
- return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
- @property
- def this_update(self) -> datetime.datetime:
- self._requires_successful_response()
- asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
- self._backend._lib.OCSP_single_get0_status(
- self._single,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- asn1_time,
- self._backend._ffi.NULL,
- )
- self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
- return _parse_asn1_generalized_time(self._backend, asn1_time[0])
- @property
- def next_update(self) -> typing.Optional[datetime.datetime]:
- self._requires_successful_response()
- asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
- self._backend._lib.OCSP_single_get0_status(
- self._single,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- self._backend._ffi.NULL,
- asn1_time,
- )
- if asn1_time[0] != self._backend._ffi.NULL:
- return _parse_asn1_generalized_time(self._backend, asn1_time[0])
- else:
- return None
- @property
- def issuer_key_hash(self) -> bytes:
- self._requires_successful_response()
- return _issuer_key_hash(self._backend, self._cert_id)
- @property
- def issuer_name_hash(self) -> bytes:
- self._requires_successful_response()
- return _issuer_name_hash(self._backend, self._cert_id)
- @property
- def hash_algorithm(self) -> hashes.HashAlgorithm:
- self._requires_successful_response()
- return _hash_algorithm(self._backend, self._cert_id)
- @property
- def serial_number(self) -> int:
- self._requires_successful_response()
- return _serial_number(self._backend, self._cert_id)
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- self._requires_successful_response()
- return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)
- @utils.cached_property
- def single_extensions(self) -> x509.Extensions:
- self._requires_successful_response()
- return self._backend._ocsp_singleresp_ext_parser.parse(self._single)
- def public_bytes(self, encoding: serialization.Encoding) -> bytes:
- if encoding is not serialization.Encoding.DER:
- raise ValueError("The only allowed encoding value is Encoding.DER")
- bio = self._backend._create_mem_bio_gc()
- res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
- bio, self._ocsp_response
- )
- self._backend.openssl_assert(res > 0)
- return self._backend._read_mem_bio(bio)
- class _OCSPRequest(OCSPRequest):
- def __init__(self, backend, ocsp_request):
- if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
- raise NotImplementedError(
- "OCSP request contains more than one request"
- )
- self._backend = backend
- self._ocsp_request = ocsp_request
- self._request = self._backend._lib.OCSP_request_onereq_get0(
- self._ocsp_request, 0
- )
- self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
- self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
- self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
- @property
- def issuer_key_hash(self) -> bytes:
- return _issuer_key_hash(self._backend, self._cert_id)
- @property
- def issuer_name_hash(self) -> bytes:
- return _issuer_name_hash(self._backend, self._cert_id)
- @property
- def serial_number(self) -> int:
- return _serial_number(self._backend, self._cert_id)
- @property
- def hash_algorithm(self) -> hashes.HashAlgorithm:
- return _hash_algorithm(self._backend, self._cert_id)
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)
- def public_bytes(self, encoding: serialization.Encoding) -> bytes:
- if encoding is not serialization.Encoding.DER:
- raise ValueError("The only allowed encoding value is Encoding.DER")
- bio = self._backend._create_mem_bio_gc()
- res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
- self._backend.openssl_assert(res > 0)
- return self._backend._read_mem_bio(bio)
|