bpo-33578: Add getstate/setstate for CJK codec (GH-6984)

This implements getstate and setstate for the cjkcodecs multibyte incremental encoders/decoders, primarily to fix issues with seek/tell.

The encoder getstate/setstate is slightly tricky as the "state" is pending bytes + MultibyteCodec_State but only an integer can be returned. The approach I've taken is to encode this data into a long, similar to how .tell() encodes a "cookie_type" as a long.


https://bugs.python.org/issue33578
This commit is contained in:
Christopher Thorne 2018-11-01 10:48:49 +00:00 committed by Miss Islington (bot)
parent 4b5e62dbb2
commit ac22f6aa98
8 changed files with 416 additions and 22 deletions

View file

@ -117,6 +117,88 @@ class Test_IncrementalEncoder(unittest.TestCase):
self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
def test_state_methods_with_buffer_state(self):
# euc_jis_2004 stores state as a buffer of pending bytes
encoder = codecs.getincrementalencoder('euc_jis_2004')()
initial_state = encoder.getstate()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
encoder.setstate(initial_state)
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6'), b'')
partial_state = encoder.getstate()
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
encoder.setstate(partial_state)
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
def test_state_methods_with_non_buffer_state(self):
# iso2022_jp stores state without using a buffer
encoder = codecs.getincrementalencoder('iso2022_jp')()
self.assertEqual(encoder.encode('z'), b'z')
en_state = encoder.getstate()
self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
jp_state = encoder.getstate()
self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
encoder.setstate(jp_state)
self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
encoder.setstate(en_state)
self.assertEqual(encoder.encode('z'), b'z')
def test_getstate_returns_expected_value(self):
# Note: getstate is implemented such that these state values
# are expected to be the same across all builds of Python,
# regardless of x32/64 bit, endianness and compiler.
# euc_jis_2004 stores state as a buffer of pending bytes
buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
self.assertEqual(buffer_state_encoder.getstate(), 0)
buffer_state_encoder.encode('\u00e6')
self.assertEqual(buffer_state_encoder.getstate(),
int.from_bytes(
b"\x02"
b"\xc3\xa6"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little'))
buffer_state_encoder.encode('\u0300')
self.assertEqual(buffer_state_encoder.getstate(), 0)
# iso2022_jp stores state without using a buffer
non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
self.assertEqual(non_buffer_state_encoder.getstate(),
int.from_bytes(
b"\x00"
b"\x42\x42\x00\x00\x00\x00\x00\x00",
'little'))
non_buffer_state_encoder.encode('\u3042')
self.assertEqual(non_buffer_state_encoder.getstate(),
int.from_bytes(
b"\x00"
b"\xc2\x42\x00\x00\x00\x00\x00\x00",
'little'))
def test_setstate_validates_input_size(self):
encoder = codecs.getincrementalencoder('euc_jp')()
pending_size_nine = int.from_bytes(
b"\x09"
b"\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little')
self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
def test_setstate_validates_input_bytes(self):
encoder = codecs.getincrementalencoder('euc_jp')()
invalid_utf8 = int.from_bytes(
b"\x01"
b"\xff"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little')
self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
def test_issue5640(self):
encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
self.assertEqual(encoder.encode('\xff'), b'\\xff')
@ -165,6 +247,37 @@ class Test_IncrementalDecoder(unittest.TestCase):
decoder = codecs.getincrementaldecoder(enc)()
self.assertRaises(TypeError, decoder.decode, "")
def test_state_methods(self):
decoder = codecs.getincrementaldecoder('euc_jp')()
# Decode a complete input sequence
self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
pending1, _ = decoder.getstate()
self.assertEqual(pending1, b'')
# Decode first half of a partial input sequence
self.assertEqual(decoder.decode(b'\xa4'), '')
pending2, flags2 = decoder.getstate()
self.assertEqual(pending2, b'\xa4')
# Decode second half of a partial input sequence
self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
pending3, _ = decoder.getstate()
self.assertEqual(pending3, b'')
# Jump back and decode second half of partial input sequence again
decoder.setstate((pending2, flags2))
self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
pending4, _ = decoder.getstate()
self.assertEqual(pending4, b'')
def test_setstate_validates_input(self):
decoder = codecs.getincrementaldecoder('euc_jp')()
self.assertRaises(TypeError, decoder.setstate, 123)
self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
self.assertRaises(UnicodeError, decoder.setstate, (b"123456789", 0))
class Test_StreamReader(unittest.TestCase):
def test_bug1728403(self):
try: