mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Issue #22854: Merge UnsupportedOperation fixes from 3.5
This commit is contained in:
commit
047f3b7376
6 changed files with 130 additions and 35 deletions
|
@ -203,6 +203,9 @@ class MockUnseekableIO:
|
|||
def tell(self, *args):
|
||||
raise self.UnsupportedOperation("not seekable")
|
||||
|
||||
def truncate(self, *args):
|
||||
raise self.UnsupportedOperation("not seekable")
|
||||
|
||||
class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
|
||||
UnsupportedOperation = io.UnsupportedOperation
|
||||
|
||||
|
@ -361,6 +364,107 @@ class IOTest(unittest.TestCase):
|
|||
self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
|
||||
self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
|
||||
|
||||
def test_optional_abilities(self):
|
||||
# Test for OSError when optional APIs are not supported
|
||||
# The purpose of this test is to try fileno(), reading, writing and
|
||||
# seeking operations with various objects that indicate they do not
|
||||
# support these operations.
|
||||
|
||||
def pipe_reader():
|
||||
[r, w] = os.pipe()
|
||||
os.close(w) # So that read() is harmless
|
||||
return self.FileIO(r, "r")
|
||||
|
||||
def pipe_writer():
|
||||
[r, w] = os.pipe()
|
||||
self.addCleanup(os.close, r)
|
||||
# Guarantee that we can write into the pipe without blocking
|
||||
thread = threading.Thread(target=os.read, args=(r, 100))
|
||||
thread.start()
|
||||
self.addCleanup(thread.join)
|
||||
return self.FileIO(w, "w")
|
||||
|
||||
def buffered_reader():
|
||||
return self.BufferedReader(self.MockUnseekableIO())
|
||||
|
||||
def buffered_writer():
|
||||
return self.BufferedWriter(self.MockUnseekableIO())
|
||||
|
||||
def buffered_random():
|
||||
return self.BufferedRandom(self.BytesIO())
|
||||
|
||||
def buffered_rw_pair():
|
||||
return self.BufferedRWPair(self.MockUnseekableIO(),
|
||||
self.MockUnseekableIO())
|
||||
|
||||
def text_reader():
|
||||
class UnseekableReader(self.MockUnseekableIO):
|
||||
writable = self.BufferedIOBase.writable
|
||||
write = self.BufferedIOBase.write
|
||||
return self.TextIOWrapper(UnseekableReader(), "ascii")
|
||||
|
||||
def text_writer():
|
||||
class UnseekableWriter(self.MockUnseekableIO):
|
||||
readable = self.BufferedIOBase.readable
|
||||
read = self.BufferedIOBase.read
|
||||
return self.TextIOWrapper(UnseekableWriter(), "ascii")
|
||||
|
||||
tests = (
|
||||
(pipe_reader, "fr"), (pipe_writer, "fw"),
|
||||
(buffered_reader, "r"), (buffered_writer, "w"),
|
||||
(buffered_random, "rws"), (buffered_rw_pair, "rw"),
|
||||
(text_reader, "r"), (text_writer, "w"),
|
||||
(self.BytesIO, "rws"), (self.StringIO, "rws"),
|
||||
)
|
||||
for [test, abilities] in tests:
|
||||
if test is pipe_writer and not threading:
|
||||
continue # Skip subtest that uses a background thread
|
||||
with self.subTest(test), test() as obj:
|
||||
readable = "r" in abilities
|
||||
self.assertEqual(obj.readable(), readable)
|
||||
writable = "w" in abilities
|
||||
self.assertEqual(obj.writable(), writable)
|
||||
seekable = "s" in abilities
|
||||
self.assertEqual(obj.seekable(), seekable)
|
||||
|
||||
if isinstance(obj, self.TextIOBase):
|
||||
data = "3"
|
||||
elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
|
||||
data = b"3"
|
||||
else:
|
||||
self.fail("Unknown base class")
|
||||
|
||||
if "f" in abilities:
|
||||
obj.fileno()
|
||||
else:
|
||||
self.assertRaises(OSError, obj.fileno)
|
||||
|
||||
if readable:
|
||||
obj.read(1)
|
||||
obj.read()
|
||||
else:
|
||||
self.assertRaises(OSError, obj.read, 1)
|
||||
self.assertRaises(OSError, obj.read)
|
||||
|
||||
if writable:
|
||||
obj.write(data)
|
||||
else:
|
||||
self.assertRaises(OSError, obj.write, data)
|
||||
|
||||
if seekable:
|
||||
obj.tell()
|
||||
obj.seek(0)
|
||||
else:
|
||||
self.assertRaises(OSError, obj.tell)
|
||||
self.assertRaises(OSError, obj.seek, 0)
|
||||
|
||||
if writable and seekable:
|
||||
obj.truncate()
|
||||
obj.truncate(0)
|
||||
else:
|
||||
self.assertRaises(OSError, obj.truncate)
|
||||
self.assertRaises(OSError, obj.truncate, 0)
|
||||
|
||||
def test_open_handles_NUL_chars(self):
|
||||
fn_with_NUL = 'foo\0bar'
|
||||
self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
|
||||
|
@ -751,12 +855,6 @@ class CommonBufferedTests:
|
|||
|
||||
self.assertEqual(42, bufio.fileno())
|
||||
|
||||
@unittest.skip('test having existential crisis')
|
||||
def test_no_fileno(self):
|
||||
# XXX will we always have fileno() function? If so, kill
|
||||
# this test. Else, write it.
|
||||
pass
|
||||
|
||||
def test_invalid_args(self):
|
||||
rawio = self.MockRawIO()
|
||||
bufio = self.tp(rawio)
|
||||
|
@ -784,13 +882,9 @@ class CommonBufferedTests:
|
|||
super().flush()
|
||||
rawio = self.MockRawIO()
|
||||
bufio = MyBufferedIO(rawio)
|
||||
writable = bufio.writable()
|
||||
del bufio
|
||||
support.gc_collect()
|
||||
if writable:
|
||||
self.assertEqual(record, [1, 2, 3])
|
||||
else:
|
||||
self.assertEqual(record, [1, 2])
|
||||
self.assertEqual(record, [1, 2, 3])
|
||||
|
||||
def test_context_manager(self):
|
||||
# Test usability as a context manager
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue