mirror of
https://github.com/python/cpython.git
synced 2025-07-30 06:34:15 +00:00
Issue #28162: Fixes Ctrl+Z handling in console readall()
This commit is contained in:
commit
9ffb5d7828
2 changed files with 53 additions and 38 deletions
|
@ -107,16 +107,15 @@ class WindowsConsoleIOTests(unittest.TestCase):
|
|||
source = 'ϼўТλФЙ\r\n'.encode('utf-16-le')
|
||||
expected = 'ϼўТλФЙ\r\n'.encode('utf-8')
|
||||
for read_count in range(1, 16):
|
||||
stdin = open('CONIN$', 'rb', buffering=0)
|
||||
write_input(stdin, source)
|
||||
with open('CONIN$', 'rb', buffering=0) as stdin:
|
||||
write_input(stdin, source)
|
||||
|
||||
actual = b''
|
||||
while not actual.endswith(b'\n'):
|
||||
b = stdin.read(read_count)
|
||||
actual += b
|
||||
actual = b''
|
||||
while not actual.endswith(b'\n'):
|
||||
b = stdin.read(read_count)
|
||||
actual += b
|
||||
|
||||
self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
|
||||
stdin.close()
|
||||
self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
|
||||
|
||||
def test_partial_surrogate_reads(self):
|
||||
# Test that reading less than 1 full character works when stdin
|
||||
|
@ -125,17 +124,24 @@ class WindowsConsoleIOTests(unittest.TestCase):
|
|||
source = '\U00101FFF\U00101001\r\n'.encode('utf-16-le')
|
||||
expected = '\U00101FFF\U00101001\r\n'.encode('utf-8')
|
||||
for read_count in range(1, 16):
|
||||
stdin = open('CONIN$', 'rb', buffering=0)
|
||||
with open('CONIN$', 'rb', buffering=0) as stdin:
|
||||
write_input(stdin, source)
|
||||
|
||||
actual = b''
|
||||
while not actual.endswith(b'\n'):
|
||||
b = stdin.read(read_count)
|
||||
actual += b
|
||||
|
||||
self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
|
||||
|
||||
def test_ctrl_z(self):
|
||||
with open('CONIN$', 'rb', buffering=0) as stdin:
|
||||
source = '\xC4\x1A\r\n'.encode('utf-16-le')
|
||||
expected = '\xC4'.encode('utf-8')
|
||||
write_input(stdin, source)
|
||||
|
||||
actual = b''
|
||||
while not actual.endswith(b'\n'):
|
||||
b = stdin.read(read_count)
|
||||
actual += b
|
||||
|
||||
self.assertEqual(actual, expected, 'stdin.read({})'.format(read_count))
|
||||
stdin.close()
|
||||
|
||||
a, b = stdin.read(1), stdin.readall()
|
||||
self.assertEqual(expected[0:1], a)
|
||||
self.assertEqual(expected[1:], b)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -816,44 +816,53 @@ _io__WindowsConsoleIO_readall_impl(winconsoleio *self)
|
|||
|
||||
PyMem_Free(subbuf);
|
||||
|
||||
/* when the read starts with ^Z or is empty we break */
|
||||
if (n == 0 || buf[len] == '\x1a')
|
||||
/* when the read is empty we break */
|
||||
if (n == 0)
|
||||
break;
|
||||
|
||||
len += n;
|
||||
}
|
||||
|
||||
if (len == 0 || buf[0] == '\x1a' && _buflen(self) == 0) {
|
||||
if (len == 0 && _buflen(self) == 0) {
|
||||
/* when the result starts with ^Z we return an empty buffer */
|
||||
PyMem_Free(buf);
|
||||
return PyBytes_FromStringAndSize(NULL, 0);
|
||||
}
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
bytes_size = WideCharToMultiByte(CP_UTF8, 0, buf, len,
|
||||
NULL, 0, NULL, NULL);
|
||||
Py_END_ALLOW_THREADS
|
||||
if (len) {
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
bytes_size = WideCharToMultiByte(CP_UTF8, 0, buf, len,
|
||||
NULL, 0, NULL, NULL);
|
||||
Py_END_ALLOW_THREADS
|
||||
|
||||
if (!bytes_size) {
|
||||
DWORD err = GetLastError();
|
||||
PyMem_Free(buf);
|
||||
return PyErr_SetFromWindowsErr(err);
|
||||
if (!bytes_size) {
|
||||
DWORD err = GetLastError();
|
||||
PyMem_Free(buf);
|
||||
return PyErr_SetFromWindowsErr(err);
|
||||
}
|
||||
} else {
|
||||
bytes_size = 0;
|
||||
}
|
||||
|
||||
bytes_size += _buflen(self);
|
||||
bytes = PyBytes_FromStringAndSize(NULL, bytes_size);
|
||||
rn = _copyfrombuf(self, PyBytes_AS_STRING(bytes), bytes_size);
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
bytes_size = WideCharToMultiByte(CP_UTF8, 0, buf, len,
|
||||
&PyBytes_AS_STRING(bytes)[rn], bytes_size - rn, NULL, NULL);
|
||||
Py_END_ALLOW_THREADS
|
||||
if (len) {
|
||||
Py_BEGIN_ALLOW_THREADS
|
||||
bytes_size = WideCharToMultiByte(CP_UTF8, 0, buf, len,
|
||||
&PyBytes_AS_STRING(bytes)[rn], bytes_size - rn, NULL, NULL);
|
||||
Py_END_ALLOW_THREADS
|
||||
|
||||
if (!bytes_size) {
|
||||
DWORD err = GetLastError();
|
||||
PyMem_Free(buf);
|
||||
Py_CLEAR(bytes);
|
||||
return PyErr_SetFromWindowsErr(err);
|
||||
if (!bytes_size) {
|
||||
DWORD err = GetLastError();
|
||||
PyMem_Free(buf);
|
||||
Py_CLEAR(bytes);
|
||||
return PyErr_SetFromWindowsErr(err);
|
||||
}
|
||||
|
||||
/* add back the number of preserved bytes */
|
||||
bytes_size += rn;
|
||||
}
|
||||
|
||||
PyMem_Free(buf);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue