mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
Close #17828: better handling of codec errors
- output type errors now redirect users to the type-neutral convenience functions in the codecs module - stateless errors that occur during encoding and decoding will now be automatically wrapped in exceptions that give the name of the codec involved
This commit is contained in:
parent
59799a8399
commit
8b097b4ed7
7 changed files with 414 additions and 46 deletions
|
@ -1,5 +1,6 @@
|
|||
import _testcapi
|
||||
import codecs
|
||||
import contextlib
|
||||
import io
|
||||
import locale
|
||||
import sys
|
||||
|
@ -2292,28 +2293,31 @@ class TransformCodecTest(unittest.TestCase):
|
|||
def test_basics(self):
|
||||
binput = bytes(range(256))
|
||||
for encoding in bytes_transform_encodings:
|
||||
# generic codecs interface
|
||||
(o, size) = codecs.getencoder(encoding)(binput)
|
||||
self.assertEqual(size, len(binput))
|
||||
(i, size) = codecs.getdecoder(encoding)(o)
|
||||
self.assertEqual(size, len(o))
|
||||
self.assertEqual(i, binput)
|
||||
with self.subTest(encoding=encoding):
|
||||
# generic codecs interface
|
||||
(o, size) = codecs.getencoder(encoding)(binput)
|
||||
self.assertEqual(size, len(binput))
|
||||
(i, size) = codecs.getdecoder(encoding)(o)
|
||||
self.assertEqual(size, len(o))
|
||||
self.assertEqual(i, binput)
|
||||
|
||||
def test_read(self):
|
||||
for encoding in bytes_transform_encodings:
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.read()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
with self.subTest(encoding=encoding):
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.read()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
def test_readline(self):
|
||||
for encoding in bytes_transform_encodings:
|
||||
if encoding in ['uu_codec', 'zlib_codec']:
|
||||
continue
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.readline()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
with self.subTest(encoding=encoding):
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.readline()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
def test_buffer_api_usage(self):
|
||||
# We check all the transform codecs accept memoryview input
|
||||
|
@ -2321,17 +2325,158 @@ class TransformCodecTest(unittest.TestCase):
|
|||
# and also that they roundtrip correctly
|
||||
original = b"12345\x80"
|
||||
for encoding in bytes_transform_encodings:
|
||||
data = original
|
||||
view = memoryview(data)
|
||||
data = codecs.encode(data, encoding)
|
||||
view_encoded = codecs.encode(view, encoding)
|
||||
self.assertEqual(view_encoded, data)
|
||||
view = memoryview(data)
|
||||
data = codecs.decode(data, encoding)
|
||||
self.assertEqual(data, original)
|
||||
view_decoded = codecs.decode(view, encoding)
|
||||
self.assertEqual(view_decoded, data)
|
||||
with self.subTest(encoding=encoding):
|
||||
data = original
|
||||
view = memoryview(data)
|
||||
data = codecs.encode(data, encoding)
|
||||
view_encoded = codecs.encode(view, encoding)
|
||||
self.assertEqual(view_encoded, data)
|
||||
view = memoryview(data)
|
||||
data = codecs.decode(data, encoding)
|
||||
self.assertEqual(data, original)
|
||||
view_decoded = codecs.decode(view, encoding)
|
||||
self.assertEqual(view_decoded, data)
|
||||
|
||||
def test_type_error_for_text_input(self):
|
||||
# Check binary -> binary codecs give a good error for str input
|
||||
bad_input = "bad input type"
|
||||
for encoding in bytes_transform_encodings:
|
||||
with self.subTest(encoding=encoding):
|
||||
msg = "^encoding with '{}' codec failed".format(encoding)
|
||||
with self.assertRaisesRegex(TypeError, msg) as failure:
|
||||
bad_input.encode(encoding)
|
||||
self.assertTrue(isinstance(failure.exception.__cause__,
|
||||
TypeError))
|
||||
|
||||
def test_type_error_for_binary_input(self):
|
||||
# Check str -> str codec gives a good error for binary input
|
||||
for bad_input in (b"immutable", bytearray(b"mutable")):
|
||||
with self.subTest(bad_input=bad_input):
|
||||
msg = "^decoding with 'rot_13' codec failed"
|
||||
with self.assertRaisesRegex(AttributeError, msg) as failure:
|
||||
bad_input.decode("rot_13")
|
||||
self.assertTrue(isinstance(failure.exception.__cause__,
|
||||
AttributeError))
|
||||
|
||||
def test_bad_decoding_output_type(self):
|
||||
# Check bytes.decode and bytearray.decode give a good error
|
||||
# message for binary -> binary codecs
|
||||
data = b"encode first to ensure we meet any format restrictions"
|
||||
for encoding in bytes_transform_encodings:
|
||||
with self.subTest(encoding=encoding):
|
||||
encoded_data = codecs.encode(data, encoding)
|
||||
fmt = ("'{}' decoder returned 'bytes' instead of 'str'; "
|
||||
"use codecs.decode\(\) to decode to arbitrary types")
|
||||
msg = fmt.format(encoding)
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
encoded_data.decode(encoding)
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
bytearray(encoded_data).decode(encoding)
|
||||
|
||||
def test_bad_encoding_output_type(self):
|
||||
# Check str.encode gives a good error message for str -> str codecs
|
||||
msg = ("'rot_13' encoder returned 'str' instead of 'bytes'; "
|
||||
"use codecs.encode\(\) to encode to arbitrary types")
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
"just an example message".encode("rot_13")
|
||||
|
||||
|
||||
# The codec system tries to wrap exceptions in order to ensure the error
|
||||
# mentions the operation being performed and the codec involved. We
|
||||
# currently *only* want this to happen for relatively stateless
|
||||
# exceptions, where the only significant information they contain is their
|
||||
# type and a single str argument.
|
||||
class ExceptionChainingTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# There's no way to unregister a codec search function, so we just
|
||||
# ensure we render this one fairly harmless after the test
|
||||
# case finishes by using the test case repr as the codec name
|
||||
# The codecs module normalizes codec names, although this doesn't
|
||||
# appear to be formally documented...
|
||||
self.codec_name = repr(self).lower().replace(" ", "-")
|
||||
self.codec_info = None
|
||||
codecs.register(self.get_codec)
|
||||
|
||||
def get_codec(self, codec_name):
|
||||
if codec_name != self.codec_name:
|
||||
return None
|
||||
return self.codec_info
|
||||
|
||||
def set_codec(self, obj_to_raise):
|
||||
def raise_obj(*args, **kwds):
|
||||
raise obj_to_raise
|
||||
self.codec_info = codecs.CodecInfo(raise_obj, raise_obj,
|
||||
name=self.codec_name)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assertWrapped(self, operation, exc_type, msg):
|
||||
full_msg = "{} with '{}' codec failed \({}: {}\)".format(
|
||||
operation, self.codec_name, exc_type.__name__, msg)
|
||||
with self.assertRaisesRegex(exc_type, full_msg) as caught:
|
||||
yield caught
|
||||
|
||||
def check_wrapped(self, obj_to_raise, msg):
|
||||
self.set_codec(obj_to_raise)
|
||||
with self.assertWrapped("encoding", RuntimeError, msg):
|
||||
"str_input".encode(self.codec_name)
|
||||
with self.assertWrapped("encoding", RuntimeError, msg):
|
||||
codecs.encode("str_input", self.codec_name)
|
||||
with self.assertWrapped("decoding", RuntimeError, msg):
|
||||
b"bytes input".decode(self.codec_name)
|
||||
with self.assertWrapped("decoding", RuntimeError, msg):
|
||||
codecs.decode(b"bytes input", self.codec_name)
|
||||
|
||||
def test_raise_by_type(self):
|
||||
self.check_wrapped(RuntimeError, "")
|
||||
|
||||
def test_raise_by_value(self):
|
||||
msg = "This should be wrapped"
|
||||
self.check_wrapped(RuntimeError(msg), msg)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def assertNotWrapped(self, operation, exc_type, msg):
|
||||
with self.assertRaisesRegex(exc_type, msg) as caught:
|
||||
yield caught
|
||||
actual_msg = str(caught.exception)
|
||||
self.assertNotIn(operation, actual_msg)
|
||||
self.assertNotIn(self.codec_name, actual_msg)
|
||||
|
||||
def check_not_wrapped(self, obj_to_raise, msg):
|
||||
self.set_codec(obj_to_raise)
|
||||
with self.assertNotWrapped("encoding", RuntimeError, msg):
|
||||
"str input".encode(self.codec_name)
|
||||
with self.assertNotWrapped("encoding", RuntimeError, msg):
|
||||
codecs.encode("str input", self.codec_name)
|
||||
with self.assertNotWrapped("decoding", RuntimeError, msg):
|
||||
b"bytes input".decode(self.codec_name)
|
||||
with self.assertNotWrapped("decoding", RuntimeError, msg):
|
||||
codecs.decode(b"bytes input", self.codec_name)
|
||||
|
||||
def test_init_override_is_not_wrapped(self):
|
||||
class CustomInit(RuntimeError):
|
||||
def __init__(self):
|
||||
pass
|
||||
self.check_not_wrapped(CustomInit, "")
|
||||
|
||||
def test_new_override_is_not_wrapped(self):
|
||||
class CustomNew(RuntimeError):
|
||||
def __new__(cls):
|
||||
return super().__new__(cls)
|
||||
self.check_not_wrapped(CustomNew, "")
|
||||
|
||||
def test_instance_attribute_is_not_wrapped(self):
|
||||
msg = "This should NOT be wrapped"
|
||||
exc = RuntimeError(msg)
|
||||
exc.attr = 1
|
||||
self.check_not_wrapped(exc, msg)
|
||||
|
||||
def test_non_str_arg_is_not_wrapped(self):
|
||||
self.check_not_wrapped(RuntimeError(1), "1")
|
||||
|
||||
def test_multiple_args_is_not_wrapped(self):
|
||||
msg = "\('a', 'b', 'c'\)"
|
||||
self.check_not_wrapped(RuntimeError('a', 'b', 'c'), msg)
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == 'win32',
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue