added make_archive (and secondary APIs) to shutil

This commit is contained in:
Tarek Ziadé 2010-02-23 05:16:41 +00:00
parent b0aad6cd09
commit 48cc8dc958
3 changed files with 694 additions and 5 deletions

View file

@ -7,11 +7,71 @@ import sys
import stat
import os
import os.path
from os.path import splitdrive
from distutils.spawn import find_executable, spawn
from shutil import (_make_tarball, _make_zipfile, make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats)
import tarfile
import warnings
from test import test_support
from test.test_support import TESTFN
from test.test_support import TESTFN, check_warnings, captured_stdout
TESTFN2 = TESTFN + "2"
try:
import grp
import pwd
UID_GID_SUPPORT = True
except ImportError:
UID_GID_SUPPORT = False
try:
import zlib
except ImportError:
zlib = None
try:
import zipfile
ZIP_SUPPORT = True
except ImportError:
ZIP_SUPPORT = find_executable('zip')
class TestShutil(unittest.TestCase):
def setUp(self):
super(TestShutil, self).setUp()
self.tempdirs = []
def tearDown(self):
super(TestShutil, self).tearDown()
while self.tempdirs:
d = self.tempdirs.pop()
shutil.rmtree(d, os.name in ('nt', 'cygwin'))
def write_file(self, path, content='xxx'):
"""Writes a file in the given path.
path can be a string or a sequence.
"""
if isinstance(path, (list, tuple)):
path = os.path.join(*path)
f = open(path, 'w')
try:
f.write(content)
finally:
f.close()
def mkdtemp(self):
"""Create a temporary directory that will be cleaned up.
Returns the path of the directory.
"""
d = tempfile.mkdtemp()
self.tempdirs.append(d)
return d
def test_rmtree_errors(self):
# filename is guaranteed not to exist
filename = tempfile.mktemp()
@ -277,6 +337,261 @@ class TestShutil(unittest.TestCase):
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
@unittest.skipUnless(zlib, "requires zlib")
def test_make_tarball(self):
# creating something to tar
tmpdir = self.mkdtemp()
self.write_file([tmpdir, 'file1'], 'xxx')
self.write_file([tmpdir, 'file2'], 'xxx')
os.mkdir(os.path.join(tmpdir, 'sub'))
self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
tmpdir2 = self.mkdtemp()
unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
"source and target should be on same drive")
base_name = os.path.join(tmpdir2, 'archive')
# working with relative paths to avoid tar warnings
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
_make_tarball(splitdrive(base_name)[1], '.')
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
tarball = base_name + '.tar.gz'
self.assertTrue(os.path.exists(tarball))
# trying an uncompressed one
base_name = os.path.join(tmpdir2, 'archive')
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
_make_tarball(splitdrive(base_name)[1], '.', compress=None)
finally:
os.chdir(old_dir)
tarball = base_name + '.tar'
self.assertTrue(os.path.exists(tarball))
def _tarinfo(self, path):
tar = tarfile.open(path)
try:
names = tar.getnames()
names.sort()
return tuple(names)
finally:
tar.close()
def _create_files(self):
# creating something to tar
tmpdir = self.mkdtemp()
dist = os.path.join(tmpdir, 'dist')
os.mkdir(dist)
self.write_file([dist, 'file1'], 'xxx')
self.write_file([dist, 'file2'], 'xxx')
os.mkdir(os.path.join(dist, 'sub'))
self.write_file([dist, 'sub', 'file3'], 'xxx')
os.mkdir(os.path.join(dist, 'sub2'))
tmpdir2 = self.mkdtemp()
base_name = os.path.join(tmpdir2, 'archive')
return tmpdir, tmpdir2, base_name
@unittest.skipUnless(zlib, "Requires zlib")
@unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
'Need the tar command to run')
def test_tarfile_vs_tar(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
_make_tarball(base_name, 'dist')
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
tarball = base_name + '.tar.gz'
self.assertTrue(os.path.exists(tarball))
# now create another tarball using `tar`
tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
gzip_cmd = ['gzip', '-f9', 'archive2.tar']
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
with captured_stdout() as s:
spawn(tar_cmd)
spawn(gzip_cmd)
finally:
os.chdir(old_dir)
self.assertTrue(os.path.exists(tarball2))
# let's compare both tarballs
self.assertEquals(self._tarinfo(tarball), self._tarinfo(tarball2))
# trying an uncompressed one
base_name = os.path.join(tmpdir2, 'archive')
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
_make_tarball(base_name, 'dist', compress=None)
finally:
os.chdir(old_dir)
tarball = base_name + '.tar'
self.assertTrue(os.path.exists(tarball))
# now for a dry_run
base_name = os.path.join(tmpdir2, 'archive')
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
_make_tarball(base_name, 'dist', compress=None, dry_run=True)
finally:
os.chdir(old_dir)
tarball = base_name + '.tar'
self.assertTrue(os.path.exists(tarball))
@unittest.skipUnless(find_executable('compress'),
'The compress program is required')
def test_compress_deprecated(self):
tmpdir, tmpdir2, base_name = self._create_files()
# using compress and testing the PendingDeprecationWarning
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
with captured_stdout() as s:
with check_warnings() as w:
warnings.simplefilter("always")
_make_tarball(base_name, 'dist', compress='compress')
finally:
os.chdir(old_dir)
tarball = base_name + '.tar.Z'
self.assertTrue(os.path.exists(tarball))
self.assertEquals(len(w.warnings), 1)
# same test with dry_run
os.remove(tarball)
old_dir = os.getcwd()
os.chdir(tmpdir)
try:
with captured_stdout() as s:
with check_warnings() as w:
warnings.simplefilter("always")
_make_tarball(base_name, 'dist', compress='compress',
dry_run=True)
finally:
os.chdir(old_dir)
self.assertTrue(not os.path.exists(tarball))
self.assertEquals(len(w.warnings), 1)
@unittest.skipUnless(zlib, "Requires zlib")
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
def test_make_zipfile(self):
# creating something to tar
tmpdir = self.mkdtemp()
self.write_file([tmpdir, 'file1'], 'xxx')
self.write_file([tmpdir, 'file2'], 'xxx')
tmpdir2 = self.mkdtemp()
base_name = os.path.join(tmpdir2, 'archive')
_make_zipfile(base_name, tmpdir)
# check if the compressed tarball was created
tarball = base_name + '.zip'
def test_make_archive(self):
tmpdir = self.mkdtemp()
base_name = os.path.join(tmpdir, 'archive')
self.assertRaises(ValueError, make_archive, base_name, 'xxx')
@unittest.skipUnless(zlib, "Requires zlib")
def test_make_archive_owner_group(self):
# testing make_archive with owner and group, with various combinations
# this works even if there's not gid/uid support
if UID_GID_SUPPORT:
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
else:
group = owner = 'root'
base_dir, root_dir, base_name = self._create_files()
base_name = os.path.join(self.mkdtemp() , 'archive')
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'zip', root_dir, base_dir)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner=owner, group=group)
self.assertTrue(os.path.exists(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner='kjhkjhkjg', group='oihohoh')
self.assertTrue(os.path.exists(res))
@unittest.skipUnless(zlib, "Requires zlib")
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
def test_tarfile_root_owner(self):
tmpdir, tmpdir2, base_name = self._create_files()
old_dir = os.getcwd()
os.chdir(tmpdir)
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
try:
archive_name = _make_tarball(base_name, 'dist', compress=None,
owner=owner, group=group)
finally:
os.chdir(old_dir)
# check if the compressed tarball was created
self.assertTrue(os.path.exists(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
try:
for member in archive.getmembers():
self.assertEquals(member.uid, 0)
self.assertEquals(member.gid, 0)
finally:
archive.close()
def test_make_archive_cwd(self):
current_dir = os.getcwd()
def _breaks(*args, **kw):
raise RuntimeError()
register_archive_format('xxx', _breaks, [], 'xxx file')
try:
try:
make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
except Exception:
pass
self.assertEquals(os.getcwd(), current_dir)
finally:
unregister_archive_format('xxx')
def test_register_archive_format(self):
self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
1)
self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
[(1, 2), (1, 2, 3)])
register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
formats = [name for name, params in get_archive_formats()]
self.assertIn('xxx', formats)
unregister_archive_format('xxx')
formats = [name for name, params in get_archive_formats()]
self.assertNotIn('xxx', formats)
class TestMove(unittest.TestCase):