hashes.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from cryptography import utils
  5. from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
  6. from cryptography.hazmat.primitives import hashes
  7. class _HashContext(hashes.HashContext):
  8. def __init__(self, backend, algorithm: hashes.HashAlgorithm, ctx=None):
  9. self._algorithm = algorithm
  10. self._backend = backend
  11. if ctx is None:
  12. ctx = self._backend._lib.EVP_MD_CTX_new()
  13. ctx = self._backend._ffi.gc(
  14. ctx, self._backend._lib.EVP_MD_CTX_free
  15. )
  16. evp_md = self._backend._evp_md_from_algorithm(algorithm)
  17. if evp_md == self._backend._ffi.NULL:
  18. raise UnsupportedAlgorithm(
  19. "{} is not a supported hash on this backend.".format(
  20. algorithm.name
  21. ),
  22. _Reasons.UNSUPPORTED_HASH,
  23. )
  24. res = self._backend._lib.EVP_DigestInit_ex(
  25. ctx, evp_md, self._backend._ffi.NULL
  26. )
  27. self._backend.openssl_assert(res != 0)
  28. self._ctx = ctx
  29. algorithm = utils.read_only_property("_algorithm")
  30. def copy(self) -> "_HashContext":
  31. copied_ctx = self._backend._lib.EVP_MD_CTX_new()
  32. copied_ctx = self._backend._ffi.gc(
  33. copied_ctx, self._backend._lib.EVP_MD_CTX_free
  34. )
  35. res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
  36. self._backend.openssl_assert(res != 0)
  37. return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
  38. def update(self, data: bytes) -> None:
  39. data_ptr = self._backend._ffi.from_buffer(data)
  40. res = self._backend._lib.EVP_DigestUpdate(
  41. self._ctx, data_ptr, len(data)
  42. )
  43. self._backend.openssl_assert(res != 0)
  44. def finalize(self) -> bytes:
  45. if isinstance(self.algorithm, hashes.ExtendableOutputFunction):
  46. # extendable output functions use a different finalize
  47. return self._finalize_xof()
  48. else:
  49. buf = self._backend._ffi.new(
  50. "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE
  51. )
  52. outlen = self._backend._ffi.new("unsigned int *")
  53. res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, outlen)
  54. self._backend.openssl_assert(res != 0)
  55. self._backend.openssl_assert(
  56. outlen[0] == self.algorithm.digest_size
  57. )
  58. return self._backend._ffi.buffer(buf)[: outlen[0]]
  59. def _finalize_xof(self) -> bytes:
  60. buf = self._backend._ffi.new(
  61. "unsigned char[]", self.algorithm.digest_size
  62. )
  63. res = self._backend._lib.EVP_DigestFinalXOF(
  64. self._ctx, buf, self.algorithm.digest_size
  65. )
  66. self._backend.openssl_assert(res != 0)
  67. return self._backend._ffi.buffer(buf)[: self.algorithm.digest_size]