Issue #7777: socket: Add Reliable Datagram Sockets (PF_RDS) support.

This commit is contained in:
Charles-François Natali 2011-11-10 19:21:37 +01:00
parent 0c929d9d39
commit 10b8cf4455
5 changed files with 260 additions and 5 deletions

View file

@ -47,8 +47,20 @@ def _have_socket_can():
s.close()
return True
def _have_socket_rds():
"""Check whether RDS sockets are supported on this host."""
try:
s = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
except (AttributeError, OSError):
return False
else:
s.close()
return True
HAVE_SOCKET_CAN = _have_socket_can()
HAVE_SOCKET_RDS = _have_socket_rds()
# Size in bytes of the int type
SIZEOF_INT = array.array("i").itemsize
@ -113,6 +125,23 @@ class SocketCANTest(unittest.TestCase):
self.skipTest('network interface `%s` does not exist' %
self.interface)
class SocketRDSTest(unittest.TestCase):
"""To be able to run this test, the `rds` kernel module must be loaded:
# modprobe rds
"""
bufsize = 8192
def setUp(self):
self.serv = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
self.addCleanup(self.serv.close)
try:
self.port = support.bind_port(self.serv)
except OSError:
self.skipTest('unable to bind RDS socket')
class ThreadableTest:
"""Threadable Test class
@ -271,6 +300,29 @@ class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
self.cli = None
ThreadableTest.clientTearDown(self)
class ThreadedRDSSocketTest(SocketRDSTest, ThreadableTest):
def __init__(self, methodName='runTest'):
SocketRDSTest.__init__(self, methodName=methodName)
ThreadableTest.__init__(self)
self.evt = threading.Event()
def clientSetUp(self):
self.cli = socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0)
try:
# RDS sockets must be bound explicitly to send or receive data
self.cli.bind((HOST, 0))
self.cli_addr = self.cli.getsockname()
except OSError:
# skipTest should not be called here, and will be called in the
# server instead
pass
def clientTearDown(self):
self.cli.close()
self.cli = None
ThreadableTest.clientTearDown(self)
class SocketConnectedTest(ThreadedTCPSocketTest):
"""Socket tests for client-server connection.
@ -1239,6 +1291,112 @@ class CANTest(ThreadedCANSocketTest):
self.cli.send(self.cf2)
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):
def testCrucialConstants(self):
socket.AF_RDS
socket.PF_RDS
def testCreateSocket(self):
with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
pass
def testSocketBufferSize(self):
bufsize = 16384
with socket.socket(socket.PF_RDS, socket.SOCK_SEQPACKET, 0) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, bufsize)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, bufsize)
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
@unittest.skipUnless(thread, 'Threading required for this test.')
class RDSTest(ThreadedRDSSocketTest):
def __init__(self, methodName='runTest'):
ThreadedRDSSocketTest.__init__(self, methodName=methodName)
def testSendAndRecv(self):
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
self.assertEqual(self.cli_addr, addr)
def _testSendAndRecv(self):
self.data = b'spam'
self.cli.sendto(self.data, 0, (HOST, self.port))
def testPeek(self):
data, addr = self.serv.recvfrom(self.bufsize, socket.MSG_PEEK)
self.assertEqual(self.data, data)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
def _testPeek(self):
self.data = b'spam'
self.cli.sendto(self.data, 0, (HOST, self.port))
@requireAttrs(socket.socket, 'recvmsg')
def testSendAndRecvMsg(self):
data, ancdata, msg_flags, addr = self.serv.recvmsg(self.bufsize)
self.assertEqual(self.data, data)
@requireAttrs(socket.socket, 'sendmsg')
def _testSendAndRecvMsg(self):
self.data = b'hello ' * 10
self.cli.sendmsg([self.data], (), 0, (HOST, self.port))
def testSendAndRecvMulti(self):
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data1, data)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data2, data)
def _testSendAndRecvMulti(self):
self.data1 = b'bacon'
self.cli.sendto(self.data1, 0, (HOST, self.port))
self.data2 = b'egg'
self.cli.sendto(self.data2, 0, (HOST, self.port))
def testSelect(self):
r, w, x = select.select([self.serv], [], [], 3.0)
self.assertIn(self.serv, r)
data, addr = self.serv.recvfrom(self.bufsize)
self.assertEqual(self.data, data)
def _testSelect(self):
self.data = b'select'
self.cli.sendto(self.data, 0, (HOST, self.port))
def testCongestion(self):
# wait until the sender is done
self.evt.wait()
def _testCongestion(self):
# test the behavior in case of congestion
self.data = b'fill'
self.cli.setblocking(False)
try:
# try to lower the receiver's socket buffer size
self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
except OSError:
pass
with self.assertRaises(OSError) as cm:
try:
# fill the receiver's socket buffer
while True:
self.cli.sendto(self.data, 0, (HOST, self.port))
finally:
# signal the receiver we're done
self.evt.set()
# sendto() should have failed with ENOBUFS
self.assertEqual(cm.exception.errno, errno.ENOBUFS)
# and we should have received a congestion notification through poll
r, w, x = select.select([self.serv], [], [], 3.0)
self.assertIn(self.serv, r)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
@ -4362,6 +4520,7 @@ def test_main():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest])
tests.extend([
CmsgMacroTests,
SendmsgUDPTest,