mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
bpo-30987 - Support for ISO-TP protocol in SocketCAN (#2956)
* Added support for CAN_ISOTP protocol * Added unit tests for CAN ISOTP * Updated documentation for ISO-TP protocol * Removed trailing whitespace in documentation * Added blurb NEWS.d file * updated Misc/ACKS * Fixed broken unit test that was using isotp const outside of skippable section * Removed dependecy over third party project * Added implementation for getsockname + unit tests * Missing newline at end of ACKS file * Accidentally inserted a type in ACKS file * Followed tiran changes review #1 recommendations * Added spaces after comma
This commit is contained in:
parent
ed94a8b285
commit
a30f6d45ac
5 changed files with 146 additions and 8 deletions
|
@ -55,6 +55,16 @@ def _have_socket_can():
|
|||
s.close()
|
||||
return True
|
||||
|
||||
def _have_socket_can_isotp():
|
||||
"""Check whether CAN ISOTP sockets are supported on this host."""
|
||||
try:
|
||||
s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP)
|
||||
except (AttributeError, OSError):
|
||||
return False
|
||||
else:
|
||||
s.close()
|
||||
return True
|
||||
|
||||
def _have_socket_rds():
|
||||
"""Check whether RDS sockets are supported on this host."""
|
||||
try:
|
||||
|
@ -77,6 +87,8 @@ def _have_socket_alg():
|
|||
|
||||
HAVE_SOCKET_CAN = _have_socket_can()
|
||||
|
||||
HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
|
||||
|
||||
HAVE_SOCKET_RDS = _have_socket_rds()
|
||||
|
||||
HAVE_SOCKET_ALG = _have_socket_alg()
|
||||
|
@ -1709,6 +1721,49 @@ class CANTest(ThreadedCANSocketTest):
|
|||
self.assertEqual(bytes_sent, len(header_plus_frame))
|
||||
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_CAN_ISOTP, 'CAN ISOTP required for this test.')
|
||||
class ISOTPTest(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.interface = "vcan0"
|
||||
|
||||
def testCrucialConstants(self):
|
||||
socket.AF_CAN
|
||||
socket.PF_CAN
|
||||
socket.CAN_ISOTP
|
||||
socket.SOCK_DGRAM
|
||||
|
||||
def testCreateSocket(self):
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s:
|
||||
pass
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, "CAN_ISOTP"),
|
||||
'socket.CAN_ISOTP required for this test.')
|
||||
def testCreateISOTPSocket(self):
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
|
||||
pass
|
||||
|
||||
def testTooLongInterfaceName(self):
|
||||
# most systems limit IFNAMSIZ to 16, take 1024 to be sure
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
|
||||
with self.assertRaisesRegex(OSError, 'interface name too long'):
|
||||
s.bind(('x' * 1024, 1, 2))
|
||||
|
||||
def testBind(self):
|
||||
try:
|
||||
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_ISOTP) as s:
|
||||
addr = self.interface, 0x123, 0x456
|
||||
s.bind(addr)
|
||||
self.assertEqual(s.getsockname(), addr)
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENODEV:
|
||||
self.skipTest('network interface `%s` does not exist' %
|
||||
self.interface)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
|
||||
class BasicRDSTest(unittest.TestCase):
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue