gh-74696: Do not change the current working directory in shutil.make_archive() if possible (GH-93160)

It is no longer changed when create a zip or tar archive.

It is still changed for custom archivers registered with shutil.register_archive_format()
if root_dir is not None.

Co-authored-by: Éric <merwok@netwok.org>
Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
Serhiy Storchaka 2022-06-22 11:47:25 +03:00 committed by GitHub
parent f805d37641
commit fda4b2f063
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 52 deletions

View file

@ -574,12 +574,18 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules.
.. note:: .. note::
This function is not thread-safe. This function is not thread-safe when custom archivers registered
with :func:`register_archive_format` are used. In this case it
temporarily changes the current working directory of the process
to perform archiving.
.. versionchanged:: 3.8 .. versionchanged:: 3.8
The modern pax (POSIX.1-2001) format is now used instead of The modern pax (POSIX.1-2001) format is now used instead of
the legacy GNU format for archives created with ``format="tar"``. the legacy GNU format for archives created with ``format="tar"``.
.. versionchanged:: 3.10.6
This function is now made thread-safe during creation of standard
``.zip`` and tar archives.
.. function:: get_archive_formats() .. function:: get_archive_formats()

View file

@ -897,7 +897,7 @@ def _get_uid(name):
return None return None
def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
owner=None, group=None, logger=None): owner=None, group=None, logger=None, root_dir=None):
"""Create a (possibly compressed) tar file from all the files under """Create a (possibly compressed) tar file from all the files under
'base_dir'. 'base_dir'.
@ -954,14 +954,20 @@ def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
if not dry_run: if not dry_run:
tar = tarfile.open(archive_name, 'w|%s' % tar_compression) tar = tarfile.open(archive_name, 'w|%s' % tar_compression)
arcname = base_dir
if root_dir is not None:
base_dir = os.path.join(root_dir, base_dir)
try: try:
tar.add(base_dir, filter=_set_uid_gid) tar.add(base_dir, arcname, filter=_set_uid_gid)
finally: finally:
tar.close() tar.close()
if root_dir is not None:
archive_name = os.path.abspath(archive_name)
return archive_name return archive_name
def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None): def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0,
logger=None, owner=None, group=None, root_dir=None):
"""Create a zip file from all the files under 'base_dir'. """Create a zip file from all the files under 'base_dir'.
The output zip file will be named 'base_name' + ".zip". Returns the The output zip file will be named 'base_name' + ".zip". Returns the
@ -985,42 +991,60 @@ def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
if not dry_run: if not dry_run:
with zipfile.ZipFile(zip_filename, "w", with zipfile.ZipFile(zip_filename, "w",
compression=zipfile.ZIP_DEFLATED) as zf: compression=zipfile.ZIP_DEFLATED) as zf:
path = os.path.normpath(base_dir) arcname = os.path.normpath(base_dir)
if path != os.curdir: if root_dir is not None:
zf.write(path, path) base_dir = os.path.join(root_dir, base_dir)
base_dir = os.path.normpath(base_dir)
if arcname != os.curdir:
zf.write(base_dir, arcname)
if logger is not None: if logger is not None:
logger.info("adding '%s'", path) logger.info("adding '%s'", base_dir)
for dirpath, dirnames, filenames in os.walk(base_dir): for dirpath, dirnames, filenames in os.walk(base_dir):
arcdirpath = dirpath
if root_dir is not None:
arcdirpath = os.path.relpath(arcdirpath, root_dir)
arcdirpath = os.path.normpath(arcdirpath)
for name in sorted(dirnames): for name in sorted(dirnames):
path = os.path.normpath(os.path.join(dirpath, name)) path = os.path.join(dirpath, name)
zf.write(path, path) arcname = os.path.join(arcdirpath, name)
zf.write(path, arcname)
if logger is not None: if logger is not None:
logger.info("adding '%s'", path) logger.info("adding '%s'", path)
for name in filenames: for name in filenames:
path = os.path.normpath(os.path.join(dirpath, name)) path = os.path.join(dirpath, name)
path = os.path.normpath(path)
if os.path.isfile(path): if os.path.isfile(path):
zf.write(path, path) arcname = os.path.join(arcdirpath, name)
zf.write(path, arcname)
if logger is not None: if logger is not None:
logger.info("adding '%s'", path) logger.info("adding '%s'", path)
if root_dir is not None:
zip_filename = os.path.abspath(zip_filename)
return zip_filename return zip_filename
# Maps the name of the archive format to a tuple containing:
# * the archiving function
# * extra keyword arguments
# * description
# * does it support the root_dir argument?
_ARCHIVE_FORMATS = { _ARCHIVE_FORMATS = {
'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"), 'tar': (_make_tarball, [('compress', None)],
"uncompressed tar file", True),
} }
if _ZLIB_SUPPORTED: if _ZLIB_SUPPORTED:
_ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')],
"gzip'ed tar-file") "gzip'ed tar-file", True)
_ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file") _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True)
if _BZ2_SUPPORTED: if _BZ2_SUPPORTED:
_ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
"bzip2'ed tar-file") "bzip2'ed tar-file", True)
if _LZMA_SUPPORTED: if _LZMA_SUPPORTED:
_ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')],
"xz'ed tar-file") "xz'ed tar-file", True)
def get_archive_formats(): def get_archive_formats():
"""Returns a list of supported formats for archiving and unarchiving. """Returns a list of supported formats for archiving and unarchiving.
@ -1051,7 +1075,7 @@ def register_archive_format(name, function, extra_args=None, description=''):
if not isinstance(element, (tuple, list)) or len(element) !=2: if not isinstance(element, (tuple, list)) or len(element) !=2:
raise TypeError('extra_args elements are : (arg_name, value)') raise TypeError('extra_args elements are : (arg_name, value)')
_ARCHIVE_FORMATS[name] = (function, extra_args, description) _ARCHIVE_FORMATS[name] = (function, extra_args, description, False)
def unregister_archive_format(name): def unregister_archive_format(name):
del _ARCHIVE_FORMATS[name] del _ARCHIVE_FORMATS[name]
@ -1075,36 +1099,38 @@ def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
uses the current owner and group. uses the current owner and group.
""" """
sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir)
save_cwd = os.getcwd()
if root_dir is not None:
if logger is not None:
logger.debug("changing into '%s'", root_dir)
base_name = os.path.abspath(base_name)
if not dry_run:
os.chdir(root_dir)
if base_dir is None:
base_dir = os.curdir
kwargs = {'dry_run': dry_run, 'logger': logger}
try: try:
format_info = _ARCHIVE_FORMATS[format] format_info = _ARCHIVE_FORMATS[format]
except KeyError: except KeyError:
raise ValueError("unknown archive format '%s'" % format) from None raise ValueError("unknown archive format '%s'" % format) from None
kwargs = {'dry_run': dry_run, 'logger': logger,
'owner': owner, 'group': group}
func = format_info[0] func = format_info[0]
for arg, val in format_info[1]: for arg, val in format_info[1]:
kwargs[arg] = val kwargs[arg] = val
if format != 'zip': if base_dir is None:
kwargs['owner'] = owner base_dir = os.curdir
kwargs['group'] = group
support_root_dir = format_info[3]
save_cwd = None
if root_dir is not None:
if support_root_dir:
kwargs['root_dir'] = root_dir
else:
save_cwd = os.getcwd()
if logger is not None:
logger.debug("changing into '%s'", root_dir)
base_name = os.path.abspath(base_name)
if not dry_run:
os.chdir(root_dir)
try: try:
filename = func(base_name, base_dir, **kwargs) filename = func(base_name, base_dir, **kwargs)
finally: finally:
if root_dir is not None: if save_cwd is not None:
if logger is not None: if logger is not None:
logger.debug("changing back to '%s'", save_cwd) logger.debug("changing back to '%s'", save_cwd)
os.chdir(save_cwd) os.chdir(save_cwd)
@ -1217,6 +1243,11 @@ def _unpack_tarfile(filename, extract_dir):
finally: finally:
tarobj.close() tarobj.close()
# Maps the name of the unpack format to a tuple containing:
# * extensions
# * the unpacking function
# * extra keyword arguments
# * description
_UNPACK_FORMATS = { _UNPACK_FORMATS = {
'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"),

View file

@ -51,6 +51,9 @@ try:
except ImportError: except ImportError:
_winapi = None _winapi = None
no_chdir = unittest.mock.patch('os.chdir',
side_effect=AssertionError("shouldn't call os.chdir()"))
def _fake_rename(*args, **kwargs): def _fake_rename(*args, **kwargs):
# Pretend the destination path is on a different filesystem. # Pretend the destination path is on a different filesystem.
raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link")
@ -1342,7 +1345,7 @@ class TestArchives(BaseTest, unittest.TestCase):
work_dir = os.path.dirname(tmpdir2) work_dir = os.path.dirname(tmpdir2)
rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
with os_helper.change_cwd(work_dir): with os_helper.change_cwd(work_dir), no_chdir:
base_name = os.path.abspath(rel_base_name) base_name = os.path.abspath(rel_base_name)
tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
@ -1356,7 +1359,7 @@ class TestArchives(BaseTest, unittest.TestCase):
'./file1', './file2', './sub/file3']) './file1', './file2', './sub/file3'])
# trying an uncompressed one # trying an uncompressed one
with os_helper.change_cwd(work_dir): with os_helper.change_cwd(work_dir), no_chdir:
tarball = make_archive(rel_base_name, 'tar', root_dir, '.') tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
self.assertEqual(tarball, base_name + '.tar') self.assertEqual(tarball, base_name + '.tar')
self.assertTrue(os.path.isfile(tarball)) self.assertTrue(os.path.isfile(tarball))
@ -1392,7 +1395,8 @@ class TestArchives(BaseTest, unittest.TestCase):
def test_tarfile_vs_tar(self): def test_tarfile_vs_tar(self):
root_dir, base_dir = self._create_files() root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive') base_name = os.path.join(self.mkdtemp(), 'archive')
tarball = make_archive(base_name, 'gztar', root_dir, base_dir) with no_chdir:
tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
# check if the compressed tarball was created # check if the compressed tarball was created
self.assertEqual(tarball, base_name + '.tar.gz') self.assertEqual(tarball, base_name + '.tar.gz')
@ -1409,13 +1413,15 @@ class TestArchives(BaseTest, unittest.TestCase):
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
# trying an uncompressed one # trying an uncompressed one
tarball = make_archive(base_name, 'tar', root_dir, base_dir) with no_chdir:
tarball = make_archive(base_name, 'tar', root_dir, base_dir)
self.assertEqual(tarball, base_name + '.tar') self.assertEqual(tarball, base_name + '.tar')
self.assertTrue(os.path.isfile(tarball)) self.assertTrue(os.path.isfile(tarball))
# now for a dry_run # now for a dry_run
tarball = make_archive(base_name, 'tar', root_dir, base_dir, with no_chdir:
dry_run=True) tarball = make_archive(base_name, 'tar', root_dir, base_dir,
dry_run=True)
self.assertEqual(tarball, base_name + '.tar') self.assertEqual(tarball, base_name + '.tar')
self.assertTrue(os.path.isfile(tarball)) self.assertTrue(os.path.isfile(tarball))
@ -1431,7 +1437,7 @@ class TestArchives(BaseTest, unittest.TestCase):
work_dir = os.path.dirname(tmpdir2) work_dir = os.path.dirname(tmpdir2)
rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
with os_helper.change_cwd(work_dir): with os_helper.change_cwd(work_dir), no_chdir:
base_name = os.path.abspath(rel_base_name) base_name = os.path.abspath(rel_base_name)
res = make_archive(rel_base_name, 'zip', root_dir) res = make_archive(rel_base_name, 'zip', root_dir)
@ -1444,7 +1450,7 @@ class TestArchives(BaseTest, unittest.TestCase):
'dist/file1', 'dist/file2', 'dist/sub/file3', 'dist/file1', 'dist/file2', 'dist/sub/file3',
'outer']) 'outer'])
with os_helper.change_cwd(work_dir): with os_helper.change_cwd(work_dir), no_chdir:
base_name = os.path.abspath(rel_base_name) base_name = os.path.abspath(rel_base_name)
res = make_archive(rel_base_name, 'zip', root_dir, base_dir) res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
@ -1462,7 +1468,8 @@ class TestArchives(BaseTest, unittest.TestCase):
def test_zipfile_vs_zip(self): def test_zipfile_vs_zip(self):
root_dir, base_dir = self._create_files() root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive') base_name = os.path.join(self.mkdtemp(), 'archive')
archive = make_archive(base_name, 'zip', root_dir, base_dir) with no_chdir:
archive = make_archive(base_name, 'zip', root_dir, base_dir)
# check if ZIP file was created # check if ZIP file was created
self.assertEqual(archive, base_name + '.zip') self.assertEqual(archive, base_name + '.zip')
@ -1488,7 +1495,8 @@ class TestArchives(BaseTest, unittest.TestCase):
def test_unzip_zipfile(self): def test_unzip_zipfile(self):
root_dir, base_dir = self._create_files() root_dir, base_dir = self._create_files()
base_name = os.path.join(self.mkdtemp(), 'archive') base_name = os.path.join(self.mkdtemp(), 'archive')
archive = make_archive(base_name, 'zip', root_dir, base_dir) with no_chdir:
archive = make_archive(base_name, 'zip', root_dir, base_dir)
# check if ZIP file was created # check if ZIP file was created
self.assertEqual(archive, base_name + '.zip') self.assertEqual(archive, base_name + '.zip')
@ -1546,7 +1554,7 @@ class TestArchives(BaseTest, unittest.TestCase):
base_name = os.path.join(self.mkdtemp(), 'archive') base_name = os.path.join(self.mkdtemp(), 'archive')
group = grp.getgrgid(0)[0] group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0] owner = pwd.getpwuid(0)[0]
with os_helper.change_cwd(root_dir): with os_helper.change_cwd(root_dir), no_chdir:
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
owner=owner, group=group) owner=owner, group=group)
@ -1564,23 +1572,30 @@ class TestArchives(BaseTest, unittest.TestCase):
def test_make_archive_cwd(self): def test_make_archive_cwd(self):
current_dir = os.getcwd() current_dir = os.getcwd()
root_dir = self.mkdtemp()
def _breaks(*args, **kw): def _breaks(*args, **kw):
raise RuntimeError() raise RuntimeError()
dirs = []
def _chdir(path):
dirs.append(path)
orig_chdir(path)
register_archive_format('xxx', _breaks, [], 'xxx file') register_archive_format('xxx', _breaks, [], 'xxx file')
try: try:
try: with support.swap_attr(os, 'chdir', _chdir) as orig_chdir:
make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) try:
except Exception: make_archive('xxx', 'xxx', root_dir=root_dir)
pass except Exception:
pass
self.assertEqual(os.getcwd(), current_dir) self.assertEqual(os.getcwd(), current_dir)
self.assertEqual(dirs, [root_dir, current_dir])
finally: finally:
unregister_archive_format('xxx') unregister_archive_format('xxx')
def test_make_tarfile_in_curdir(self): def test_make_tarfile_in_curdir(self):
# Issue #21280 # Issue #21280
root_dir = self.mkdtemp() root_dir = self.mkdtemp()
with os_helper.change_cwd(root_dir): with os_helper.change_cwd(root_dir), no_chdir:
self.assertEqual(make_archive('test', 'tar'), 'test.tar') self.assertEqual(make_archive('test', 'tar'), 'test.tar')
self.assertTrue(os.path.isfile('test.tar')) self.assertTrue(os.path.isfile('test.tar'))
@ -1588,7 +1603,7 @@ class TestArchives(BaseTest, unittest.TestCase):
def test_make_zipfile_in_curdir(self): def test_make_zipfile_in_curdir(self):
# Issue #21280 # Issue #21280
root_dir = self.mkdtemp() root_dir = self.mkdtemp()
with os_helper.change_cwd(root_dir): with os_helper.change_cwd(root_dir), no_chdir:
self.assertEqual(make_archive('test', 'zip'), 'test.zip') self.assertEqual(make_archive('test', 'zip'), 'test.zip')
self.assertTrue(os.path.isfile('test.zip')) self.assertTrue(os.path.isfile('test.zip'))

View file

@ -0,0 +1,2 @@
:func:`shutil.make_archive` no longer temporarily changes the current
working directory during creation of standard ``.zip`` or tar archives.