mirror of
https://github.com/python/cpython.git
synced 2025-11-25 12:44:13 +00:00
Issue #10141: socket: add SocketCAN (PF_CAN) support. Initial patch by Matthias
Fuchs, updated by Tiago Gonçalves.
This commit is contained in:
parent
90c30e87be
commit
47413c1171
10 changed files with 683 additions and 321 deletions
|
|
@ -21,6 +21,7 @@ from weakref import proxy
|
|||
import signal
|
||||
import math
|
||||
import pickle
|
||||
import struct
|
||||
try:
|
||||
import fcntl
|
||||
except ImportError:
|
||||
|
|
@ -36,6 +37,18 @@ except ImportError:
|
|||
thread = None
|
||||
threading = None
|
||||
|
||||
def _have_socket_can():
|
||||
"""Check whether CAN sockets are supported on this host."""
|
||||
try:
|
||||
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
|
||||
except (AttributeError, socket.error, OSError):
|
||||
return False
|
||||
else:
|
||||
s.close()
|
||||
return True
|
||||
|
||||
HAVE_SOCKET_CAN = _have_socket_can()
|
||||
|
||||
# Size in bytes of the int type
|
||||
SIZEOF_INT = array.array("i").itemsize
|
||||
|
||||
|
|
@ -80,6 +93,30 @@ class ThreadSafeCleanupTestCase(unittest.TestCase):
|
|||
with self._cleanup_lock:
|
||||
return super().doCleanups(*args, **kwargs)
|
||||
|
||||
class SocketCANTest(unittest.TestCase):
|
||||
|
||||
"""To be able to run this test, a `vcan0` CAN interface can be created with
|
||||
the following commands:
|
||||
# modprobe vcan
|
||||
# ip link add dev vcan0 type vcan
|
||||
# ifconfig vcan0 up
|
||||
"""
|
||||
interface = 'vcan0'
|
||||
bufsize = 128
|
||||
|
||||
def setUp(self):
|
||||
self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
|
||||
try:
|
||||
self.s.bind((self.interface,))
|
||||
except socket.error:
|
||||
self.skipTest('network interface `%s` does not exist' %
|
||||
self.interface)
|
||||
self.s.close()
|
||||
|
||||
def tearDown(self):
|
||||
self.s.close()
|
||||
self.s = None
|
||||
|
||||
class ThreadableTest:
|
||||
"""Threadable Test class
|
||||
|
||||
|
|
@ -210,6 +247,26 @@ class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
|
|||
self.cli = None
|
||||
ThreadableTest.clientTearDown(self)
|
||||
|
||||
class ThreadedCANSocketTest(SocketCANTest, ThreadableTest):
|
||||
|
||||
def __init__(self, methodName='runTest'):
|
||||
SocketCANTest.__init__(self, methodName=methodName)
|
||||
ThreadableTest.__init__(self)
|
||||
|
||||
def clientSetUp(self):
|
||||
self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
|
||||
try:
|
||||
self.cli.bind((self.interface,))
|
||||
except socket.error:
|
||||
self.skipTest('network interface `%s` does not exist' %
|
||||
self.interface)
|
||||
self.cli.close()
|
||||
|
||||
def clientTearDown(self):
|
||||
self.cli.close()
|
||||
self.cli = None
|
||||
ThreadableTest.clientTearDown(self)
|
||||
|
||||
class SocketConnectedTest(ThreadedTCPSocketTest):
|
||||
"""Socket tests for client-server connection.
|
||||
|
||||
|
|
@ -1072,6 +1129,112 @@ class GeneralModuleTests(unittest.TestCase):
|
|||
srv.close()
|
||||
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
|
||||
class BasicCANTest(unittest.TestCase):
|
||||
|
||||
def testCrucialConstants(self):
|
||||
socket.AF_CAN
|
||||
socket.PF_CAN
|
||||
socket.CAN_RAW
|
||||
|
||||
def testCreateSocket(self):
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
pass
|
||||
|
||||
def testBindAny(self):
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
s.bind(('', ))
|
||||
|
||||
def testTooLongInterfaceName(self):
|
||||
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
self.assertRaisesRegexp(socket.error, 'interface name too long',
|
||||
s.bind, ('x' * 1024,))
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, "CAN_RAW_LOOPBACK"),
|
||||
'socket.CAN_RAW_LOOPBACK required for this test.')
|
||||
def testLoopback(self):
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
for loopback in (0, 1):
|
||||
s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK,
|
||||
loopback)
|
||||
self.assertEqual(loopback,
|
||||
s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, "CAN_RAW_FILTER"),
|
||||
'socket.CAN_RAW_FILTER required for this test.')
|
||||
def testFilter(self):
|
||||
can_id, can_mask = 0x200, 0x700
|
||||
can_filter = struct.pack("=II", can_id, can_mask)
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter)
|
||||
self.assertEqual(can_filter,
|
||||
s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
|
||||
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
|
||||
@unittest.skipUnless(thread, 'Threading required for this test.')
|
||||
class CANTest(ThreadedCANSocketTest):
|
||||
|
||||
"""The CAN frame structure is defined in <linux/can.h>:
|
||||
|
||||
struct can_frame {
|
||||
canid_t can_id; /* 32 bit CAN_ID + EFF/RTR/ERR flags */
|
||||
__u8 can_dlc; /* data length code: 0 .. 8 */
|
||||
__u8 data[8] __attribute__((aligned(8)));
|
||||
};
|
||||
"""
|
||||
can_frame_fmt = "=IB3x8s"
|
||||
|
||||
def __init__(self, methodName='runTest'):
|
||||
ThreadedCANSocketTest.__init__(self, methodName=methodName)
|
||||
|
||||
@classmethod
|
||||
def build_can_frame(cls, can_id, data):
|
||||
"""Build a CAN frame."""
|
||||
can_dlc = len(data)
|
||||
data = data.ljust(8, b'\x00')
|
||||
return struct.pack(cls.can_frame_fmt, can_id, can_dlc, data)
|
||||
|
||||
@classmethod
|
||||
def dissect_can_frame(cls, frame):
|
||||
"""Dissect a CAN frame."""
|
||||
can_id, can_dlc, data = struct.unpack(cls.can_frame_fmt, frame)
|
||||
return (can_id, can_dlc, data[:can_dlc])
|
||||
|
||||
def testSendFrame(self):
|
||||
cf, addr = self.s.recvfrom(self.bufsize)
|
||||
self.assertEqual(self.cf, cf)
|
||||
self.assertEqual(addr[0], self.interface)
|
||||
self.assertEqual(addr[1], socket.AF_CAN)
|
||||
|
||||
def _testSendFrame(self):
|
||||
self.cf = self.build_can_frame(0x00, b'\x01\x02\x03\x04\x05')
|
||||
self.cli.send(self.cf)
|
||||
|
||||
def testSendMaxFrame(self):
|
||||
cf, addr = self.s.recvfrom(self.bufsize)
|
||||
self.assertEqual(self.cf, cf)
|
||||
|
||||
def _testSendMaxFrame(self):
|
||||
self.cf = self.build_can_frame(0x00, b'\x07' * 8)
|
||||
self.cli.send(self.cf)
|
||||
|
||||
def testSendMultiFrames(self):
|
||||
cf, addr = self.s.recvfrom(self.bufsize)
|
||||
self.assertEqual(self.cf1, cf)
|
||||
|
||||
cf, addr = self.s.recvfrom(self.bufsize)
|
||||
self.assertEqual(self.cf2, cf)
|
||||
|
||||
def _testSendMultiFrames(self):
|
||||
self.cf1 = self.build_can_frame(0x07, b'\x44\x33\x22\x11')
|
||||
self.cli.send(self.cf1)
|
||||
|
||||
self.cf2 = self.build_can_frame(0x12, b'\x99\x22\x33')
|
||||
self.cli.send(self.cf2)
|
||||
|
||||
|
||||
@unittest.skipUnless(thread, 'Threading required for this test.')
|
||||
class BasicTCPTest(SocketConnectedTest):
|
||||
|
||||
|
|
@ -4194,6 +4357,7 @@ def test_main():
|
|||
if isTipcAvailable():
|
||||
tests.append(TIPCTest)
|
||||
tests.append(TIPCThreadableTest)
|
||||
tests.extend([BasicCANTest, CANTest])
|
||||
tests.extend([
|
||||
CmsgMacroTests,
|
||||
SendmsgUDPTest,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue