bpo-33753: Refactor creating temporary files in test_fileinput. (GH-7377)

This commit is contained in:
Serhiy Storchaka 2018-06-05 12:08:36 +03:00 committed by GitHub
parent f822549653
commit 5f48e2644d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -8,6 +8,7 @@ import re
import fileinput import fileinput
import collections import collections
import builtins import builtins
import tempfile
import unittest import unittest
try: try:
@ -33,20 +34,15 @@ from unittest import mock
# all the work, and a few functions (input, etc.) that use a global _state # all the work, and a few functions (input, etc.) that use a global _state
# variable. # variable.
# Write lines (a list of lines) to temp file number i, and return the class BaseTests:
# temp file's name. # Write a content (str or bytes) to temp file, and return the
def writeTmp(i, lines, mode='w'): # opening in text mode is the default # temp file's name.
name = TESTFN + str(i) def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
f = open(name, mode) fd, name = tempfile.mkstemp()
for line in lines: self.addCleanup(support.unlink, name)
f.write(line) with open(fd, mode) as f:
f.close() f.write(content)
return name return name
def remove_tempfiles(*names):
for name in names:
if name:
safe_unlink(name)
class LineReader: class LineReader:
@ -84,23 +80,19 @@ class LineReader:
def close(self): def close(self):
pass pass
class BufferSizesTests(unittest.TestCase): class BufferSizesTests(BaseTests, unittest.TestCase):
def test_buffer_sizes(self): def test_buffer_sizes(self):
# First, run the tests with default and teeny buffer size. # First, run the tests with default and teeny buffer size.
for round, bs in (0, 0), (1, 30): for round, bs in (0, 0), (1, 30):
t1 = t2 = t3 = t4 = None t1 = self.writeTmp(''.join("Line %s of file 1\n" % (i+1) for i in range(15)))
try: t2 = self.writeTmp(''.join("Line %s of file 2\n" % (i+1) for i in range(10)))
t1 = writeTmp(1, ["Line %s of file 1\n" % (i+1) for i in range(15)]) t3 = self.writeTmp(''.join("Line %s of file 3\n" % (i+1) for i in range(5)))
t2 = writeTmp(2, ["Line %s of file 2\n" % (i+1) for i in range(10)]) t4 = self.writeTmp(''.join("Line %s of file 4\n" % (i+1) for i in range(1)))
t3 = writeTmp(3, ["Line %s of file 3\n" % (i+1) for i in range(5)]) if bs:
t4 = writeTmp(4, ["Line %s of file 4\n" % (i+1) for i in range(1)]) with self.assertWarns(DeprecationWarning):
if bs:
with self.assertWarns(DeprecationWarning):
self.buffer_size_test(t1, t2, t3, t4, bs, round)
else:
self.buffer_size_test(t1, t2, t3, t4, bs, round) self.buffer_size_test(t1, t2, t3, t4, bs, round)
finally: else:
remove_tempfiles(t1, t2, t3, t4) self.buffer_size_test(t1, t2, t3, t4, bs, round)
def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0): def buffer_size_test(self, t1, t2, t3, t4, bs=0, round=0):
pat = re.compile(r'LINE (\d+) OF FILE (\d+)') pat = re.compile(r'LINE (\d+) OF FILE (\d+)')
@ -187,74 +179,59 @@ class UnconditionallyRaise:
self.invoked = True self.invoked = True
raise self.exception_type() raise self.exception_type()
class FileInputTests(unittest.TestCase): class FileInputTests(BaseTests, unittest.TestCase):
def test_zero_byte_files(self): def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None t1 = self.writeTmp("")
try: t2 = self.writeTmp("")
t1 = writeTmp(1, [""]) t3 = self.writeTmp("The only line there is.\n")
t2 = writeTmp(2, [""]) t4 = self.writeTmp("")
t3 = writeTmp(3, ["The only line there is.\n"]) fi = FileInput(files=(t1, t2, t3, t4))
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline() line = fi.readline()
self.assertEqual(line, 'The only line there is.\n') self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1) self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3) self.assertEqual(fi.filename(), t3)
line = fi.readline() line = fi.readline()
self.assertFalse(line) self.assertFalse(line)
self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0) self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4) self.assertEqual(fi.filename(), t4)
fi.close() fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def test_files_that_dont_end_with_newline(self): def test_files_that_dont_end_with_newline(self):
t1 = t2 = None t1 = self.writeTmp("A\nB\nC")
try: t2 = self.writeTmp("D\nE\nF")
t1 = writeTmp(1, ["A\nB\nC"]) fi = FileInput(files=(t1, t2))
t2 = writeTmp(2, ["D\nE\nF"]) lines = list(fi)
fi = FileInput(files=(t1, t2)) self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
lines = list(fi) self.assertEqual(fi.filelineno(), 3)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) self.assertEqual(fi.lineno(), 6)
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)
finally:
remove_tempfiles(t1, t2)
## def test_unicode_filenames(self): ## def test_unicode_filenames(self):
## # XXX A unicode string is always returned by writeTmp. ## # XXX A unicode string is always returned by writeTmp.
## # So is this needed? ## # So is this needed?
## try: ## t1 = self.writeTmp("A\nB")
## t1 = writeTmp(1, ["A\nB"]) ## encoding = sys.getfilesystemencoding()
## encoding = sys.getfilesystemencoding() ## if encoding is None:
## if encoding is None: ## encoding = 'ascii'
## encoding = 'ascii' ## fi = FileInput(files=str(t1, encoding))
## fi = FileInput(files=str(t1, encoding)) ## lines = list(fi)
## lines = list(fi) ## self.assertEqual(lines, ["A\n", "B"])
## self.assertEqual(lines, ["A\n", "B"])
## finally:
## remove_tempfiles(t1)
def test_fileno(self): def test_fileno(self):
t1 = t2 = None t1 = self.writeTmp("A\nB")
try: t2 = self.writeTmp("C\nD")
t1 = writeTmp(1, ["A\nB"]) fi = FileInput(files=(t1, t2))
t2 = writeTmp(2, ["C\nD"]) self.assertEqual(fi.fileno(), -1)
fi = FileInput(files=(t1, t2)) line = next(fi)
self.assertEqual(fi.fileno(), -1) self.assertNotEqual(fi.fileno(), -1)
line =next( fi) fi.nextfile()
self.assertNotEqual(fi.fileno(), -1) self.assertEqual(fi.fileno(), -1)
fi.nextfile() line = list(fi)
self.assertEqual(fi.fileno(), -1) self.assertEqual(fi.fileno(), -1)
line = list(fi)
self.assertEqual(fi.fileno(), -1)
finally:
remove_tempfiles(t1, t2)
def test_opening_mode(self): def test_opening_mode(self):
try: try:
@ -263,17 +240,13 @@ class FileInputTests(unittest.TestCase):
self.fail("FileInput should reject invalid mode argument") self.fail("FileInput should reject invalid mode argument")
except ValueError: except ValueError:
pass pass
t1 = None # try opening in universal newline mode
try: t1 = self.writeTmp(b"A\nB\r\nC\rD", mode="wb")
# try opening in universal newline mode with check_warnings(('', DeprecationWarning)):
t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb") fi = FileInput(files=t1, mode="U")
with check_warnings(('', DeprecationWarning)): with check_warnings(('', DeprecationWarning)):
fi = FileInput(files=t1, mode="U") lines = list(fi)
with check_warnings(('', DeprecationWarning)): self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
finally:
remove_tempfiles(t1)
def test_stdin_binary_mode(self): def test_stdin_binary_mode(self):
with mock.patch('sys.stdin') as m_stdin: with mock.patch('sys.stdin') as m_stdin:
@ -314,8 +287,7 @@ class FileInputTests(unittest.TestCase):
self.invoked = True self.invoked = True
return open(*args) return open(*args)
t = writeTmp(1, ["\n"]) t = self.writeTmp("\n")
self.addCleanup(remove_tempfiles, t)
custom_open_hook = CustomOpenHook() custom_open_hook = CustomOpenHook()
with FileInput([t], openhook=custom_open_hook) as fi: with FileInput([t], openhook=custom_open_hook) as fi:
fi.readline() fi.readline()
@ -358,27 +330,22 @@ class FileInputTests(unittest.TestCase):
self.assertEqual(fi.readline(), b'') self.assertEqual(fi.readline(), b'')
def test_context_manager(self): def test_context_manager(self):
try: t1 = self.writeTmp("A\nB\nC")
t1 = writeTmp(1, ["A\nB\nC"]) t2 = self.writeTmp("D\nE\nF")
t2 = writeTmp(2, ["D\nE\nF"]) with FileInput(files=(t1, t2)) as fi:
with FileInput(files=(t1, t2)) as fi: lines = list(fi)
lines = list(fi) self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"]) self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.filelineno(), 3) self.assertEqual(fi.lineno(), 6)
self.assertEqual(fi.lineno(), 6) self.assertEqual(fi._files, ())
self.assertEqual(fi._files, ())
finally:
remove_tempfiles(t1, t2)
def test_close_on_exception(self): def test_close_on_exception(self):
t1 = self.writeTmp("")
try: try:
t1 = writeTmp(1, [""])
with FileInput(files=t1) as fi: with FileInput(files=t1) as fi:
raise OSError raise OSError
except OSError: except OSError:
self.assertEqual(fi._files, ()) self.assertEqual(fi._files, ())
finally:
remove_tempfiles(t1)
def test_empty_files_list_specified_to_constructor(self): def test_empty_files_list_specified_to_constructor(self):
with FileInput(files=[]) as fi: with FileInput(files=[]) as fi:
@ -387,8 +354,7 @@ class FileInputTests(unittest.TestCase):
def test__getitem__(self): def test__getitem__(self):
"""Tests invoking FileInput.__getitem__() with the current """Tests invoking FileInput.__getitem__() with the current
line number""" line number"""
t = writeTmp(1, ["line1\n", "line2\n"]) t = self.writeTmp("line1\nline2\n")
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t]) as fi: with FileInput(files=[t]) as fi:
retval1 = fi[0] retval1 = fi[0]
self.assertEqual(retval1, "line1\n") self.assertEqual(retval1, "line1\n")
@ -398,8 +364,7 @@ class FileInputTests(unittest.TestCase):
def test__getitem__invalid_key(self): def test__getitem__invalid_key(self):
"""Tests invoking FileInput.__getitem__() with an index unequal to """Tests invoking FileInput.__getitem__() with an index unequal to
the line number""" the line number"""
t = writeTmp(1, ["line1\n", "line2\n"]) t = self.writeTmp("line1\nline2\n")
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t]) as fi: with FileInput(files=[t]) as fi:
with self.assertRaises(RuntimeError) as cm: with self.assertRaises(RuntimeError) as cm:
fi[1] fi[1]
@ -408,8 +373,7 @@ class FileInputTests(unittest.TestCase):
def test__getitem__eof(self): def test__getitem__eof(self):
"""Tests invoking FileInput.__getitem__() with the line number but at """Tests invoking FileInput.__getitem__() with the line number but at
end-of-input""" end-of-input"""
t = writeTmp(1, []) t = self.writeTmp('')
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t]) as fi: with FileInput(files=[t]) as fi:
with self.assertRaises(IndexError) as cm: with self.assertRaises(IndexError) as cm:
fi[0] fi[0]
@ -423,8 +387,8 @@ class FileInputTests(unittest.TestCase):
os_unlink_orig = os.unlink os_unlink_orig = os.unlink
os_unlink_replacement = UnconditionallyRaise(OSError) os_unlink_replacement = UnconditionallyRaise(OSError)
try: try:
t = writeTmp(1, ["\n"]) t = self.writeTmp("\n")
self.addCleanup(remove_tempfiles, t) self.addCleanup(support.unlink, t + '.bak')
with FileInput(files=[t], inplace=True) as fi: with FileInput(files=[t], inplace=True) as fi:
next(fi) # make sure the file is opened next(fi) # make sure the file is opened
os.unlink = os_unlink_replacement os.unlink = os_unlink_replacement
@ -443,8 +407,7 @@ class FileInputTests(unittest.TestCase):
os_fstat_orig = os.fstat os_fstat_orig = os.fstat
os_fstat_replacement = UnconditionallyRaise(OSError) os_fstat_replacement = UnconditionallyRaise(OSError)
try: try:
t = writeTmp(1, ["\n"]) t = self.writeTmp("\n")
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi: with FileInput(files=[t], inplace=True) as fi:
os.fstat = os_fstat_replacement os.fstat = os_fstat_replacement
fi.readline() fi.readline()
@ -463,8 +426,7 @@ class FileInputTests(unittest.TestCase):
os_chmod_orig = os.chmod os_chmod_orig = os.chmod
os_chmod_replacement = UnconditionallyRaise(OSError) os_chmod_replacement = UnconditionallyRaise(OSError)
try: try:
t = writeTmp(1, ["\n"]) t = self.writeTmp("\n")
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi: with FileInput(files=[t], inplace=True) as fi:
os.chmod = os_chmod_replacement os.chmod = os_chmod_replacement
fi.readline() fi.readline()
@ -483,8 +445,7 @@ class FileInputTests(unittest.TestCase):
self.__call__() self.__call__()
unconditionally_raise_ValueError = FilenoRaisesValueError() unconditionally_raise_ValueError = FilenoRaisesValueError()
t = writeTmp(1, ["\n"]) t = self.writeTmp("\n")
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t]) as fi: with FileInput(files=[t]) as fi:
file_backup = fi._file file_backup = fi._file
try: try:
@ -532,30 +493,22 @@ class FileInputTests(unittest.TestCase):
self.assertEqual(src.linesread, []) self.assertEqual(src.linesread, [])
def test_pathlib_file(self): def test_pathlib_file(self):
t1 = None t1 = Path(self.writeTmp("Pathlib file."))
try: with FileInput(t1) as fi:
t1 = Path(writeTmp(1, ["Pathlib file."])) line = fi.readline()
with FileInput(t1) as fi: self.assertEqual(line, 'Pathlib file.')
line = fi.readline() self.assertEqual(fi.lineno(), 1)
self.assertEqual(line, 'Pathlib file.') self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.lineno(), 1) self.assertEqual(fi.filename(), os.fspath(t1))
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), os.fspath(t1))
finally:
remove_tempfiles(t1)
def test_pathlib_file_inplace(self): def test_pathlib_file_inplace(self):
t1 = None t1 = Path(self.writeTmp('Pathlib file.'))
try: with FileInput(t1, inplace=True) as fi:
t1 = Path(writeTmp(1, ['Pathlib file.'])) line = fi.readline()
with FileInput(t1, inplace=True) as fi: self.assertEqual(line, 'Pathlib file.')
line = fi.readline() print('Modified %s' % line)
self.assertEqual(line, 'Pathlib file.') with open(t1) as f:
print('Modified %s' % line) self.assertEqual(f.read(), 'Modified Pathlib file.\n')
with open(t1) as f:
self.assertEqual(f.read(), 'Modified Pathlib file.\n')
finally:
remove_tempfiles(t1)
class MockFileInput: class MockFileInput: