mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
Fix, refactor and extend tests for shutil.make_archive().
This commit is contained in:
parent
2a23adf440
commit
527ef0792f
1 changed files with 82 additions and 88 deletions
|
@ -14,7 +14,7 @@ import subprocess
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
from os.path import splitdrive
|
from os.path import splitdrive
|
||||||
from distutils.spawn import find_executable, spawn
|
from distutils.spawn import find_executable, spawn
|
||||||
from shutil import (_make_tarball, _make_zipfile, make_archive,
|
from shutil import (make_archive,
|
||||||
register_archive_format, unregister_archive_format,
|
register_archive_format, unregister_archive_format,
|
||||||
get_archive_formats, Error, unpack_archive,
|
get_archive_formats, Error, unpack_archive,
|
||||||
register_unpack_format, RegistryError,
|
register_unpack_format, RegistryError,
|
||||||
|
@ -86,6 +86,18 @@ def read_file(path, binary=False):
|
||||||
with open(path, 'rb' if binary else 'r') as fp:
|
with open(path, 'rb' if binary else 'r') as fp:
|
||||||
return fp.read()
|
return fp.read()
|
||||||
|
|
||||||
|
def rlistdir(path):
|
||||||
|
res = []
|
||||||
|
for name in sorted(os.listdir(path)):
|
||||||
|
p = os.path.join(path, name)
|
||||||
|
if os.path.isdir(p) and not os.path.islink(p):
|
||||||
|
res.append(name + '/')
|
||||||
|
for n in rlistdir(p):
|
||||||
|
res.append(name + '/' + n)
|
||||||
|
else:
|
||||||
|
res.append(name)
|
||||||
|
return res
|
||||||
|
|
||||||
|
|
||||||
class TestShutil(unittest.TestCase):
|
class TestShutil(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -951,114 +963,105 @@ class TestShutil(unittest.TestCase):
|
||||||
@requires_zlib
|
@requires_zlib
|
||||||
def test_make_tarball(self):
|
def test_make_tarball(self):
|
||||||
# creating something to tar
|
# creating something to tar
|
||||||
tmpdir = self.mkdtemp()
|
root_dir, base_dir = self._create_files('')
|
||||||
write_file((tmpdir, 'file1'), 'xxx')
|
|
||||||
write_file((tmpdir, 'file2'), 'xxx')
|
|
||||||
os.mkdir(os.path.join(tmpdir, 'sub'))
|
|
||||||
write_file((tmpdir, 'sub', 'file3'), 'xxx')
|
|
||||||
|
|
||||||
tmpdir2 = self.mkdtemp()
|
tmpdir2 = self.mkdtemp()
|
||||||
# force shutil to create the directory
|
# force shutil to create the directory
|
||||||
os.rmdir(tmpdir2)
|
os.rmdir(tmpdir2)
|
||||||
unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
|
unittest.skipUnless(splitdrive(root_dir)[0] == splitdrive(tmpdir2)[0],
|
||||||
"source and target should be on same drive")
|
"source and target should be on same drive")
|
||||||
|
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
base_name = os.path.join(tmpdir2, 'archive')
|
||||||
|
|
||||||
# working with relative paths to avoid tar warnings
|
# working with relative paths to avoid tar warnings
|
||||||
with support.change_cwd(tmpdir):
|
make_archive(splitdrive(base_name)[1], 'gztar', root_dir, '.')
|
||||||
_make_tarball(splitdrive(base_name)[1], '.')
|
|
||||||
|
|
||||||
# check if the compressed tarball was created
|
# check if the compressed tarball was created
|
||||||
tarball = base_name + '.tar.gz'
|
tarball = base_name + '.tar.gz'
|
||||||
self.assertTrue(os.path.exists(tarball))
|
self.assertTrue(os.path.isfile(tarball))
|
||||||
|
self.assertTrue(tarfile.is_tarfile(tarball))
|
||||||
|
with tarfile.open(tarball, 'r:gz') as tf:
|
||||||
|
self.assertCountEqual(tf.getnames(),
|
||||||
|
['.', './sub', './sub2',
|
||||||
|
'./file1', './file2', './sub/file3'])
|
||||||
|
|
||||||
# trying an uncompressed one
|
# trying an uncompressed one
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
base_name = os.path.join(tmpdir2, 'archive')
|
||||||
with support.change_cwd(tmpdir):
|
make_archive(splitdrive(base_name)[1], 'tar', root_dir, '.')
|
||||||
_make_tarball(splitdrive(base_name)[1], '.', compress=None)
|
|
||||||
tarball = base_name + '.tar'
|
tarball = base_name + '.tar'
|
||||||
self.assertTrue(os.path.exists(tarball))
|
self.assertTrue(os.path.isfile(tarball))
|
||||||
|
self.assertTrue(tarfile.is_tarfile(tarball))
|
||||||
|
with tarfile.open(tarball, 'r') as tf:
|
||||||
|
self.assertCountEqual(tf.getnames(),
|
||||||
|
['.', './sub', './sub2',
|
||||||
|
'./file1', './file2', './sub/file3'])
|
||||||
|
|
||||||
def _tarinfo(self, path):
|
def _tarinfo(self, path):
|
||||||
tar = tarfile.open(path)
|
with tarfile.open(path) as tar:
|
||||||
try:
|
|
||||||
names = tar.getnames()
|
names = tar.getnames()
|
||||||
names.sort()
|
names.sort()
|
||||||
return tuple(names)
|
return tuple(names)
|
||||||
finally:
|
|
||||||
tar.close()
|
|
||||||
|
|
||||||
def _create_files(self):
|
def _create_files(self, base_dir='dist'):
|
||||||
# creating something to tar
|
# creating something to tar
|
||||||
tmpdir = self.mkdtemp()
|
root_dir = self.mkdtemp()
|
||||||
dist = os.path.join(tmpdir, 'dist')
|
dist = os.path.join(root_dir, base_dir)
|
||||||
os.mkdir(dist)
|
os.makedirs(dist, exist_ok=True)
|
||||||
write_file((dist, 'file1'), 'xxx')
|
write_file((dist, 'file1'), 'xxx')
|
||||||
write_file((dist, 'file2'), 'xxx')
|
write_file((dist, 'file2'), 'xxx')
|
||||||
os.mkdir(os.path.join(dist, 'sub'))
|
os.mkdir(os.path.join(dist, 'sub'))
|
||||||
write_file((dist, 'sub', 'file3'), 'xxx')
|
write_file((dist, 'sub', 'file3'), 'xxx')
|
||||||
os.mkdir(os.path.join(dist, 'sub2'))
|
os.mkdir(os.path.join(dist, 'sub2'))
|
||||||
tmpdir2 = self.mkdtemp()
|
if base_dir:
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
write_file((root_dir, 'outer'), 'xxx')
|
||||||
return tmpdir, tmpdir2, base_name
|
return root_dir, base_dir
|
||||||
|
|
||||||
@requires_zlib
|
@requires_zlib
|
||||||
@unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
|
@unittest.skipUnless(find_executable('tar'),
|
||||||
'Need the tar command to run')
|
'Need the tar command to run')
|
||||||
def test_tarfile_vs_tar(self):
|
def test_tarfile_vs_tar(self):
|
||||||
tmpdir, tmpdir2, base_name = self._create_files()
|
root_dir, base_dir = self._create_files()
|
||||||
with support.change_cwd(tmpdir):
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
||||||
_make_tarball(base_name, 'dist')
|
make_archive(base_name, 'gztar', root_dir, base_dir)
|
||||||
|
|
||||||
# check if the compressed tarball was created
|
# check if the compressed tarball was created
|
||||||
tarball = base_name + '.tar.gz'
|
tarball = base_name + '.tar.gz'
|
||||||
self.assertTrue(os.path.exists(tarball))
|
self.assertTrue(os.path.isfile(tarball))
|
||||||
|
|
||||||
# now create another tarball using `tar`
|
# now create another tarball using `tar`
|
||||||
tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
|
tarball2 = os.path.join(root_dir, 'archive2.tar')
|
||||||
tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
|
tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
|
||||||
gzip_cmd = ['gzip', '-f9', 'archive2.tar']
|
with support.change_cwd(root_dir), captured_stdout():
|
||||||
with support.change_cwd(tmpdir):
|
spawn(tar_cmd)
|
||||||
with captured_stdout() as s:
|
|
||||||
spawn(tar_cmd)
|
|
||||||
spawn(gzip_cmd)
|
|
||||||
|
|
||||||
self.assertTrue(os.path.exists(tarball2))
|
self.assertTrue(os.path.isfile(tarball2))
|
||||||
# let's compare both tarballs
|
# let's compare both tarballs
|
||||||
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
|
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
|
||||||
|
|
||||||
# trying an uncompressed one
|
# trying an uncompressed one
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
make_archive(base_name, 'tar', root_dir, base_dir)
|
||||||
with support.change_cwd(tmpdir):
|
|
||||||
_make_tarball(base_name, 'dist', compress=None)
|
|
||||||
tarball = base_name + '.tar'
|
tarball = base_name + '.tar'
|
||||||
self.assertTrue(os.path.exists(tarball))
|
self.assertTrue(os.path.isfile(tarball))
|
||||||
|
|
||||||
# now for a dry_run
|
# now for a dry_run
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
make_archive(base_name, 'tar', root_dir, base_dir, dry_run=True)
|
||||||
with support.change_cwd(tmpdir):
|
|
||||||
_make_tarball(base_name, 'dist', compress=None, dry_run=True)
|
|
||||||
tarball = base_name + '.tar'
|
tarball = base_name + '.tar'
|
||||||
self.assertTrue(os.path.exists(tarball))
|
self.assertTrue(os.path.isfile(tarball))
|
||||||
|
|
||||||
@requires_zlib
|
@requires_zlib
|
||||||
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
|
||||||
def test_make_zipfile(self):
|
def test_make_zipfile(self):
|
||||||
# creating something to tar
|
# creating something to zip
|
||||||
tmpdir = self.mkdtemp()
|
root_dir, base_dir = self._create_files()
|
||||||
write_file((tmpdir, 'file1'), 'xxx')
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
||||||
write_file((tmpdir, 'file2'), 'xxx')
|
res = make_archive(base_name, 'zip', root_dir, 'dist')
|
||||||
|
|
||||||
tmpdir2 = self.mkdtemp()
|
self.assertEqual(res, base_name + '.zip')
|
||||||
# force shutil to create the directory
|
self.assertTrue(os.path.isfile(res))
|
||||||
os.rmdir(tmpdir2)
|
self.assertTrue(zipfile.is_zipfile(res))
|
||||||
base_name = os.path.join(tmpdir2, 'archive')
|
with zipfile.ZipFile(res) as zf:
|
||||||
_make_zipfile(base_name, tmpdir)
|
self.assertCountEqual(zf.namelist(),
|
||||||
|
['dist/file1', 'dist/file2', 'dist/sub/file3'])
|
||||||
# check if the compressed tarball was created
|
|
||||||
tarball = base_name + '.zip'
|
|
||||||
self.assertTrue(os.path.exists(tarball))
|
|
||||||
|
|
||||||
|
|
||||||
def test_make_archive(self):
|
def test_make_archive(self):
|
||||||
|
@ -1076,36 +1079,37 @@ class TestShutil(unittest.TestCase):
|
||||||
else:
|
else:
|
||||||
group = owner = 'root'
|
group = owner = 'root'
|
||||||
|
|
||||||
base_dir, root_dir, base_name = self._create_files()
|
root_dir, base_dir = self._create_files()
|
||||||
base_name = os.path.join(self.mkdtemp() , 'archive')
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
||||||
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
|
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
|
||||||
group=group)
|
group=group)
|
||||||
self.assertTrue(os.path.exists(res))
|
self.assertTrue(os.path.isfile(res))
|
||||||
|
|
||||||
res = make_archive(base_name, 'zip', root_dir, base_dir)
|
res = make_archive(base_name, 'zip', root_dir, base_dir)
|
||||||
self.assertTrue(os.path.exists(res))
|
self.assertTrue(os.path.isfile(res))
|
||||||
|
|
||||||
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
||||||
owner=owner, group=group)
|
owner=owner, group=group)
|
||||||
self.assertTrue(os.path.exists(res))
|
self.assertTrue(os.path.isfile(res))
|
||||||
|
|
||||||
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
res = make_archive(base_name, 'tar', root_dir, base_dir,
|
||||||
owner='kjhkjhkjg', group='oihohoh')
|
owner='kjhkjhkjg', group='oihohoh')
|
||||||
self.assertTrue(os.path.exists(res))
|
self.assertTrue(os.path.isfile(res))
|
||||||
|
|
||||||
|
|
||||||
@requires_zlib
|
@requires_zlib
|
||||||
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
|
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
|
||||||
def test_tarfile_root_owner(self):
|
def test_tarfile_root_owner(self):
|
||||||
tmpdir, tmpdir2, base_name = self._create_files()
|
root_dir, base_dir = self._create_files()
|
||||||
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
||||||
group = grp.getgrgid(0)[0]
|
group = grp.getgrgid(0)[0]
|
||||||
owner = pwd.getpwuid(0)[0]
|
owner = pwd.getpwuid(0)[0]
|
||||||
with support.change_cwd(tmpdir):
|
with support.change_cwd(root_dir):
|
||||||
archive_name = _make_tarball(base_name, 'dist', compress=None,
|
archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
|
||||||
owner=owner, group=group)
|
owner=owner, group=group)
|
||||||
|
|
||||||
# check if the compressed tarball was created
|
# check if the compressed tarball was created
|
||||||
self.assertTrue(os.path.exists(archive_name))
|
self.assertTrue(os.path.isfile(archive_name))
|
||||||
|
|
||||||
# now checks the rights
|
# now checks the rights
|
||||||
archive = tarfile.open(archive_name)
|
archive = tarfile.open(archive_name)
|
||||||
|
@ -1162,40 +1166,30 @@ class TestShutil(unittest.TestCase):
|
||||||
formats = [name for name, params in get_archive_formats()]
|
formats = [name for name, params in get_archive_formats()]
|
||||||
self.assertNotIn('xxx', formats)
|
self.assertNotIn('xxx', formats)
|
||||||
|
|
||||||
def _compare_dirs(self, dir1, dir2):
|
|
||||||
# check that dir1 and dir2 are equivalent,
|
|
||||||
# return the diff
|
|
||||||
diff = []
|
|
||||||
for root, dirs, files in os.walk(dir1):
|
|
||||||
for file_ in files:
|
|
||||||
path = os.path.join(root, file_)
|
|
||||||
target_path = os.path.join(dir2, os.path.split(path)[-1])
|
|
||||||
if not os.path.exists(target_path):
|
|
||||||
diff.append(file_)
|
|
||||||
return diff
|
|
||||||
|
|
||||||
@requires_zlib
|
@requires_zlib
|
||||||
def test_unpack_archive(self):
|
def test_unpack_archive(self):
|
||||||
formats = ['tar', 'gztar', 'zip']
|
formats = ['tar', 'gztar', 'zip']
|
||||||
if BZ2_SUPPORTED:
|
if BZ2_SUPPORTED:
|
||||||
formats.append('bztar')
|
formats.append('bztar')
|
||||||
|
|
||||||
|
root_dir, base_dir = self._create_files()
|
||||||
for format in formats:
|
for format in formats:
|
||||||
tmpdir = self.mkdtemp()
|
expected = rlistdir(root_dir)
|
||||||
base_dir, root_dir, base_name = self._create_files()
|
expected.remove('outer')
|
||||||
tmpdir2 = self.mkdtemp()
|
if format == 'zip':
|
||||||
|
expected.remove('dist/sub2/')
|
||||||
|
base_name = os.path.join(self.mkdtemp(), 'archive')
|
||||||
filename = make_archive(base_name, format, root_dir, base_dir)
|
filename = make_archive(base_name, format, root_dir, base_dir)
|
||||||
|
|
||||||
# let's try to unpack it now
|
# let's try to unpack it now
|
||||||
|
tmpdir2 = self.mkdtemp()
|
||||||
unpack_archive(filename, tmpdir2)
|
unpack_archive(filename, tmpdir2)
|
||||||
diff = self._compare_dirs(tmpdir, tmpdir2)
|
self.assertEqual(rlistdir(tmpdir2), expected)
|
||||||
self.assertEqual(diff, [])
|
|
||||||
|
|
||||||
# and again, this time with the format specified
|
# and again, this time with the format specified
|
||||||
tmpdir3 = self.mkdtemp()
|
tmpdir3 = self.mkdtemp()
|
||||||
unpack_archive(filename, tmpdir3, format=format)
|
unpack_archive(filename, tmpdir3, format=format)
|
||||||
diff = self._compare_dirs(tmpdir, tmpdir3)
|
self.assertEqual(rlistdir(tmpdir3), expected)
|
||||||
self.assertEqual(diff, [])
|
|
||||||
self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
|
self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
|
||||||
self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
|
self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue