mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
Issue #8796: codecs.open() calls the builtin open() function instead of using
StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter, StreamRecoder and EncodedFile() of the codec module. Use the builtin open() function or io.TextIOWrapper instead.
This commit is contained in:
parent
c556e10b94
commit
98fe1a0c3b
4 changed files with 148 additions and 59 deletions
|
@ -1,7 +1,10 @@
|
|||
from test import support
|
||||
import unittest
|
||||
import _testcapi
|
||||
import codecs
|
||||
import sys, _testcapi, io
|
||||
import io
|
||||
import sys
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
class Queue(object):
|
||||
"""
|
||||
|
@ -63,7 +66,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
# the StreamReader and check that the results equal the appropriate
|
||||
# entries from partialresults.
|
||||
q = Queue(b"")
|
||||
r = codecs.getreader(self.encoding)(q)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
r = codecs.getreader(self.encoding)(q)
|
||||
result = ""
|
||||
for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
|
||||
q.write(bytes([c]))
|
||||
|
@ -106,7 +111,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
return codecs.getreader(self.encoding)(stream)
|
||||
|
||||
def readalllines(input, keepends=True, size=None):
|
||||
reader = getreader(input)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = getreader(input)
|
||||
lines = []
|
||||
while True:
|
||||
line = reader.readline(size=size, keepends=keepends)
|
||||
|
@ -215,14 +222,18 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
' \r\n',
|
||||
]
|
||||
stream = io.BytesIO("".join(s).encode(self.encoding))
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
for (i, line) in enumerate(reader):
|
||||
self.assertEqual(line, s[i])
|
||||
|
||||
def test_readlinequeue(self):
|
||||
q = Queue(b"")
|
||||
writer = codecs.getwriter(self.encoding)(q)
|
||||
reader = codecs.getreader(self.encoding)(q)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
writer = codecs.getwriter(self.encoding)(q)
|
||||
reader = codecs.getreader(self.encoding)(q)
|
||||
|
||||
# No lineends
|
||||
writer.write("foo\r")
|
||||
|
@ -253,7 +264,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
|
||||
s = (s1+s2+s3).encode(self.encoding)
|
||||
stream = io.BytesIO(s)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
self.assertEqual(reader.readline(), s1)
|
||||
self.assertEqual(reader.readline(), s2)
|
||||
self.assertEqual(reader.readline(), s3)
|
||||
|
@ -268,7 +281,9 @@ class ReadTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
|
||||
s = (s1+s2+s3+s4+s5).encode(self.encoding)
|
||||
stream = io.BytesIO(s)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
self.assertEqual(reader.readline(), s1)
|
||||
self.assertEqual(reader.readline(), s2)
|
||||
self.assertEqual(reader.readline(), s3)
|
||||
|
@ -290,7 +305,9 @@ class UTF32Test(ReadTest):
|
|||
_,_,reader,writer = codecs.lookup(self.encoding)
|
||||
# encode some stream
|
||||
s = io.BytesIO()
|
||||
f = writer(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = writer(s)
|
||||
f.write("spam")
|
||||
f.write("spam")
|
||||
d = s.getvalue()
|
||||
|
@ -298,16 +315,22 @@ class UTF32Test(ReadTest):
|
|||
self.assertTrue(d == self.spamle or d == self.spambe)
|
||||
# try to read it back
|
||||
s = io.BytesIO(d)
|
||||
f = reader(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = reader(s)
|
||||
self.assertEqual(f.read(), "spamspam")
|
||||
|
||||
def test_badbom(self):
|
||||
s = io.BytesIO(4*b"\xff")
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
s = io.BytesIO(8*b"\xff")
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
def test_partial(self):
|
||||
|
@ -454,7 +477,9 @@ class UTF16Test(ReadTest):
|
|||
_,_,reader,writer = codecs.lookup(self.encoding)
|
||||
# encode some stream
|
||||
s = io.BytesIO()
|
||||
f = writer(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = writer(s)
|
||||
f.write("spam")
|
||||
f.write("spam")
|
||||
d = s.getvalue()
|
||||
|
@ -462,16 +487,22 @@ class UTF16Test(ReadTest):
|
|||
self.assertTrue(d == self.spamle or d == self.spambe)
|
||||
# try to read it back
|
||||
s = io.BytesIO(d)
|
||||
f = reader(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = reader(s)
|
||||
self.assertEqual(f.read(), "spamspam")
|
||||
|
||||
def test_badbom(self):
|
||||
s = io.BytesIO(b"\xff\xff")
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
s = io.BytesIO(b"\xff\xff\xff\xff")
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
def test_partial(self):
|
||||
|
@ -517,7 +548,8 @@ class UTF16Test(ReadTest):
|
|||
self.addCleanup(support.unlink, support.TESTFN)
|
||||
with open(support.TESTFN, 'wb') as fp:
|
||||
fp.write(s)
|
||||
with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader:
|
||||
with codecs.open(support.TESTFN, 'U',
|
||||
encoding=self.encoding) as reader:
|
||||
self.assertEqual(reader.read(), s1)
|
||||
|
||||
class UTF16LETest(ReadTest):
|
||||
|
@ -705,7 +737,9 @@ class UTF8SigTest(ReadTest):
|
|||
reader = codecs.getreader("utf-8-sig")
|
||||
for sizehint in [None] + list(range(1, 11)) + \
|
||||
[64, 128, 256, 512, 1024]:
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
ostream = io.StringIO()
|
||||
while 1:
|
||||
if sizehint is not None:
|
||||
|
@ -727,7 +761,9 @@ class UTF8SigTest(ReadTest):
|
|||
reader = codecs.getreader("utf-8-sig")
|
||||
for sizehint in [None] + list(range(1, 11)) + \
|
||||
[64, 128, 256, 512, 1024]:
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
ostream = io.StringIO()
|
||||
while 1:
|
||||
if sizehint is not None:
|
||||
|
@ -749,7 +785,9 @@ class EscapeDecodeTest(unittest.TestCase):
|
|||
class RecodingTest(unittest.TestCase):
|
||||
def test_recoding(self):
|
||||
f = io.BytesIO()
|
||||
f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
|
||||
f2.write("a")
|
||||
f2.close()
|
||||
# Python used to crash on this at exit because of a refcount
|
||||
|
@ -1126,7 +1164,9 @@ class IDNACodecTest(unittest.TestCase):
|
|||
self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
|
||||
|
||||
def test_stream(self):
|
||||
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
|
||||
r.read(3)
|
||||
self.assertEqual(r.read(), "")
|
||||
|
||||
|
@ -1233,18 +1273,24 @@ class CodecsModuleTest(unittest.TestCase):
|
|||
class StreamReaderTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.reader = codecs.getreader('utf-8')
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
self.reader = codecs.getreader('utf-8')
|
||||
self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
|
||||
|
||||
def test_readlines(self):
|
||||
f = self.reader(self.stream)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = self.reader(self.stream)
|
||||
self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
|
||||
|
||||
class EncodedFileTest(unittest.TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
|
||||
ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
|
||||
self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
|
||||
|
||||
f = io.BytesIO()
|
||||
|
@ -1388,7 +1434,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
if encoding not in broken_unicode_with_streams:
|
||||
# check stream reader/writer
|
||||
q = Queue(b"")
|
||||
writer = codecs.getwriter(encoding)(q)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
writer = codecs.getwriter(encoding)(q)
|
||||
encodedresult = b""
|
||||
for c in s:
|
||||
writer.write(c)
|
||||
|
@ -1396,7 +1444,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
self.assertTrue(type(chunk) is bytes, type(chunk))
|
||||
encodedresult += chunk
|
||||
q = Queue(b"")
|
||||
reader = codecs.getreader(encoding)(q)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(q)
|
||||
decodedresult = ""
|
||||
for c in encodedresult:
|
||||
q.write(bytes([c]))
|
||||
|
@ -1470,7 +1520,9 @@ class BasicUnicodeTest(unittest.TestCase, MixInCheckStateHandling):
|
|||
continue
|
||||
if encoding in broken_unicode_with_streams:
|
||||
continue
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
|
||||
for t in range(5):
|
||||
# Test that calling seek resets the internal codec state and buffers
|
||||
reader.seek(0, 0)
|
||||
|
@ -1539,15 +1591,19 @@ class CharmapTest(unittest.TestCase):
|
|||
class WithStmtTest(unittest.TestCase):
|
||||
def test_encodedfile(self):
|
||||
f = io.BytesIO(b"\xc3\xbc")
|
||||
with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
|
||||
self.assertEqual(ef.read(), b"\xfc")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
|
||||
self.assertEqual(ef.read(), b"\xfc")
|
||||
|
||||
def test_streamreaderwriter(self):
|
||||
f = io.BytesIO(b"\xc3\xbc")
|
||||
info = codecs.lookup("utf-8")
|
||||
with codecs.StreamReaderWriter(f, info.streamreader,
|
||||
info.streamwriter, 'strict') as srw:
|
||||
self.assertEqual(srw.read(), "\xfc")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
with codecs.StreamReaderWriter(f, info.streamreader,
|
||||
info.streamwriter, 'strict') as srw:
|
||||
self.assertEqual(srw.read(), "\xfc")
|
||||
|
||||
class TypesTest(unittest.TestCase):
|
||||
def test_decode_unicode(self):
|
||||
|
@ -1644,15 +1700,15 @@ class BomTest(unittest.TestCase):
|
|||
|
||||
# (StreamWriter) Check that the BOM is written after a seek(0)
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.writer.write(data[0])
|
||||
self.assertNotEqual(f.writer.tell(), 0)
|
||||
f.writer.seek(0)
|
||||
f.writer.write(data)
|
||||
f.write(data[0])
|
||||
self.assertNotEqual(f.tell(), 0)
|
||||
f.seek(0)
|
||||
f.write(data)
|
||||
f.seek(0)
|
||||
self.assertEqual(f.read(), data)
|
||||
|
||||
# Check that the BOM is not written after a seek() at a position
|
||||
# different than the start
|
||||
# Check that the BOM is not written after a seek() at a
|
||||
# position different than the start
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.write(data)
|
||||
f.seek(f.tell())
|
||||
|
@ -1660,12 +1716,12 @@ class BomTest(unittest.TestCase):
|
|||
f.seek(0)
|
||||
self.assertEqual(f.read(), data * 2)
|
||||
|
||||
# (StreamWriter) Check that the BOM is not written after a seek()
|
||||
# at a position different than the start
|
||||
# (StreamWriter) Check that the BOM is not written after a
|
||||
# seek() at a position different than the start
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.writer.write(data)
|
||||
f.writer.seek(f.writer.tell())
|
||||
f.writer.write(data)
|
||||
f.write(data)
|
||||
f.seek(f.tell())
|
||||
f.write(data)
|
||||
f.seek(0)
|
||||
self.assertEqual(f.read(), data * 2)
|
||||
|
||||
|
@ -1704,7 +1760,9 @@ class TransformCodecTest(unittest.TestCase):
|
|||
def test_read(self):
|
||||
for encoding in bytes_transform_encodings:
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.read()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
|
@ -1713,7 +1771,9 @@ class TransformCodecTest(unittest.TestCase):
|
|||
if encoding in ['uu_codec', 'zlib_codec']:
|
||||
continue
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.readline()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue