mirror of
https://github.com/python/cpython.git
synced 2025-07-24 19:54:21 +00:00

*ordering* between objects; there is only a default equality test (defined by an object being equal to itself only). Read the comment in object.c. The current implementation never uses a three-way comparison to compute a rich comparison, but it does use a rich comparison to compute a three-way comparison. I'm not quite done ripping out all the calls to PyObject_Compare/Cmp, or replacing tp_compare implementations with tp_richcompare implementations; but much of that has happened (to make most unit tests pass). The following tests still fail, because I need help deciding or understanding: test_codeop -- depends on comparing code objects test_datetime -- need Tim Peters' opinion test_marshal -- depends on comparing code objects test_mutants -- need help understanding it The problem with test_codeop and test_marshal is this: these tests compare two different code objects and expect them to be equal. Is that still a feature we'd like to support? I've temporarily removed the comparison and hash code from code objects, so they use the default (equality by pointer only) comparison. For the other two tests, run them to see for yourself. (There may be more failing test with "-u all".) A general problem with getting lots of these tests to pass is the reality that for object types that have a natural total ordering, implementing __cmp__ is much more convenient than implementing __eq__, __ne__, __lt__, and so on. Should we go back to allowing __cmp__ to provide a total ordering? Should we provide some other way to implement rich comparison with a single method override? Alex proposed a __key__() method; I've considered a __richcmp__() method. Or perhaps __cmp__() just shouldn't be killed off...
325 lines
10 KiB
Python
325 lines
10 KiB
Python
r"""File-like objects that read from or write to a string buffer.
|
|
|
|
This implements (nearly) all stdio methods.
|
|
|
|
f = StringIO() # ready for writing
|
|
f = StringIO(buf) # ready for reading
|
|
f.close() # explicitly release resources held
|
|
flag = f.isatty() # always false
|
|
pos = f.tell() # get current position
|
|
f.seek(pos) # set current position
|
|
f.seek(pos, mode) # mode 0: absolute; 1: relative; 2: relative to EOF
|
|
buf = f.read() # read until EOF
|
|
buf = f.read(n) # read up to n bytes
|
|
buf = f.readline() # read until end of line ('\n') or EOF
|
|
list = f.readlines()# list of f.readline() results until EOF
|
|
f.truncate([size]) # truncate file at to at most size (default: current pos)
|
|
f.write(buf) # write at current position
|
|
f.writelines(list) # for line in list: f.write(line)
|
|
f.getvalue() # return whole file's contents as a string
|
|
|
|
Notes:
|
|
- Using a real file is often faster (but less convenient).
|
|
- There's also a much faster implementation in C, called cStringIO, but
|
|
it's not subclassable.
|
|
- fileno() is left unimplemented so that code which uses it triggers
|
|
an exception early.
|
|
- Seeking far beyond EOF and then writing will insert real null
|
|
bytes that occupy space in the buffer.
|
|
- There's a simple test set (see end of this file).
|
|
"""
|
|
try:
|
|
from errno import EINVAL
|
|
except ImportError:
|
|
EINVAL = 22
|
|
|
|
__all__ = ["StringIO"]
|
|
|
|
def _complain_ifclosed(closed):
|
|
if closed:
|
|
raise ValueError, "I/O operation on closed file"
|
|
|
|
class StringIO:
|
|
"""class StringIO([buffer])
|
|
|
|
When a StringIO object is created, it can be initialized to an existing
|
|
string by passing the string to the constructor. If no string is given,
|
|
the StringIO will start empty.
|
|
|
|
The StringIO object can accept either Unicode or 8-bit strings, but
|
|
mixing the two may take some care. If both are used, 8-bit strings that
|
|
cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause
|
|
a UnicodeError to be raised when getvalue() is called.
|
|
"""
|
|
def __init__(self, buf = ''):
|
|
# Force self.buf to be a string or unicode
|
|
if not isinstance(buf, basestring):
|
|
buf = str(buf)
|
|
self.buf = buf
|
|
self.len = len(buf)
|
|
self.buflist = []
|
|
self.pos = 0
|
|
self.closed = False
|
|
self.softspace = 0
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
"""A file object is its own iterator, for example iter(f) returns f
|
|
(unless f is closed). When a file is used as an iterator, typically
|
|
in a for loop (for example, for line in f: print line), the next()
|
|
method is called repeatedly. This method returns the next input line,
|
|
or raises StopIteration when EOF is hit.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
r = self.readline()
|
|
if not r:
|
|
raise StopIteration
|
|
return r
|
|
|
|
def close(self):
|
|
"""Free the memory buffer.
|
|
"""
|
|
if not self.closed:
|
|
self.closed = True
|
|
del self.buf, self.pos
|
|
|
|
def isatty(self):
|
|
"""Returns False because StringIO objects are not connected to a
|
|
tty-like device.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
return False
|
|
|
|
def seek(self, pos, mode = 0):
|
|
"""Set the file's current position.
|
|
|
|
The mode argument is optional and defaults to 0 (absolute file
|
|
positioning); other values are 1 (seek relative to the current
|
|
position) and 2 (seek relative to the file's end).
|
|
|
|
There is no return value.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
if self.buflist:
|
|
self.buf += ''.join(self.buflist)
|
|
self.buflist = []
|
|
if mode == 1:
|
|
pos += self.pos
|
|
elif mode == 2:
|
|
pos += self.len
|
|
self.pos = max(0, pos)
|
|
|
|
def tell(self):
|
|
"""Return the file's current position."""
|
|
_complain_ifclosed(self.closed)
|
|
return self.pos
|
|
|
|
def read(self, n=None):
|
|
"""Read at most size bytes from the file
|
|
(less if the read hits EOF before obtaining size bytes).
|
|
|
|
If the size argument is negative or omitted, read all data until EOF
|
|
is reached. The bytes are returned as a string object. An empty
|
|
string is returned when EOF is encountered immediately.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
if self.buflist:
|
|
self.buf += ''.join(self.buflist)
|
|
self.buflist = []
|
|
if n is None:
|
|
n = -1
|
|
if n < 0:
|
|
newpos = self.len
|
|
else:
|
|
newpos = min(self.pos+n, self.len)
|
|
r = self.buf[self.pos:newpos]
|
|
self.pos = newpos
|
|
return r
|
|
|
|
def readline(self, length=None):
|
|
"""Read one entire line from the file.
|
|
|
|
A trailing newline character is kept in the string (but may be absent
|
|
when a file ends with an incomplete line). If the size argument is
|
|
present and non-negative, it is a maximum byte count (including the
|
|
trailing newline) and an incomplete line may be returned.
|
|
|
|
An empty string is returned only when EOF is encountered immediately.
|
|
|
|
Note: Unlike stdio's fgets(), the returned string contains null
|
|
characters ('\0') if they occurred in the input.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
if self.buflist:
|
|
self.buf += ''.join(self.buflist)
|
|
self.buflist = []
|
|
i = self.buf.find('\n', self.pos)
|
|
if i < 0:
|
|
newpos = self.len
|
|
else:
|
|
newpos = i+1
|
|
if length is not None:
|
|
if self.pos + length < newpos:
|
|
newpos = self.pos + length
|
|
r = self.buf[self.pos:newpos]
|
|
self.pos = newpos
|
|
return r
|
|
|
|
def readlines(self, sizehint = 0):
|
|
"""Read until EOF using readline() and return a list containing the
|
|
lines thus read.
|
|
|
|
If the optional sizehint argument is present, instead of reading up
|
|
to EOF, whole lines totalling approximately sizehint bytes (or more
|
|
to accommodate a final whole line).
|
|
"""
|
|
total = 0
|
|
lines = []
|
|
line = self.readline()
|
|
while line:
|
|
lines.append(line)
|
|
total += len(line)
|
|
if 0 < sizehint <= total:
|
|
break
|
|
line = self.readline()
|
|
return lines
|
|
|
|
def truncate(self, size=None):
|
|
"""Truncate the file's size.
|
|
|
|
If the optional size argument is present, the file is truncated to
|
|
(at most) that size. The size defaults to the current position.
|
|
The current file position is not changed unless the position
|
|
is beyond the new file size.
|
|
|
|
If the specified size exceeds the file's current size, the
|
|
file remains unchanged.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
if size is None:
|
|
size = self.pos
|
|
elif size < 0:
|
|
raise IOError(EINVAL, "Negative size not allowed")
|
|
elif size < self.pos:
|
|
self.pos = size
|
|
self.buf = self.getvalue()[:size]
|
|
self.len = size
|
|
|
|
def write(self, s):
|
|
"""Write a string to the file.
|
|
|
|
There is no return value.
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
if not s: return
|
|
# Force s to be a string or unicode
|
|
if not isinstance(s, basestring):
|
|
s = str(s)
|
|
spos = self.pos
|
|
slen = self.len
|
|
if spos == slen:
|
|
self.buflist.append(s)
|
|
self.len = self.pos = spos + len(s)
|
|
return
|
|
if spos > slen:
|
|
self.buflist.append('\0'*(spos - slen))
|
|
slen = spos
|
|
newpos = spos + len(s)
|
|
if spos < slen:
|
|
if self.buflist:
|
|
self.buf += ''.join(self.buflist)
|
|
self.buflist = [self.buf[:spos], s, self.buf[newpos:]]
|
|
self.buf = ''
|
|
if newpos > slen:
|
|
slen = newpos
|
|
else:
|
|
self.buflist.append(s)
|
|
slen = newpos
|
|
self.len = slen
|
|
self.pos = newpos
|
|
|
|
def writelines(self, iterable):
|
|
"""Write a sequence of strings to the file. The sequence can be any
|
|
iterable object producing strings, typically a list of strings. There
|
|
is no return value.
|
|
|
|
(The name is intended to match readlines(); writelines() does not add
|
|
line separators.)
|
|
"""
|
|
write = self.write
|
|
for line in iterable:
|
|
write(line)
|
|
|
|
def flush(self):
|
|
"""Flush the internal buffer
|
|
"""
|
|
_complain_ifclosed(self.closed)
|
|
|
|
def getvalue(self):
|
|
"""
|
|
Retrieve the entire contents of the "file" at any time before
|
|
the StringIO object's close() method is called.
|
|
|
|
The StringIO object can accept either Unicode or 8-bit strings,
|
|
but mixing the two may take some care. If both are used, 8-bit
|
|
strings that cannot be interpreted as 7-bit ASCII (that use the
|
|
8th bit) will cause a UnicodeError to be raised when getvalue()
|
|
is called.
|
|
"""
|
|
if self.buflist:
|
|
self.buf += ''.join(self.buflist)
|
|
self.buflist = []
|
|
return self.buf
|
|
|
|
|
|
# A little test suite
|
|
|
|
def test():
|
|
import sys
|
|
if sys.argv[1:]:
|
|
file = sys.argv[1]
|
|
else:
|
|
file = '/etc/passwd'
|
|
lines = open(file, 'r').readlines()
|
|
text = open(file, 'r').read()
|
|
f = StringIO()
|
|
for line in lines[:-2]:
|
|
f.write(line)
|
|
f.writelines(lines[-2:])
|
|
if f.getvalue() != text:
|
|
raise RuntimeError, 'write failed'
|
|
length = f.tell()
|
|
print 'File length =', length
|
|
f.seek(len(lines[0]))
|
|
f.write(lines[1])
|
|
f.seek(0)
|
|
print 'First line =', repr(f.readline())
|
|
print 'Position =', f.tell()
|
|
line = f.readline()
|
|
print 'Second line =', repr(line)
|
|
f.seek(-len(line), 1)
|
|
line2 = f.read(len(line))
|
|
if line != line2:
|
|
raise RuntimeError, 'bad result after seek back'
|
|
f.seek(len(line2), 1)
|
|
list = f.readlines()
|
|
line = list[-1]
|
|
f.seek(f.tell() - len(line))
|
|
line2 = f.read()
|
|
if line != line2:
|
|
raise RuntimeError, 'bad result after seek back from EOF'
|
|
print 'Read', len(list), 'more lines'
|
|
print 'File length =', f.tell()
|
|
if f.tell() != length:
|
|
raise RuntimeError, 'bad length'
|
|
f.truncate(length/2)
|
|
f.seek(0, 2)
|
|
print 'Truncated length =', f.tell()
|
|
if f.tell() != length/2:
|
|
raise RuntimeError, 'truncate did not adjust length'
|
|
f.close()
|
|
|
|
if __name__ == '__main__':
|
|
test()
|