1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749 |
- # This file is dual licensed under the terms of the Apache License, Version
- # 2.0, and the BSD License. See the LICENSE file in the root of this repository
- # for complete details.
- import collections
- import contextlib
- import itertools
- import warnings
- from contextlib import contextmanager
- from cryptography import utils, x509
- from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
- from cryptography.hazmat._der import (
- INTEGER,
- NULL,
- SEQUENCE,
- encode_der,
- encode_der_integer,
- )
- from cryptography.hazmat.backends.interfaces import (
- CMACBackend,
- CipherBackend,
- DERSerializationBackend,
- DHBackend,
- DSABackend,
- EllipticCurveBackend,
- HMACBackend,
- HashBackend,
- PBKDF2HMACBackend,
- PEMSerializationBackend,
- RSABackend,
- ScryptBackend,
- X509Backend,
- )
- from cryptography.hazmat.backends.openssl import aead
- from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
- from cryptography.hazmat.backends.openssl.cmac import _CMACContext
- from cryptography.hazmat.backends.openssl.decode_asn1 import (
- _CRL_ENTRY_REASON_ENUM_TO_CODE,
- _CRL_EXTENSION_HANDLERS,
- _EXTENSION_HANDLERS_BASE,
- _EXTENSION_HANDLERS_SCT,
- _OCSP_BASICRESP_EXTENSION_HANDLERS,
- _OCSP_REQ_EXTENSION_HANDLERS,
- _OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT,
- _REVOKED_EXTENSION_HANDLERS,
- _X509ExtensionParser,
- )
- from cryptography.hazmat.backends.openssl.dh import (
- _DHParameters,
- _DHPrivateKey,
- _DHPublicKey,
- _dh_params_dup,
- )
- from cryptography.hazmat.backends.openssl.dsa import (
- _DSAParameters,
- _DSAPrivateKey,
- _DSAPublicKey,
- )
- from cryptography.hazmat.backends.openssl.ec import (
- _EllipticCurvePrivateKey,
- _EllipticCurvePublicKey,
- )
- from cryptography.hazmat.backends.openssl.ed25519 import (
- _Ed25519PrivateKey,
- _Ed25519PublicKey,
- )
- from cryptography.hazmat.backends.openssl.ed448 import (
- _ED448_KEY_SIZE,
- _Ed448PrivateKey,
- _Ed448PublicKey,
- )
- from cryptography.hazmat.backends.openssl.encode_asn1 import (
- _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
- _CRL_EXTENSION_ENCODE_HANDLERS,
- _EXTENSION_ENCODE_HANDLERS,
- _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS,
- _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS,
- _encode_asn1_int_gc,
- _encode_asn1_str_gc,
- _encode_name_gc,
- _txt2obj_gc,
- )
- from cryptography.hazmat.backends.openssl.hashes import _HashContext
- from cryptography.hazmat.backends.openssl.hmac import _HMACContext
- from cryptography.hazmat.backends.openssl.ocsp import (
- _OCSPRequest,
- _OCSPResponse,
- )
- from cryptography.hazmat.backends.openssl.poly1305 import (
- _POLY1305_KEY_SIZE,
- _Poly1305Context,
- )
- from cryptography.hazmat.backends.openssl.rsa import (
- _RSAPrivateKey,
- _RSAPublicKey,
- )
- from cryptography.hazmat.backends.openssl.x25519 import (
- _X25519PrivateKey,
- _X25519PublicKey,
- )
- from cryptography.hazmat.backends.openssl.x448 import (
- _X448PrivateKey,
- _X448PublicKey,
- )
- from cryptography.hazmat.backends.openssl.x509 import (
- _Certificate,
- _CertificateRevocationList,
- _CertificateSigningRequest,
- _RevokedCertificate,
- )
- from cryptography.hazmat.bindings.openssl import binding
- from cryptography.hazmat.primitives import hashes, serialization
- from cryptography.hazmat.primitives.asymmetric import (
- dh,
- dsa,
- ec,
- ed25519,
- ed448,
- rsa,
- )
- from cryptography.hazmat.primitives.asymmetric.padding import (
- MGF1,
- OAEP,
- PKCS1v15,
- PSS,
- )
- from cryptography.hazmat.primitives.ciphers.algorithms import (
- AES,
- ARC4,
- Blowfish,
- CAST5,
- Camellia,
- ChaCha20,
- IDEA,
- SEED,
- TripleDES,
- )
- from cryptography.hazmat.primitives.ciphers.modes import (
- CBC,
- CFB,
- CFB8,
- CTR,
- ECB,
- GCM,
- OFB,
- XTS,
- )
- from cryptography.hazmat.primitives.kdf import scrypt
- from cryptography.hazmat.primitives.serialization import pkcs7, ssh
- from cryptography.x509 import ocsp
- _MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
- # Not actually supported, just used as a marker for some serialization tests.
- class _RC2(object):
- pass
- @utils.register_interface(CipherBackend)
- @utils.register_interface(CMACBackend)
- @utils.register_interface(DERSerializationBackend)
- @utils.register_interface(DHBackend)
- @utils.register_interface(DSABackend)
- @utils.register_interface(EllipticCurveBackend)
- @utils.register_interface(HashBackend)
- @utils.register_interface(HMACBackend)
- @utils.register_interface(PBKDF2HMACBackend)
- @utils.register_interface(RSABackend)
- @utils.register_interface(PEMSerializationBackend)
- @utils.register_interface(X509Backend)
- @utils.register_interface_if(
- binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend
- )
- class Backend(object):
- """
- OpenSSL API binding interfaces.
- """
- name = "openssl"
- # FIPS has opinions about acceptable algorithms and key sizes, but the
- # disallowed algorithms are still present in OpenSSL. They just error if
- # you try to use them. To avoid that we allowlist the algorithms in
- # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
- _fips_aead = {
- b"aes-128-ccm",
- b"aes-192-ccm",
- b"aes-256-ccm",
- b"aes-128-gcm",
- b"aes-192-gcm",
- b"aes-256-gcm",
- }
- _fips_ciphers = (AES, TripleDES)
- _fips_hashes = (
- hashes.SHA1,
- hashes.SHA224,
- hashes.SHA256,
- hashes.SHA384,
- hashes.SHA512,
- hashes.SHA512_224,
- hashes.SHA512_256,
- hashes.SHA3_224,
- hashes.SHA3_256,
- hashes.SHA3_384,
- hashes.SHA3_512,
- hashes.SHAKE128,
- hashes.SHAKE256,
- )
- _fips_rsa_min_key_size = 2048
- _fips_rsa_min_public_exponent = 65537
- _fips_dsa_min_modulus = 1 << 2048
- _fips_dh_min_key_size = 2048
- _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
- def __init__(self):
- self._binding = binding.Binding()
- self._ffi = self._binding.ffi
- self._lib = self._binding.lib
- self._fips_enabled = self._is_fips_enabled()
- self._cipher_registry = {}
- self._register_default_ciphers()
- self._register_x509_ext_parsers()
- self._register_x509_encoders()
- if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
- warnings.warn(
- "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
- UserWarning,
- )
- else:
- self.activate_osrandom_engine()
- self._dh_types = [self._lib.EVP_PKEY_DH]
- if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
- self._dh_types.append(self._lib.EVP_PKEY_DHX)
- def openssl_assert(self, ok, errors=None):
- return binding._openssl_assert(self._lib, ok, errors=errors)
- def _is_fips_enabled(self):
- fips_mode = getattr(self._lib, "FIPS_mode", lambda: 0)
- mode = fips_mode()
- if mode == 0:
- # OpenSSL without FIPS pushes an error on the error stack
- self._lib.ERR_clear_error()
- return bool(mode)
- def activate_builtin_random(self):
- if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
- # Obtain a new structural reference.
- e = self._lib.ENGINE_get_default_RAND()
- if e != self._ffi.NULL:
- self._lib.ENGINE_unregister_RAND(e)
- # Reset the RNG to use the built-in.
- res = self._lib.RAND_set_rand_method(self._ffi.NULL)
- self.openssl_assert(res == 1)
- # decrement the structural reference from get_default_RAND
- res = self._lib.ENGINE_finish(e)
- self.openssl_assert(res == 1)
- @contextlib.contextmanager
- def _get_osurandom_engine(self):
- # Fetches an engine by id and returns it. This creates a structural
- # reference.
- e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
- self.openssl_assert(e != self._ffi.NULL)
- # Initialize the engine for use. This adds a functional reference.
- res = self._lib.ENGINE_init(e)
- self.openssl_assert(res == 1)
- try:
- yield e
- finally:
- # Decrement the structural ref incremented by ENGINE_by_id.
- res = self._lib.ENGINE_free(e)
- self.openssl_assert(res == 1)
- # Decrement the functional ref incremented by ENGINE_init.
- res = self._lib.ENGINE_finish(e)
- self.openssl_assert(res == 1)
- def activate_osrandom_engine(self):
- if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
- # Unregister and free the current engine.
- self.activate_builtin_random()
- with self._get_osurandom_engine() as e:
- # Set the engine as the default RAND provider.
- res = self._lib.ENGINE_set_default_RAND(e)
- self.openssl_assert(res == 1)
- # Reset the RNG to use the engine
- res = self._lib.RAND_set_rand_method(self._ffi.NULL)
- self.openssl_assert(res == 1)
- def osrandom_engine_implementation(self):
- buf = self._ffi.new("char[]", 64)
- with self._get_osurandom_engine() as e:
- res = self._lib.ENGINE_ctrl_cmd(
- e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
- )
- self.openssl_assert(res > 0)
- return self._ffi.string(buf).decode("ascii")
- def openssl_version_text(self):
- """
- Friendly string name of the loaded OpenSSL library. This is not
- necessarily the same version as it was compiled against.
- Example: OpenSSL 1.1.1d 10 Sep 2019
- """
- return self._ffi.string(
- self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
- ).decode("ascii")
- def openssl_version_number(self):
- return self._lib.OpenSSL_version_num()
- def create_hmac_ctx(self, key, algorithm):
- return _HMACContext(self, key, algorithm)
- def _evp_md_from_algorithm(self, algorithm):
- if algorithm.name == "blake2b" or algorithm.name == "blake2s":
- alg = "{}{}".format(
- algorithm.name, algorithm.digest_size * 8
- ).encode("ascii")
- else:
- alg = algorithm.name.encode("ascii")
- evp_md = self._lib.EVP_get_digestbyname(alg)
- return evp_md
- def _evp_md_non_null_from_algorithm(self, algorithm):
- evp_md = self._evp_md_from_algorithm(algorithm)
- self.openssl_assert(evp_md != self._ffi.NULL)
- return evp_md
- def hash_supported(self, algorithm):
- if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
- return False
- evp_md = self._evp_md_from_algorithm(algorithm)
- return evp_md != self._ffi.NULL
- def hmac_supported(self, algorithm):
- return self.hash_supported(algorithm)
- def create_hash_ctx(self, algorithm):
- return _HashContext(self, algorithm)
- def cipher_supported(self, cipher, mode):
- if self._fips_enabled and not isinstance(cipher, self._fips_ciphers):
- return False
- try:
- adapter = self._cipher_registry[type(cipher), type(mode)]
- except KeyError:
- return False
- evp_cipher = adapter(self, cipher, mode)
- return self._ffi.NULL != evp_cipher
- def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
- if (cipher_cls, mode_cls) in self._cipher_registry:
- raise ValueError(
- "Duplicate registration for: {} {}.".format(
- cipher_cls, mode_cls
- )
- )
- self._cipher_registry[cipher_cls, mode_cls] = adapter
- def _register_default_ciphers(self):
- for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
- self.register_cipher_adapter(
- AES,
- mode_cls,
- GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
- )
- for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
- self.register_cipher_adapter(
- Camellia,
- mode_cls,
- GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
- )
- for mode_cls in [CBC, CFB, CFB8, OFB]:
- self.register_cipher_adapter(
- TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
- )
- self.register_cipher_adapter(
- TripleDES, ECB, GetCipherByName("des-ede3")
- )
- for mode_cls in [CBC, CFB, OFB, ECB]:
- self.register_cipher_adapter(
- Blowfish, mode_cls, GetCipherByName("bf-{mode.name}")
- )
- for mode_cls in [CBC, CFB, OFB, ECB]:
- self.register_cipher_adapter(
- SEED, mode_cls, GetCipherByName("seed-{mode.name}")
- )
- for cipher_cls, mode_cls in itertools.product(
- [CAST5, IDEA],
- [CBC, OFB, CFB, ECB],
- ):
- self.register_cipher_adapter(
- cipher_cls,
- mode_cls,
- GetCipherByName("{cipher.name}-{mode.name}"),
- )
- self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4"))
- # We don't actually support RC2, this is just used by some tests.
- self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2"))
- self.register_cipher_adapter(
- ChaCha20, type(None), GetCipherByName("chacha20")
- )
- self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
- def _register_x509_ext_parsers(self):
- ext_handlers = _EXTENSION_HANDLERS_BASE.copy()
- # All revoked extensions are valid single response extensions, see:
- # https://tools.ietf.org/html/rfc6960#section-4.4.5
- singleresp_handlers = _REVOKED_EXTENSION_HANDLERS.copy()
- if self._lib.Cryptography_HAS_SCT:
- ext_handlers.update(_EXTENSION_HANDLERS_SCT)
- singleresp_handlers.update(_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT)
- self._certificate_extension_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.X509_get_ext_count,
- get_ext=self._lib.X509_get_ext,
- handlers=ext_handlers,
- )
- self._csr_extension_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.sk_X509_EXTENSION_num,
- get_ext=self._lib.sk_X509_EXTENSION_value,
- handlers=ext_handlers,
- )
- self._revoked_cert_extension_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.X509_REVOKED_get_ext_count,
- get_ext=self._lib.X509_REVOKED_get_ext,
- handlers=_REVOKED_EXTENSION_HANDLERS,
- )
- self._crl_extension_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.X509_CRL_get_ext_count,
- get_ext=self._lib.X509_CRL_get_ext,
- handlers=_CRL_EXTENSION_HANDLERS,
- )
- self._ocsp_req_ext_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.OCSP_REQUEST_get_ext_count,
- get_ext=self._lib.OCSP_REQUEST_get_ext,
- handlers=_OCSP_REQ_EXTENSION_HANDLERS,
- )
- self._ocsp_basicresp_ext_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.OCSP_BASICRESP_get_ext_count,
- get_ext=self._lib.OCSP_BASICRESP_get_ext,
- handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS,
- )
- self._ocsp_singleresp_ext_parser = _X509ExtensionParser(
- self,
- ext_count=self._lib.OCSP_SINGLERESP_get_ext_count,
- get_ext=self._lib.OCSP_SINGLERESP_get_ext,
- handlers=singleresp_handlers,
- )
- def _register_x509_encoders(self):
- self._extension_encode_handlers = _EXTENSION_ENCODE_HANDLERS.copy()
- self._crl_extension_encode_handlers = (
- _CRL_EXTENSION_ENCODE_HANDLERS.copy()
- )
- self._crl_entry_extension_encode_handlers = (
- _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS.copy()
- )
- self._ocsp_request_extension_encode_handlers = (
- _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS.copy()
- )
- self._ocsp_basicresp_extension_encode_handlers = (
- _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS.copy()
- )
- def create_symmetric_encryption_ctx(self, cipher, mode):
- return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
- def create_symmetric_decryption_ctx(self, cipher, mode):
- return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
- def pbkdf2_hmac_supported(self, algorithm):
- return self.hmac_supported(algorithm)
- def derive_pbkdf2_hmac(
- self, algorithm, length, salt, iterations, key_material
- ):
- buf = self._ffi.new("unsigned char[]", length)
- evp_md = self._evp_md_non_null_from_algorithm(algorithm)
- key_material_ptr = self._ffi.from_buffer(key_material)
- res = self._lib.PKCS5_PBKDF2_HMAC(
- key_material_ptr,
- len(key_material),
- salt,
- len(salt),
- iterations,
- evp_md,
- length,
- buf,
- )
- self.openssl_assert(res == 1)
- return self._ffi.buffer(buf)[:]
- def _consume_errors(self):
- return binding._consume_errors(self._lib)
- def _consume_errors_with_text(self):
- return binding._consume_errors_with_text(self._lib)
- def _bn_to_int(self, bn):
- assert bn != self._ffi.NULL
- bn_num_bytes = self._lib.BN_num_bytes(bn)
- bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
- bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
- # A zero length means the BN has value 0
- self.openssl_assert(bin_len >= 0)
- val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
- if self._lib.BN_is_negative(bn):
- val = -val
- return val
- def _int_to_bn(self, num, bn=None):
- """
- Converts a python integer to a BIGNUM. The returned BIGNUM will not
- be garbage collected (to support adding them to structs that take
- ownership of the object). Be sure to register it for GC if it will
- be discarded after use.
- """
- assert bn is None or bn != self._ffi.NULL
- if bn is None:
- bn = self._ffi.NULL
- binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
- bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
- self.openssl_assert(bn_ptr != self._ffi.NULL)
- return bn_ptr
- def generate_rsa_private_key(self, public_exponent, key_size):
- rsa._verify_rsa_parameters(public_exponent, key_size)
- rsa_cdata = self._lib.RSA_new()
- self.openssl_assert(rsa_cdata != self._ffi.NULL)
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- bn = self._int_to_bn(public_exponent)
- bn = self._ffi.gc(bn, self._lib.BN_free)
- res = self._lib.RSA_generate_key_ex(
- rsa_cdata, key_size, bn, self._ffi.NULL
- )
- self.openssl_assert(res == 1)
- evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
- return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
- def generate_rsa_parameters_supported(self, public_exponent, key_size):
- return (
- public_exponent >= 3
- and public_exponent & 1 != 0
- and key_size >= 512
- )
- def load_rsa_private_numbers(self, numbers):
- rsa._check_private_key_components(
- numbers.p,
- numbers.q,
- numbers.d,
- numbers.dmp1,
- numbers.dmq1,
- numbers.iqmp,
- numbers.public_numbers.e,
- numbers.public_numbers.n,
- )
- rsa_cdata = self._lib.RSA_new()
- self.openssl_assert(rsa_cdata != self._ffi.NULL)
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- p = self._int_to_bn(numbers.p)
- q = self._int_to_bn(numbers.q)
- d = self._int_to_bn(numbers.d)
- dmp1 = self._int_to_bn(numbers.dmp1)
- dmq1 = self._int_to_bn(numbers.dmq1)
- iqmp = self._int_to_bn(numbers.iqmp)
- e = self._int_to_bn(numbers.public_numbers.e)
- n = self._int_to_bn(numbers.public_numbers.n)
- res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
- self.openssl_assert(res == 1)
- res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
- self.openssl_assert(res == 1)
- res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
- self.openssl_assert(res == 1)
- evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
- return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
- def load_rsa_public_numbers(self, numbers):
- rsa._check_public_key_components(numbers.e, numbers.n)
- rsa_cdata = self._lib.RSA_new()
- self.openssl_assert(rsa_cdata != self._ffi.NULL)
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- e = self._int_to_bn(numbers.e)
- n = self._int_to_bn(numbers.n)
- res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
- self.openssl_assert(res == 1)
- evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
- return _RSAPublicKey(self, rsa_cdata, evp_pkey)
- def _create_evp_pkey_gc(self):
- evp_pkey = self._lib.EVP_PKEY_new()
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return evp_pkey
- def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
- evp_pkey = self._create_evp_pkey_gc()
- res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
- self.openssl_assert(res == 1)
- return evp_pkey
- def _bytes_to_bio(self, data):
- """
- Return a _MemoryBIO namedtuple of (BIO, char*).
- The char* is the storage for the BIO and it must stay alive until the
- BIO is finished with.
- """
- data_ptr = self._ffi.from_buffer(data)
- bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
- self.openssl_assert(bio != self._ffi.NULL)
- return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
- def _create_mem_bio_gc(self):
- """
- Creates an empty memory BIO.
- """
- bio_method = self._lib.BIO_s_mem()
- self.openssl_assert(bio_method != self._ffi.NULL)
- bio = self._lib.BIO_new(bio_method)
- self.openssl_assert(bio != self._ffi.NULL)
- bio = self._ffi.gc(bio, self._lib.BIO_free)
- return bio
- def _read_mem_bio(self, bio):
- """
- Reads a memory BIO. This only works on memory BIOs.
- """
- buf = self._ffi.new("char **")
- buf_len = self._lib.BIO_get_mem_data(bio, buf)
- self.openssl_assert(buf_len > 0)
- self.openssl_assert(buf[0] != self._ffi.NULL)
- bio_data = self._ffi.buffer(buf[0], buf_len)[:]
- return bio_data
- def _evp_pkey_to_private_key(self, evp_pkey):
- """
- Return the appropriate type of PrivateKey given an evp_pkey cdata
- pointer.
- """
- key_type = self._lib.EVP_PKEY_id(evp_pkey)
- if key_type == self._lib.EVP_PKEY_RSA:
- rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
- self.openssl_assert(rsa_cdata != self._ffi.NULL)
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
- elif key_type == self._lib.EVP_PKEY_DSA:
- dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
- self.openssl_assert(dsa_cdata != self._ffi.NULL)
- dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
- return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
- elif key_type == self._lib.EVP_PKEY_EC:
- ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
- self.openssl_assert(ec_cdata != self._ffi.NULL)
- ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
- return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
- elif key_type in self._dh_types:
- dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- return _DHPrivateKey(self, dh_cdata, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
- # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
- return _Ed25519PrivateKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
- # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
- return _X448PrivateKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
- # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
- return _X25519PrivateKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
- # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1
- return _Ed448PrivateKey(self, evp_pkey)
- else:
- raise UnsupportedAlgorithm("Unsupported key type.")
- def _evp_pkey_to_public_key(self, evp_pkey):
- """
- Return the appropriate type of PublicKey given an evp_pkey cdata
- pointer.
- """
- key_type = self._lib.EVP_PKEY_id(evp_pkey)
- if key_type == self._lib.EVP_PKEY_RSA:
- rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
- self.openssl_assert(rsa_cdata != self._ffi.NULL)
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- return _RSAPublicKey(self, rsa_cdata, evp_pkey)
- elif key_type == self._lib.EVP_PKEY_DSA:
- dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
- self.openssl_assert(dsa_cdata != self._ffi.NULL)
- dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
- return _DSAPublicKey(self, dsa_cdata, evp_pkey)
- elif key_type == self._lib.EVP_PKEY_EC:
- ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
- self.openssl_assert(ec_cdata != self._ffi.NULL)
- ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
- return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
- elif key_type in self._dh_types:
- dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- return _DHPublicKey(self, dh_cdata, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
- # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
- return _Ed25519PublicKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
- # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
- return _X448PublicKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
- # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
- return _X25519PublicKey(self, evp_pkey)
- elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
- # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1
- return _Ed448PublicKey(self, evp_pkey)
- else:
- raise UnsupportedAlgorithm("Unsupported key type.")
- def _oaep_hash_supported(self, algorithm):
- if self._lib.Cryptography_HAS_RSA_OAEP_MD:
- return isinstance(
- algorithm,
- (
- hashes.SHA1,
- hashes.SHA224,
- hashes.SHA256,
- hashes.SHA384,
- hashes.SHA512,
- ),
- )
- else:
- return isinstance(algorithm, hashes.SHA1)
- def rsa_padding_supported(self, padding):
- if isinstance(padding, PKCS1v15):
- return True
- elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
- return self.hash_supported(padding._mgf._algorithm)
- elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
- return (
- self._oaep_hash_supported(padding._mgf._algorithm)
- and self._oaep_hash_supported(padding._algorithm)
- and (
- (padding._label is None or len(padding._label) == 0)
- or self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1
- )
- )
- else:
- return False
- def generate_dsa_parameters(self, key_size):
- if key_size not in (1024, 2048, 3072, 4096):
- raise ValueError(
- "Key size must be 1024, 2048, 3072, or 4096 bits."
- )
- ctx = self._lib.DSA_new()
- self.openssl_assert(ctx != self._ffi.NULL)
- ctx = self._ffi.gc(ctx, self._lib.DSA_free)
- res = self._lib.DSA_generate_parameters_ex(
- ctx,
- key_size,
- self._ffi.NULL,
- 0,
- self._ffi.NULL,
- self._ffi.NULL,
- self._ffi.NULL,
- )
- self.openssl_assert(res == 1)
- return _DSAParameters(self, ctx)
- def generate_dsa_private_key(self, parameters):
- ctx = self._lib.DSAparams_dup(parameters._dsa_cdata)
- self.openssl_assert(ctx != self._ffi.NULL)
- ctx = self._ffi.gc(ctx, self._lib.DSA_free)
- self._lib.DSA_generate_key(ctx)
- evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
- return _DSAPrivateKey(self, ctx, evp_pkey)
- def generate_dsa_private_key_and_parameters(self, key_size):
- parameters = self.generate_dsa_parameters(key_size)
- return self.generate_dsa_private_key(parameters)
- def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
- res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
- self.openssl_assert(res == 1)
- res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
- self.openssl_assert(res == 1)
- def load_dsa_private_numbers(self, numbers):
- dsa._check_dsa_private_numbers(numbers)
- parameter_numbers = numbers.public_numbers.parameter_numbers
- dsa_cdata = self._lib.DSA_new()
- self.openssl_assert(dsa_cdata != self._ffi.NULL)
- dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
- p = self._int_to_bn(parameter_numbers.p)
- q = self._int_to_bn(parameter_numbers.q)
- g = self._int_to_bn(parameter_numbers.g)
- pub_key = self._int_to_bn(numbers.public_numbers.y)
- priv_key = self._int_to_bn(numbers.x)
- self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
- evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
- return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
- def load_dsa_public_numbers(self, numbers):
- dsa._check_dsa_parameters(numbers.parameter_numbers)
- dsa_cdata = self._lib.DSA_new()
- self.openssl_assert(dsa_cdata != self._ffi.NULL)
- dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
- p = self._int_to_bn(numbers.parameter_numbers.p)
- q = self._int_to_bn(numbers.parameter_numbers.q)
- g = self._int_to_bn(numbers.parameter_numbers.g)
- pub_key = self._int_to_bn(numbers.y)
- priv_key = self._ffi.NULL
- self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
- evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
- return _DSAPublicKey(self, dsa_cdata, evp_pkey)
- def load_dsa_parameter_numbers(self, numbers):
- dsa._check_dsa_parameters(numbers)
- dsa_cdata = self._lib.DSA_new()
- self.openssl_assert(dsa_cdata != self._ffi.NULL)
- dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
- p = self._int_to_bn(numbers.p)
- q = self._int_to_bn(numbers.q)
- g = self._int_to_bn(numbers.g)
- res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
- self.openssl_assert(res == 1)
- return _DSAParameters(self, dsa_cdata)
- def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
- evp_pkey = self._create_evp_pkey_gc()
- res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
- self.openssl_assert(res == 1)
- return evp_pkey
- def dsa_hash_supported(self, algorithm):
- return self.hash_supported(algorithm)
- def dsa_parameters_supported(self, p, q, g):
- return True
- def cmac_algorithm_supported(self, algorithm):
- return self.cipher_supported(
- algorithm, CBC(b"\x00" * algorithm.block_size)
- )
- def create_cmac_ctx(self, algorithm):
- return _CMACContext(self, algorithm)
- def _x509_check_signature_params(self, private_key, algorithm):
- if isinstance(
- private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
- ):
- if algorithm is not None:
- raise ValueError(
- "algorithm must be None when signing via ed25519 or ed448"
- )
- elif not isinstance(
- private_key,
- (rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey),
- ):
- raise TypeError(
- "Key must be an rsa, dsa, ec, ed25519, or ed448 private key."
- )
- elif not isinstance(algorithm, hashes.HashAlgorithm):
- raise TypeError("Algorithm must be a registered hash algorithm.")
- elif isinstance(algorithm, hashes.MD5) and not isinstance(
- private_key, rsa.RSAPrivateKey
- ):
- raise ValueError(
- "MD5 hash algorithm is only supported with RSA keys"
- )
- def create_x509_csr(self, builder, private_key, algorithm):
- if not isinstance(builder, x509.CertificateSigningRequestBuilder):
- raise TypeError("Builder type mismatch.")
- self._x509_check_signature_params(private_key, algorithm)
- # Resolve the signature algorithm.
- evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
- # Create an empty request.
- x509_req = self._lib.X509_REQ_new()
- self.openssl_assert(x509_req != self._ffi.NULL)
- x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
- # Set x509 version.
- res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value)
- self.openssl_assert(res == 1)
- # Set subject name.
- res = self._lib.X509_REQ_set_subject_name(
- x509_req, _encode_name_gc(self, builder._subject_name)
- )
- self.openssl_assert(res == 1)
- # Set subject public key.
- public_key = private_key.public_key()
- res = self._lib.X509_REQ_set_pubkey(x509_req, public_key._evp_pkey)
- self.openssl_assert(res == 1)
- # Add extensions.
- sk_extension = self._lib.sk_X509_EXTENSION_new_null()
- self.openssl_assert(sk_extension != self._ffi.NULL)
- sk_extension = self._ffi.gc(
- sk_extension,
- lambda x: self._lib.sk_X509_EXTENSION_pop_free(
- x,
- self._ffi.addressof(
- self._lib._original_lib, "X509_EXTENSION_free"
- ),
- ),
- )
- # Don't GC individual extensions because the memory is owned by
- # sk_extensions and will be freed along with it.
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._extension_encode_handlers,
- x509_obj=sk_extension,
- add_func=self._lib.sk_X509_EXTENSION_insert,
- gc=False,
- )
- res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension)
- self.openssl_assert(res == 1)
- # Add attributes (all bytes encoded as ASN1 UTF8_STRING)
- for attr_oid, attr_val in builder._attributes:
- obj = _txt2obj_gc(self, attr_oid.dotted_string)
- res = self._lib.X509_REQ_add1_attr_by_OBJ(
- x509_req,
- obj,
- x509.name._ASN1Type.UTF8String.value,
- attr_val,
- len(attr_val),
- )
- self.openssl_assert(res == 1)
- # Sign the request using the requester's private key.
- res = self._lib.X509_REQ_sign(x509_req, private_key._evp_pkey, evp_md)
- if res == 0:
- errors = self._consume_errors_with_text()
- raise ValueError("Signing failed", errors)
- return _CertificateSigningRequest(self, x509_req)
- def create_x509_certificate(self, builder, private_key, algorithm):
- if not isinstance(builder, x509.CertificateBuilder):
- raise TypeError("Builder type mismatch.")
- self._x509_check_signature_params(private_key, algorithm)
- # Resolve the signature algorithm.
- evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
- # Create an empty certificate.
- x509_cert = self._lib.X509_new()
- x509_cert = self._ffi.gc(x509_cert, self._lib.X509_free)
- # Set the x509 version.
- res = self._lib.X509_set_version(x509_cert, builder._version.value)
- self.openssl_assert(res == 1)
- # Set the subject's name.
- res = self._lib.X509_set_subject_name(
- x509_cert, _encode_name_gc(self, builder._subject_name)
- )
- self.openssl_assert(res == 1)
- # Set the subject's public key.
- res = self._lib.X509_set_pubkey(
- x509_cert, builder._public_key._evp_pkey
- )
- self.openssl_assert(res == 1)
- # Set the certificate serial number.
- serial_number = _encode_asn1_int_gc(self, builder._serial_number)
- res = self._lib.X509_set_serialNumber(x509_cert, serial_number)
- self.openssl_assert(res == 1)
- # Set the "not before" time.
- self._set_asn1_time(
- self._lib.X509_getm_notBefore(x509_cert), builder._not_valid_before
- )
- # Set the "not after" time.
- self._set_asn1_time(
- self._lib.X509_getm_notAfter(x509_cert), builder._not_valid_after
- )
- # Add extensions.
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._extension_encode_handlers,
- x509_obj=x509_cert,
- add_func=self._lib.X509_add_ext,
- gc=True,
- )
- # Set the issuer name.
- res = self._lib.X509_set_issuer_name(
- x509_cert, _encode_name_gc(self, builder._issuer_name)
- )
- self.openssl_assert(res == 1)
- # Sign the certificate with the issuer's private key.
- res = self._lib.X509_sign(x509_cert, private_key._evp_pkey, evp_md)
- if res == 0:
- errors = self._consume_errors_with_text()
- raise ValueError("Signing failed", errors)
- return _Certificate(self, x509_cert)
- def _evp_md_x509_null_if_eddsa(self, private_key, algorithm):
- if isinstance(
- private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
- ):
- # OpenSSL requires us to pass NULL for EVP_MD for ed25519/ed448
- return self._ffi.NULL
- else:
- return self._evp_md_non_null_from_algorithm(algorithm)
- def _set_asn1_time(self, asn1_time, time):
- if time.year >= 2050:
- asn1_str = time.strftime("%Y%m%d%H%M%SZ").encode("ascii")
- else:
- asn1_str = time.strftime("%y%m%d%H%M%SZ").encode("ascii")
- res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str)
- self.openssl_assert(res == 1)
- def _create_asn1_time(self, time):
- asn1_time = self._lib.ASN1_TIME_new()
- self.openssl_assert(asn1_time != self._ffi.NULL)
- asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free)
- self._set_asn1_time(asn1_time, time)
- return asn1_time
- def create_x509_crl(self, builder, private_key, algorithm):
- if not isinstance(builder, x509.CertificateRevocationListBuilder):
- raise TypeError("Builder type mismatch.")
- self._x509_check_signature_params(private_key, algorithm)
- evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
- # Create an empty CRL.
- x509_crl = self._lib.X509_CRL_new()
- x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
- # Set the x509 CRL version. We only support v2 (integer value 1).
- res = self._lib.X509_CRL_set_version(x509_crl, 1)
- self.openssl_assert(res == 1)
- # Set the issuer name.
- res = self._lib.X509_CRL_set_issuer_name(
- x509_crl, _encode_name_gc(self, builder._issuer_name)
- )
- self.openssl_assert(res == 1)
- # Set the last update time.
- last_update = self._create_asn1_time(builder._last_update)
- res = self._lib.X509_CRL_set1_lastUpdate(x509_crl, last_update)
- self.openssl_assert(res == 1)
- # Set the next update time.
- next_update = self._create_asn1_time(builder._next_update)
- res = self._lib.X509_CRL_set1_nextUpdate(x509_crl, next_update)
- self.openssl_assert(res == 1)
- # Add extensions.
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._crl_extension_encode_handlers,
- x509_obj=x509_crl,
- add_func=self._lib.X509_CRL_add_ext,
- gc=True,
- )
- # add revoked certificates
- for revoked_cert in builder._revoked_certificates:
- # Duplicating because the X509_CRL takes ownership and will free
- # this memory when X509_CRL_free is called.
- revoked = self._lib.X509_REVOKED_dup(revoked_cert._x509_revoked)
- self.openssl_assert(revoked != self._ffi.NULL)
- res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked)
- self.openssl_assert(res == 1)
- res = self._lib.X509_CRL_sign(x509_crl, private_key._evp_pkey, evp_md)
- if res == 0:
- errors = self._consume_errors_with_text()
- raise ValueError("Signing failed", errors)
- return _CertificateRevocationList(self, x509_crl)
- def _create_x509_extensions(
- self, extensions, handlers, x509_obj, add_func, gc
- ):
- for i, extension in enumerate(extensions):
- x509_extension = self._create_x509_extension(handlers, extension)
- self.openssl_assert(x509_extension != self._ffi.NULL)
- if gc:
- x509_extension = self._ffi.gc(
- x509_extension, self._lib.X509_EXTENSION_free
- )
- res = add_func(x509_obj, x509_extension, i)
- self.openssl_assert(res >= 1)
- def _create_raw_x509_extension(self, extension, value):
- obj = _txt2obj_gc(self, extension.oid.dotted_string)
- return self._lib.X509_EXTENSION_create_by_OBJ(
- self._ffi.NULL, obj, 1 if extension.critical else 0, value
- )
- def _create_x509_extension(self, handlers, extension):
- if isinstance(extension.value, x509.UnrecognizedExtension):
- value = _encode_asn1_str_gc(self, extension.value.value)
- return self._create_raw_x509_extension(extension, value)
- elif isinstance(extension.value, x509.TLSFeature):
- asn1 = encode_der(
- SEQUENCE,
- *[
- encode_der(INTEGER, encode_der_integer(x.value))
- for x in extension.value
- ],
- )
- value = _encode_asn1_str_gc(self, asn1)
- return self._create_raw_x509_extension(extension, value)
- elif isinstance(extension.value, x509.PrecertPoison):
- value = _encode_asn1_str_gc(self, encode_der(NULL))
- return self._create_raw_x509_extension(extension, value)
- else:
- try:
- encode = handlers[extension.oid]
- except KeyError:
- raise NotImplementedError(
- "Extension not supported: {}".format(extension.oid)
- )
- ext_struct = encode(self, extension.value)
- nid = self._lib.OBJ_txt2nid(
- extension.oid.dotted_string.encode("ascii")
- )
- self.openssl_assert(nid != self._lib.NID_undef)
- return self._lib.X509V3_EXT_i2d(
- nid, 1 if extension.critical else 0, ext_struct
- )
- def create_x509_revoked_certificate(self, builder):
- if not isinstance(builder, x509.RevokedCertificateBuilder):
- raise TypeError("Builder type mismatch.")
- x509_revoked = self._lib.X509_REVOKED_new()
- self.openssl_assert(x509_revoked != self._ffi.NULL)
- x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free)
- serial_number = _encode_asn1_int_gc(self, builder._serial_number)
- res = self._lib.X509_REVOKED_set_serialNumber(
- x509_revoked, serial_number
- )
- self.openssl_assert(res == 1)
- rev_date = self._create_asn1_time(builder._revocation_date)
- res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date)
- self.openssl_assert(res == 1)
- # add CRL entry extensions
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._crl_entry_extension_encode_handlers,
- x509_obj=x509_revoked,
- add_func=self._lib.X509_REVOKED_add_ext,
- gc=True,
- )
- return _RevokedCertificate(self, None, x509_revoked)
- def load_pem_private_key(self, data, password):
- return self._load_key(
- self._lib.PEM_read_bio_PrivateKey,
- self._evp_pkey_to_private_key,
- data,
- password,
- )
- def load_pem_public_key(self, data):
- mem_bio = self._bytes_to_bio(data)
- evp_pkey = self._lib.PEM_read_bio_PUBKEY(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if evp_pkey != self._ffi.NULL:
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return self._evp_pkey_to_public_key(evp_pkey)
- else:
- # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
- # need to check to see if it is a pure PKCS1 RSA public key (not
- # embedded in a subjectPublicKeyInfo)
- self._consume_errors()
- res = self._lib.BIO_reset(mem_bio.bio)
- self.openssl_assert(res == 1)
- rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if rsa_cdata != self._ffi.NULL:
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
- return _RSAPublicKey(self, rsa_cdata, evp_pkey)
- else:
- self._handle_key_loading_error()
- def load_pem_parameters(self, data):
- mem_bio = self._bytes_to_bio(data)
- # only DH is supported currently
- dh_cdata = self._lib.PEM_read_bio_DHparams(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if dh_cdata != self._ffi.NULL:
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- return _DHParameters(self, dh_cdata)
- else:
- self._handle_key_loading_error()
- def load_der_private_key(self, data, password):
- # OpenSSL has a function called d2i_AutoPrivateKey that in theory
- # handles this automatically, however it doesn't handle encrypted
- # private keys. Instead we try to load the key two different ways.
- # First we'll try to load it as a traditional key.
- bio_data = self._bytes_to_bio(data)
- key = self._evp_pkey_from_der_traditional_key(bio_data, password)
- if key:
- return self._evp_pkey_to_private_key(key)
- else:
- # Finally we try to load it with the method that handles encrypted
- # PKCS8 properly.
- return self._load_key(
- self._lib.d2i_PKCS8PrivateKey_bio,
- self._evp_pkey_to_private_key,
- data,
- password,
- )
- def _evp_pkey_from_der_traditional_key(self, bio_data, password):
- key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
- if key != self._ffi.NULL:
- key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
- if password is not None:
- raise TypeError(
- "Password was given but private key is not encrypted."
- )
- return key
- else:
- self._consume_errors()
- return None
- def load_der_public_key(self, data):
- mem_bio = self._bytes_to_bio(data)
- evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
- if evp_pkey != self._ffi.NULL:
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return self._evp_pkey_to_public_key(evp_pkey)
- else:
- # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
- # need to check to see if it is a pure PKCS1 RSA public key (not
- # embedded in a subjectPublicKeyInfo)
- self._consume_errors()
- res = self._lib.BIO_reset(mem_bio.bio)
- self.openssl_assert(res == 1)
- rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
- mem_bio.bio, self._ffi.NULL
- )
- if rsa_cdata != self._ffi.NULL:
- rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
- evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
- return _RSAPublicKey(self, rsa_cdata, evp_pkey)
- else:
- self._handle_key_loading_error()
- def load_der_parameters(self, data):
- mem_bio = self._bytes_to_bio(data)
- dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
- if dh_cdata != self._ffi.NULL:
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- return _DHParameters(self, dh_cdata)
- elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
- # We check to see if the is dhx.
- self._consume_errors()
- res = self._lib.BIO_reset(mem_bio.bio)
- self.openssl_assert(res == 1)
- dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
- mem_bio.bio, self._ffi.NULL
- )
- if dh_cdata != self._ffi.NULL:
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- return _DHParameters(self, dh_cdata)
- self._handle_key_loading_error()
- def load_pem_x509_certificate(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509 = self._lib.PEM_read_bio_X509(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if x509 == self._ffi.NULL:
- self._consume_errors()
- raise ValueError(
- "Unable to load certificate. See https://cryptography.io/en/"
- "latest/faq.html#why-can-t-i-import-my-pem-file for more"
- " details."
- )
- x509 = self._ffi.gc(x509, self._lib.X509_free)
- return _Certificate(self, x509)
- def load_der_x509_certificate(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
- if x509 == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to load certificate")
- x509 = self._ffi.gc(x509, self._lib.X509_free)
- return _Certificate(self, x509)
- def load_pem_x509_crl(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509_crl = self._lib.PEM_read_bio_X509_CRL(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if x509_crl == self._ffi.NULL:
- self._consume_errors()
- raise ValueError(
- "Unable to load CRL. See https://cryptography.io/en/la"
- "test/faq.html#why-can-t-i-import-my-pem-file for more"
- " details."
- )
- x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
- return _CertificateRevocationList(self, x509_crl)
- def load_der_x509_crl(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
- if x509_crl == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to load CRL")
- x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
- return _CertificateRevocationList(self, x509_crl)
- def load_pem_x509_csr(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509_req = self._lib.PEM_read_bio_X509_REQ(
- mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if x509_req == self._ffi.NULL:
- self._consume_errors()
- raise ValueError(
- "Unable to load request. See https://cryptography.io/en/"
- "latest/faq.html#why-can-t-i-import-my-pem-file for more"
- " details."
- )
- x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
- return _CertificateSigningRequest(self, x509_req)
- def load_der_x509_csr(self, data):
- mem_bio = self._bytes_to_bio(data)
- x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
- if x509_req == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to load request")
- x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
- return _CertificateSigningRequest(self, x509_req)
- def _load_key(self, openssl_read_func, convert_func, data, password):
- mem_bio = self._bytes_to_bio(data)
- userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
- if password is not None:
- utils._check_byteslike("password", password)
- password_ptr = self._ffi.from_buffer(password)
- userdata.password = password_ptr
- userdata.length = len(password)
- evp_pkey = openssl_read_func(
- mem_bio.bio,
- self._ffi.NULL,
- self._ffi.addressof(
- self._lib._original_lib, "Cryptography_pem_password_cb"
- ),
- userdata,
- )
- if evp_pkey == self._ffi.NULL:
- if userdata.error != 0:
- self._consume_errors()
- if userdata.error == -1:
- raise TypeError(
- "Password was not given but private key is encrypted"
- )
- else:
- assert userdata.error == -2
- raise ValueError(
- "Passwords longer than {} bytes are not supported "
- "by this backend.".format(userdata.maxsize - 1)
- )
- else:
- self._handle_key_loading_error()
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- if password is not None and userdata.called == 0:
- raise TypeError(
- "Password was given but private key is not encrypted."
- )
- assert (
- password is not None and userdata.called == 1
- ) or password is None
- return convert_func(evp_pkey)
- def _handle_key_loading_error(self):
- errors = self._consume_errors()
- if not errors:
- raise ValueError(
- "Could not deserialize key data. The data may be in an "
- "incorrect format or it may be encrypted with an unsupported "
- "algorithm."
- )
- elif errors[0]._lib_reason_match(
- self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
- ) or errors[0]._lib_reason_match(
- self._lib.ERR_LIB_PKCS12,
- self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
- ):
- raise ValueError("Bad decrypt. Incorrect password?")
- elif any(
- error._lib_reason_match(
- self._lib.ERR_LIB_EVP,
- self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
- )
- for error in errors
- ):
- raise ValueError("Unsupported public key algorithm.")
- else:
- raise ValueError(
- "Could not deserialize key data. The data may be in an "
- "incorrect format or it may be encrypted with an unsupported "
- "algorithm."
- )
- def elliptic_curve_supported(self, curve):
- try:
- curve_nid = self._elliptic_curve_to_nid(curve)
- except UnsupportedAlgorithm:
- curve_nid = self._lib.NID_undef
- group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
- if group == self._ffi.NULL:
- self._consume_errors()
- return False
- else:
- self.openssl_assert(curve_nid != self._lib.NID_undef)
- self._lib.EC_GROUP_free(group)
- return True
- def elliptic_curve_signature_algorithm_supported(
- self, signature_algorithm, curve
- ):
- # We only support ECDSA right now.
- if not isinstance(signature_algorithm, ec.ECDSA):
- return False
- return self.elliptic_curve_supported(curve)
- def generate_elliptic_curve_private_key(self, curve):
- """
- Generate a new private key on the named curve.
- """
- if self.elliptic_curve_supported(curve):
- ec_cdata = self._ec_key_new_by_curve(curve)
- res = self._lib.EC_KEY_generate_key(ec_cdata)
- self.openssl_assert(res == 1)
- evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
- return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
- else:
- raise UnsupportedAlgorithm(
- "Backend object does not support {}.".format(curve.name),
- _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
- )
- def load_elliptic_curve_private_numbers(self, numbers):
- public = numbers.public_numbers
- ec_cdata = self._ec_key_new_by_curve(public.curve)
- private_value = self._ffi.gc(
- self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
- )
- res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
- self.openssl_assert(res == 1)
- ec_cdata = self._ec_key_set_public_key_affine_coordinates(
- ec_cdata, public.x, public.y
- )
- evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
- return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
- def load_elliptic_curve_public_numbers(self, numbers):
- ec_cdata = self._ec_key_new_by_curve(numbers.curve)
- ec_cdata = self._ec_key_set_public_key_affine_coordinates(
- ec_cdata, numbers.x, numbers.y
- )
- evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
- return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
- def load_elliptic_curve_public_bytes(self, curve, point_bytes):
- ec_cdata = self._ec_key_new_by_curve(curve)
- group = self._lib.EC_KEY_get0_group(ec_cdata)
- self.openssl_assert(group != self._ffi.NULL)
- point = self._lib.EC_POINT_new(group)
- self.openssl_assert(point != self._ffi.NULL)
- point = self._ffi.gc(point, self._lib.EC_POINT_free)
- with self._tmp_bn_ctx() as bn_ctx:
- res = self._lib.EC_POINT_oct2point(
- group, point, point_bytes, len(point_bytes), bn_ctx
- )
- if res != 1:
- self._consume_errors()
- raise ValueError("Invalid public bytes for the given curve")
- res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
- self.openssl_assert(res == 1)
- evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
- return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
- def derive_elliptic_curve_private_key(self, private_value, curve):
- ec_cdata = self._ec_key_new_by_curve(curve)
- get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
- point = self._lib.EC_POINT_new(group)
- self.openssl_assert(point != self._ffi.NULL)
- point = self._ffi.gc(point, self._lib.EC_POINT_free)
- value = self._int_to_bn(private_value)
- value = self._ffi.gc(value, self._lib.BN_clear_free)
- with self._tmp_bn_ctx() as bn_ctx:
- res = self._lib.EC_POINT_mul(
- group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
- )
- self.openssl_assert(res == 1)
- bn_x = self._lib.BN_CTX_get(bn_ctx)
- bn_y = self._lib.BN_CTX_get(bn_ctx)
- res = get_func(group, point, bn_x, bn_y, bn_ctx)
- self.openssl_assert(res == 1)
- res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
- self.openssl_assert(res == 1)
- private = self._int_to_bn(private_value)
- private = self._ffi.gc(private, self._lib.BN_clear_free)
- res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
- self.openssl_assert(res == 1)
- evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
- return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
- def _ec_key_new_by_curve(self, curve):
- curve_nid = self._elliptic_curve_to_nid(curve)
- return self._ec_key_new_by_curve_nid(curve_nid)
- def _ec_key_new_by_curve_nid(self, curve_nid):
- ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
- self.openssl_assert(ec_cdata != self._ffi.NULL)
- return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
- def load_der_ocsp_request(self, data):
- mem_bio = self._bytes_to_bio(data)
- request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL)
- if request == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to load OCSP request")
- request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free)
- return _OCSPRequest(self, request)
- def load_der_ocsp_response(self, data):
- mem_bio = self._bytes_to_bio(data)
- response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL)
- if response == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to load OCSP response")
- response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free)
- return _OCSPResponse(self, response)
- def create_ocsp_request(self, builder):
- ocsp_req = self._lib.OCSP_REQUEST_new()
- self.openssl_assert(ocsp_req != self._ffi.NULL)
- ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free)
- cert, issuer, algorithm = builder._request
- evp_md = self._evp_md_non_null_from_algorithm(algorithm)
- certid = self._lib.OCSP_cert_to_id(evp_md, cert._x509, issuer._x509)
- self.openssl_assert(certid != self._ffi.NULL)
- onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid)
- self.openssl_assert(onereq != self._ffi.NULL)
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._ocsp_request_extension_encode_handlers,
- x509_obj=ocsp_req,
- add_func=self._lib.OCSP_REQUEST_add_ext,
- gc=True,
- )
- return _OCSPRequest(self, ocsp_req)
- def _create_ocsp_basic_response(self, builder, private_key, algorithm):
- self._x509_check_signature_params(private_key, algorithm)
- basic = self._lib.OCSP_BASICRESP_new()
- self.openssl_assert(basic != self._ffi.NULL)
- basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free)
- evp_md = self._evp_md_non_null_from_algorithm(
- builder._response._algorithm
- )
- certid = self._lib.OCSP_cert_to_id(
- evp_md,
- builder._response._cert._x509,
- builder._response._issuer._x509,
- )
- self.openssl_assert(certid != self._ffi.NULL)
- certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free)
- if builder._response._revocation_reason is None:
- reason = -1
- else:
- reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[
- builder._response._revocation_reason
- ]
- if builder._response._revocation_time is None:
- rev_time = self._ffi.NULL
- else:
- rev_time = self._create_asn1_time(
- builder._response._revocation_time
- )
- next_update = self._ffi.NULL
- if builder._response._next_update is not None:
- next_update = self._create_asn1_time(
- builder._response._next_update
- )
- this_update = self._create_asn1_time(builder._response._this_update)
- res = self._lib.OCSP_basic_add1_status(
- basic,
- certid,
- builder._response._cert_status.value,
- reason,
- rev_time,
- this_update,
- next_update,
- )
- self.openssl_assert(res != self._ffi.NULL)
- # okay, now sign the basic structure
- evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
- responder_cert, responder_encoding = builder._responder_id
- flags = self._lib.OCSP_NOCERTS
- if responder_encoding is ocsp.OCSPResponderEncoding.HASH:
- flags |= self._lib.OCSP_RESPID_KEY
- if builder._certs is not None:
- for cert in builder._certs:
- res = self._lib.OCSP_basic_add1_cert(basic, cert._x509)
- self.openssl_assert(res == 1)
- self._create_x509_extensions(
- extensions=builder._extensions,
- handlers=self._ocsp_basicresp_extension_encode_handlers,
- x509_obj=basic,
- add_func=self._lib.OCSP_BASICRESP_add_ext,
- gc=True,
- )
- res = self._lib.OCSP_basic_sign(
- basic,
- responder_cert._x509,
- private_key._evp_pkey,
- evp_md,
- self._ffi.NULL,
- flags,
- )
- if res != 1:
- errors = self._consume_errors_with_text()
- raise ValueError(
- "Error while signing. responder_cert must be signed "
- "by private_key",
- errors,
- )
- return basic
- def create_ocsp_response(
- self, response_status, builder, private_key, algorithm
- ):
- if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL:
- basic = self._create_ocsp_basic_response(
- builder, private_key, algorithm
- )
- else:
- basic = self._ffi.NULL
- ocsp_resp = self._lib.OCSP_response_create(
- response_status.value, basic
- )
- self.openssl_assert(ocsp_resp != self._ffi.NULL)
- ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free)
- return _OCSPResponse(self, ocsp_resp)
- def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve):
- return self.elliptic_curve_supported(curve) and isinstance(
- algorithm, ec.ECDH
- )
- def _ec_cdata_to_evp_pkey(self, ec_cdata):
- evp_pkey = self._create_evp_pkey_gc()
- res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
- self.openssl_assert(res == 1)
- return evp_pkey
- def _elliptic_curve_to_nid(self, curve):
- """
- Get the NID for a curve name.
- """
- curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
- curve_name = curve_aliases.get(curve.name, curve.name)
- curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
- if curve_nid == self._lib.NID_undef:
- raise UnsupportedAlgorithm(
- "{} is not a supported elliptic curve".format(curve.name),
- _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
- )
- return curve_nid
- @contextmanager
- def _tmp_bn_ctx(self):
- bn_ctx = self._lib.BN_CTX_new()
- self.openssl_assert(bn_ctx != self._ffi.NULL)
- bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
- self._lib.BN_CTX_start(bn_ctx)
- try:
- yield bn_ctx
- finally:
- self._lib.BN_CTX_end(bn_ctx)
- def _ec_key_determine_group_get_func(self, ctx):
- """
- Given an EC_KEY determine the group and what function is required to
- get point coordinates.
- """
- self.openssl_assert(ctx != self._ffi.NULL)
- nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
- self.openssl_assert(nid_two_field != self._lib.NID_undef)
- group = self._lib.EC_KEY_get0_group(ctx)
- self.openssl_assert(group != self._ffi.NULL)
- method = self._lib.EC_GROUP_method_of(group)
- self.openssl_assert(method != self._ffi.NULL)
- nid = self._lib.EC_METHOD_get_field_type(method)
- self.openssl_assert(nid != self._lib.NID_undef)
- if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
- get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
- else:
- get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
- assert get_func
- return get_func, group
- def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y):
- """
- Sets the public key point in the EC_KEY context to the affine x and y
- values.
- """
- if x < 0 or y < 0:
- raise ValueError(
- "Invalid EC key. Both x and y must be non-negative."
- )
- x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
- y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
- res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
- if res != 1:
- self._consume_errors()
- raise ValueError("Invalid EC key.")
- return ctx
- def _private_key_bytes(
- self, encoding, format, encryption_algorithm, key, evp_pkey, cdata
- ):
- # validate argument types
- if not isinstance(encoding, serialization.Encoding):
- raise TypeError("encoding must be an item from the Encoding enum")
- if not isinstance(format, serialization.PrivateFormat):
- raise TypeError(
- "format must be an item from the PrivateFormat enum"
- )
- if not isinstance(
- encryption_algorithm, serialization.KeySerializationEncryption
- ):
- raise TypeError(
- "Encryption algorithm must be a KeySerializationEncryption "
- "instance"
- )
- # validate password
- if isinstance(encryption_algorithm, serialization.NoEncryption):
- password = b""
- elif isinstance(
- encryption_algorithm, serialization.BestAvailableEncryption
- ):
- password = encryption_algorithm.password
- if len(password) > 1023:
- raise ValueError(
- "Passwords longer than 1023 bytes are not supported by "
- "this backend"
- )
- else:
- raise ValueError("Unsupported encryption type")
- # PKCS8 + PEM/DER
- if format is serialization.PrivateFormat.PKCS8:
- if encoding is serialization.Encoding.PEM:
- write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
- elif encoding is serialization.Encoding.DER:
- write_bio = self._lib.i2d_PKCS8PrivateKey_bio
- else:
- raise ValueError("Unsupported encoding for PKCS8")
- return self._private_key_bytes_via_bio(
- write_bio, evp_pkey, password
- )
- # TraditionalOpenSSL + PEM/DER
- if format is serialization.PrivateFormat.TraditionalOpenSSL:
- if self._fips_enabled and not isinstance(
- encryption_algorithm, serialization.NoEncryption
- ):
- raise ValueError(
- "Encrypted traditional OpenSSL format is not "
- "supported in FIPS mode."
- )
- key_type = self._lib.EVP_PKEY_id(evp_pkey)
- if encoding is serialization.Encoding.PEM:
- if key_type == self._lib.EVP_PKEY_RSA:
- write_bio = self._lib.PEM_write_bio_RSAPrivateKey
- elif key_type == self._lib.EVP_PKEY_DSA:
- write_bio = self._lib.PEM_write_bio_DSAPrivateKey
- elif key_type == self._lib.EVP_PKEY_EC:
- write_bio = self._lib.PEM_write_bio_ECPrivateKey
- else:
- raise ValueError(
- "Unsupported key type for TraditionalOpenSSL"
- )
- return self._private_key_bytes_via_bio(
- write_bio, cdata, password
- )
- if encoding is serialization.Encoding.DER:
- if password:
- raise ValueError(
- "Encryption is not supported for DER encoded "
- "traditional OpenSSL keys"
- )
- if key_type == self._lib.EVP_PKEY_RSA:
- write_bio = self._lib.i2d_RSAPrivateKey_bio
- elif key_type == self._lib.EVP_PKEY_EC:
- write_bio = self._lib.i2d_ECPrivateKey_bio
- elif key_type == self._lib.EVP_PKEY_DSA:
- write_bio = self._lib.i2d_DSAPrivateKey_bio
- else:
- raise ValueError(
- "Unsupported key type for TraditionalOpenSSL"
- )
- return self._bio_func_output(write_bio, cdata)
- raise ValueError("Unsupported encoding for TraditionalOpenSSL")
- # OpenSSH + PEM
- if format is serialization.PrivateFormat.OpenSSH:
- if encoding is serialization.Encoding.PEM:
- return ssh.serialize_ssh_private_key(key, password)
- raise ValueError(
- "OpenSSH private key format can only be used"
- " with PEM encoding"
- )
- # Anything that key-specific code was supposed to handle earlier,
- # like Raw.
- raise ValueError("format is invalid with this key")
- def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password):
- if not password:
- evp_cipher = self._ffi.NULL
- else:
- # This is a curated value that we will update over time.
- evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
- return self._bio_func_output(
- write_bio,
- evp_pkey,
- evp_cipher,
- password,
- len(password),
- self._ffi.NULL,
- self._ffi.NULL,
- )
- def _bio_func_output(self, write_bio, *args):
- bio = self._create_mem_bio_gc()
- res = write_bio(bio, *args)
- self.openssl_assert(res == 1)
- return self._read_mem_bio(bio)
- def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata):
- if not isinstance(encoding, serialization.Encoding):
- raise TypeError("encoding must be an item from the Encoding enum")
- if not isinstance(format, serialization.PublicFormat):
- raise TypeError(
- "format must be an item from the PublicFormat enum"
- )
- # SubjectPublicKeyInfo + PEM/DER
- if format is serialization.PublicFormat.SubjectPublicKeyInfo:
- if encoding is serialization.Encoding.PEM:
- write_bio = self._lib.PEM_write_bio_PUBKEY
- elif encoding is serialization.Encoding.DER:
- write_bio = self._lib.i2d_PUBKEY_bio
- else:
- raise ValueError(
- "SubjectPublicKeyInfo works only with PEM or DER encoding"
- )
- return self._bio_func_output(write_bio, evp_pkey)
- # PKCS1 + PEM/DER
- if format is serialization.PublicFormat.PKCS1:
- # Only RSA is supported here.
- key_type = self._lib.EVP_PKEY_id(evp_pkey)
- if key_type != self._lib.EVP_PKEY_RSA:
- raise ValueError("PKCS1 format is supported only for RSA keys")
- if encoding is serialization.Encoding.PEM:
- write_bio = self._lib.PEM_write_bio_RSAPublicKey
- elif encoding is serialization.Encoding.DER:
- write_bio = self._lib.i2d_RSAPublicKey_bio
- else:
- raise ValueError("PKCS1 works only with PEM or DER encoding")
- return self._bio_func_output(write_bio, cdata)
- # OpenSSH + OpenSSH
- if format is serialization.PublicFormat.OpenSSH:
- if encoding is serialization.Encoding.OpenSSH:
- return ssh.serialize_ssh_public_key(key)
- raise ValueError(
- "OpenSSH format must be used with OpenSSH encoding"
- )
- # Anything that key-specific code was supposed to handle earlier,
- # like Raw, CompressedPoint, UncompressedPoint
- raise ValueError("format is invalid with this key")
- def _parameter_bytes(self, encoding, format, cdata):
- if encoding is serialization.Encoding.OpenSSH:
- raise TypeError("OpenSSH encoding is not supported")
- # Only DH is supported here currently.
- q = self._ffi.new("BIGNUM **")
- self._lib.DH_get0_pqg(cdata, self._ffi.NULL, q, self._ffi.NULL)
- if encoding is serialization.Encoding.PEM:
- if q[0] != self._ffi.NULL:
- write_bio = self._lib.PEM_write_bio_DHxparams
- else:
- write_bio = self._lib.PEM_write_bio_DHparams
- elif encoding is serialization.Encoding.DER:
- if q[0] != self._ffi.NULL:
- write_bio = self._lib.Cryptography_i2d_DHxparams_bio
- else:
- write_bio = self._lib.i2d_DHparams_bio
- else:
- raise TypeError("encoding must be an item from the Encoding enum")
- bio = self._create_mem_bio_gc()
- res = write_bio(bio, cdata)
- self.openssl_assert(res == 1)
- return self._read_mem_bio(bio)
- def generate_dh_parameters(self, generator, key_size):
- if key_size < dh._MIN_MODULUS_SIZE:
- raise ValueError(
- "DH key_size must be at least {} bits".format(
- dh._MIN_MODULUS_SIZE
- )
- )
- if generator not in (2, 5):
- raise ValueError("DH generator must be 2 or 5")
- dh_param_cdata = self._lib.DH_new()
- self.openssl_assert(dh_param_cdata != self._ffi.NULL)
- dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
- res = self._lib.DH_generate_parameters_ex(
- dh_param_cdata, key_size, generator, self._ffi.NULL
- )
- self.openssl_assert(res == 1)
- return _DHParameters(self, dh_param_cdata)
- def _dh_cdata_to_evp_pkey(self, dh_cdata):
- evp_pkey = self._create_evp_pkey_gc()
- res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
- self.openssl_assert(res == 1)
- return evp_pkey
- def generate_dh_private_key(self, parameters):
- dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self)
- res = self._lib.DH_generate_key(dh_key_cdata)
- self.openssl_assert(res == 1)
- evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
- return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
- def generate_dh_private_key_and_parameters(self, generator, key_size):
- return self.generate_dh_private_key(
- self.generate_dh_parameters(generator, key_size)
- )
- def load_dh_private_numbers(self, numbers):
- parameter_numbers = numbers.public_numbers.parameter_numbers
- dh_cdata = self._lib.DH_new()
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- p = self._int_to_bn(parameter_numbers.p)
- g = self._int_to_bn(parameter_numbers.g)
- if parameter_numbers.q is not None:
- q = self._int_to_bn(parameter_numbers.q)
- else:
- q = self._ffi.NULL
- pub_key = self._int_to_bn(numbers.public_numbers.y)
- priv_key = self._int_to_bn(numbers.x)
- res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
- self.openssl_assert(res == 1)
- res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
- self.openssl_assert(res == 1)
- codes = self._ffi.new("int[]", 1)
- res = self._lib.Cryptography_DH_check(dh_cdata, codes)
- self.openssl_assert(res == 1)
- # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
- # equal 11 when the generator is 2 (a quadratic nonresidue).
- # We want to ignore that error because p % 24 == 23 is also fine.
- # Specifically, g is then a quadratic residue. Within the context of
- # Diffie-Hellman this means it can only generate half the possible
- # values. That sounds bad, but quadratic nonresidues leak a bit of
- # the key to the attacker in exchange for having the full key space
- # available. See: https://crypto.stackexchange.com/questions/12961
- if codes[0] != 0 and not (
- parameter_numbers.g == 2
- and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
- ):
- raise ValueError("DH private numbers did not pass safety checks.")
- evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
- return _DHPrivateKey(self, dh_cdata, evp_pkey)
- def load_dh_public_numbers(self, numbers):
- dh_cdata = self._lib.DH_new()
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- parameter_numbers = numbers.parameter_numbers
- p = self._int_to_bn(parameter_numbers.p)
- g = self._int_to_bn(parameter_numbers.g)
- if parameter_numbers.q is not None:
- q = self._int_to_bn(parameter_numbers.q)
- else:
- q = self._ffi.NULL
- pub_key = self._int_to_bn(numbers.y)
- res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
- self.openssl_assert(res == 1)
- res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
- self.openssl_assert(res == 1)
- evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
- return _DHPublicKey(self, dh_cdata, evp_pkey)
- def load_dh_parameter_numbers(self, numbers):
- dh_cdata = self._lib.DH_new()
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- p = self._int_to_bn(numbers.p)
- g = self._int_to_bn(numbers.g)
- if numbers.q is not None:
- q = self._int_to_bn(numbers.q)
- else:
- q = self._ffi.NULL
- res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
- self.openssl_assert(res == 1)
- return _DHParameters(self, dh_cdata)
- def dh_parameters_supported(self, p, g, q=None):
- dh_cdata = self._lib.DH_new()
- self.openssl_assert(dh_cdata != self._ffi.NULL)
- dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
- p = self._int_to_bn(p)
- g = self._int_to_bn(g)
- if q is not None:
- q = self._int_to_bn(q)
- else:
- q = self._ffi.NULL
- res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
- self.openssl_assert(res == 1)
- codes = self._ffi.new("int[]", 1)
- res = self._lib.Cryptography_DH_check(dh_cdata, codes)
- self.openssl_assert(res == 1)
- return codes[0] == 0
- def dh_x942_serialization_supported(self):
- return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
- def x509_name_bytes(self, name):
- x509_name = _encode_name_gc(self, name)
- pp = self._ffi.new("unsigned char **")
- res = self._lib.i2d_X509_NAME(x509_name, pp)
- self.openssl_assert(pp[0] != self._ffi.NULL)
- pp = self._ffi.gc(
- pp, lambda pointer: self._lib.OPENSSL_free(pointer[0])
- )
- self.openssl_assert(res > 0)
- return self._ffi.buffer(pp[0], res)[:]
- def x25519_load_public_bytes(self, data):
- # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
- # switch this to EVP_PKEY_new_raw_public_key
- if len(data) != 32:
- raise ValueError("An X25519 public key is 32 bytes long")
- evp_pkey = self._create_evp_pkey_gc()
- res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
- self.openssl_assert(res == 1)
- res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
- evp_pkey, data, len(data)
- )
- self.openssl_assert(res == 1)
- return _X25519PublicKey(self, evp_pkey)
- def x25519_load_private_bytes(self, data):
- # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
- # switch this to EVP_PKEY_new_raw_private_key and drop the
- # zeroed_bytearray garbage.
- # OpenSSL only has facilities for loading PKCS8 formatted private
- # keys using the algorithm identifiers specified in
- # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
- # This is the standard PKCS8 prefix for a 32 byte X25519 key.
- # The form is:
- # 0:d=0 hl=2 l= 46 cons: SEQUENCE
- # 2:d=1 hl=2 l= 1 prim: INTEGER :00
- # 5:d=1 hl=2 l= 5 cons: SEQUENCE
- # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110
- # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key)
- # Of course there's a bit more complexity. In reality OCTET STRING
- # contains an OCTET STRING of length 32! So the last two bytes here
- # are \x04\x20, which is an OCTET STRING of length 32.
- if len(data) != 32:
- raise ValueError("An X25519 private key is 32 bytes long")
- pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
- with self._zeroed_bytearray(48) as ba:
- ba[0:16] = pkcs8_prefix
- ba[16:] = data
- bio = self._bytes_to_bio(ba)
- evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- self.openssl_assert(
- self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
- )
- return _X25519PrivateKey(self, evp_pkey)
- def _evp_pkey_keygen_gc(self, nid):
- evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
- self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
- evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
- res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
- self.openssl_assert(res == 1)
- evp_ppkey = self._ffi.new("EVP_PKEY **")
- res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
- self.openssl_assert(res == 1)
- self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
- return evp_pkey
- def x25519_generate_key(self):
- evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
- return _X25519PrivateKey(self, evp_pkey)
- def x25519_supported(self):
- if self._fips_enabled:
- return False
- return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
- def x448_load_public_bytes(self, data):
- if len(data) != 56:
- raise ValueError("An X448 public key is 56 bytes long")
- evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
- self._lib.NID_X448, self._ffi.NULL, data, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _X448PublicKey(self, evp_pkey)
- def x448_load_private_bytes(self, data):
- if len(data) != 56:
- raise ValueError("An X448 private key is 56 bytes long")
- data_ptr = self._ffi.from_buffer(data)
- evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
- self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _X448PrivateKey(self, evp_pkey)
- def x448_generate_key(self):
- evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
- return _X448PrivateKey(self, evp_pkey)
- def x448_supported(self):
- if self._fips_enabled:
- return False
- return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
- def ed25519_supported(self):
- if self._fips_enabled:
- return False
- return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
- def ed25519_load_public_bytes(self, data):
- utils._check_bytes("data", data)
- if len(data) != ed25519._ED25519_KEY_SIZE:
- raise ValueError("An Ed25519 public key is 32 bytes long")
- evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
- self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _Ed25519PublicKey(self, evp_pkey)
- def ed25519_load_private_bytes(self, data):
- if len(data) != ed25519._ED25519_KEY_SIZE:
- raise ValueError("An Ed25519 private key is 32 bytes long")
- utils._check_byteslike("data", data)
- data_ptr = self._ffi.from_buffer(data)
- evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
- self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _Ed25519PrivateKey(self, evp_pkey)
- def ed25519_generate_key(self):
- evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
- return _Ed25519PrivateKey(self, evp_pkey)
- def ed448_supported(self):
- if self._fips_enabled:
- return False
- return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
- def ed448_load_public_bytes(self, data):
- utils._check_bytes("data", data)
- if len(data) != _ED448_KEY_SIZE:
- raise ValueError("An Ed448 public key is 57 bytes long")
- evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
- self._lib.NID_ED448, self._ffi.NULL, data, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _Ed448PublicKey(self, evp_pkey)
- def ed448_load_private_bytes(self, data):
- utils._check_byteslike("data", data)
- if len(data) != _ED448_KEY_SIZE:
- raise ValueError("An Ed448 private key is 57 bytes long")
- data_ptr = self._ffi.from_buffer(data)
- evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
- self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
- )
- self.openssl_assert(evp_pkey != self._ffi.NULL)
- evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
- return _Ed448PrivateKey(self, evp_pkey)
- def ed448_generate_key(self):
- evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
- return _Ed448PrivateKey(self, evp_pkey)
- def derive_scrypt(self, key_material, salt, length, n, r, p):
- buf = self._ffi.new("unsigned char[]", length)
- key_material_ptr = self._ffi.from_buffer(key_material)
- res = self._lib.EVP_PBE_scrypt(
- key_material_ptr,
- len(key_material),
- salt,
- len(salt),
- n,
- r,
- p,
- scrypt._MEM_LIMIT,
- buf,
- length,
- )
- if res != 1:
- errors = self._consume_errors_with_text()
- # memory required formula explained here:
- # https://blog.filippo.io/the-scrypt-parameters/
- min_memory = 128 * n * r // (1024 ** 2)
- raise MemoryError(
- "Not enough memory to derive key. These parameters require"
- " {} MB of memory.".format(min_memory),
- errors,
- )
- return self._ffi.buffer(buf)[:]
- def aead_cipher_supported(self, cipher):
- cipher_name = aead._aead_cipher_name(cipher)
- if self._fips_enabled and cipher_name not in self._fips_aead:
- return False
- return self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
- @contextlib.contextmanager
- def _zeroed_bytearray(self, length):
- """
- This method creates a bytearray, which we copy data into (hopefully
- also from a mutable buffer that can be dynamically erased!), and then
- zero when we're done.
- """
- ba = bytearray(length)
- try:
- yield ba
- finally:
- self._zero_data(ba, length)
- def _zero_data(self, data, length):
- # We clear things this way because at the moment we're not
- # sure of a better way that can guarantee it overwrites the
- # memory of a bytearray and doesn't just replace the underlying char *.
- for i in range(length):
- data[i] = 0
- @contextlib.contextmanager
- def _zeroed_null_terminated_buf(self, data):
- """
- This method takes bytes, which can be a bytestring or a mutable
- buffer like a bytearray, and yields a null-terminated version of that
- data. This is required because PKCS12_parse doesn't take a length with
- its password char * and ffi.from_buffer doesn't provide null
- termination. So, to support zeroing the data via bytearray we
- need to build this ridiculous construct that copies the memory, but
- zeroes it after use.
- """
- if data is None:
- yield self._ffi.NULL
- else:
- data_len = len(data)
- buf = self._ffi.new("char[]", data_len + 1)
- self._ffi.memmove(buf, data, data_len)
- try:
- yield buf
- finally:
- # Cast to a uint8_t * so we can assign by integer
- self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
- def load_key_and_certificates_from_pkcs12(self, data, password):
- if password is not None:
- utils._check_byteslike("password", password)
- bio = self._bytes_to_bio(data)
- p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
- if p12 == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Could not deserialize PKCS12 data")
- p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
- evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
- x509_ptr = self._ffi.new("X509 **")
- sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
- with self._zeroed_null_terminated_buf(password) as password_buf:
- res = self._lib.PKCS12_parse(
- p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
- )
- if res == 0:
- self._consume_errors()
- raise ValueError("Invalid password or PKCS12 data")
- cert = None
- key = None
- additional_certificates = []
- if evp_pkey_ptr[0] != self._ffi.NULL:
- evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
- key = self._evp_pkey_to_private_key(evp_pkey)
- if x509_ptr[0] != self._ffi.NULL:
- x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
- cert = _Certificate(self, x509)
- if sk_x509_ptr[0] != self._ffi.NULL:
- sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
- num = self._lib.sk_X509_num(sk_x509_ptr[0])
- for i in range(num):
- x509 = self._lib.sk_X509_value(sk_x509, i)
- self.openssl_assert(x509 != self._ffi.NULL)
- x509 = self._ffi.gc(x509, self._lib.X509_free)
- additional_certificates.append(_Certificate(self, x509))
- return (key, cert, additional_certificates)
- def serialize_key_and_certificates_to_pkcs12(
- self, name, key, cert, cas, encryption_algorithm
- ):
- password = None
- if name is not None:
- utils._check_bytes("name", name)
- if isinstance(encryption_algorithm, serialization.NoEncryption):
- nid_cert = -1
- nid_key = -1
- pkcs12_iter = 0
- mac_iter = 0
- elif isinstance(
- encryption_algorithm, serialization.BestAvailableEncryption
- ):
- # PKCS12 encryption is hopeless trash and can never be fixed.
- # This is the least terrible option.
- nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
- nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
- # At least we can set this higher than OpenSSL's default
- pkcs12_iter = 20000
- # mac_iter chosen for compatibility reasons, see:
- # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
- # Did we mention how lousy PKCS12 encryption is?
- mac_iter = 1
- password = encryption_algorithm.password
- else:
- raise ValueError("Unsupported key encryption type")
- if cas is None or len(cas) == 0:
- sk_x509 = self._ffi.NULL
- else:
- sk_x509 = self._lib.sk_X509_new_null()
- sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
- # reverse the list when building the stack so that they're encoded
- # in the order they were originally provided. it is a mystery
- for ca in reversed(cas):
- res = self._lib.sk_X509_push(sk_x509, ca._x509)
- backend.openssl_assert(res >= 1)
- with self._zeroed_null_terminated_buf(password) as password_buf:
- with self._zeroed_null_terminated_buf(name) as name_buf:
- p12 = self._lib.PKCS12_create(
- password_buf,
- name_buf,
- key._evp_pkey if key else self._ffi.NULL,
- cert._x509 if cert else self._ffi.NULL,
- sk_x509,
- nid_key,
- nid_cert,
- pkcs12_iter,
- mac_iter,
- 0,
- )
- self.openssl_assert(p12 != self._ffi.NULL)
- p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
- bio = self._create_mem_bio_gc()
- res = self._lib.i2d_PKCS12_bio(bio, p12)
- self.openssl_assert(res > 0)
- return self._read_mem_bio(bio)
- def poly1305_supported(self):
- if self._fips_enabled:
- return False
- return self._lib.Cryptography_HAS_POLY1305 == 1
- def create_poly1305_ctx(self, key):
- utils._check_byteslike("key", key)
- if len(key) != _POLY1305_KEY_SIZE:
- raise ValueError("A poly1305 key is 32 bytes long")
- return _Poly1305Context(self, key)
- def load_pem_pkcs7_certificates(self, data):
- utils._check_bytes("data", data)
- bio = self._bytes_to_bio(data)
- p7 = self._lib.PEM_read_bio_PKCS7(
- bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
- )
- if p7 == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to parse PKCS7 data")
- p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
- return self._load_pkcs7_certificates(p7)
- def load_der_pkcs7_certificates(self, data):
- utils._check_bytes("data", data)
- bio = self._bytes_to_bio(data)
- p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
- if p7 == self._ffi.NULL:
- self._consume_errors()
- raise ValueError("Unable to parse PKCS7 data")
- p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
- return self._load_pkcs7_certificates(p7)
- def _load_pkcs7_certificates(self, p7):
- nid = self._lib.OBJ_obj2nid(p7.type)
- self.openssl_assert(nid != self._lib.NID_undef)
- if nid != self._lib.NID_pkcs7_signed:
- raise UnsupportedAlgorithm(
- "Only basic signed structures are currently supported. NID"
- " for this data was {}".format(nid),
- _Reasons.UNSUPPORTED_SERIALIZATION,
- )
- sk_x509 = p7.d.sign.cert
- num = self._lib.sk_X509_num(sk_x509)
- certs = []
- for i in range(num):
- x509 = self._lib.sk_X509_value(sk_x509, i)
- self.openssl_assert(x509 != self._ffi.NULL)
- res = self._lib.X509_up_ref(x509)
- # When OpenSSL is less than 1.1.0 up_ref returns the current
- # refcount. On 1.1.0+ it returns 1 for success.
- self.openssl_assert(res >= 1)
- x509 = self._ffi.gc(x509, self._lib.X509_free)
- certs.append(_Certificate(self, x509))
- return certs
- def pkcs7_sign(self, builder, encoding, options):
- bio = self._bytes_to_bio(builder._data)
- init_flags = self._lib.PKCS7_PARTIAL
- final_flags = 0
- if len(builder._additional_certs) == 0:
- certs = self._ffi.NULL
- else:
- certs = self._lib.sk_X509_new_null()
- certs = self._ffi.gc(certs, self._lib.sk_X509_free)
- for cert in builder._additional_certs:
- res = self._lib.sk_X509_push(certs, cert._x509)
- self.openssl_assert(res >= 1)
- if pkcs7.PKCS7Options.DetachedSignature in options:
- # Don't embed the data in the PKCS7 structure
- init_flags |= self._lib.PKCS7_DETACHED
- final_flags |= self._lib.PKCS7_DETACHED
- # This just inits a structure for us. However, there
- # are flags we need to set, joy.
- p7 = self._lib.PKCS7_sign(
- self._ffi.NULL,
- self._ffi.NULL,
- certs,
- self._ffi.NULL,
- init_flags,
- )
- self.openssl_assert(p7 != self._ffi.NULL)
- p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
- signer_flags = 0
- # These flags are configurable on a per-signature basis
- # but we've deliberately chosen to make the API only allow
- # setting it across all signatures for now.
- if pkcs7.PKCS7Options.NoCapabilities in options:
- signer_flags |= self._lib.PKCS7_NOSMIMECAP
- elif pkcs7.PKCS7Options.NoAttributes in options:
- signer_flags |= self._lib.PKCS7_NOATTR
- if pkcs7.PKCS7Options.NoCerts in options:
- signer_flags |= self._lib.PKCS7_NOCERTS
- for certificate, private_key, hash_algorithm in builder._signers:
- md = self._evp_md_non_null_from_algorithm(hash_algorithm)
- p7signerinfo = self._lib.PKCS7_sign_add_signer(
- p7, certificate._x509, private_key._evp_pkey, md, signer_flags
- )
- self.openssl_assert(p7signerinfo != self._ffi.NULL)
- for option in options:
- # DetachedSignature, NoCapabilities, and NoAttributes are already
- # handled so we just need to check these last two options.
- if option is pkcs7.PKCS7Options.Text:
- final_flags |= self._lib.PKCS7_TEXT
- elif option is pkcs7.PKCS7Options.Binary:
- final_flags |= self._lib.PKCS7_BINARY
- bio_out = self._create_mem_bio_gc()
- if encoding is serialization.Encoding.SMIME:
- # This finalizes the structure
- res = self._lib.SMIME_write_PKCS7(
- bio_out, p7, bio.bio, final_flags
- )
- elif encoding is serialization.Encoding.PEM:
- res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
- self.openssl_assert(res == 1)
- res = self._lib.PEM_write_bio_PKCS7_stream(
- bio_out, p7, bio.bio, final_flags
- )
- else:
- assert encoding is serialization.Encoding.DER
- # We need to call finalize here becauase i2d_PKCS7_bio does not
- # finalize.
- res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
- self.openssl_assert(res == 1)
- res = self._lib.i2d_PKCS7_bio(bio_out, p7)
- self.openssl_assert(res == 1)
- return self._read_mem_bio(bio_out)
- class GetCipherByName(object):
- def __init__(self, fmt):
- self._fmt = fmt
- def __call__(self, backend, cipher, mode):
- cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
- return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
- def _get_xts_cipher(backend, cipher, mode):
- cipher_name = "aes-{}-xts".format(cipher.key_size // 2)
- return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
- backend = Backend()
|