gh-131492, gh-131461: handle exceptions in GzipFile constructor while owning resources (#131462)

Co-authored-by: Victor Stinner <vstinner@python.org>
This commit is contained in:
Thomas Grainger 2025-03-20 17:06:21 +00:00 committed by GitHub
parent f53e7de6a8
commit ce79274e9f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 65 additions and 49 deletions

View file

@ -202,51 +202,58 @@ class GzipFile(_compression.BaseStream):
raise ValueError("Invalid mode: {!r}".format(mode))
if mode and 'b' not in mode:
mode += 'b'
if fileobj is None:
fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
if filename is None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is None:
mode = getattr(fileobj, 'mode', 'rb')
try:
if fileobj is None:
fileobj = self.myfileobj = builtins.open(filename, mode or 'rb')
if filename is None:
filename = getattr(fileobj, 'name', '')
if not isinstance(filename, (str, bytes)):
filename = ''
else:
filename = os.fspath(filename)
origmode = mode
if mode is None:
mode = getattr(fileobj, 'mode', 'rb')
if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename
if mode.startswith('r'):
self.mode = READ
raw = _GzipReader(fileobj)
self._buffer = io.BufferedReader(raw)
self.name = filename
elif mode.startswith(('w', 'a', 'x')):
if origmode is None:
import warnings
warnings.warn(
"GzipFile was opened for writing, but this will "
"change in future Python releases. "
"Specify the mode argument for opening it for writing.",
FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self._write_mtime = mtime
self._buffer_size = _WRITE_BUFFER_SIZE
self._buffer = io.BufferedWriter(_WriteBufferStream(self),
buffer_size=self._buffer_size)
else:
raise ValueError("Invalid mode: {!r}".format(mode))
elif mode.startswith(('w', 'a', 'x')):
if origmode is None:
import warnings
warnings.warn(
"GzipFile was opened for writing, but this will "
"change in future Python releases. "
"Specify the mode argument for opening it for writing.",
FutureWarning, 2)
self.mode = WRITE
self._init_write(filename)
self.compress = zlib.compressobj(compresslevel,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0)
self._write_mtime = mtime
self._buffer_size = _WRITE_BUFFER_SIZE
self._buffer = io.BufferedWriter(_WriteBufferStream(self),
buffer_size=self._buffer_size)
else:
raise ValueError("Invalid mode: {!r}".format(mode))
self.fileobj = fileobj
self.fileobj = fileobj
if self.mode == WRITE:
self._write_gzip_header(compresslevel)
if self.mode == WRITE:
self._write_gzip_header(compresslevel)
except:
# Avoid a ResourceWarning if the write fails,
# eg read-only file or KeyboardInterrupt
self._close()
raise
@property
def mtime(self):
@ -387,11 +394,14 @@ class GzipFile(_compression.BaseStream):
elif self.mode == READ:
self._buffer.close()
finally:
self.fileobj = None
myfileobj = self.myfileobj
if myfileobj:
self.myfileobj = None
myfileobj.close()
self._close()
def _close(self):
self.fileobj = None
myfileobj = self.myfileobj
if myfileobj is not None:
self.myfileobj = None
myfileobj.close()
def flush(self,zlib_mode=zlib.Z_SYNC_FLUSH):
self._check_not_closed()

View file

@ -19,6 +19,7 @@ from test import archiver_tests
from test import support
from test.support import os_helper
from test.support import script_helper
from test.support import warnings_helper
# Check for our compression modules.
try:
@ -1638,10 +1639,13 @@ class WriteTest(WriteTestBase, unittest.TestCase):
raise exctype
f = BadFile()
with self.assertRaises(exctype):
tar = tarfile.open(tmpname, self.mode, fileobj=f,
format=tarfile.PAX_FORMAT,
pax_headers={'non': 'empty'})
with (
warnings_helper.check_no_resource_warning(self),
self.assertRaises(exctype),
):
tarfile.open(tmpname, self.mode, fileobj=f,
format=tarfile.PAX_FORMAT,
pax_headers={'non': 'empty'})
self.assertFalse(f.closed)
def test_missing_fileobj(self):

View file

@ -0,0 +1 @@
Fix :exc:`ResourceWarning` when constructing a :class:`gzip.GzipFile` in write mode with a broken file object.

View file

@ -0,0 +1 @@
Fix a resource leak when constructing a :class:`gzip.GzipFile` with a filename fails, for example when passing an invalid ``compresslevel``.