dh.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  5. from cryptography.hazmat.primitives import serialization
  6. from cryptography.hazmat.primitives.asymmetric import dh
  7. def _dh_params_dup(dh_cdata, backend):
  8. lib = backend._lib
  9. ffi = backend._ffi
  10. param_cdata = lib.DHparams_dup(dh_cdata)
  11. backend.openssl_assert(param_cdata != ffi.NULL)
  12. param_cdata = ffi.gc(param_cdata, lib.DH_free)
  13. if lib.CRYPTOGRAPHY_IS_LIBRESSL:
  14. # In libressl DHparams_dup don't copy q
  15. q = ffi.new("BIGNUM **")
  16. lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
  17. q_dup = lib.BN_dup(q[0])
  18. res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
  19. backend.openssl_assert(res == 1)
  20. return param_cdata
  21. def _dh_cdata_to_parameters(dh_cdata, backend):
  22. param_cdata = _dh_params_dup(dh_cdata, backend)
  23. return _DHParameters(backend, param_cdata)
  24. class _DHParameters(dh.DHParameters):
  25. def __init__(self, backend, dh_cdata):
  26. self._backend = backend
  27. self._dh_cdata = dh_cdata
  28. def parameter_numbers(self) -> dh.DHParameterNumbers:
  29. p = self._backend._ffi.new("BIGNUM **")
  30. g = self._backend._ffi.new("BIGNUM **")
  31. q = self._backend._ffi.new("BIGNUM **")
  32. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  33. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  34. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  35. if q[0] == self._backend._ffi.NULL:
  36. q_val = None
  37. else:
  38. q_val = self._backend._bn_to_int(q[0])
  39. return dh.DHParameterNumbers(
  40. p=self._backend._bn_to_int(p[0]),
  41. g=self._backend._bn_to_int(g[0]),
  42. q=q_val,
  43. )
  44. def generate_private_key(self) -> dh.DHPrivateKey:
  45. return self._backend.generate_dh_private_key(self)
  46. def parameter_bytes(
  47. self,
  48. encoding: serialization.Encoding,
  49. format: serialization.ParameterFormat,
  50. ) -> bytes:
  51. if format is not serialization.ParameterFormat.PKCS3:
  52. raise ValueError("Only PKCS3 serialization is supported")
  53. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  54. q = self._backend._ffi.new("BIGNUM **")
  55. self._backend._lib.DH_get0_pqg(
  56. self._dh_cdata,
  57. self._backend._ffi.NULL,
  58. q,
  59. self._backend._ffi.NULL,
  60. )
  61. if q[0] != self._backend._ffi.NULL:
  62. raise UnsupportedAlgorithm(
  63. "DH X9.42 serialization is not supported",
  64. _Reasons.UNSUPPORTED_SERIALIZATION,
  65. )
  66. return self._backend._parameter_bytes(encoding, format, self._dh_cdata)
  67. def _get_dh_num_bits(backend, dh_cdata) -> int:
  68. p = backend._ffi.new("BIGNUM **")
  69. backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
  70. backend.openssl_assert(p[0] != backend._ffi.NULL)
  71. return backend._lib.BN_num_bits(p[0])
  72. class _DHPrivateKey(dh.DHPrivateKey):
  73. def __init__(self, backend, dh_cdata, evp_pkey):
  74. self._backend = backend
  75. self._dh_cdata = dh_cdata
  76. self._evp_pkey = evp_pkey
  77. self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
  78. @property
  79. def key_size(self) -> int:
  80. return _get_dh_num_bits(self._backend, self._dh_cdata)
  81. def private_numbers(self) -> dh.DHPrivateNumbers:
  82. p = self._backend._ffi.new("BIGNUM **")
  83. g = self._backend._ffi.new("BIGNUM **")
  84. q = self._backend._ffi.new("BIGNUM **")
  85. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  86. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  87. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  88. if q[0] == self._backend._ffi.NULL:
  89. q_val = None
  90. else:
  91. q_val = self._backend._bn_to_int(q[0])
  92. pub_key = self._backend._ffi.new("BIGNUM **")
  93. priv_key = self._backend._ffi.new("BIGNUM **")
  94. self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
  95. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  96. self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
  97. return dh.DHPrivateNumbers(
  98. public_numbers=dh.DHPublicNumbers(
  99. parameter_numbers=dh.DHParameterNumbers(
  100. p=self._backend._bn_to_int(p[0]),
  101. g=self._backend._bn_to_int(g[0]),
  102. q=q_val,
  103. ),
  104. y=self._backend._bn_to_int(pub_key[0]),
  105. ),
  106. x=self._backend._bn_to_int(priv_key[0]),
  107. )
  108. def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
  109. buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes)
  110. pub_key = self._backend._ffi.new("BIGNUM **")
  111. self._backend._lib.DH_get0_key(
  112. peer_public_key._dh_cdata, # type: ignore[attr-defined]
  113. pub_key,
  114. self._backend._ffi.NULL,
  115. )
  116. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  117. res = self._backend._lib.DH_compute_key(
  118. buf, pub_key[0], self._dh_cdata
  119. )
  120. if res == -1:
  121. errors_with_text = self._backend._consume_errors_with_text()
  122. raise ValueError(
  123. "Error computing shared key. Public key is likely invalid "
  124. "for this exchange.",
  125. errors_with_text,
  126. )
  127. else:
  128. self._backend.openssl_assert(res >= 1)
  129. key = self._backend._ffi.buffer(buf)[:res]
  130. pad = self._key_size_bytes - len(key)
  131. if pad > 0:
  132. key = (b"\x00" * pad) + key
  133. return key
  134. def public_key(self) -> dh.DHPublicKey:
  135. dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
  136. pub_key = self._backend._ffi.new("BIGNUM **")
  137. self._backend._lib.DH_get0_key(
  138. self._dh_cdata, pub_key, self._backend._ffi.NULL
  139. )
  140. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  141. pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
  142. self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
  143. res = self._backend._lib.DH_set0_key(
  144. dh_cdata, pub_key_dup, self._backend._ffi.NULL
  145. )
  146. self._backend.openssl_assert(res == 1)
  147. evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
  148. return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
  149. def parameters(self) -> dh.DHParameters:
  150. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  151. def private_bytes(
  152. self,
  153. encoding: serialization.Encoding,
  154. format: serialization.PrivateFormat,
  155. encryption_algorithm: serialization.KeySerializationEncryption,
  156. ) -> bytes:
  157. if format is not serialization.PrivateFormat.PKCS8:
  158. raise ValueError(
  159. "DH private keys support only PKCS8 serialization"
  160. )
  161. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  162. q = self._backend._ffi.new("BIGNUM **")
  163. self._backend._lib.DH_get0_pqg(
  164. self._dh_cdata,
  165. self._backend._ffi.NULL,
  166. q,
  167. self._backend._ffi.NULL,
  168. )
  169. if q[0] != self._backend._ffi.NULL:
  170. raise UnsupportedAlgorithm(
  171. "DH X9.42 serialization is not supported",
  172. _Reasons.UNSUPPORTED_SERIALIZATION,
  173. )
  174. return self._backend._private_key_bytes(
  175. encoding,
  176. format,
  177. encryption_algorithm,
  178. self,
  179. self._evp_pkey,
  180. self._dh_cdata,
  181. )
  182. class _DHPublicKey(dh.DHPublicKey):
  183. def __init__(self, backend, dh_cdata, evp_pkey):
  184. self._backend = backend
  185. self._dh_cdata = dh_cdata
  186. self._evp_pkey = evp_pkey
  187. self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
  188. @property
  189. def key_size(self) -> int:
  190. return self._key_size_bits
  191. def public_numbers(self) -> dh.DHPublicNumbers:
  192. p = self._backend._ffi.new("BIGNUM **")
  193. g = self._backend._ffi.new("BIGNUM **")
  194. q = self._backend._ffi.new("BIGNUM **")
  195. self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
  196. self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
  197. self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
  198. if q[0] == self._backend._ffi.NULL:
  199. q_val = None
  200. else:
  201. q_val = self._backend._bn_to_int(q[0])
  202. pub_key = self._backend._ffi.new("BIGNUM **")
  203. self._backend._lib.DH_get0_key(
  204. self._dh_cdata, pub_key, self._backend._ffi.NULL
  205. )
  206. self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
  207. return dh.DHPublicNumbers(
  208. parameter_numbers=dh.DHParameterNumbers(
  209. p=self._backend._bn_to_int(p[0]),
  210. g=self._backend._bn_to_int(g[0]),
  211. q=q_val,
  212. ),
  213. y=self._backend._bn_to_int(pub_key[0]),
  214. )
  215. def parameters(self) -> dh.DHParameters:
  216. return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
  217. def public_bytes(
  218. self,
  219. encoding: serialization.Encoding,
  220. format: serialization.PublicFormat,
  221. ) -> bytes:
  222. if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
  223. raise ValueError(
  224. "DH public keys support only "
  225. "SubjectPublicKeyInfo serialization"
  226. )
  227. if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
  228. q = self._backend._ffi.new("BIGNUM **")
  229. self._backend._lib.DH_get0_pqg(
  230. self._dh_cdata,
  231. self._backend._ffi.NULL,
  232. q,
  233. self._backend._ffi.NULL,
  234. )
  235. if q[0] != self._backend._ffi.NULL:
  236. raise UnsupportedAlgorithm(
  237. "DH X9.42 serialization is not supported",
  238. _Reasons.UNSUPPORTED_SERIALIZATION,
  239. )
  240. return self._backend._public_key_bytes(
  241. encoding, format, self, self._evp_pkey, None
  242. )