Issue #27744: Add AF_ALG (Linux Kernel crypto) to socket module.

This commit is contained in:
Christian Heimes 2016-09-05 23:54:41 +02:00
parent 92a6c170e6
commit dffa3949c7
7 changed files with 647 additions and 73 deletions

View file

@ -5325,6 +5325,170 @@ class SendfileUsingSendfileTest(SendfileUsingSendTest):
def meth_from_sock(self, sock):
return getattr(sock, "_sendfile_use_sendfile")
@unittest.skipUnless(hasattr(socket, "AF_ALG"), 'AF_ALG required')
class LinuxKernelCryptoAPI(unittest.TestCase):
# tests for AF_ALG
def create_alg(self, typ, name):
sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
sock.bind((typ, name))
return sock
def test_sha256(self):
expected = bytes.fromhex("ba7816bf8f01cfea414140de5dae2223b00361a396"
"177a9cb410ff61f20015ad")
with self.create_alg('hash', 'sha256') as algo:
op, _ = algo.accept()
with op:
op.sendall(b"abc")
self.assertEqual(op.recv(512), expected)
op, _ = algo.accept()
with op:
op.send(b'a', socket.MSG_MORE)
op.send(b'b', socket.MSG_MORE)
op.send(b'c', socket.MSG_MORE)
op.send(b'')
self.assertEqual(op.recv(512), expected)
def test_hmac_sha1(self):
expected = bytes.fromhex("effcdf6ae5eb2fa2d27416d5f184df9c259a7c79")
with self.create_alg('hash', 'hmac(sha1)') as algo:
algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, b"Jefe")
op, _ = algo.accept()
with op:
op.sendall(b"what do ya want for nothing?")
self.assertEqual(op.recv(512), expected)
def test_aes_cbc(self):
key = bytes.fromhex('06a9214036b8a15b512e03d534120006')
iv = bytes.fromhex('3dafba429d9eb430b422da802c9fac41')
msg = b"Single block msg"
ciphertext = bytes.fromhex('e353779c1079aeb82708942dbe77181a')
msglen = len(msg)
with self.create_alg('skcipher', 'cbc(aes)') as algo:
algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
op, _ = algo.accept()
with op:
op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
flags=socket.MSG_MORE)
op.sendall(msg)
self.assertEqual(op.recv(msglen), ciphertext)
op, _ = algo.accept()
with op:
op.sendmsg_afalg([ciphertext],
op=socket.ALG_OP_DECRYPT, iv=iv)
self.assertEqual(op.recv(msglen), msg)
# long message
multiplier = 1024
longmsg = [msg] * multiplier
op, _ = algo.accept()
with op:
op.sendmsg_afalg(longmsg,
op=socket.ALG_OP_ENCRYPT, iv=iv)
enc = op.recv(msglen * multiplier)
self.assertEqual(len(enc), msglen * multiplier)
self.assertTrue(enc[:msglen], ciphertext)
op, _ = algo.accept()
with op:
op.sendmsg_afalg([enc],
op=socket.ALG_OP_DECRYPT, iv=iv)
dec = op.recv(msglen * multiplier)
self.assertEqual(len(dec), msglen * multiplier)
self.assertEqual(dec, msg * multiplier)
@support.requires_linux_version(3, 19)
def test_aead_aes_gcm(self):
key = bytes.fromhex('c939cc13397c1d37de6ae0e1cb7c423c')
iv = bytes.fromhex('b3d8cc017cbb89b39e0f67e2')
plain = bytes.fromhex('c3b3c41f113a31b73d9a5cd432103069')
assoc = bytes.fromhex('24825602bd12a984e0092d3e448eda5f')
expected_ct = bytes.fromhex('93fe7d9e9bfd10348a5606e5cafa7354')
expected_tag = bytes.fromhex('0032a1dc85f1c9786925a2e71d8272dd')
taglen = len(expected_tag)
assoclen = len(assoc)
with self.create_alg('aead', 'gcm(aes)') as algo:
algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, key)
algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_AEAD_AUTHSIZE,
None, taglen)
# send assoc, plain and tag buffer in separate steps
op, _ = algo.accept()
with op:
op.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, iv=iv,
assoclen=assoclen, flags=socket.MSG_MORE)
op.sendall(assoc, socket.MSG_MORE)
op.sendall(plain, socket.MSG_MORE)
op.sendall(b'\x00' * taglen)
res = op.recv(assoclen + len(plain) + taglen)
self.assertEqual(expected_ct, res[assoclen:-taglen])
self.assertEqual(expected_tag, res[-taglen:])
# now with msg
op, _ = algo.accept()
with op:
msg = assoc + plain + b'\x00' * taglen
op.sendmsg_afalg([msg], op=socket.ALG_OP_ENCRYPT, iv=iv,
assoclen=assoclen)
res = op.recv(assoclen + len(plain) + taglen)
self.assertEqual(expected_ct, res[assoclen:-taglen])
self.assertEqual(expected_tag, res[-taglen:])
# create anc data manually
pack_uint32 = struct.Struct('I').pack
op, _ = algo.accept()
with op:
msg = assoc + plain + b'\x00' * taglen
op.sendmsg(
[msg],
([socket.SOL_ALG, socket.ALG_SET_OP, pack_uint32(socket.ALG_OP_ENCRYPT)],
[socket.SOL_ALG, socket.ALG_SET_IV, pack_uint32(len(iv)) + iv],
[socket.SOL_ALG, socket.ALG_SET_AEAD_ASSOCLEN, pack_uint32(assoclen)],
)
)
res = op.recv(len(msg))
self.assertEqual(expected_ct, res[assoclen:-taglen])
self.assertEqual(expected_tag, res[-taglen:])
# decrypt and verify
op, _ = algo.accept()
with op:
msg = assoc + expected_ct + expected_tag
op.sendmsg_afalg([msg], op=socket.ALG_OP_DECRYPT, iv=iv,
assoclen=assoclen)
res = op.recv(len(msg))
self.assertEqual(plain, res[assoclen:-taglen])
def test_drbg_pr_sha256(self):
# deterministic random bit generator, prediction resistance, sha256
with self.create_alg('rng', 'drbg_pr_sha256') as algo:
extra_seed = os.urandom(32)
algo.setsockopt(socket.SOL_ALG, socket.ALG_SET_KEY, extra_seed)
op, _ = algo.accept()
with op:
rn = op.recv(32)
self.assertEqual(len(rn), 32)
def test_sendmsg_afalg_args(self):
sock = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
with self.assertRaises(TypeError):
sock.sendmsg_afalg()
with self.assertRaises(TypeError):
sock.sendmsg_afalg(op=None)
with self.assertRaises(TypeError):
sock.sendmsg_afalg(1)
with self.assertRaises(TypeError):
sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=None)
with self.assertRaises(TypeError):
sock.sendmsg_afalg(op=socket.ALG_OP_ENCRYPT, assoclen=-1)
def test_main():
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
@ -5352,6 +5516,7 @@ def test_main():
tests.extend([TIPCTest, TIPCThreadableTest])
tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest])
tests.append(LinuxKernelCryptoAPI)
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,