Merged revisions 67762 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r67762 | antoine.pitrou | 2008-12-14 18:40:51 +0100 (dim., 14 déc. 2008) | 3 lines

  Backport r67759 (fix io.IncrementalNewlineDecoder for UTF-16 et al.).
........
This commit is contained in:
Antoine Pitrou 2008-12-14 18:08:37 +00:00
parent 30327242b3
commit f8638a8d21
3 changed files with 89 additions and 56 deletions

View file

@ -680,8 +680,9 @@ class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
@classmethod
def lookupTestDecoder(cls, name):
if cls.codecEnabled and name == 'test_decoder':
latin1 = codecs.lookup('latin-1')
return codecs.CodecInfo(
name='test_decoder', encode=None, decode=None,
name='test_decoder', encode=latin1.encode, decode=None,
incrementalencoder=None,
streamreader=None, streamwriter=None,
incrementaldecoder=cls)
@ -840,8 +841,11 @@ class TextIOWrapperTest(unittest.TestCase):
[ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
[ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
]
encodings = ('utf-8', 'latin-1')
encodings = (
'utf-8', 'latin-1',
'utf-16', 'utf-16-le', 'utf-16-be',
'utf-32', 'utf-32-le', 'utf-32-be',
)
# Try a range of buffer sizes to test the case where \r is the last
# character in TextIOWrapper._pending_line.
@ -1195,55 +1199,83 @@ class TextIOWrapperTest(unittest.TestCase):
self.assertEqual(buffer.seekable(), txt.seekable())
def test_newline_decoder(self):
import codecs
decoder = codecs.getincrementaldecoder("utf-8")()
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
def check_newline_decoder_utf8(self, decoder):
# UTF-8 specific tests for a newline decoder
def _check_decode(b, s, **kwargs):
# We exercise getstate() / setstate() as well as decode()
state = decoder.getstate()
self.assertEquals(decoder.decode(b, **kwargs), s)
decoder.setstate(state)
self.assertEquals(decoder.decode(b, **kwargs), s)
self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
_check_decode(b'\xe8\xa2\x88', "\u8888")
self.assertEquals(decoder.decode(b'\xe8'), u"")
self.assertEquals(decoder.decode(b'\xa2'), u"")
self.assertEquals(decoder.decode(b'\x88'), u"\u8888")
_check_decode(b'\xe8', "")
_check_decode(b'\xa2', "")
_check_decode(b'\x88', "\u8888")
self.assertEquals(decoder.decode(b'\xe8'), u"")
_check_decode(b'\xe8', "")
_check_decode(b'\xa2', "")
_check_decode(b'\x88', "\u8888")
_check_decode(b'\xe8', "")
self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
decoder.setstate((b'', 0))
self.assertEquals(decoder.decode(b'\n'), u"\n")
self.assertEquals(decoder.decode(b'\r'), u"")
self.assertEquals(decoder.decode(b'', final=True), u"\n")
self.assertEquals(decoder.decode(b'\r', final=True), u"\n")
decoder.reset()
_check_decode(b'\n', "\n")
_check_decode(b'\r', "")
_check_decode(b'', "\n", final=True)
_check_decode(b'\r', "\n", final=True)
self.assertEquals(decoder.decode(b'\r'), u"")
self.assertEquals(decoder.decode(b'a'), u"\na")
_check_decode(b'\r', "")
_check_decode(b'a', "\na")
self.assertEquals(decoder.decode(b'\r\r\n'), u"\n\n")
self.assertEquals(decoder.decode(b'\r'), u"")
self.assertEquals(decoder.decode(b'\r'), u"\n")
self.assertEquals(decoder.decode(b'\na'), u"\na")
_check_decode(b'\r\r\n', "\n\n")
_check_decode(b'\r', "")
_check_decode(b'\r', "\n")
_check_decode(b'\na', "\na")
self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), u"\u8888\n")
self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), u"\u8888")
self.assertEquals(decoder.decode(b'\n'), u"\n")
self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), u"\u8888")
self.assertEquals(decoder.decode(b'\n'), u"\n")
_check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
_check_decode(b'\xe8\xa2\x88', "\u8888")
_check_decode(b'\n', "\n")
_check_decode(b'\xe8\xa2\x88\r', "\u8888")
_check_decode(b'\n', "\n")
def check_newline_decoder(self, decoder, encoding):
result = []
encoder = codecs.getincrementalencoder(encoding)()
def _decode_bytewise(s):
for b in encoder.encode(s):
result.append(decoder.decode(b))
self.assertEquals(decoder.newlines, None)
_decode_bytewise("abc\n\r")
self.assertEquals(decoder.newlines, '\n')
_decode_bytewise("\nabc")
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
_decode_bytewise("abc\r")
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
_decode_bytewise("abc")
self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
_decode_bytewise("abc\r")
self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
decoder.reset()
self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
self.assertEquals(decoder.newlines, None)
def test_newline_decoder(self):
encodings = (
'utf-8', 'latin-1',
'utf-16', 'utf-16-le', 'utf-16-be',
'utf-32', 'utf-32-le', 'utf-32-be',
)
for enc in encodings:
decoder = codecs.getincrementaldecoder(enc)()
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
self.check_newline_decoder(decoder, enc)
decoder = codecs.getincrementaldecoder("utf-8")()
decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
self.assertEquals(decoder.newlines, None)
decoder.decode(b"abc\n\r")
self.assertEquals(decoder.newlines, u'\n')
decoder.decode(b"\nabc")
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
decoder.decode(b"abc\r")
self.assertEquals(decoder.newlines, ('\n', '\r\n'))
decoder.decode(b"abc")
self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
decoder.decode(b"abc\r")
decoder.reset()
self.assertEquals(decoder.decode(b"abc"), "abc")
self.assertEquals(decoder.newlines, None)
self.check_newline_decoder_utf8(decoder)
# XXX Tests for open()