bpo-40291: Add support for CAN_J1939 sockets (GH-19538)

Add support for CAN_J1939 sockets that wrap SAE J1939 protocol
functionality provided by Linux 5.4+ kernels.
This commit is contained in:
karl ding 2020-04-29 15:31:19 -07:00 committed by GitHub
parent fd33cdbd05
commit 360371f79c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 197 additions and 5 deletions

View file

@ -80,6 +80,16 @@ def _have_socket_can_isotp():
s.close()
return True
def _have_socket_can_j1939():
"""Check whether CAN J1939 sockets are supported on this host."""
try:
s = socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939)
except (AttributeError, OSError):
return False
else:
s.close()
return True
def _have_socket_rds():
"""Check whether RDS sockets are supported on this host."""
try:
@ -143,6 +153,8 @@ HAVE_SOCKET_CAN = _have_socket_can()
HAVE_SOCKET_CAN_ISOTP = _have_socket_can_isotp()
HAVE_SOCKET_CAN_J1939 = _have_socket_can_j1939()
HAVE_SOCKET_RDS = _have_socket_rds()
HAVE_SOCKET_ALG = _have_socket_alg()
@ -2117,6 +2129,68 @@ class ISOTPTest(unittest.TestCase):
raise
@unittest.skipUnless(HAVE_SOCKET_CAN_J1939, 'CAN J1939 required for this test.')
class J1939Test(unittest.TestCase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.interface = "vcan0"
@unittest.skipUnless(hasattr(socket, "CAN_J1939"),
'socket.CAN_J1939 required for this test.')
def testJ1939Constants(self):
socket.CAN_J1939
socket.J1939_MAX_UNICAST_ADDR
socket.J1939_IDLE_ADDR
socket.J1939_NO_ADDR
socket.J1939_NO_NAME
socket.J1939_PGN_REQUEST
socket.J1939_PGN_ADDRESS_CLAIMED
socket.J1939_PGN_ADDRESS_COMMANDED
socket.J1939_PGN_PDU1_MAX
socket.J1939_PGN_MAX
socket.J1939_NO_PGN
# J1939 socket options
socket.SO_J1939_FILTER
socket.SO_J1939_PROMISC
socket.SO_J1939_SEND_PRIO
socket.SO_J1939_ERRQUEUE
socket.SCM_J1939_DEST_ADDR
socket.SCM_J1939_DEST_NAME
socket.SCM_J1939_PRIO
socket.SCM_J1939_ERRQUEUE
socket.J1939_NLA_PAD
socket.J1939_NLA_BYTES_ACKED
socket.J1939_EE_INFO_NONE
socket.J1939_EE_INFO_TX_ABORT
socket.J1939_FILTER_MAX
@unittest.skipUnless(hasattr(socket, "CAN_J1939"),
'socket.CAN_J1939 required for this test.')
def testCreateJ1939Socket(self):
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
pass
def testBind(self):
try:
with socket.socket(socket.PF_CAN, socket.SOCK_DGRAM, socket.CAN_J1939) as s:
addr = self.interface, socket.J1939_NO_NAME, socket.J1939_NO_PGN, socket.J1939_NO_ADDR
s.bind(addr)
self.assertEqual(s.getsockname(), addr)
except OSError as e:
if e.errno == errno.ENODEV:
self.skipTest('network interface `%s` does not exist' %
self.interface)
else:
raise
@unittest.skipUnless(HAVE_SOCKET_RDS, 'RDS sockets required for this test.')
class BasicRDSTest(unittest.TestCase):