123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601 |
- # This file is dual licensed under the terms of the Apache License, Version
- # 2.0, and the BSD License. See the LICENSE file in the root of this repository
- # for complete details.
- import datetime
- import operator
- import typing
- from cryptography import utils, x509
- from cryptography.exceptions import UnsupportedAlgorithm
- from cryptography.hazmat.backends.openssl import dsa, ec, rsa
- from cryptography.hazmat.backends.openssl.decode_asn1 import (
- _asn1_integer_to_int,
- _asn1_string_to_bytes,
- _decode_x509_name,
- _obj2txt,
- _parse_asn1_time,
- )
- from cryptography.hazmat.backends.openssl.encode_asn1 import (
- _encode_asn1_int_gc,
- _txt2obj_gc,
- )
- from cryptography.hazmat.primitives import hashes, serialization
- from cryptography.x509.base import _PUBLIC_KEY_TYPES
- from cryptography.x509.name import _ASN1Type
- class _Certificate(x509.Certificate):
- # Keep-alive reference used by OCSP
- _ocsp_resp_ref: typing.Any
- def __init__(self, backend, x509_cert):
- self._backend = backend
- self._x509 = x509_cert
- version = self._backend._lib.X509_get_version(self._x509)
- if version == 0:
- self._version = x509.Version.v1
- elif version == 2:
- self._version = x509.Version.v3
- else:
- raise x509.InvalidVersion(
- "{} is not a valid X509 version".format(version), version
- )
- def __repr__(self):
- return "<Certificate(subject={}, ...)>".format(self.subject)
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, _Certificate):
- return NotImplemented
- res = self._backend._lib.X509_cmp(self._x509, other._x509)
- return res == 0
- def __ne__(self, other: object) -> bool:
- return not self == other
- def __hash__(self) -> int:
- return hash(self.public_bytes(serialization.Encoding.DER))
- def __deepcopy__(self, memo):
- return self
- def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
- h = hashes.Hash(algorithm, self._backend)
- h.update(self.public_bytes(serialization.Encoding.DER))
- return h.finalize()
- version = utils.read_only_property("_version")
- @property
- def serial_number(self) -> int:
- asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
- self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
- return _asn1_integer_to_int(self._backend, asn1_int)
- def public_key(self) -> _PUBLIC_KEY_TYPES:
- pkey = self._backend._lib.X509_get_pubkey(self._x509)
- if pkey == self._backend._ffi.NULL:
- # Remove errors from the stack.
- self._backend._consume_errors()
- raise ValueError("Certificate public key is of an unknown type")
- pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
- return self._backend._evp_pkey_to_public_key(pkey)
- @property
- def not_valid_before(self) -> datetime.datetime:
- asn1_time = self._backend._lib.X509_get0_notBefore(self._x509)
- return _parse_asn1_time(self._backend, asn1_time)
- @property
- def not_valid_after(self) -> datetime.datetime:
- asn1_time = self._backend._lib.X509_get0_notAfter(self._x509)
- return _parse_asn1_time(self._backend, asn1_time)
- @property
- def issuer(self) -> x509.Name:
- issuer = self._backend._lib.X509_get_issuer_name(self._x509)
- self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
- return _decode_x509_name(self._backend, issuer)
- @property
- def subject(self) -> x509.Name:
- subject = self._backend._lib.X509_get_subject_name(self._x509)
- self._backend.openssl_assert(subject != self._backend._ffi.NULL)
- return _decode_x509_name(self._backend, subject)
- @property
- def signature_hash_algorithm(
- self,
- ) -> typing.Optional[hashes.HashAlgorithm]:
- oid = self.signature_algorithm_oid
- try:
- return x509._SIG_OIDS_TO_HASH[oid]
- except KeyError:
- raise UnsupportedAlgorithm(
- "Signature algorithm OID:{} not recognized".format(oid)
- )
- @property
- def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
- alg = self._backend._ffi.new("X509_ALGOR **")
- self._backend._lib.X509_get0_signature(
- self._backend._ffi.NULL, alg, self._x509
- )
- self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
- oid = _obj2txt(self._backend, alg[0].algorithm)
- return x509.ObjectIdentifier(oid)
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- return self._backend._certificate_extension_parser.parse(self._x509)
- @property
- def signature(self) -> bytes:
- sig = self._backend._ffi.new("ASN1_BIT_STRING **")
- self._backend._lib.X509_get0_signature(
- sig, self._backend._ffi.NULL, self._x509
- )
- self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
- return _asn1_string_to_bytes(self._backend, sig[0])
- @property
- def tbs_certificate_bytes(self) -> bytes:
- pp = self._backend._ffi.new("unsigned char **")
- res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp)
- self._backend.openssl_assert(res > 0)
- pp = self._backend._ffi.gc(
- pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
- )
- return self._backend._ffi.buffer(pp[0], res)[:]
- def public_bytes(self, encoding: serialization.Encoding) -> bytes:
- bio = self._backend._create_mem_bio_gc()
- if encoding is serialization.Encoding.PEM:
- res = self._backend._lib.PEM_write_bio_X509(bio, self._x509)
- elif encoding is serialization.Encoding.DER:
- res = self._backend._lib.i2d_X509_bio(bio, self._x509)
- else:
- raise TypeError("encoding must be an item from the Encoding enum")
- self._backend.openssl_assert(res == 1)
- return self._backend._read_mem_bio(bio)
- class _RevokedCertificate(x509.RevokedCertificate):
- def __init__(self, backend, crl, x509_revoked):
- self._backend = backend
- # The X509_REVOKED_value is a X509_REVOKED * that has
- # no reference counting. This means when X509_CRL_free is
- # called then the CRL and all X509_REVOKED * are freed. Since
- # you can retain a reference to a single revoked certificate
- # and let the CRL fall out of scope we need to retain a
- # private reference to the CRL inside the RevokedCertificate
- # object to prevent the gc from being called inappropriately.
- self._crl = crl
- self._x509_revoked = x509_revoked
- @property
- def serial_number(self) -> int:
- asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber(
- self._x509_revoked
- )
- self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
- return _asn1_integer_to_int(self._backend, asn1_int)
- @property
- def revocation_date(self) -> datetime.datetime:
- return _parse_asn1_time(
- self._backend,
- self._backend._lib.X509_REVOKED_get0_revocationDate(
- self._x509_revoked
- ),
- )
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- return self._backend._revoked_cert_extension_parser.parse(
- self._x509_revoked
- )
- @utils.register_interface(x509.CertificateRevocationList)
- class _CertificateRevocationList(object):
- def __init__(self, backend, x509_crl):
- self._backend = backend
- self._x509_crl = x509_crl
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, _CertificateRevocationList):
- return NotImplemented
- res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl)
- return res == 0
- def __ne__(self, other: object) -> bool:
- return not self == other
- def fingerprint(self, algorithm: hashes.HashAlgorithm) -> bytes:
- h = hashes.Hash(algorithm, self._backend)
- bio = self._backend._create_mem_bio_gc()
- res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
- self._backend.openssl_assert(res == 1)
- der = self._backend._read_mem_bio(bio)
- h.update(der)
- return h.finalize()
- @utils.cached_property
- def _sorted_crl(self):
- # X509_CRL_get0_by_serial sorts in place, which breaks a variety of
- # things we don't want to break (like iteration and the signature).
- # Let's dupe it and sort that instead.
- dup = self._backend._lib.X509_CRL_dup(self._x509_crl)
- self._backend.openssl_assert(dup != self._backend._ffi.NULL)
- dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free)
- return dup
- def get_revoked_certificate_by_serial_number(
- self, serial_number: int
- ) -> typing.Optional[x509.RevokedCertificate]:
- revoked = self._backend._ffi.new("X509_REVOKED **")
- asn1_int = _encode_asn1_int_gc(self._backend, serial_number)
- res = self._backend._lib.X509_CRL_get0_by_serial(
- self._sorted_crl, revoked, asn1_int
- )
- if res == 0:
- return None
- else:
- self._backend.openssl_assert(revoked[0] != self._backend._ffi.NULL)
- return _RevokedCertificate(
- self._backend, self._sorted_crl, revoked[0]
- )
- @property
- def signature_hash_algorithm(
- self,
- ) -> typing.Optional[hashes.HashAlgorithm]:
- oid = self.signature_algorithm_oid
- try:
- return x509._SIG_OIDS_TO_HASH[oid]
- except KeyError:
- raise UnsupportedAlgorithm(
- "Signature algorithm OID:{} not recognized".format(oid)
- )
- @property
- def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
- alg = self._backend._ffi.new("X509_ALGOR **")
- self._backend._lib.X509_CRL_get0_signature(
- self._x509_crl, self._backend._ffi.NULL, alg
- )
- self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
- oid = _obj2txt(self._backend, alg[0].algorithm)
- return x509.ObjectIdentifier(oid)
- @property
- def issuer(self) -> x509.Name:
- issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl)
- self._backend.openssl_assert(issuer != self._backend._ffi.NULL)
- return _decode_x509_name(self._backend, issuer)
- @property
- def next_update(self) -> datetime.datetime:
- nu = self._backend._lib.X509_CRL_get0_nextUpdate(self._x509_crl)
- self._backend.openssl_assert(nu != self._backend._ffi.NULL)
- return _parse_asn1_time(self._backend, nu)
- @property
- def last_update(self) -> datetime.datetime:
- lu = self._backend._lib.X509_CRL_get0_lastUpdate(self._x509_crl)
- self._backend.openssl_assert(lu != self._backend._ffi.NULL)
- return _parse_asn1_time(self._backend, lu)
- @property
- def signature(self) -> bytes:
- sig = self._backend._ffi.new("ASN1_BIT_STRING **")
- self._backend._lib.X509_CRL_get0_signature(
- self._x509_crl, sig, self._backend._ffi.NULL
- )
- self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
- return _asn1_string_to_bytes(self._backend, sig[0])
- @property
- def tbs_certlist_bytes(self) -> bytes:
- pp = self._backend._ffi.new("unsigned char **")
- res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp)
- self._backend.openssl_assert(res > 0)
- pp = self._backend._ffi.gc(
- pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
- )
- return self._backend._ffi.buffer(pp[0], res)[:]
- def public_bytes(self, encoding: serialization.Encoding) -> bytes:
- bio = self._backend._create_mem_bio_gc()
- if encoding is serialization.Encoding.PEM:
- res = self._backend._lib.PEM_write_bio_X509_CRL(
- bio, self._x509_crl
- )
- elif encoding is serialization.Encoding.DER:
- res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl)
- else:
- raise TypeError("encoding must be an item from the Encoding enum")
- self._backend.openssl_assert(res == 1)
- return self._backend._read_mem_bio(bio)
- def _revoked_cert(self, idx):
- revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
- r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx)
- self._backend.openssl_assert(r != self._backend._ffi.NULL)
- return _RevokedCertificate(self._backend, self, r)
- def __iter__(self):
- for i in range(len(self)):
- yield self._revoked_cert(i)
- def __getitem__(self, idx):
- if isinstance(idx, slice):
- start, stop, step = idx.indices(len(self))
- return [self._revoked_cert(i) for i in range(start, stop, step)]
- else:
- idx = operator.index(idx)
- if idx < 0:
- idx += len(self)
- if not 0 <= idx < len(self):
- raise IndexError
- return self._revoked_cert(idx)
- def __len__(self) -> int:
- revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl)
- if revoked == self._backend._ffi.NULL:
- return 0
- else:
- return self._backend._lib.sk_X509_REVOKED_num(revoked)
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- return self._backend._crl_extension_parser.parse(self._x509_crl)
- def is_signature_valid(self, public_key: _PUBLIC_KEY_TYPES) -> bool:
- if not isinstance(
- public_key,
- (
- dsa._DSAPublicKey,
- rsa._RSAPublicKey,
- ec._EllipticCurvePublicKey,
- ),
- ):
- raise TypeError(
- "Expecting one of DSAPublicKey, RSAPublicKey,"
- " or EllipticCurvePublicKey."
- )
- res = self._backend._lib.X509_CRL_verify(
- self._x509_crl, public_key._evp_pkey
- )
- if res != 1:
- self._backend._consume_errors()
- return False
- return True
- @utils.register_interface(x509.CertificateSigningRequest)
- class _CertificateSigningRequest(object):
- def __init__(self, backend, x509_req):
- self._backend = backend
- self._x509_req = x509_req
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, _CertificateSigningRequest):
- return NotImplemented
- self_bytes = self.public_bytes(serialization.Encoding.DER)
- other_bytes = other.public_bytes(serialization.Encoding.DER)
- return self_bytes == other_bytes
- def __ne__(self, other: object) -> bool:
- return not self == other
- def __hash__(self) -> int:
- return hash(self.public_bytes(serialization.Encoding.DER))
- def public_key(self) -> _PUBLIC_KEY_TYPES:
- pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
- self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
- pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
- return self._backend._evp_pkey_to_public_key(pkey)
- @property
- def subject(self) -> x509.Name:
- subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req)
- self._backend.openssl_assert(subject != self._backend._ffi.NULL)
- return _decode_x509_name(self._backend, subject)
- @property
- def signature_hash_algorithm(
- self,
- ) -> typing.Optional[hashes.HashAlgorithm]:
- oid = self.signature_algorithm_oid
- try:
- return x509._SIG_OIDS_TO_HASH[oid]
- except KeyError:
- raise UnsupportedAlgorithm(
- "Signature algorithm OID:{} not recognized".format(oid)
- )
- @property
- def signature_algorithm_oid(self) -> x509.ObjectIdentifier:
- alg = self._backend._ffi.new("X509_ALGOR **")
- self._backend._lib.X509_REQ_get0_signature(
- self._x509_req, self._backend._ffi.NULL, alg
- )
- self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
- oid = _obj2txt(self._backend, alg[0].algorithm)
- return x509.ObjectIdentifier(oid)
- @utils.cached_property
- def extensions(self) -> x509.Extensions:
- x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
- x509_exts = self._backend._ffi.gc(
- x509_exts,
- lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free(
- x,
- self._backend._ffi.addressof(
- self._backend._lib._original_lib, "X509_EXTENSION_free"
- ),
- ),
- )
- return self._backend._csr_extension_parser.parse(x509_exts)
- def public_bytes(self, encoding: serialization.Encoding) -> bytes:
- bio = self._backend._create_mem_bio_gc()
- if encoding is serialization.Encoding.PEM:
- res = self._backend._lib.PEM_write_bio_X509_REQ(
- bio, self._x509_req
- )
- elif encoding is serialization.Encoding.DER:
- res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req)
- else:
- raise TypeError("encoding must be an item from the Encoding enum")
- self._backend.openssl_assert(res == 1)
- return self._backend._read_mem_bio(bio)
- @property
- def tbs_certrequest_bytes(self) -> bytes:
- pp = self._backend._ffi.new("unsigned char **")
- res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp)
- self._backend.openssl_assert(res > 0)
- pp = self._backend._ffi.gc(
- pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
- )
- return self._backend._ffi.buffer(pp[0], res)[:]
- @property
- def signature(self) -> bytes:
- sig = self._backend._ffi.new("ASN1_BIT_STRING **")
- self._backend._lib.X509_REQ_get0_signature(
- self._x509_req, sig, self._backend._ffi.NULL
- )
- self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL)
- return _asn1_string_to_bytes(self._backend, sig[0])
- @property
- def is_signature_valid(self) -> bool:
- pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req)
- self._backend.openssl_assert(pkey != self._backend._ffi.NULL)
- pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free)
- res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey)
- if res != 1:
- self._backend._consume_errors()
- return False
- return True
- def get_attribute_for_oid(self, oid: x509.ObjectIdentifier) -> bytes:
- obj = _txt2obj_gc(self._backend, oid.dotted_string)
- pos = self._backend._lib.X509_REQ_get_attr_by_OBJ(
- self._x509_req, obj, -1
- )
- if pos == -1:
- raise x509.AttributeNotFound(
- "No {} attribute was found".format(oid), oid
- )
- attr = self._backend._lib.X509_REQ_get_attr(self._x509_req, pos)
- self._backend.openssl_assert(attr != self._backend._ffi.NULL)
- # We don't support multiple valued attributes for now.
- self._backend.openssl_assert(
- self._backend._lib.X509_ATTRIBUTE_count(attr) == 1
- )
- asn1_type = self._backend._lib.X509_ATTRIBUTE_get0_type(attr, 0)
- self._backend.openssl_assert(asn1_type != self._backend._ffi.NULL)
- # We need this to ensure that our C type cast is safe.
- # Also this should always be a sane string type, but we'll see if
- # that is true in the real world...
- if asn1_type.type not in (
- _ASN1Type.UTF8String.value,
- _ASN1Type.PrintableString.value,
- _ASN1Type.IA5String.value,
- ):
- raise ValueError(
- "OID {} has a disallowed ASN.1 type: {}".format(
- oid, asn1_type.type
- )
- )
- data = self._backend._lib.X509_ATTRIBUTE_get0_data(
- attr, 0, asn1_type.type, self._backend._ffi.NULL
- )
- self._backend.openssl_assert(data != self._backend._ffi.NULL)
- # This cast is safe iff we assert on the type above to ensure
- # that it is always a type of ASN1_STRING
- data = self._backend._ffi.cast("ASN1_STRING *", data)
- return _asn1_string_to_bytes(self._backend, data)
- @utils.register_interface(
- x509.certificate_transparency.SignedCertificateTimestamp
- )
- class _SignedCertificateTimestamp(object):
- def __init__(self, backend, sct_list, sct):
- self._backend = backend
- # Keep the SCT_LIST that this SCT came from alive.
- self._sct_list = sct_list
- self._sct = sct
- @property
- def version(self) -> x509.certificate_transparency.Version:
- version = self._backend._lib.SCT_get_version(self._sct)
- assert version == self._backend._lib.SCT_VERSION_V1
- return x509.certificate_transparency.Version.v1
- @property
- def log_id(self) -> bytes:
- out = self._backend._ffi.new("unsigned char **")
- log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out)
- assert log_id_length >= 0
- return self._backend._ffi.buffer(out[0], log_id_length)[:]
- @property
- def timestamp(self) -> datetime.datetime:
- timestamp = self._backend._lib.SCT_get_timestamp(self._sct)
- milliseconds = timestamp % 1000
- return datetime.datetime.utcfromtimestamp(timestamp // 1000).replace(
- microsecond=milliseconds * 1000
- )
- @property
- def entry_type(self) -> x509.certificate_transparency.LogEntryType:
- entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct)
- # We currently only support loading SCTs from the X.509 extension, so
- # we only have precerts.
- assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT
- return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE
- @property
- def _signature(self):
- ptrptr = self._backend._ffi.new("unsigned char **")
- res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr)
- self._backend.openssl_assert(res > 0)
- self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL)
- return self._backend._ffi.buffer(ptrptr[0], res)[:]
- def __hash__(self) -> int:
- return hash(self._signature)
- def __eq__(self, other: object) -> bool:
- if not isinstance(other, _SignedCertificateTimestamp):
- return NotImplemented
- return self._signature == other._signature
- def __ne__(self, other: object) -> bool:
- return not self == other
|