mirror of
https://github.com/python/cpython.git
synced 2025-08-28 12:45:07 +00:00
Support name and mode attributes on all file types.
Don't read more than one line when reading text from a tty device. Add peek() and read1() methods. Return str instead of unicode when return ASCII characters in text mode.
This commit is contained in:
parent
913dd0be52
commit
13633bb8c5
3 changed files with 83 additions and 18 deletions
91
Lib/io.py
91
Lib/io.py
|
@ -18,6 +18,7 @@ XXX don't use assert to validate input requirements
|
||||||
XXX whenever an argument is None, use the default value
|
XXX whenever an argument is None, use the default value
|
||||||
XXX read/write ops should check readable/writable
|
XXX read/write ops should check readable/writable
|
||||||
XXX buffered readinto should work with arbitrary buffer objects
|
XXX buffered readinto should work with arbitrary buffer objects
|
||||||
|
XXX use incremental encoder for text output, at least for UTF-16
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__author__ = ("Guido van Rossum <guido@python.org>, "
|
__author__ = ("Guido van Rossum <guido@python.org>, "
|
||||||
|
@ -137,6 +138,8 @@ def open(file, mode="r", buffering=None, *, encoding=None, newline=None):
|
||||||
raise ValueError("invalid buffering size")
|
raise ValueError("invalid buffering size")
|
||||||
if buffering == 0:
|
if buffering == 0:
|
||||||
if binary:
|
if binary:
|
||||||
|
raw._name = file
|
||||||
|
raw._mode = mode
|
||||||
return raw
|
return raw
|
||||||
raise ValueError("can't have unbuffered text I/O")
|
raise ValueError("can't have unbuffered text I/O")
|
||||||
if updating:
|
if updating:
|
||||||
|
@ -147,8 +150,13 @@ def open(file, mode="r", buffering=None, *, encoding=None, newline=None):
|
||||||
assert reading
|
assert reading
|
||||||
buffer = BufferedReader(raw, buffering)
|
buffer = BufferedReader(raw, buffering)
|
||||||
if binary:
|
if binary:
|
||||||
|
buffer.name = file
|
||||||
|
buffer.mode = mode
|
||||||
return buffer
|
return buffer
|
||||||
return TextIOWrapper(buffer, encoding, newline)
|
text = TextIOWrapper(buffer, encoding, newline)
|
||||||
|
text.name = file
|
||||||
|
text.mode = mode
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
class IOBase:
|
class IOBase:
|
||||||
|
@ -349,6 +357,14 @@ class FileIO(_fileio._FileIO, RawIOBase):
|
||||||
_fileio._FileIO.close(self)
|
_fileio._FileIO.close(self)
|
||||||
RawIOBase.close(self)
|
RawIOBase.close(self)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mode(self):
|
||||||
|
return self._mode
|
||||||
|
|
||||||
|
|
||||||
class SocketIO(RawIOBase):
|
class SocketIO(RawIOBase):
|
||||||
|
|
||||||
|
@ -628,7 +644,6 @@ class BufferedReader(_BufferedIOMixin):
|
||||||
to_read = max(self.buffer_size,
|
to_read = max(self.buffer_size,
|
||||||
n if n is not None else 2*len(self._read_buf))
|
n if n is not None else 2*len(self._read_buf))
|
||||||
current = self.raw.read(to_read)
|
current = self.raw.read(to_read)
|
||||||
|
|
||||||
if current in (b"", None):
|
if current in (b"", None):
|
||||||
nodata_val = current
|
nodata_val = current
|
||||||
break
|
break
|
||||||
|
@ -642,6 +657,39 @@ class BufferedReader(_BufferedIOMixin):
|
||||||
out = nodata_val
|
out = nodata_val
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
def peek(self, n=0, *, unsafe=False):
|
||||||
|
"""Returns buffered bytes without advancing the position.
|
||||||
|
|
||||||
|
The argument indicates a desired minimal number of bytes; we
|
||||||
|
do at most one raw read to satisfy it. We never return more
|
||||||
|
than self.buffer_size.
|
||||||
|
|
||||||
|
Unless unsafe=True is passed, we return a copy.
|
||||||
|
"""
|
||||||
|
want = min(n, self.buffer_size)
|
||||||
|
have = len(self._read_buf)
|
||||||
|
if have < want:
|
||||||
|
to_read = self.buffer_size - have
|
||||||
|
current = self.raw.read(to_read)
|
||||||
|
if current:
|
||||||
|
self._read_buf += current
|
||||||
|
result = self._read_buf
|
||||||
|
if unsafe:
|
||||||
|
result = result[:]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def read1(self, n):
|
||||||
|
"""Reads up to n bytes.
|
||||||
|
|
||||||
|
Returns up to n bytes. If at least one byte is buffered,
|
||||||
|
we only return buffered bytes. Otherwise, we do one
|
||||||
|
raw read.
|
||||||
|
"""
|
||||||
|
if n <= 0:
|
||||||
|
return b""
|
||||||
|
self.peek(1, unsafe=True)
|
||||||
|
return self.read(min(n, len(self._read_buf)))
|
||||||
|
|
||||||
def tell(self):
|
def tell(self):
|
||||||
return self.raw.tell() - len(self._read_buf)
|
return self.raw.tell() - len(self._read_buf)
|
||||||
|
|
||||||
|
@ -746,6 +794,12 @@ class BufferedRWPair(BufferedIOBase):
|
||||||
def write(self, b):
|
def write(self, b):
|
||||||
return self.writer.write(b)
|
return self.writer.write(b)
|
||||||
|
|
||||||
|
def peek(self, n=0, *, unsafe=False):
|
||||||
|
return self.reader.peek(n, unsafe=unsafe)
|
||||||
|
|
||||||
|
def read1(self, n):
|
||||||
|
return self.reader.read1(n)
|
||||||
|
|
||||||
def readable(self):
|
def readable(self):
|
||||||
return self.reader.readable()
|
return self.reader.readable()
|
||||||
|
|
||||||
|
@ -799,6 +853,14 @@ class BufferedRandom(BufferedWriter, BufferedReader):
|
||||||
self.flush()
|
self.flush()
|
||||||
return BufferedReader.readinto(self, b)
|
return BufferedReader.readinto(self, b)
|
||||||
|
|
||||||
|
def peek(self, n=0, *, unsafe=False):
|
||||||
|
self.flush()
|
||||||
|
return BufferedReader.peek(self, n, unsafe=unsafe)
|
||||||
|
|
||||||
|
def read1(self, n):
|
||||||
|
self.flush()
|
||||||
|
return BufferedReader.read1(self, n)
|
||||||
|
|
||||||
def write(self, b):
|
def write(self, b):
|
||||||
if self._read_buf:
|
if self._read_buf:
|
||||||
self.raw.seek(-len(self._read_buf), 1) # Undo readahead
|
self.raw.seek(-len(self._read_buf), 1) # Undo readahead
|
||||||
|
@ -932,6 +994,7 @@ class TextIOWrapper(TextIOBase):
|
||||||
b = bytes(b)
|
b = bytes(b)
|
||||||
n = self.buffer.write(b)
|
n = self.buffer.write(b)
|
||||||
if "\n" in s:
|
if "\n" in s:
|
||||||
|
# XXX only if isatty
|
||||||
self.flush()
|
self.flush()
|
||||||
self._snapshot = self._decoder = None
|
self._snapshot = self._decoder = None
|
||||||
return len(s)
|
return len(s)
|
||||||
|
@ -951,11 +1014,11 @@ class TextIOWrapper(TextIOBase):
|
||||||
def _read_chunk(self):
|
def _read_chunk(self):
|
||||||
assert self._decoder is not None
|
assert self._decoder is not None
|
||||||
if not self._telling:
|
if not self._telling:
|
||||||
readahead = self.buffer.read(self._CHUNK_SIZE)
|
readahead = self.buffer.read1(self._CHUNK_SIZE)
|
||||||
pending = self._decoder.decode(readahead, not readahead)
|
pending = self._decoder.decode(readahead, not readahead)
|
||||||
return readahead, pending
|
return readahead, pending
|
||||||
decoder_state = pickle.dumps(self._decoder, 2)
|
decoder_state = pickle.dumps(self._decoder, 2)
|
||||||
readahead = self.buffer.read(self._CHUNK_SIZE)
|
readahead = self.buffer.read1(self._CHUNK_SIZE)
|
||||||
pending = self._decoder.decode(readahead, not readahead)
|
pending = self._decoder.decode(readahead, not readahead)
|
||||||
self._snapshot = (decoder_state, readahead, pending)
|
self._snapshot = (decoder_state, readahead, pending)
|
||||||
return readahead, pending
|
return readahead, pending
|
||||||
|
@ -1043,6 +1106,14 @@ class TextIOWrapper(TextIOBase):
|
||||||
self._decoder = decoder
|
self._decoder = decoder
|
||||||
return orig_pos
|
return orig_pos
|
||||||
|
|
||||||
|
def _simplify(self, u):
|
||||||
|
# XXX Hack until str/unicode unification: return str instead
|
||||||
|
# of unicode if it's all ASCII
|
||||||
|
try:
|
||||||
|
return str(u)
|
||||||
|
except UnicodeEncodeError:
|
||||||
|
return u
|
||||||
|
|
||||||
def read(self, n: int = -1):
|
def read(self, n: int = -1):
|
||||||
decoder = self._decoder or self._get_decoder()
|
decoder = self._decoder or self._get_decoder()
|
||||||
res = self._pending
|
res = self._pending
|
||||||
|
@ -1050,7 +1121,7 @@ class TextIOWrapper(TextIOBase):
|
||||||
res += decoder.decode(self.buffer.read(), True)
|
res += decoder.decode(self.buffer.read(), True)
|
||||||
self._pending = ""
|
self._pending = ""
|
||||||
self._snapshot = None
|
self._snapshot = None
|
||||||
return res
|
return self._simplify(res)
|
||||||
else:
|
else:
|
||||||
while len(res) < n:
|
while len(res) < n:
|
||||||
readahead, pending = self._read_chunk()
|
readahead, pending = self._read_chunk()
|
||||||
|
@ -1058,7 +1129,7 @@ class TextIOWrapper(TextIOBase):
|
||||||
if not readahead:
|
if not readahead:
|
||||||
break
|
break
|
||||||
self._pending = res[n:]
|
self._pending = res[n:]
|
||||||
return res[:n]
|
return self._simplify(res[:n])
|
||||||
|
|
||||||
def next(self) -> str:
|
def next(self) -> str:
|
||||||
self._telling = False
|
self._telling = False
|
||||||
|
@ -1074,9 +1145,9 @@ class TextIOWrapper(TextIOBase):
|
||||||
# XXX Hack to support limit argument, for backwards compatibility
|
# XXX Hack to support limit argument, for backwards compatibility
|
||||||
line = self.readline()
|
line = self.readline()
|
||||||
if len(line) <= limit:
|
if len(line) <= limit:
|
||||||
return line
|
return self._simplify(line)
|
||||||
line, self._pending = line[:limit], line[limit:] + self._pending
|
line, self._pending = line[:limit], line[limit:] + self._pending
|
||||||
return line
|
return self._simplify(line)
|
||||||
|
|
||||||
line = self._pending
|
line = self._pending
|
||||||
start = 0
|
start = 0
|
||||||
|
@ -1129,6 +1200,6 @@ class TextIOWrapper(TextIOBase):
|
||||||
# XXX Update self.newlines here if we want to support that
|
# XXX Update self.newlines here if we want to support that
|
||||||
|
|
||||||
if self._fix_newlines and ending not in ("\n", ""):
|
if self._fix_newlines and ending not in ("\n", ""):
|
||||||
return line[:endpos] + "\n"
|
return self._simplify(line[:endpos] + "\n")
|
||||||
else:
|
else:
|
||||||
return line[:nextpos]
|
return self._simplify(line[:nextpos])
|
||||||
|
|
|
@ -163,14 +163,7 @@ class BytesTest(unittest.TestCase):
|
||||||
f.write(b)
|
f.write(b)
|
||||||
with open(tfn, "rb") as f:
|
with open(tfn, "rb") as f:
|
||||||
self.assertEqual(f.read(), sample)
|
self.assertEqual(f.read(), sample)
|
||||||
# Test writing in text mode
|
# Text mode is ambiguous; don't test
|
||||||
with open(tfn, "w") as f:
|
|
||||||
f.write(b)
|
|
||||||
with open(tfn, "r") as f:
|
|
||||||
self.assertEqual(f.read(), sample)
|
|
||||||
# Can't use readinto in text mode
|
|
||||||
with open(tfn, "r") as f:
|
|
||||||
self.assertRaises(TypeError, f.readinto, b)
|
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
os.remove(tfn)
|
os.remove(tfn)
|
||||||
|
|
|
@ -51,6 +51,7 @@ class AutoFileTests(unittest.TestCase):
|
||||||
|
|
||||||
self.assertEquals(f.mode, "w")
|
self.assertEquals(f.mode, "w")
|
||||||
self.assertEquals(f.closed, False)
|
self.assertEquals(f.closed, False)
|
||||||
|
self.assertEquals(f.name, TESTFN)
|
||||||
|
|
||||||
# verify the attributes are readonly
|
# verify the attributes are readonly
|
||||||
for attr in 'mode', 'closed':
|
for attr in 'mode', 'closed':
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue