gh-124008: Fix calculation of the number of written bytes for the Windows console (GH-124059)

Since MultiByteToWideChar()/WideCharToMultiByte() is not reversible if
the data contains invalid UTF-8 sequences, use binary search to
calculate the number of written bytes from the number of written
characters.

Also fix writing incomplete UTF-8 sequences.

Also fix handling of memory allocation failures.
This commit is contained in:
Serhiy Storchaka 2024-11-27 13:38:12 +02:00 committed by GitHub
parent 83926d3b4c
commit 3cf83d91a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 115 additions and 28 deletions

View file

@ -142,6 +142,29 @@ class WindowsConsoleIOTests(unittest.TestCase):
with ConIO('CONOUT$', 'w') as f: with ConIO('CONOUT$', 'w') as f:
self.assertEqual(f.write(b''), 0) self.assertEqual(f.write(b''), 0)
@requires_resource('console')
def test_write(self):
testcases = []
with ConIO('CONOUT$', 'w') as f:
for a in [
b'',
b'abc',
b'\xc2\xa7\xe2\x98\x83\xf0\x9f\x90\x8d',
b'\xff'*10,
]:
for b in b'\xc2\xa7', b'\xe2\x98\x83', b'\xf0\x9f\x90\x8d':
testcases.append(a + b)
for i in range(1, len(b)):
data = a + b[:i]
testcases.append(data + b'z')
testcases.append(data + b'\xff')
# incomplete multibyte sequence
with self.subTest(data=data):
self.assertEqual(f.write(data), len(a))
for data in testcases:
with self.subTest(data=data):
self.assertEqual(f.write(data), len(data))
def assertStdinRoundTrip(self, text): def assertStdinRoundTrip(self, text):
stdin = open('CONIN$', 'r') stdin = open('CONIN$', 'r')
old_stdin = sys.stdin old_stdin = sys.stdin

View file

@ -0,0 +1,2 @@
Fix possible crash (in debug build), incorrect output or returning incorrect
value from raw binary ``write()`` when writing to console on Windows.

View file

@ -135,19 +135,67 @@ char _PyIO_get_console_type(PyObject *path_or_fd) {
} }
static DWORD static DWORD
_find_last_utf8_boundary(const char *buf, DWORD len) _find_last_utf8_boundary(const unsigned char *buf, DWORD len)
{ {
/* This function never returns 0, returns the original len instead */ for (DWORD count = 1; count < 4 && count <= len; count++) {
DWORD count = 1; unsigned char c = buf[len - count];
if (len == 0 || (buf[len - 1] & 0x80) == 0) { if (c < 0x80) {
return len; /* No starting byte found. */
}
for (;; count++) {
if (count > 3 || count >= len) {
return len; return len;
} }
if ((buf[len - count] & 0xc0) != 0x80) { if (c >= 0xc0) {
return len - count; if (c < 0xe0 /* 2-bytes sequence */ ? count < 2 :
c < 0xf0 /* 3-bytes sequence */ ? count < 3 :
c < 0xf8 /* 4-bytes sequence */)
{
/* Incomplete multibyte sequence. */
return len - count;
}
/* Either complete or invalid sequence. */
return len;
}
}
/* Either complete 4-bytes sequence or invalid sequence. */
return len;
}
/* Find the number of UTF-8 bytes that corresponds to the specified number of
* wchars.
* I.e. find x <= len so that MultiByteToWideChar(CP_UTF8, 0, s, x, NULL, 0) == n.
*
* WideCharToMultiByte() cannot be used for this, because the UTF-8 -> wchar
* conversion is not reversible (invalid UTF-8 byte produces \ufffd which
* will be converted back to 3-bytes UTF-8 sequence \xef\xbf\xbd).
* So we need to use binary search.
*/
static DWORD
_wchar_to_utf8_count(const unsigned char *s, DWORD len, DWORD n)
{
DWORD start = 0;
while (1) {
DWORD mid = 0;
for (DWORD i = len / 2; i <= len; i++) {
mid = _find_last_utf8_boundary(s, i);
if (mid != 0) {
break;
}
/* The middle could split the first multibytes sequence. */
}
if (mid == len) {
return start + len;
}
if (mid == 0) {
mid = len > 1 ? len - 1 : 1;
}
DWORD wlen = MultiByteToWideChar(CP_UTF8, 0, s, mid, NULL, 0);
if (wlen <= n) {
s += mid;
start += mid;
len -= mid;
n -= wlen;
}
else {
len = mid;
} }
} }
} }
@ -563,8 +611,10 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
int err = 0, sig = 0; int err = 0, sig = 0;
wchar_t *buf = (wchar_t*)PyMem_Malloc(maxlen * sizeof(wchar_t)); wchar_t *buf = (wchar_t*)PyMem_Malloc(maxlen * sizeof(wchar_t));
if (!buf) if (!buf) {
PyErr_NoMemory();
goto error; goto error;
}
*readlen = 0; *readlen = 0;
@ -622,6 +672,7 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
Py_UNBLOCK_THREADS Py_UNBLOCK_THREADS
if (!newbuf) { if (!newbuf) {
sig = -1; sig = -1;
PyErr_NoMemory();
break; break;
} }
buf = newbuf; buf = newbuf;
@ -645,8 +696,10 @@ read_console_w(HANDLE handle, DWORD maxlen, DWORD *readlen) {
if (*readlen > 0 && buf[0] == L'\x1a') { if (*readlen > 0 && buf[0] == L'\x1a') {
PyMem_Free(buf); PyMem_Free(buf);
buf = (wchar_t *)PyMem_Malloc(sizeof(wchar_t)); buf = (wchar_t *)PyMem_Malloc(sizeof(wchar_t));
if (!buf) if (!buf) {
PyErr_NoMemory();
goto error; goto error;
}
buf[0] = L'\0'; buf[0] = L'\0';
*readlen = 0; *readlen = 0;
} }
@ -824,8 +877,10 @@ _io__WindowsConsoleIO_readall_impl(winconsoleio *self)
bufsize = BUFSIZ; bufsize = BUFSIZ;
buf = (wchar_t*)PyMem_Malloc((bufsize + 1) * sizeof(wchar_t)); buf = (wchar_t*)PyMem_Malloc((bufsize + 1) * sizeof(wchar_t));
if (buf == NULL) if (buf == NULL) {
PyErr_NoMemory();
return NULL; return NULL;
}
while (1) { while (1) {
wchar_t *subbuf; wchar_t *subbuf;
@ -847,6 +902,7 @@ _io__WindowsConsoleIO_readall_impl(winconsoleio *self)
(bufsize + 1) * sizeof(wchar_t)); (bufsize + 1) * sizeof(wchar_t));
if (tmp == NULL) { if (tmp == NULL) {
PyMem_Free(buf); PyMem_Free(buf);
PyErr_NoMemory();
return NULL; return NULL;
} }
buf = tmp; buf = tmp;
@ -1022,43 +1078,49 @@ _io__WindowsConsoleIO_write_impl(winconsoleio *self, PyTypeObject *cls,
len = (DWORD)b->len; len = (DWORD)b->len;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, NULL, 0);
/* issue11395 there is an unspecified upper bound on how many bytes /* issue11395 there is an unspecified upper bound on how many bytes
can be written at once. We cap at 32k - the caller will have to can be written at once. We cap at 32k - the caller will have to
handle partial writes. handle partial writes.
Since we don't know how many input bytes are being ignored, we Since we don't know how many input bytes are being ignored, we
have to reduce and recalculate. */ have to reduce and recalculate. */
while (wlen > 32766 / sizeof(wchar_t)) { const DWORD max_wlen = 32766 / sizeof(wchar_t);
len /= 2; /* UTF-8 to wchar ratio is at most 3:1. */
len = Py_MIN(len, max_wlen * 3);
while (1) {
/* Fix for github issues gh-110913 and gh-82052. */ /* Fix for github issues gh-110913 and gh-82052. */
len = _find_last_utf8_boundary(b->buf, len); len = _find_last_utf8_boundary(b->buf, len);
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, NULL, 0); wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, NULL, 0);
if (wlen <= max_wlen) {
break;
}
len /= 2;
} }
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (!wlen) if (!wlen) {
return PyErr_SetFromWindowsErr(0); return PyLong_FromLong(0);
}
wbuf = (wchar_t*)PyMem_Malloc(wlen * sizeof(wchar_t)); wbuf = (wchar_t*)PyMem_Malloc(wlen * sizeof(wchar_t));
if (!wbuf) {
PyErr_NoMemory();
return NULL;
}
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, wbuf, wlen); wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len, wbuf, wlen);
if (wlen) { if (wlen) {
res = WriteConsoleW(handle, wbuf, wlen, &n, NULL); res = WriteConsoleW(handle, wbuf, wlen, &n, NULL);
#ifdef Py_DEBUG
if (res) {
#else
if (res && n < wlen) { if (res && n < wlen) {
#endif
/* Wrote fewer characters than expected, which means our /* Wrote fewer characters than expected, which means our
* len value may be wrong. So recalculate it from the * len value may be wrong. So recalculate it from the
* characters that were written. As this could potentially * characters that were written.
* result in a different value, we also validate that value.
*/ */
len = WideCharToMultiByte(CP_UTF8, 0, wbuf, n, len = _wchar_to_utf8_count(b->buf, len, n);
NULL, 0, NULL, NULL);
if (len) {
wlen = MultiByteToWideChar(CP_UTF8, 0, b->buf, len,
NULL, 0);
assert(wlen == len);
}
} }
} else } else
res = 0; res = 0;