mirror of
https://github.com/python/cpython.git
synced 2025-07-29 06:05:00 +00:00
Issue #1646: Make socket support TIPC. The socket module now has support
for TIPC under Linux, see http://tipc.sf.net/ for more information. Thanks to Alberto Bertogli for the patch
This commit is contained in:
parent
e28fa297e9
commit
fb2d25a154
8 changed files with 278 additions and 5 deletions
|
@ -1084,6 +1084,85 @@ class BufferIOTest(SocketConnectedTest):
|
|||
buf = buffer(MSG)
|
||||
self.serv_conn.send(buf)
|
||||
|
||||
|
||||
TIPC_STYPE = 2000
|
||||
TIPC_LOWER = 200
|
||||
TIPC_UPPER = 210
|
||||
|
||||
def isTipcAvailable():
|
||||
"""Check if the TIPC module is loaded
|
||||
|
||||
The TIPC module is not loaded automatically on Ubuntu and probably
|
||||
other Linux distros.
|
||||
"""
|
||||
if not hasattr(socket, "AF_TIPC"):
|
||||
return False
|
||||
if not os.path.isfile("/proc/modules"):
|
||||
return False
|
||||
with open("/proc/modules") as f:
|
||||
for line in f:
|
||||
if line.startswith("tipc "):
|
||||
return True
|
||||
if test_support.debug:
|
||||
print "TIPC module is not loaded, please 'sudo modprobe tipc'"
|
||||
return False
|
||||
|
||||
class TIPCTest (unittest.TestCase):
|
||||
def testRDM(self):
|
||||
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
||||
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
|
||||
|
||||
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
||||
TIPC_LOWER, TIPC_UPPER)
|
||||
srv.bind(srvaddr)
|
||||
|
||||
sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
||||
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
||||
cli.sendto(MSG, sendaddr)
|
||||
|
||||
msg, recvaddr = srv.recvfrom(1024)
|
||||
|
||||
self.assertEqual(cli.getsockname(), recvaddr)
|
||||
self.assertEqual(msg, MSG)
|
||||
|
||||
|
||||
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
|
||||
def __init__(self, methodName = 'runTest'):
|
||||
unittest.TestCase.__init__(self, methodName = methodName)
|
||||
ThreadableTest.__init__(self)
|
||||
|
||||
def setUp(self):
|
||||
self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
||||
self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
|
||||
TIPC_LOWER, TIPC_UPPER)
|
||||
self.srv.bind(srvaddr)
|
||||
self.srv.listen(5)
|
||||
self.serverExplicitReady()
|
||||
self.conn, self.connaddr = self.srv.accept()
|
||||
|
||||
def clientSetUp(self):
|
||||
# The is a hittable race between serverExplicitReady() and the
|
||||
# accept() call; sleep a little while to avoid it, otherwise
|
||||
# we could get an exception
|
||||
time.sleep(0.1)
|
||||
self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
|
||||
addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
|
||||
TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
|
||||
self.cli.connect(addr)
|
||||
self.cliaddr = self.cli.getsockname()
|
||||
|
||||
def testStream(self):
|
||||
msg = self.conn.recv(1024)
|
||||
self.assertEqual(msg, MSG)
|
||||
self.assertEqual(self.cliaddr, self.connaddr)
|
||||
|
||||
def _testStream(self):
|
||||
self.cli.send(MSG)
|
||||
self.cli.close()
|
||||
|
||||
|
||||
def test_main():
|
||||
tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
|
||||
TestExceptions, BufferIOTest, BasicTCPTest2]
|
||||
|
@ -1105,6 +1184,9 @@ def test_main():
|
|||
tests.append(BasicSocketPairTest)
|
||||
if sys.platform == 'linux2':
|
||||
tests.append(TestLinuxAbstractNamespace)
|
||||
if isTipcAvailable():
|
||||
tests.append(TIPCTest)
|
||||
tests.append(TIPCThreadableTest)
|
||||
|
||||
thread_info = test_support.threading_setup()
|
||||
test_support.run_unittest(*tests)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue