Make all the multibyte codec tests pass.

Changes to io.py, necessary to make this work:
- Redid io.StringIO as a TextIOWrapper on top of a BytesIO instance.
- Got rid of _MemoryIOMixin, folding it into BytesIO instead.
- The read() functions that take -1 to mean "eveything" now also take None.
- Added readline() support to BufferedIOBase. :-(
This commit is contained in:
Guido van Rossum 2007-05-17 23:59:11 +00:00
parent f4cfc8f6bb
commit 024da5c257
10 changed files with 1225 additions and 1227 deletions

129
Lib/io.py
View file

@ -415,8 +415,8 @@ class BufferedIOBase(IOBase):
def read(self, n: int = -1) -> bytes:
"""read(n: int = -1) -> bytes. Read and return up to n bytes.
If the argument is omitted, or negative, reads and returns all
data until EOF.
If the argument is omitted, None, or negative, reads and
returns all data until EOF.
If the argument is positive, and the underlying raw stream is
not 'interactive', multiple raw reads may be issued to satisfy
@ -450,6 +450,20 @@ class BufferedIOBase(IOBase):
b[:n] = data
return n
def readline(self, sizehint: int = -1) -> bytes:
"""For backwards compatibility, a (slow) readline()."""
if sizehint is None:
sizehint = -1
res = b""
while sizehint < 0 or len(res) < sizehint:
b = self.read(1)
if not b:
break
res += b
if b == b"\n":
break
return res
def write(self, b: bytes) -> int:
"""write(b: bytes) -> int. Write the given buffer to the IO stream.
@ -518,19 +532,25 @@ class _BufferedIOMixin(BufferedIOBase):
return self.raw.isatty()
class _MemoryIOMixin(BufferedIOBase):
class BytesIO(BufferedIOBase):
# XXX docstring
"""Buffered I/O implementation using an in-memory bytes buffer."""
def __init__(self, buffer):
# XXX More docs
def __init__(self, initial_bytes=None):
buffer = b""
if initial_bytes is not None:
buffer += initial_bytes
self._buffer = buffer
self._pos = 0
def getvalue(self):
return self._buffer
def read(self, n=-1):
assert n is not None
def read(self, n=None):
if n is None:
n = -1
if n < 0:
n = len(self._buffer)
newpos = min(len(self._buffer), self._pos + n)
@ -538,6 +558,9 @@ class _MemoryIOMixin(BufferedIOBase):
self._pos = newpos
return b
def read1(self, n):
return self.read(n)
def write(self, b):
n = len(b)
newpos = self._pos + n
@ -575,65 +598,6 @@ class _MemoryIOMixin(BufferedIOBase):
return True
class BytesIO(_MemoryIOMixin):
"""Buffered I/O implementation using a bytes buffer, like StringIO."""
# XXX More docs
def __init__(self, initial_bytes=None):
buffer = b""
if initial_bytes is not None:
buffer += initial_bytes
_MemoryIOMixin.__init__(self, buffer)
# XXX This should inherit from TextIOBase
class StringIO(_MemoryIOMixin):
"""Buffered I/O implementation using a string buffer, like StringIO."""
# XXX More docs
# Reuses the same code as BytesIO, but encode strings on the way in
# and decode them on the way out.
charsize = len("!".encode("unicode-internal"))
def __init__(self, initial_string=None):
if initial_string is not None:
buffer = initial_string.encode("unicode-internal")
else:
buffer = b""
_MemoryIOMixin.__init__(self, buffer)
def getvalue(self):
return self._buffer.encode("unicode-internal")
def read(self, n=-1):
return super(StringIO, self).read(n*self.charsize) \
.decode("unicode-internal")
def write(self, s):
return super(StringIO, self).write(s.encode("unicode-internal")) \
//self.charsize
def seek(self, pos, whence=0):
return super(StringIO, self).seek(self.charsize*pos, whence) \
//self.charsize
def tell(self):
return super(StringIO, self).tell()//self.charsize
def truncate(self, pos=None):
if pos is not None:
pos *= self.charsize
return super(StringIO, self).truncate(pos)//self.charsize
def readinto(self, b: bytes) -> int:
self._unsupported("readinto")
class BufferedReader(_BufferedIOMixin):
"""Buffer for a readable sequential RawIO object."""
@ -646,7 +610,7 @@ class BufferedReader(_BufferedIOMixin):
self._read_buf = b""
self.buffer_size = buffer_size
def read(self, n=-1):
def read(self, n=None):
"""Read n bytes.
Returns exactly n bytes of data unless the underlying raw IO
@ -654,7 +618,8 @@ class BufferedReader(_BufferedIOMixin):
mode. If n is negative, read until EOF or until read() would
block.
"""
assert n is not None
if n is None:
n = -1
nodata_val = b""
while n < 0 or len(self._read_buf) < n:
to_read = max(self.buffer_size,
@ -801,7 +766,9 @@ class BufferedRWPair(BufferedIOBase):
self.reader = BufferedReader(reader, buffer_size)
self.writer = BufferedWriter(writer, buffer_size, max_buffer_size)
def read(self, n=-1):
def read(self, n=None):
if n is None:
n = -1
return self.reader.read(n)
def readinto(self, b):
@ -861,7 +828,9 @@ class BufferedRandom(BufferedWriter, BufferedReader):
else:
return self.raw.tell() - len(self._read_buf)
def read(self, n=-1):
def read(self, n=None):
if n is None:
n = -1
self.flush()
return BufferedReader.read(self, n)
@ -1129,7 +1098,9 @@ class TextIOWrapper(TextIOBase):
except UnicodeEncodeError:
return u
def read(self, n: int = -1):
def read(self, n=None):
if n is None:
n = -1
decoder = self._decoder or self._get_decoder()
res = self._pending
if n < 0:
@ -1146,7 +1117,7 @@ class TextIOWrapper(TextIOBase):
self._pending = res[n:]
return self._simplify(res[:n])
def __next__(self) -> str:
def __next__(self):
self._telling = False
line = self.readline()
if not line:
@ -1218,3 +1189,17 @@ class TextIOWrapper(TextIOBase):
return self._simplify(line[:endpos] + "\n")
else:
return self._simplify(line[:nextpos])
class StringIO(TextIOWrapper):
# XXX This is really slow, but fully functional
def __init__(self, initial_value=""):
super(StringIO, self).__init__(BytesIO(), "utf-8")
if initial_value:
self.write(initial_value)
self.seek(0)
def getvalue(self):
return self.buffer.getvalue().decode("utf-8")