base.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. import abc
  5. import typing
  6. from cryptography import utils
  7. from cryptography.exceptions import (
  8. AlreadyFinalized,
  9. AlreadyUpdated,
  10. NotYetFinalized,
  11. UnsupportedAlgorithm,
  12. _Reasons,
  13. )
  14. from cryptography.hazmat.backends import _get_backend
  15. from cryptography.hazmat.backends.interfaces import CipherBackend
  16. from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
  17. from cryptography.hazmat.primitives.ciphers import modes
  18. class BlockCipherAlgorithm(metaclass=abc.ABCMeta):
  19. @abc.abstractproperty
  20. def block_size(self) -> int:
  21. """
  22. The size of a block as an integer in bits (e.g. 64, 128).
  23. """
  24. class CipherContext(metaclass=abc.ABCMeta):
  25. @abc.abstractmethod
  26. def update(self, data: bytes) -> bytes:
  27. """
  28. Processes the provided bytes through the cipher and returns the results
  29. as bytes.
  30. """
  31. @abc.abstractmethod
  32. def update_into(self, data: bytes, buf) -> int:
  33. """
  34. Processes the provided bytes and writes the resulting data into the
  35. provided buffer. Returns the number of bytes written.
  36. """
  37. @abc.abstractmethod
  38. def finalize(self) -> bytes:
  39. """
  40. Returns the results of processing the final block as bytes.
  41. """
  42. class AEADCipherContext(metaclass=abc.ABCMeta):
  43. @abc.abstractmethod
  44. def authenticate_additional_data(self, data: bytes) -> None:
  45. """
  46. Authenticates the provided bytes.
  47. """
  48. class AEADDecryptionContext(metaclass=abc.ABCMeta):
  49. @abc.abstractmethod
  50. def finalize_with_tag(self, tag: bytes) -> bytes:
  51. """
  52. Returns the results of processing the final block as bytes and allows
  53. delayed passing of the authentication tag.
  54. """
  55. class AEADEncryptionContext(metaclass=abc.ABCMeta):
  56. @abc.abstractproperty
  57. def tag(self) -> bytes:
  58. """
  59. Returns tag bytes. This is only available after encryption is
  60. finalized.
  61. """
  62. class Cipher(object):
  63. def __init__(
  64. self,
  65. algorithm: CipherAlgorithm,
  66. mode: typing.Optional[modes.Mode],
  67. backend=None,
  68. ):
  69. backend = _get_backend(backend)
  70. if not isinstance(backend, CipherBackend):
  71. raise UnsupportedAlgorithm(
  72. "Backend object does not implement CipherBackend.",
  73. _Reasons.BACKEND_MISSING_INTERFACE,
  74. )
  75. if not isinstance(algorithm, CipherAlgorithm):
  76. raise TypeError("Expected interface of CipherAlgorithm.")
  77. if mode is not None:
  78. mode.validate_for_algorithm(algorithm)
  79. self.algorithm = algorithm
  80. self.mode = mode
  81. self._backend = backend
  82. def encryptor(self):
  83. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  84. if self.mode.tag is not None:
  85. raise ValueError(
  86. "Authentication tag must be None when encrypting."
  87. )
  88. ctx = self._backend.create_symmetric_encryption_ctx(
  89. self.algorithm, self.mode
  90. )
  91. return self._wrap_ctx(ctx, encrypt=True)
  92. def decryptor(self):
  93. ctx = self._backend.create_symmetric_decryption_ctx(
  94. self.algorithm, self.mode
  95. )
  96. return self._wrap_ctx(ctx, encrypt=False)
  97. def _wrap_ctx(self, ctx, encrypt):
  98. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  99. if encrypt:
  100. return _AEADEncryptionContext(ctx)
  101. else:
  102. return _AEADCipherContext(ctx)
  103. else:
  104. return _CipherContext(ctx)
  105. @utils.register_interface(CipherContext)
  106. class _CipherContext(object):
  107. def __init__(self, ctx):
  108. self._ctx = ctx
  109. def update(self, data: bytes) -> bytes:
  110. if self._ctx is None:
  111. raise AlreadyFinalized("Context was already finalized.")
  112. return self._ctx.update(data)
  113. def update_into(self, data: bytes, buf) -> int:
  114. if self._ctx is None:
  115. raise AlreadyFinalized("Context was already finalized.")
  116. return self._ctx.update_into(data, buf)
  117. def finalize(self) -> bytes:
  118. if self._ctx is None:
  119. raise AlreadyFinalized("Context was already finalized.")
  120. data = self._ctx.finalize()
  121. self._ctx = None
  122. return data
  123. @utils.register_interface(AEADCipherContext)
  124. @utils.register_interface(CipherContext)
  125. @utils.register_interface(AEADDecryptionContext)
  126. class _AEADCipherContext(object):
  127. def __init__(self, ctx):
  128. self._ctx = ctx
  129. self._bytes_processed = 0
  130. self._aad_bytes_processed = 0
  131. self._tag = None
  132. self._updated = False
  133. def _check_limit(self, data_size: int):
  134. if self._ctx is None:
  135. raise AlreadyFinalized("Context was already finalized.")
  136. self._updated = True
  137. self._bytes_processed += data_size
  138. if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
  139. raise ValueError(
  140. "{} has a maximum encrypted byte limit of {}".format(
  141. self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
  142. )
  143. )
  144. def update(self, data: bytes) -> bytes:
  145. self._check_limit(len(data))
  146. return self._ctx.update(data)
  147. def update_into(self, data: bytes, buf) -> int:
  148. self._check_limit(len(data))
  149. return self._ctx.update_into(data, buf)
  150. def finalize(self) -> bytes:
  151. if self._ctx is None:
  152. raise AlreadyFinalized("Context was already finalized.")
  153. data = self._ctx.finalize()
  154. self._tag = self._ctx.tag
  155. self._ctx = None
  156. return data
  157. def finalize_with_tag(self, tag: bytes) -> bytes:
  158. if self._ctx is None:
  159. raise AlreadyFinalized("Context was already finalized.")
  160. data = self._ctx.finalize_with_tag(tag)
  161. self._tag = self._ctx.tag
  162. self._ctx = None
  163. return data
  164. def authenticate_additional_data(self, data: bytes) -> None:
  165. if self._ctx is None:
  166. raise AlreadyFinalized("Context was already finalized.")
  167. if self._updated:
  168. raise AlreadyUpdated("Update has been called on this context.")
  169. self._aad_bytes_processed += len(data)
  170. if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
  171. raise ValueError(
  172. "{} has a maximum AAD byte limit of {}".format(
  173. self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
  174. )
  175. )
  176. self._ctx.authenticate_additional_data(data)
  177. @utils.register_interface(AEADEncryptionContext)
  178. class _AEADEncryptionContext(_AEADCipherContext):
  179. @property
  180. def tag(self) -> bytes:
  181. if self._ctx is not None:
  182. raise NotYetFinalized(
  183. "You must finalize encryption before " "getting the tag."
  184. )
  185. assert self._tag is not None
  186. return self._tag