updated pycrypto

This commit is contained in:
shim_
2018-05-10 16:56:32 +02:00
parent fb89f1946b
commit 26579a25f1
92 changed files with 2518 additions and 5288 deletions

View File

@@ -24,19 +24,12 @@
"""Self-testing for PyCrypto hash modules"""
from __future__ import nested_scopes
__revision__ = "$Id$"
import sys
if sys.version_info[0] == 2 and sys.version_info[1] == 1:
from Crypto.Util.py21compat import *
import unittest
from binascii import a2b_hex, b2a_hex, hexlify
from binascii import a2b_hex, b2a_hex
from Crypto.Util.py3compat import *
from Crypto.Util.strxor import strxor_c
# For compatibility with Python 2.1 and Python 2.2
if sys.hexversion < 0x02030000:
@@ -73,22 +66,14 @@ class CipherSelfTest(unittest.TestCase):
self.plaintext = b(_extract(params, 'plaintext'))
self.ciphertext = b(_extract(params, 'ciphertext'))
self.module_name = _extract(params, 'module_name', None)
self.assoc_data = _extract(params, 'assoc_data', None)
self.mac = _extract(params, 'mac', None)
if self.assoc_data:
self.mac = b(self.mac)
mode = _extract(params, 'mode', None)
self.mode_name = str(mode)
if mode is not None:
# Block cipher
self.mode = getattr(self.module, "MODE_" + mode)
self.iv = _extract(params, 'iv', None)
if self.iv is None:
self.iv = _extract(params, 'nonce', None)
if self.iv is not None:
self.iv = b(self.iv)
if self.iv is not None: self.iv = b(self.iv)
# Only relevant for OPENPGP mode
self.encrypted_iv = _extract(params, 'encrypted_iv', None)
@@ -132,60 +117,29 @@ class CipherSelfTest(unittest.TestCase):
else:
return self.module.new(a2b_hex(self.key), self.mode, a2b_hex(self.iv), **params)
def isMode(self, name):
if not hasattr(self.module, "MODE_"+name):
return False
return self.mode == getattr(self.module, "MODE_"+name)
def runTest(self):
plaintext = a2b_hex(self.plaintext)
ciphertext = a2b_hex(self.ciphertext)
assoc_data = []
if self.assoc_data:
assoc_data = [ a2b_hex(b(x)) for x in self.assoc_data]
ct = None
pt = None
ct1 = b2a_hex(self._new().encrypt(plaintext))
pt1 = b2a_hex(self._new(1).decrypt(ciphertext))
ct2 = b2a_hex(self._new().encrypt(plaintext))
pt2 = b2a_hex(self._new(1).decrypt(ciphertext))
#
# Repeat the same encryption or decryption twice and verify
# that the result is always the same
#
for i in xrange(2):
cipher = self._new()
decipher = self._new(1)
# Only AEAD modes
for comp in assoc_data:
cipher.update(comp)
decipher.update(comp)
ctX = b2a_hex(cipher.encrypt(plaintext))
if self.isMode("SIV"):
ptX = b2a_hex(decipher.decrypt_and_verify(ciphertext, a2b_hex(self.mac)))
else:
ptX = b2a_hex(decipher.decrypt(ciphertext))
if ct:
self.assertEqual(ct, ctX)
self.assertEqual(pt, ptX)
ct, pt = ctX, ptX
if self.isMode("OPENPGP"):
if hasattr(self.module, "MODE_OPENPGP") and self.mode == self.module.MODE_OPENPGP:
# In PGP mode, data returned by the first encrypt()
# is prefixed with the encrypted IV.
# Here we check it and then remove it from the ciphertexts.
eilen = len(self.encrypted_iv)
self.assertEqual(self.encrypted_iv, ct[:eilen])
ct = ct[eilen:]
self.assertEqual(self.encrypted_iv, ct1[:eilen])
self.assertEqual(self.encrypted_iv, ct2[:eilen])
ct1 = ct1[eilen:]
ct2 = ct2[eilen:]
self.assertEqual(self.ciphertext, ct) # encrypt
self.assertEqual(self.plaintext, pt) # decrypt
if self.mac:
mac = b2a_hex(cipher.digest())
self.assertEqual(self.mac, mac)
decipher.verify(a2b_hex(self.mac))
self.assertEqual(self.ciphertext, ct1) # encrypt
self.assertEqual(self.ciphertext, ct2) # encrypt (second time)
self.assertEqual(self.plaintext, pt1) # decrypt
self.assertEqual(self.plaintext, pt2) # decrypt (second time)
class CipherStreamingSelfTest(CipherSelfTest):
@@ -241,43 +195,18 @@ class CTRWraparoundTest(unittest.TestCase):
self.module_name = params.get('module_name', None)
def shortDescription(self):
return """Regression test: %s with MODE_CTR raising OverflowError on wraparound""" % (self.module_name,)
return """Regression test: %s with MODE_CTR should raise OverflowError on wraparound when shortcut used""" % (self.module_name,)
def runTest(self):
from Crypto.Util import Counter
def pythonCounter():
state = [0]
def ctr():
# First block succeeds; Second and subsequent blocks raise OverflowError
if state[0] == 0:
state[0] = 1
return b("\xff") * self.module.block_size
else:
raise OverflowError
return ctr
for little_endian in (0, 1): # (False, True) Test both endiannesses
block = b("\x00") * self.module.block_size
# Test PyObject_CallObject code path: if the counter raises OverflowError
cipher = self.module.new(a2b_hex(self.key), self.module.MODE_CTR, counter=pythonCounter())
cipher.encrypt(block)
self.assertRaises(OverflowError, cipher.encrypt, block)
self.assertRaises(OverflowError, cipher.encrypt, block)
# Test PyObject_CallObject code path: counter object should raise OverflowError
ctr = Counter.new(8*self.module.block_size, initial_value=2L**(8*self.module.block_size)-1, little_endian=little_endian)
ctr()
self.assertRaises(OverflowError, ctr)
self.assertRaises(OverflowError, ctr)
# Test the CTR-mode shortcut
ctr = Counter.new(8*self.module.block_size, initial_value=2L**(8*self.module.block_size)-1, little_endian=little_endian)
cipher = self.module.new(a2b_hex(self.key), self.module.MODE_CTR, counter=ctr)
cipher.encrypt(block)
self.assertRaises(OverflowError, cipher.encrypt, block)
self.assertRaises(OverflowError, cipher.encrypt, block)
for disable_shortcut in (0, 1): # (False, True) Test CTR-mode shortcut and PyObject_CallObject code paths
for little_endian in (0, 1): # (False, True) Test both endiannesses
ctr = Counter.new(8*self.module.block_size, initial_value=2L**(8*self.module.block_size)-1, little_endian=little_endian, disable_shortcut=disable_shortcut)
cipher = self.module.new(a2b_hex(self.key), self.module.MODE_CTR, counter=ctr)
block = b("\x00") * self.module.block_size
cipher.encrypt(block)
self.assertRaises(OverflowError, cipher.encrypt, block)
class CFBSegmentSizeTest(unittest.TestCase):
@@ -296,301 +225,6 @@ class CFBSegmentSizeTest(unittest.TestCase):
self.assertRaises(ValueError, self.module.new, a2b_hex(self.key), self.module.MODE_CFB, segment_size=i)
self.module.new(a2b_hex(self.key), self.module.MODE_CFB, "\0"*self.module.block_size, segment_size=8) # should succeed
class CCMMACLengthTest(unittest.TestCase):
"""CCM specific tests about MAC"""
def __init__(self, module):
unittest.TestCase.__init__(self)
self.module = module
self.key = b('\xFF')*16
self.iv = b('\x00')*10
def shortDescription(self):
return self.description
def runTest(self):
"""Verify that MAC can only be 4,6,8,..,16 bytes long."""
for i in range(3,16,2):
self.description = "CCM MAC length check (%d bytes)" % i
self.assertRaises(ValueError, self.module.new, self.key,
self.module.MODE_CCM, self.iv, msg_len=10, mac_len=i)
"""Verify that default MAC length is 16."""
self.description = "CCM default MAC length check"
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv, msg_len=4)
cipher.encrypt(b('z')*4)
self.assertEqual(len(cipher.digest()), 16)
class CCMSplitEncryptionTest(unittest.TestCase):
"""CCM specific tests to validate how encrypt()
decrypt() can be called multiple times on the
same object."""
def __init__(self, module):
unittest.TestCase.__init__(self)
self.module = module
self.key = b('\xFF')*16
self.iv = b('\x00')*10
self.description = "CCM Split Encryption Test"
def shortDescription(self):
return self.description
def runTest(self):
"""Verify that CCM update()/encrypt() can be called multiple times,
provided that lengths are declared beforehand"""
data = b("AUTH DATA")
pt1 = b("PLAINTEXT1") # Short
pt2 = b("PLAINTEXT2") # Long
pt_ref = pt1+pt2
# REFERENCE: Run with 1 update() and 1 encrypt()
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv)
cipher.update(data)
ct_ref = cipher.encrypt(pt_ref)
mac_ref = cipher.digest()
# Verify that calling CCM encrypt()/decrypt() twice is not
# possible without the 'msg_len' parameter and regardless
# of the 'assoc_len' parameter
for ad_len in None, len(data):
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv, assoc_len=ad_len)
cipher.update(data)
cipher.encrypt(pt1)
self.assertRaises(TypeError, cipher.encrypt, pt2)
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv, assoc_len=ad_len)
cipher.update(data)
cipher.decrypt(ct_ref[:len(pt1)])
self.assertRaises(TypeError, cipher.decrypt, ct_ref[len(pt1):])
# Run with 2 encrypt()/decrypt(). Results must be the same
# regardless of the 'assoc_len' parameter
for ad_len in None, len(data):
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv, assoc_len=ad_len, msg_len=len(pt_ref))
cipher.update(data)
ct = cipher.encrypt(pt1)
ct += cipher.encrypt(pt2)
mac = cipher.digest()
self.assertEqual(ct_ref, ct)
self.assertEqual(mac_ref, mac)
cipher = self.module.new(self.key, self.module.MODE_CCM,
self.iv, msg_len=len(pt1+pt2))
cipher.update(data)
pt = cipher.decrypt(ct[:len(pt1)])
pt += cipher.decrypt(ct[len(pt1):])
mac = cipher.verify(mac_ref)
self.assertEqual(pt_ref, pt)
class AEADTests(unittest.TestCase):
"""Tests generic to all AEAD modes"""
def __init__(self, module, mode_name, key_size):
unittest.TestCase.__init__(self)
self.module = module
self.mode_name = mode_name
self.mode = getattr(module, mode_name)
if not self.isMode("SIV"):
self.key = b('\xFF')*key_size
else:
self.key = b('\xFF')*key_size*2
self.iv = b('\x00')*10
self.description = "AEAD Test"
def isMode(self, name):
if not hasattr(self.module, "MODE_"+name):
return False
return self.mode == getattr(self.module, "MODE_"+name)
def right_mac_test(self):
"""Positive tests for MAC"""
self.description = "Test for right MAC in %s of %s" % \
(self.mode_name, self.module.__name__)
ad_ref = b("Reference AD")
pt_ref = b("Reference plaintext")
# Encrypt and create the reference MAC
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(ad_ref)
ct_ref = cipher.encrypt(pt_ref)
mac_ref = cipher.digest()
# Decrypt and verify that MAC is accepted
decipher = self.module.new(self.key, self.mode, self.iv)
decipher.update(ad_ref)
pt = decipher.decrypt_and_verify(ct_ref, mac_ref)
self.assertEqual(pt, pt_ref)
# Verify that hexverify work
decipher.hexverify(hexlify(mac_ref))
def wrong_mac_test(self):
"""Negative tests for MAC"""
self.description = "Test for wrong MAC in %s of %s" % \
(self.mode_name, self.module.__name__)
ad_ref = b("Reference AD")
pt_ref = b("Reference plaintext")
# Encrypt and create the reference MAC
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(ad_ref)
ct_ref = cipher.encrypt(pt_ref)
mac_ref = cipher.digest()
# Modify the MAC and verify it is NOT ACCEPTED
wrong_mac = strxor_c(mac_ref, 255)
decipher = self.module.new(self.key, self.mode, self.iv)
decipher.update(ad_ref)
self.assertRaises(ValueError, decipher.decrypt_and_verify,
ct_ref, wrong_mac)
def zero_data(self):
"""Verify transition from INITIALIZED to FINISHED"""
self.description = "Test for zero data in %s of %s" % \
(self.mode_name, self.module.__name__)
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.digest()
def multiple_updates(self):
"""Verify that update() can be called multiple times"""
self.description = "Test for multiple updates in %s of %s" % \
(self.mode_name, self.module.__name__)
# In all modes other than SIV, the associated data is a single
# component that can be arbitrarilly split and submitted to update().
#
# In SIV, associated data is instead organized in a vector or multiple
# components. Each component is passed to update() as a whole.
# This test is therefore not meaningful to SIV.
if self.isMode("SIV"):
return
ad = b("").join([bchr(x) for x in xrange(0,128)])
mac1, mac2, mac3 = (None,)*3
for chunk_length in 1,10,40,80,128:
chunks = [ad[i:i+chunk_length] for i in range(0, len(ad), chunk_length)]
# No encryption/decryption
cipher = self.module.new(self.key, self.mode, self.iv)
for c in chunks:
cipher.update(c)
if mac1:
cipher.verify(mac1)
else:
mac1 = cipher.digest()
# Encryption
cipher = self.module.new(self.key, self.mode, self.iv)
for c in chunks:
cipher.update(c)
ct = cipher.encrypt(b("PT"))
mac2 = cipher.digest()
# Decryption
cipher = self.module.new(self.key, self.mode, self.iv)
for c in chunks:
cipher.update(c)
cipher.decrypt(ct)
cipher.verify(mac2)
def no_mix_encrypt_decrypt(self):
"""Verify that encrypt and decrypt cannot be mixed up"""
self.description = "Test for mix of encrypt and decrypt in %s of %s" % \
(self.mode_name, self.module.__name__)
# Calling decrypt after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.encrypt(b("PT")*40)
self.assertRaises(TypeError, cipher.decrypt, b("XYZ")*40)
# Calling encrypt() after decrypt() raises an exception
# (excluded for SIV, since decrypt() is not valid)
if not self.isMode("SIV"):
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.decrypt(b("CT")*40)
self.assertRaises(TypeError, cipher.encrypt, b("XYZ")*40)
# Calling verify after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.encrypt(b("PT")*40)
self.assertRaises(TypeError, cipher.verify, b("XYZ"))
self.assertRaises(TypeError, cipher.hexverify, "12")
# Calling digest() after decrypt() raises an exception
# (excluded for SIV, since decrypt() is not valid)
if not self.isMode("SIV"):
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.decrypt(b("CT")*40)
self.assertRaises(TypeError, cipher.digest)
self.assertRaises(TypeError, cipher.hexdigest)
def no_late_update(self):
"""Verify that update cannot be called after encrypt or decrypt"""
self.description = "Test for late update in %s of %s" % \
(self.mode_name, self.module.__name__)
# Calling update after encrypt raises an exception
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(b("XX"))
cipher.encrypt(b("PT")*40)
self.assertRaises(TypeError, cipher.update, b("XYZ"))
# Calling update() after decrypt() raises an exception
# (excluded for SIV, since decrypt() is not valid)
if not self.isMode("SIV"):
cipher = self.module.new(self.key, self.mode, self.iv)
cipher.update(b("XX"))
cipher.decrypt(b("CT")*40)
self.assertRaises(TypeError, cipher.update, b("XYZ"))
def loopback(self):
"""Verify composition of encrypt_and_digest() and decrypt_and_verify()
is the identity function."""
self.description = "Lookback test decrypt_and_verify(encrypt_and_digest)"\
"for %s in %s" % (self.mode_name,
self.module.__name__)
enc_cipher = self.module.new(self.key, self.mode, self.iv)
dec_cipher = self.module.new(self.key, self.mode, self.iv)
enc_cipher.update(b("XXX"))
dec_cipher.update(b("XXX"))
plaintext = b("Reference") * 10
ct, mac = enc_cipher.encrypt_and_digest(plaintext)
pt = dec_cipher.decrypt_and_verify(ct, mac)
self.assertEqual(plaintext, pt)
def runTest(self):
self.right_mac_test()
self.wrong_mac_test()
self.zero_data()
self.multiple_updates()
self.no_mix_encrypt_decrypt()
self.no_late_update()
self.loopback()
def shortDescription(self):
return self.description
class RoundtripTest(unittest.TestCase):
def __init__(self, module, params):
from Crypto import Random
@@ -608,7 +242,7 @@ class RoundtripTest(unittest.TestCase):
for mode in (self.module.MODE_ECB, self.module.MODE_CBC, self.module.MODE_CFB, self.module.MODE_OFB, self.module.MODE_OPENPGP):
encryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv)
ciphertext = encryption_cipher.encrypt(self.plaintext)
if mode != self.module.MODE_OPENPGP:
decryption_cipher = self.module.new(a2b_hex(self.key), mode, self.iv)
else:
@@ -649,17 +283,13 @@ class IVLengthTest(unittest.TestCase):
self.module.MODE_OFB, "")
self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
self.module.MODE_OPENPGP, "")
if hasattr(self.module, "MODE_CCM"):
for ivlen in (0,6,14):
self.assertRaises(ValueError, self.module.new, a2b_hex(self.key),
self.module.MODE_CCM, bchr(0)*ivlen, msg_len=10)
self.module.new(a2b_hex(self.key), self.module.MODE_ECB, "")
self.module.new(a2b_hex(self.key), self.module.MODE_CTR, "", counter=self._dummy_counter)
def _dummy_counter(self):
return "\0" * self.module.block_size
def make_block_tests(module, module_name, test_data, additional_params=dict()):
def make_block_tests(module, module_name, test_data):
tests = []
extra_tests_added = 0
for i in range(len(test_data)):
@@ -696,7 +326,6 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
name = "%s #%d: %s" % (module_name, i+1, description)
params['description'] = name
params['module_name'] = module_name
params.update(additional_params)
# Add extra test(s) to the test suite before the current test
if not extra_tests_added:
@@ -710,18 +339,11 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
]
extra_tests_added = 1
# Extract associated data and MAC for AEAD modes
if p_mode in ('CCM', 'EAX', 'SIV', 'GCM'):
assoc_data, params['plaintext'] = params['plaintext'].split('|')
assoc_data2, params['ciphertext'], params['mac'] = params['ciphertext'].split('|')
params['assoc_data'] = assoc_data.split("-")
params['mac_len'] = len(params['mac'])>>1
# Add the current test to the test suite
tests.append(CipherSelfTest(module, params))
# When using CTR mode, test that the interface behaves like a stream cipher
if p_mode in ('OFB', 'CTR'):
if p_mode == 'CTR':
tests.append(CipherStreamingSelfTest(module, params))
# When using CTR mode, test the non-shortcut code path.
@@ -733,25 +355,6 @@ def make_block_tests(module, module_name, test_data, additional_params=dict()):
if not params2['ctr_params'].has_key('disable_shortcut'):
params2['ctr_params']['disable_shortcut'] = 1
tests.append(CipherSelfTest(module, params2))
# Add tests that don't use test vectors
if hasattr(module, "MODE_CCM"):
tests += [
CCMMACLengthTest(module),
CCMSplitEncryptionTest(module),
]
for aead_mode in ("MODE_CCM","MODE_EAX", "MODE_SIV", "MODE_GCM"):
if hasattr(module, aead_mode):
key_sizes = []
try:
key_sizes += module.key_size
except TypeError:
key_sizes = [ module.key_size ]
for ks in key_sizes:
tests += [
AEADTests(module, aead_mode, ks),
]
return tests
def make_stream_tests(module, module_name, test_data):