Issue #5689: Add support for lzma compression to the tarfile module.

This commit is contained in:
Lars Gustäbel 2011-12-10 20:38:14 +01:00
parent ce2af33562
commit 0a9dd2f11d
4 changed files with 146 additions and 22 deletions

View file

@ -21,6 +21,10 @@ try:
import bz2
except ImportError:
bz2 = None
try:
import lzma
except ImportError:
lzma = None
def md5sum(data):
return md5(data).hexdigest()
@ -29,6 +33,7 @@ TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
tarname = support.findfile("testtar.tar")
gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
tmpname = os.path.join(TEMPDIR, "tmp.tar")
md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
@ -201,13 +206,15 @@ class CommonReadTest(ReadTest):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
elif self.mode.endswith(":xz"):
_open = lzma.LZMAFile
else:
_open = open
_open = io.FileIO
for char in (b'\0', b'a'):
# Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
# are ignored correctly.
with _open(tmpname, "wb") as fobj:
with _open(tmpname, "w") as fobj:
fobj.write(char * 1024)
fobj.write(tarfile.TarInfo("foo").tobuf())
@ -222,9 +229,10 @@ class CommonReadTest(ReadTest):
class MiscReadTest(CommonReadTest):
def test_no_name_argument(self):
if self.mode.endswith("bz2"):
# BZ2File has no name attribute.
return
if self.mode.endswith(("bz2", "xz")):
# BZ2File and LZMAFile have no name attribute.
self.skipTest("no name attribute")
with open(self.tarname, "rb") as fobj:
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name))
@ -265,10 +273,12 @@ class MiscReadTest(CommonReadTest):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
elif self.mode.endswith(":xz"):
_open = lzma.LZMAFile
else:
_open = open
fobj = _open(self.tarname, "rb")
try:
_open = io.FileIO
with _open(self.tarname) as fobj:
fobj.seek(offset)
# Test if the tarfile starts with the second member.
@ -281,8 +291,6 @@ class MiscReadTest(CommonReadTest):
self.assertEqual(tar.extractfile(t).read(), data,
"seek back did not work")
tar.close()
finally:
fobj.close()
def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
@ -526,6 +534,18 @@ class DetectReadTest(unittest.TestCase):
testfunc(bz2name, "r|*")
testfunc(bz2name, "r|bz2")
if lzma:
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz")
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz")
self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:")
self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|")
testfunc(xzname, "r")
testfunc(xzname, "r:*")
testfunc(xzname, "r:xz")
testfunc(xzname, "r|*")
testfunc(xzname, "r|xz")
def test_detect_file(self):
self._test_modes(self._testfunc_file)
@ -1096,6 +1116,9 @@ class StreamWriteTest(WriteTestBase):
data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0,
"found trailing data")
elif self.mode.endswith("xz"):
with lzma.LZMAFile(tmpname) as fobj:
data = fobj.read()
else:
with open(tmpname, "rb") as fobj:
data = fobj.read()
@ -1510,6 +1533,12 @@ class AppendTest(unittest.TestCase):
self._create_testtar("w:bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
def test_append_lzma(self):
if lzma is None:
self.skipTest("lzma module not available")
self._create_testtar("w:xz")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
# Append mode is supposed to fail if the tarfile to append to
# does not end with a zero block.
def _test_error(self, data):
@ -1788,6 +1817,21 @@ class Bz2PartialReadTest(unittest.TestCase):
self._test_partial_input("r:bz2")
class LzmaMiscReadTest(MiscReadTest):
tarname = xzname
mode = "r:xz"
class LzmaUstarReadTest(UstarReadTest):
tarname = xzname
mode = "r:xz"
class LzmaStreamReadTest(StreamReadTest):
tarname = xzname
mode = "r|xz"
class LzmaWriteTest(WriteTest):
mode = "w:xz"
class LzmaStreamWriteTest(StreamWriteTest):
mode = "w|xz"
def test_main():
support.unlink(TEMPDIR)
os.makedirs(TEMPDIR)
@ -1850,6 +1894,20 @@ def test_main():
Bz2PartialReadTest,
]
if lzma:
# Create testtar.tar.xz and add lzma-specific tests.
support.unlink(xzname)
with lzma.LZMAFile(xzname, "w") as tar:
tar.write(data)
tests += [
LzmaMiscReadTest,
LzmaUstarReadTest,
LzmaStreamReadTest,
LzmaWriteTest,
LzmaStreamWriteTest,
]
try:
support.run_unittest(*tests)
finally: