mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
bpo-43651: Fix EncodingWarning in fileinput and its test (GH-25648)
This commit is contained in:
parent
caae717c29
commit
878bc8b6c2
2 changed files with 39 additions and 35 deletions
|
@ -210,7 +210,8 @@ class FileInput:
|
||||||
|
|
||||||
# We can not use io.text_encoding() here because old openhook doesn't
|
# We can not use io.text_encoding() here because old openhook doesn't
|
||||||
# take encoding parameter.
|
# take encoding parameter.
|
||||||
if "b" not in mode and encoding is None and sys.flags.warn_default_encoding:
|
if (sys.flags.warn_default_encoding and
|
||||||
|
"b" not in mode and encoding is None and openhook is None):
|
||||||
import warnings
|
import warnings
|
||||||
warnings.warn("'encoding' argument not specified.",
|
warnings.warn("'encoding' argument not specified.",
|
||||||
EncodingWarning, 2)
|
EncodingWarning, 2)
|
||||||
|
@ -330,6 +331,13 @@ class FileInput:
|
||||||
self._file = None
|
self._file = None
|
||||||
self._isstdin = False
|
self._isstdin = False
|
||||||
self._backupfilename = 0
|
self._backupfilename = 0
|
||||||
|
|
||||||
|
# EncodingWarning is emitted in __init__() already
|
||||||
|
if "b" not in self._mode:
|
||||||
|
encoding = self._encoding or "locale"
|
||||||
|
else:
|
||||||
|
encoding = None
|
||||||
|
|
||||||
if self._filename == '-':
|
if self._filename == '-':
|
||||||
self._filename = '<stdin>'
|
self._filename = '<stdin>'
|
||||||
if 'b' in self._mode:
|
if 'b' in self._mode:
|
||||||
|
@ -347,18 +355,18 @@ class FileInput:
|
||||||
pass
|
pass
|
||||||
# The next few lines may raise OSError
|
# The next few lines may raise OSError
|
||||||
os.rename(self._filename, self._backupfilename)
|
os.rename(self._filename, self._backupfilename)
|
||||||
self._file = open(self._backupfilename, self._mode)
|
self._file = open(self._backupfilename, self._mode, encoding=encoding)
|
||||||
try:
|
try:
|
||||||
perm = os.fstat(self._file.fileno()).st_mode
|
perm = os.fstat(self._file.fileno()).st_mode
|
||||||
except OSError:
|
except OSError:
|
||||||
self._output = open(self._filename, self._write_mode)
|
self._output = open(self._filename, self._write_mode, encoding=encoding)
|
||||||
else:
|
else:
|
||||||
mode = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
mode = os.O_CREAT | os.O_WRONLY | os.O_TRUNC
|
||||||
if hasattr(os, 'O_BINARY'):
|
if hasattr(os, 'O_BINARY'):
|
||||||
mode |= os.O_BINARY
|
mode |= os.O_BINARY
|
||||||
|
|
||||||
fd = os.open(self._filename, mode, perm)
|
fd = os.open(self._filename, mode, perm)
|
||||||
self._output = os.fdopen(fd, self._write_mode)
|
self._output = os.fdopen(fd, self._write_mode, encoding=encoding)
|
||||||
try:
|
try:
|
||||||
os.chmod(self._filename, perm)
|
os.chmod(self._filename, perm)
|
||||||
except OSError:
|
except OSError:
|
||||||
|
@ -376,11 +384,6 @@ class FileInput:
|
||||||
self._file = self._openhook(
|
self._file = self._openhook(
|
||||||
self._filename, self._mode, encoding=self._encoding, errors=self._errors)
|
self._filename, self._mode, encoding=self._encoding, errors=self._errors)
|
||||||
else:
|
else:
|
||||||
# EncodingWarning is emitted in __init__() already
|
|
||||||
if "b" not in self._mode:
|
|
||||||
encoding = self._encoding or "locale"
|
|
||||||
else:
|
|
||||||
encoding = None
|
|
||||||
self._file = open(self._filename, self._mode, encoding=encoding, errors=self._errors)
|
self._file = open(self._filename, self._mode, encoding=encoding, errors=self._errors)
|
||||||
self._readline = self._file.readline # hide FileInput._readline
|
self._readline = self._file.readline # hide FileInput._readline
|
||||||
return self._readline()
|
return self._readline()
|
||||||
|
|
|
@ -44,7 +44,8 @@ class BaseTests:
|
||||||
def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
|
def writeTmp(self, content, *, mode='w'): # opening in text mode is the default
|
||||||
fd, name = tempfile.mkstemp()
|
fd, name = tempfile.mkstemp()
|
||||||
self.addCleanup(os_helper.unlink, name)
|
self.addCleanup(os_helper.unlink, name)
|
||||||
with open(fd, mode) as f:
|
encoding = None if "b" in mode else "utf-8"
|
||||||
|
with open(fd, mode, encoding=encoding) as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
return name
|
return name
|
||||||
|
|
||||||
|
@ -96,7 +97,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
print('1. Simple iteration')
|
print('1. Simple iteration')
|
||||||
fi = FileInput(files=(t1, t2, t3, t4))
|
fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
|
||||||
lines = list(fi)
|
lines = list(fi)
|
||||||
fi.close()
|
fi.close()
|
||||||
self.assertEqual(len(lines), 31)
|
self.assertEqual(len(lines), 31)
|
||||||
|
@ -107,7 +108,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
print('2. Status variables')
|
print('2. Status variables')
|
||||||
fi = FileInput(files=(t1, t2, t3, t4))
|
fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
|
||||||
s = "x"
|
s = "x"
|
||||||
while s and s != 'Line 6 of file 2\n':
|
while s and s != 'Line 6 of file 2\n':
|
||||||
s = fi.readline()
|
s = fi.readline()
|
||||||
|
@ -126,7 +127,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
print('4. Stdin')
|
print('4. Stdin')
|
||||||
fi = FileInput(files=(t1, t2, t3, t4, '-'))
|
fi = FileInput(files=(t1, t2, t3, t4, '-'), encoding="utf-8")
|
||||||
savestdin = sys.stdin
|
savestdin = sys.stdin
|
||||||
try:
|
try:
|
||||||
sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
|
sys.stdin = StringIO("Line 1 of stdin\nLine 2 of stdin\n")
|
||||||
|
@ -140,7 +141,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
if verbose:
|
if verbose:
|
||||||
print('5. Boundary conditions')
|
print('5. Boundary conditions')
|
||||||
fi = FileInput(files=(t1, t2, t3, t4))
|
fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
|
||||||
self.assertEqual(fi.lineno(), 0)
|
self.assertEqual(fi.lineno(), 0)
|
||||||
self.assertEqual(fi.filename(), None)
|
self.assertEqual(fi.filename(), None)
|
||||||
fi.nextfile()
|
fi.nextfile()
|
||||||
|
@ -151,7 +152,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
print('6. Inplace')
|
print('6. Inplace')
|
||||||
savestdout = sys.stdout
|
savestdout = sys.stdout
|
||||||
try:
|
try:
|
||||||
fi = FileInput(files=(t1, t2, t3, t4), inplace=1)
|
fi = FileInput(files=(t1, t2, t3, t4), inplace=1, encoding="utf-8")
|
||||||
for line in fi:
|
for line in fi:
|
||||||
line = line[:-1].upper()
|
line = line[:-1].upper()
|
||||||
print(line)
|
print(line)
|
||||||
|
@ -159,7 +160,7 @@ class BufferSizesTests(BaseTests, unittest.TestCase):
|
||||||
finally:
|
finally:
|
||||||
sys.stdout = savestdout
|
sys.stdout = savestdout
|
||||||
|
|
||||||
fi = FileInput(files=(t1, t2, t3, t4))
|
fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
|
||||||
for line in fi:
|
for line in fi:
|
||||||
self.assertEqual(line[-1], '\n')
|
self.assertEqual(line[-1], '\n')
|
||||||
m = pat.match(line[:-1])
|
m = pat.match(line[:-1])
|
||||||
|
@ -182,7 +183,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
t2 = self.writeTmp("")
|
t2 = self.writeTmp("")
|
||||||
t3 = self.writeTmp("The only line there is.\n")
|
t3 = self.writeTmp("The only line there is.\n")
|
||||||
t4 = self.writeTmp("")
|
t4 = self.writeTmp("")
|
||||||
fi = FileInput(files=(t1, t2, t3, t4))
|
fi = FileInput(files=(t1, t2, t3, t4), encoding="utf-8")
|
||||||
|
|
||||||
line = fi.readline()
|
line = fi.readline()
|
||||||
self.assertEqual(line, 'The only line there is.\n')
|
self.assertEqual(line, 'The only line there is.\n')
|
||||||
|
@ -200,7 +201,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
def test_files_that_dont_end_with_newline(self):
|
def test_files_that_dont_end_with_newline(self):
|
||||||
t1 = self.writeTmp("A\nB\nC")
|
t1 = self.writeTmp("A\nB\nC")
|
||||||
t2 = self.writeTmp("D\nE\nF")
|
t2 = self.writeTmp("D\nE\nF")
|
||||||
fi = FileInput(files=(t1, t2))
|
fi = FileInput(files=(t1, t2), encoding="utf-8")
|
||||||
lines = list(fi)
|
lines = list(fi)
|
||||||
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
|
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
|
||||||
self.assertEqual(fi.filelineno(), 3)
|
self.assertEqual(fi.filelineno(), 3)
|
||||||
|
@ -213,14 +214,14 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
## encoding = sys.getfilesystemencoding()
|
## encoding = sys.getfilesystemencoding()
|
||||||
## if encoding is None:
|
## if encoding is None:
|
||||||
## encoding = 'ascii'
|
## encoding = 'ascii'
|
||||||
## fi = FileInput(files=str(t1, encoding))
|
## fi = FileInput(files=str(t1, encoding), encoding="utf-8")
|
||||||
## lines = list(fi)
|
## lines = list(fi)
|
||||||
## self.assertEqual(lines, ["A\n", "B"])
|
## self.assertEqual(lines, ["A\n", "B"])
|
||||||
|
|
||||||
def test_fileno(self):
|
def test_fileno(self):
|
||||||
t1 = self.writeTmp("A\nB")
|
t1 = self.writeTmp("A\nB")
|
||||||
t2 = self.writeTmp("C\nD")
|
t2 = self.writeTmp("C\nD")
|
||||||
fi = FileInput(files=(t1, t2))
|
fi = FileInput(files=(t1, t2), encoding="utf-8")
|
||||||
self.assertEqual(fi.fileno(), -1)
|
self.assertEqual(fi.fileno(), -1)
|
||||||
line = next(fi)
|
line = next(fi)
|
||||||
self.assertNotEqual(fi.fileno(), -1)
|
self.assertNotEqual(fi.fileno(), -1)
|
||||||
|
@ -232,7 +233,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
def test_opening_mode(self):
|
def test_opening_mode(self):
|
||||||
try:
|
try:
|
||||||
# invalid mode, should raise ValueError
|
# invalid mode, should raise ValueError
|
||||||
fi = FileInput(mode="w")
|
fi = FileInput(mode="w", encoding="utf-8")
|
||||||
self.fail("FileInput should reject invalid mode argument")
|
self.fail("FileInput should reject invalid mode argument")
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
@ -281,7 +282,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
self.invoked = False
|
self.invoked = False
|
||||||
def __call__(self, *args, **kargs):
|
def __call__(self, *args, **kargs):
|
||||||
self.invoked = True
|
self.invoked = True
|
||||||
return open(*args)
|
return open(*args, encoding="utf-8")
|
||||||
|
|
||||||
t = self.writeTmp("\n")
|
t = self.writeTmp("\n")
|
||||||
custom_open_hook = CustomOpenHook()
|
custom_open_hook = CustomOpenHook()
|
||||||
|
@ -346,7 +347,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
def test_context_manager(self):
|
def test_context_manager(self):
|
||||||
t1 = self.writeTmp("A\nB\nC")
|
t1 = self.writeTmp("A\nB\nC")
|
||||||
t2 = self.writeTmp("D\nE\nF")
|
t2 = self.writeTmp("D\nE\nF")
|
||||||
with FileInput(files=(t1, t2)) as fi:
|
with FileInput(files=(t1, t2), encoding="utf-8") as fi:
|
||||||
lines = list(fi)
|
lines = list(fi)
|
||||||
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
|
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
|
||||||
self.assertEqual(fi.filelineno(), 3)
|
self.assertEqual(fi.filelineno(), 3)
|
||||||
|
@ -356,13 +357,13 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
def test_close_on_exception(self):
|
def test_close_on_exception(self):
|
||||||
t1 = self.writeTmp("")
|
t1 = self.writeTmp("")
|
||||||
try:
|
try:
|
||||||
with FileInput(files=t1) as fi:
|
with FileInput(files=t1, encoding="utf-8") as fi:
|
||||||
raise OSError
|
raise OSError
|
||||||
except OSError:
|
except OSError:
|
||||||
self.assertEqual(fi._files, ())
|
self.assertEqual(fi._files, ())
|
||||||
|
|
||||||
def test_empty_files_list_specified_to_constructor(self):
|
def test_empty_files_list_specified_to_constructor(self):
|
||||||
with FileInput(files=[]) as fi:
|
with FileInput(files=[], encoding="utf-8") as fi:
|
||||||
self.assertEqual(fi._files, ('-',))
|
self.assertEqual(fi._files, ('-',))
|
||||||
|
|
||||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||||
|
@ -370,7 +371,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
"""Tests invoking FileInput.__getitem__() with the current
|
"""Tests invoking FileInput.__getitem__() with the current
|
||||||
line number"""
|
line number"""
|
||||||
t = self.writeTmp("line1\nline2\n")
|
t = self.writeTmp("line1\nline2\n")
|
||||||
with FileInput(files=[t]) as fi:
|
with FileInput(files=[t], encoding="utf-8") as fi:
|
||||||
retval1 = fi[0]
|
retval1 = fi[0]
|
||||||
self.assertEqual(retval1, "line1\n")
|
self.assertEqual(retval1, "line1\n")
|
||||||
retval2 = fi[1]
|
retval2 = fi[1]
|
||||||
|
@ -388,7 +389,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
"""Tests invoking FileInput.__getitem__() with an index unequal to
|
"""Tests invoking FileInput.__getitem__() with an index unequal to
|
||||||
the line number"""
|
the line number"""
|
||||||
t = self.writeTmp("line1\nline2\n")
|
t = self.writeTmp("line1\nline2\n")
|
||||||
with FileInput(files=[t]) as fi:
|
with FileInput(files=[t], encoding="utf-8") as fi:
|
||||||
with self.assertRaises(RuntimeError) as cm:
|
with self.assertRaises(RuntimeError) as cm:
|
||||||
fi[1]
|
fi[1]
|
||||||
self.assertEqual(cm.exception.args, ("accessing lines out of order",))
|
self.assertEqual(cm.exception.args, ("accessing lines out of order",))
|
||||||
|
@ -398,7 +399,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
"""Tests invoking FileInput.__getitem__() with the line number but at
|
"""Tests invoking FileInput.__getitem__() with the line number but at
|
||||||
end-of-input"""
|
end-of-input"""
|
||||||
t = self.writeTmp('')
|
t = self.writeTmp('')
|
||||||
with FileInput(files=[t]) as fi:
|
with FileInput(files=[t], encoding="utf-8") as fi:
|
||||||
with self.assertRaises(IndexError) as cm:
|
with self.assertRaises(IndexError) as cm:
|
||||||
fi[0]
|
fi[0]
|
||||||
self.assertEqual(cm.exception.args, ("end of input reached",))
|
self.assertEqual(cm.exception.args, ("end of input reached",))
|
||||||
|
@ -413,7 +414,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
try:
|
try:
|
||||||
t = self.writeTmp("\n")
|
t = self.writeTmp("\n")
|
||||||
self.addCleanup(safe_unlink, t + '.bak')
|
self.addCleanup(safe_unlink, t + '.bak')
|
||||||
with FileInput(files=[t], inplace=True) as fi:
|
with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
|
||||||
next(fi) # make sure the file is opened
|
next(fi) # make sure the file is opened
|
||||||
os.unlink = os_unlink_replacement
|
os.unlink = os_unlink_replacement
|
||||||
fi.nextfile()
|
fi.nextfile()
|
||||||
|
@ -432,7 +433,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
os_fstat_replacement = UnconditionallyRaise(OSError)
|
os_fstat_replacement = UnconditionallyRaise(OSError)
|
||||||
try:
|
try:
|
||||||
t = self.writeTmp("\n")
|
t = self.writeTmp("\n")
|
||||||
with FileInput(files=[t], inplace=True) as fi:
|
with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
|
||||||
os.fstat = os_fstat_replacement
|
os.fstat = os_fstat_replacement
|
||||||
fi.readline()
|
fi.readline()
|
||||||
finally:
|
finally:
|
||||||
|
@ -450,7 +451,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
os_chmod_replacement = UnconditionallyRaise(OSError)
|
os_chmod_replacement = UnconditionallyRaise(OSError)
|
||||||
try:
|
try:
|
||||||
t = self.writeTmp("\n")
|
t = self.writeTmp("\n")
|
||||||
with FileInput(files=[t], inplace=True) as fi:
|
with FileInput(files=[t], inplace=True, encoding="utf-8") as fi:
|
||||||
os.chmod = os_chmod_replacement
|
os.chmod = os_chmod_replacement
|
||||||
fi.readline()
|
fi.readline()
|
||||||
finally:
|
finally:
|
||||||
|
@ -469,7 +470,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
unconditionally_raise_ValueError = FilenoRaisesValueError()
|
unconditionally_raise_ValueError = FilenoRaisesValueError()
|
||||||
t = self.writeTmp("\n")
|
t = self.writeTmp("\n")
|
||||||
with FileInput(files=[t]) as fi:
|
with FileInput(files=[t], encoding="utf-8") as fi:
|
||||||
file_backup = fi._file
|
file_backup = fi._file
|
||||||
try:
|
try:
|
||||||
fi._file = unconditionally_raise_ValueError
|
fi._file = unconditionally_raise_ValueError
|
||||||
|
@ -517,7 +518,7 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
def test_pathlib_file(self):
|
def test_pathlib_file(self):
|
||||||
t1 = Path(self.writeTmp("Pathlib file."))
|
t1 = Path(self.writeTmp("Pathlib file."))
|
||||||
with FileInput(t1) as fi:
|
with FileInput(t1, encoding="utf-8") as fi:
|
||||||
line = fi.readline()
|
line = fi.readline()
|
||||||
self.assertEqual(line, 'Pathlib file.')
|
self.assertEqual(line, 'Pathlib file.')
|
||||||
self.assertEqual(fi.lineno(), 1)
|
self.assertEqual(fi.lineno(), 1)
|
||||||
|
@ -526,11 +527,11 @@ class FileInputTests(BaseTests, unittest.TestCase):
|
||||||
|
|
||||||
def test_pathlib_file_inplace(self):
|
def test_pathlib_file_inplace(self):
|
||||||
t1 = Path(self.writeTmp('Pathlib file.'))
|
t1 = Path(self.writeTmp('Pathlib file.'))
|
||||||
with FileInput(t1, inplace=True) as fi:
|
with FileInput(t1, inplace=True, encoding="utf-8") as fi:
|
||||||
line = fi.readline()
|
line = fi.readline()
|
||||||
self.assertEqual(line, 'Pathlib file.')
|
self.assertEqual(line, 'Pathlib file.')
|
||||||
print('Modified %s' % line)
|
print('Modified %s' % line)
|
||||||
with open(t1) as f:
|
with open(t1, encoding="utf-8") as f:
|
||||||
self.assertEqual(f.read(), 'Modified Pathlib file.\n')
|
self.assertEqual(f.read(), 'Modified Pathlib file.\n')
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue