mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
gh-132099: Harmonize Bluetooth address handling (GH-132486)
Now all protocols always accept the Bluetooth address as string and getsockname() always returns the Bluetooth address as string. * BTPROTO_SCO now accepts not only bytes, but str. * BTPROTO_SCO now checks address for embedded null. * On *BSD, BTPROTO_HCI now accepts str instead of bytes. * On FreeBSD, getsockname() for BTPROTO_HCI now returns str instead of bytes. * On NetBSD and DragonFly BDS, BTPROTO_HCI now checks address for embedded null.
This commit is contained in:
parent
ccad61e35d
commit
1fc1df8dcc
3 changed files with 57 additions and 37 deletions
|
@ -158,9 +158,8 @@ created. Socket addresses are represented as follows:
|
|||
|
||||
- On Linux it accepts a tuple ``(device_id,)`` where ``device_id``
|
||||
is an integer specifying the number of the Bluetooth device.
|
||||
- On FreeBSD, NetBSD and DragonFly BSD it accepts ``bdaddr`` where ``bdaddr``
|
||||
is a :class:`bytes` object containing the Bluetooth address in a
|
||||
string format. (ex. ``b'12:23:34:45:56:67'``)
|
||||
- On FreeBSD, NetBSD and DragonFly BSD it accepts ``bdaddr``
|
||||
where ``bdaddr`` is the Bluetooth address as a string.
|
||||
|
||||
.. versionchanged:: 3.2
|
||||
NetBSD and DragonFlyBSD support added.
|
||||
|
@ -168,9 +167,9 @@ created. Socket addresses are represented as follows:
|
|||
.. versionchanged:: 3.13.3
|
||||
FreeBSD support added.
|
||||
|
||||
- :const:`BTPROTO_SCO` accepts ``bdaddr`` where ``bdaddr`` is a
|
||||
:class:`bytes` object containing the Bluetooth address in a
|
||||
string format. (ex. ``b'12:23:34:45:56:67'``)
|
||||
- :const:`BTPROTO_SCO` accepts ``bdaddr`` where ``bdaddr`` is
|
||||
the Bluetooth address as a string or a :class:`bytes` object.
|
||||
(ex. ``'12:23:34:45:56:67'`` or ``b'12:23:34:45:56:67'``)
|
||||
|
||||
.. versionchanged:: next
|
||||
FreeBSD support added.
|
||||
|
|
|
@ -2681,6 +2681,8 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
f.bind(socket.BDADDR_ANY)
|
||||
with self.assertRaises(OSError):
|
||||
f.bind((socket.BDADDR_ANY.encode(), 0x1001))
|
||||
with self.assertRaises(OSError):
|
||||
f.bind(('\ud812', 0x1001))
|
||||
|
||||
def testBindRfcommSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM) as s:
|
||||
|
@ -2712,6 +2714,8 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
s.bind((socket.BDADDR_ANY, channel, 0))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY + '\0', channel))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind('\ud812')
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(('invalid', channel))
|
||||
|
||||
|
@ -2719,7 +2723,7 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
def testBindHciSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
|
||||
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, socket.BDADDR_ANY)
|
||||
else:
|
||||
|
@ -2738,14 +2742,17 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_RAW, socket.BTPROTO_HCI) as s:
|
||||
if sys.platform.startswith(('netbsd', 'dragonfly', 'freebsd')):
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY.encode(),))
|
||||
if sys.platform.startswith('freebsd'):
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY.encode() + b'\0')
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY.encode() + b' '*100)
|
||||
s.bind((socket.BDADDR_ANY,))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY + '\0')
|
||||
with self.assertRaises((ValueError, OSError)):
|
||||
s.bind(socket.BDADDR_ANY + ' '*100)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind('\ud812')
|
||||
with self.assertRaises(OSError):
|
||||
s.bind('invalid')
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(b'invalid')
|
||||
else:
|
||||
|
@ -2756,11 +2763,18 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
s.bind((dev, 0))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(dev)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'BTPROTO_SCO'), 'Bluetooth SCO sockets required for this test')
|
||||
def testBindScoSocket(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
addr = s.getsockname()
|
||||
self.assertEqual(addr, socket.BDADDR_ANY)
|
||||
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
||||
s.bind(socket.BDADDR_ANY.encode())
|
||||
addr = s.getsockname()
|
||||
|
@ -2770,9 +2784,17 @@ class BasicBluetoothTest(unittest.TestCase):
|
|||
def testBadScoAddr(self):
|
||||
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_SCO) as s:
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(socket.BDADDR_ANY)
|
||||
s.bind((socket.BDADDR_ANY,))
|
||||
with self.assertRaises(OSError):
|
||||
s.bind((socket.BDADDR_ANY.encode(),))
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY + '\0')
|
||||
with self.assertRaises(ValueError):
|
||||
s.bind(socket.BDADDR_ANY.encode() + b'\0')
|
||||
with self.assertRaises(UnicodeEncodeError):
|
||||
s.bind('\ud812')
|
||||
with self.assertRaises(OSError):
|
||||
s.bind('invalid')
|
||||
with self.assertRaises(OSError):
|
||||
s.bind(b'invalid')
|
||||
|
||||
|
|
|
@ -1546,7 +1546,7 @@ makesockaddr(SOCKET_T sockfd, struct sockaddr *addr, size_t addrlen, int proto)
|
|||
#elif defined(__FreeBSD__)
|
||||
const char *node = _BT_HCI_MEMB(a, node);
|
||||
size_t len = strnlen(node, sizeof(_BT_HCI_MEMB(a, node)));
|
||||
return PyBytes_FromStringAndSize(node, (Py_ssize_t)len);
|
||||
return PyUnicode_FromStringAndSize(node, (Py_ssize_t)len);
|
||||
#else
|
||||
return makebdaddr(&_BT_HCI_MEMB(a, bdaddr));
|
||||
#endif
|
||||
|
@ -2145,36 +2145,25 @@ getsockaddrarg(PySocketSockObject *s, PyObject *args,
|
|||
return 0;
|
||||
}
|
||||
_BT_HCI_MEMB(addr, dev) = dev;
|
||||
#elif defined(__FreeBSD__)
|
||||
if (!PyBytes_Check(args)) {
|
||||
#else
|
||||
const char *straddr;
|
||||
if (!PyArg_Parse(args, "s", &straddr)) {
|
||||
PyErr_Format(PyExc_OSError, "%s: "
|
||||
"wrong node format", caller);
|
||||
"wrong format", caller);
|
||||
return 0;
|
||||
}
|
||||
const char *straddr = PyBytes_AS_STRING(args);
|
||||
size_t len = PyBytes_GET_SIZE(args);
|
||||
if (strlen(straddr) != len) {
|
||||
PyErr_Format(PyExc_ValueError, "%s: "
|
||||
"node contains embedded null character", caller);
|
||||
return 0;
|
||||
}
|
||||
if (len > sizeof(_BT_HCI_MEMB(addr, node))) {
|
||||
# if defined(__FreeBSD__)
|
||||
if (strlen(straddr) > sizeof(_BT_HCI_MEMB(addr, node))) {
|
||||
PyErr_Format(PyExc_ValueError, "%s: "
|
||||
"node too long", caller);
|
||||
return 0;
|
||||
}
|
||||
strncpy(_BT_HCI_MEMB(addr, node), straddr,
|
||||
sizeof(_BT_HCI_MEMB(addr, node)));
|
||||
#else
|
||||
const char *straddr;
|
||||
if (!PyBytes_Check(args)) {
|
||||
PyErr_Format(PyExc_OSError, "%s: "
|
||||
"wrong format", caller);
|
||||
return 0;
|
||||
}
|
||||
straddr = PyBytes_AS_STRING(args);
|
||||
# else
|
||||
if (setbdaddr(straddr, &_BT_HCI_MEMB(addr, bdaddr)) < 0)
|
||||
return 0;
|
||||
# endif
|
||||
#endif
|
||||
*len_ret = sizeof *addr;
|
||||
return 1;
|
||||
|
@ -2188,12 +2177,22 @@ getsockaddrarg(PySocketSockObject *s, PyObject *args,
|
|||
struct sockaddr_sco *addr = &addrbuf->bt_sco;
|
||||
memset(addr, 0, sizeof(struct sockaddr_sco));
|
||||
_BT_SCO_MEMB(addr, family) = AF_BLUETOOTH;
|
||||
if (!PyBytes_Check(args)) {
|
||||
|
||||
if (PyBytes_Check(args)) {
|
||||
if (!PyArg_Parse(args, "y", &straddr)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
else if (PyUnicode_Check(args)) {
|
||||
if (!PyArg_Parse(args, "s", &straddr)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
else {
|
||||
PyErr_Format(PyExc_OSError,
|
||||
"%s(): wrong format", caller);
|
||||
return 0;
|
||||
}
|
||||
straddr = PyBytes_AS_STRING(args);
|
||||
if (setbdaddr(straddr, &_BT_SCO_MEMB(addr, bdaddr)) < 0)
|
||||
return 0;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue