mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
gh-135034: Normalize link targets in tarfile, add os.path.realpath(strict='allow_missing')
(#135037)
Addresses CVEs 2024-12718, 2025-4138, 2025-4330, and 2025-4517. Signed-off-by: Łukasz Langa <lukasz@langa.pl> Co-authored-by: Petr Viktorin <encukou@gmail.com> Co-authored-by: Seth Michael Larson <seth@python.org> Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com> Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
parent
ec12559eba
commit
3612d8f517
11 changed files with 969 additions and 172 deletions
169
Lib/tarfile.py
169
Lib/tarfile.py
|
@ -67,7 +67,7 @@ __all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError",
|
|||
"DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter",
|
||||
"tar_filter", "FilterError", "AbsoluteLinkError",
|
||||
"OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
|
||||
"LinkOutsideDestinationError"]
|
||||
"LinkOutsideDestinationError", "LinkFallbackError"]
|
||||
|
||||
|
||||
#---------------------------------------------------------
|
||||
|
@ -766,10 +766,22 @@ class LinkOutsideDestinationError(FilterError):
|
|||
super().__init__(f'{tarinfo.name!r} would link to {path!r}, '
|
||||
+ 'which is outside the destination')
|
||||
|
||||
class LinkFallbackError(FilterError):
|
||||
def __init__(self, tarinfo, path):
|
||||
self.tarinfo = tarinfo
|
||||
self._path = path
|
||||
super().__init__(f'link {tarinfo.name!r} would be extracted as a '
|
||||
+ f'copy of {path!r}, which was rejected')
|
||||
|
||||
# Errors caused by filters -- both "fatal" and "non-fatal" -- that
|
||||
# we consider to be issues with the argument, rather than a bug in the
|
||||
# filter function
|
||||
_FILTER_ERRORS = (FilterError, OSError, ExtractError)
|
||||
|
||||
def _get_filtered_attrs(member, dest_path, for_data=True):
|
||||
new_attrs = {}
|
||||
name = member.name
|
||||
dest_path = os.path.realpath(dest_path)
|
||||
dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING)
|
||||
# Strip leading / (tar's directory separator) from filenames.
|
||||
# Include os.sep (target OS directory separator) as well.
|
||||
if name.startswith(('/', os.sep)):
|
||||
|
@ -779,7 +791,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
|
|||
# For example, 'C:/foo' on Windows.
|
||||
raise AbsolutePathError(member)
|
||||
# Ensure we stay in the destination
|
||||
target_path = os.path.realpath(os.path.join(dest_path, name))
|
||||
target_path = os.path.realpath(os.path.join(dest_path, name),
|
||||
strict=os.path.ALLOW_MISSING)
|
||||
if os.path.commonpath([target_path, dest_path]) != dest_path:
|
||||
raise OutsideDestinationError(member, target_path)
|
||||
# Limit permissions (no high bits, and go-w)
|
||||
|
@ -817,6 +830,9 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
|
|||
if member.islnk() or member.issym():
|
||||
if os.path.isabs(member.linkname):
|
||||
raise AbsoluteLinkError(member)
|
||||
normalized = os.path.normpath(member.linkname)
|
||||
if normalized != member.linkname:
|
||||
new_attrs['linkname'] = normalized
|
||||
if member.issym():
|
||||
target_path = os.path.join(dest_path,
|
||||
os.path.dirname(name),
|
||||
|
@ -824,7 +840,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
|
|||
else:
|
||||
target_path = os.path.join(dest_path,
|
||||
member.linkname)
|
||||
target_path = os.path.realpath(target_path)
|
||||
target_path = os.path.realpath(target_path,
|
||||
strict=os.path.ALLOW_MISSING)
|
||||
if os.path.commonpath([target_path, dest_path]) != dest_path:
|
||||
raise LinkOutsideDestinationError(member, target_path)
|
||||
return new_attrs
|
||||
|
@ -2386,30 +2403,58 @@ class TarFile(object):
|
|||
members = self
|
||||
|
||||
for member in members:
|
||||
tarinfo = self._get_extract_tarinfo(member, filter_function, path)
|
||||
tarinfo, unfiltered = self._get_extract_tarinfo(
|
||||
member, filter_function, path)
|
||||
if tarinfo is None:
|
||||
continue
|
||||
if tarinfo.isdir():
|
||||
# For directories, delay setting attributes until later,
|
||||
# since permissions can interfere with extraction and
|
||||
# extracting contents can reset mtime.
|
||||
directories.append(tarinfo)
|
||||
directories.append(unfiltered)
|
||||
self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(),
|
||||
numeric_owner=numeric_owner)
|
||||
numeric_owner=numeric_owner,
|
||||
filter_function=filter_function)
|
||||
|
||||
# Reverse sort directories.
|
||||
directories.sort(key=lambda a: a.name, reverse=True)
|
||||
|
||||
|
||||
# Set correct owner, mtime and filemode on directories.
|
||||
for tarinfo in directories:
|
||||
dirpath = os.path.join(path, tarinfo.name)
|
||||
for unfiltered in directories:
|
||||
try:
|
||||
# Need to re-apply any filter, to take the *current* filesystem
|
||||
# state into account.
|
||||
try:
|
||||
tarinfo = filter_function(unfiltered, path)
|
||||
except _FILTER_ERRORS as exc:
|
||||
self._log_no_directory_fixup(unfiltered, repr(exc))
|
||||
continue
|
||||
if tarinfo is None:
|
||||
self._log_no_directory_fixup(unfiltered,
|
||||
'excluded by filter')
|
||||
continue
|
||||
dirpath = os.path.join(path, tarinfo.name)
|
||||
try:
|
||||
lstat = os.lstat(dirpath)
|
||||
except FileNotFoundError:
|
||||
self._log_no_directory_fixup(tarinfo, 'missing')
|
||||
continue
|
||||
if not stat.S_ISDIR(lstat.st_mode):
|
||||
# This is no longer a directory; presumably a later
|
||||
# member overwrote the entry.
|
||||
self._log_no_directory_fixup(tarinfo, 'not a directory')
|
||||
continue
|
||||
self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
|
||||
self.utime(tarinfo, dirpath)
|
||||
self.chmod(tarinfo, dirpath)
|
||||
except ExtractError as e:
|
||||
self._handle_nonfatal_error(e)
|
||||
|
||||
def _log_no_directory_fixup(self, member, reason):
|
||||
self._dbg(2, "tarfile: Not fixing up directory %r (%s)" %
|
||||
(member.name, reason))
|
||||
|
||||
def extract(self, member, path="", set_attrs=True, *, numeric_owner=False,
|
||||
filter=None):
|
||||
"""Extract a member from the archive to the current working directory,
|
||||
|
@ -2425,41 +2470,56 @@ class TarFile(object):
|
|||
String names of common filters are accepted.
|
||||
"""
|
||||
filter_function = self._get_filter_function(filter)
|
||||
tarinfo = self._get_extract_tarinfo(member, filter_function, path)
|
||||
tarinfo, unfiltered = self._get_extract_tarinfo(
|
||||
member, filter_function, path)
|
||||
if tarinfo is not None:
|
||||
self._extract_one(tarinfo, path, set_attrs, numeric_owner)
|
||||
|
||||
def _get_extract_tarinfo(self, member, filter_function, path):
|
||||
"""Get filtered TarInfo (or None) from member, which might be a str"""
|
||||
if isinstance(member, str):
|
||||
tarinfo = self.getmember(member)
|
||||
else:
|
||||
tarinfo = member
|
||||
"""Get (filtered, unfiltered) TarInfos from *member*
|
||||
|
||||
unfiltered = tarinfo
|
||||
*member* might be a string.
|
||||
|
||||
Return (None, None) if not found.
|
||||
"""
|
||||
|
||||
if isinstance(member, str):
|
||||
unfiltered = self.getmember(member)
|
||||
else:
|
||||
unfiltered = member
|
||||
|
||||
filtered = None
|
||||
try:
|
||||
tarinfo = filter_function(tarinfo, path)
|
||||
filtered = filter_function(unfiltered, path)
|
||||
except (OSError, UnicodeEncodeError, FilterError) as e:
|
||||
self._handle_fatal_error(e)
|
||||
except ExtractError as e:
|
||||
self._handle_nonfatal_error(e)
|
||||
if tarinfo is None:
|
||||
if filtered is None:
|
||||
self._dbg(2, "tarfile: Excluded %r" % unfiltered.name)
|
||||
return None
|
||||
# Prepare the link target for makelink().
|
||||
if tarinfo.islnk():
|
||||
tarinfo = copy.copy(tarinfo)
|
||||
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
|
||||
return tarinfo
|
||||
return None, None
|
||||
|
||||
def _extract_one(self, tarinfo, path, set_attrs, numeric_owner):
|
||||
"""Extract from filtered tarinfo to disk"""
|
||||
# Prepare the link target for makelink().
|
||||
if filtered.islnk():
|
||||
filtered = copy.copy(filtered)
|
||||
filtered._link_target = os.path.join(path, filtered.linkname)
|
||||
return filtered, unfiltered
|
||||
|
||||
def _extract_one(self, tarinfo, path, set_attrs, numeric_owner,
|
||||
filter_function=None):
|
||||
"""Extract from filtered tarinfo to disk.
|
||||
|
||||
filter_function is only used when extracting a *different*
|
||||
member (e.g. as fallback to creating a symlink)
|
||||
"""
|
||||
self._check("r")
|
||||
|
||||
try:
|
||||
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
|
||||
set_attrs=set_attrs,
|
||||
numeric_owner=numeric_owner)
|
||||
numeric_owner=numeric_owner,
|
||||
filter_function=filter_function,
|
||||
extraction_root=path)
|
||||
except (OSError, UnicodeEncodeError) as e:
|
||||
self._handle_fatal_error(e)
|
||||
except ExtractError as e:
|
||||
|
@ -2517,9 +2577,13 @@ class TarFile(object):
|
|||
return None
|
||||
|
||||
def _extract_member(self, tarinfo, targetpath, set_attrs=True,
|
||||
numeric_owner=False):
|
||||
"""Extract the TarInfo object tarinfo to a physical
|
||||
numeric_owner=False, *, filter_function=None,
|
||||
extraction_root=None):
|
||||
"""Extract the filtered TarInfo object tarinfo to a physical
|
||||
file called targetpath.
|
||||
|
||||
filter_function is only used when extracting a *different*
|
||||
member (e.g. as fallback to creating a symlink)
|
||||
"""
|
||||
# Fetch the TarInfo object for the given name
|
||||
# and build the destination pathname, replacing
|
||||
|
@ -2548,7 +2612,10 @@ class TarFile(object):
|
|||
elif tarinfo.ischr() or tarinfo.isblk():
|
||||
self.makedev(tarinfo, targetpath)
|
||||
elif tarinfo.islnk() or tarinfo.issym():
|
||||
self.makelink(tarinfo, targetpath)
|
||||
self.makelink_with_filter(
|
||||
tarinfo, targetpath,
|
||||
filter_function=filter_function,
|
||||
extraction_root=extraction_root)
|
||||
elif tarinfo.type not in SUPPORTED_TYPES:
|
||||
self.makeunknown(tarinfo, targetpath)
|
||||
else:
|
||||
|
@ -2631,10 +2698,18 @@ class TarFile(object):
|
|||
os.makedev(tarinfo.devmajor, tarinfo.devminor))
|
||||
|
||||
def makelink(self, tarinfo, targetpath):
|
||||
return self.makelink_with_filter(tarinfo, targetpath, None, None)
|
||||
|
||||
def makelink_with_filter(self, tarinfo, targetpath,
|
||||
filter_function, extraction_root):
|
||||
"""Make a (symbolic) link called targetpath. If it cannot be created
|
||||
(platform limitation), we try to make a copy of the referenced file
|
||||
instead of a link.
|
||||
|
||||
filter_function is only used when extracting a *different*
|
||||
member (e.g. as fallback to creating a link).
|
||||
"""
|
||||
keyerror_to_extracterror = False
|
||||
try:
|
||||
# For systems that support symbolic and hard links.
|
||||
if tarinfo.issym():
|
||||
|
@ -2642,18 +2717,38 @@ class TarFile(object):
|
|||
# Avoid FileExistsError on following os.symlink.
|
||||
os.unlink(targetpath)
|
||||
os.symlink(tarinfo.linkname, targetpath)
|
||||
return
|
||||
else:
|
||||
if os.path.exists(tarinfo._link_target):
|
||||
os.link(tarinfo._link_target, targetpath)
|
||||
else:
|
||||
self._extract_member(self._find_link_target(tarinfo),
|
||||
targetpath)
|
||||
return
|
||||
except symlink_exception:
|
||||
keyerror_to_extracterror = True
|
||||
|
||||
try:
|
||||
unfiltered = self._find_link_target(tarinfo)
|
||||
except KeyError:
|
||||
if keyerror_to_extracterror:
|
||||
raise ExtractError(
|
||||
"unable to resolve link inside archive") from None
|
||||
else:
|
||||
raise
|
||||
|
||||
if filter_function is None:
|
||||
filtered = unfiltered
|
||||
else:
|
||||
if extraction_root is None:
|
||||
raise ExtractError(
|
||||
"makelink_with_filter: if filter_function is not None, "
|
||||
+ "extraction_root must also not be None")
|
||||
try:
|
||||
self._extract_member(self._find_link_target(tarinfo),
|
||||
targetpath)
|
||||
except KeyError:
|
||||
raise ExtractError("unable to resolve link inside archive") from None
|
||||
filtered = filter_function(unfiltered, extraction_root)
|
||||
except _FILTER_ERRORS as cause:
|
||||
raise LinkFallbackError(tarinfo, unfiltered.name) from cause
|
||||
if filtered is not None:
|
||||
self._extract_member(filtered, targetpath,
|
||||
filter_function=filter_function,
|
||||
extraction_root=extraction_root)
|
||||
|
||||
def chown(self, tarinfo, targetpath, numeric_owner):
|
||||
"""Set owner of targetpath according to tarinfo. If numeric_owner
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue