mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 19:34:08 +00:00 
			
		
		
		
	Adds `load_package_tests` function to test.support, uses it in test_asyncio, test_email, test_json, test_tools, test_importlib and all test_importlib sub-packages to implement test discovery.
		
			
				
	
	
		
			2241 lines
		
	
	
	
		
			76 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			2241 lines
		
	
	
	
		
			76 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Supporting definitions for the Python regression tests."""
 | 
						|
 | 
						|
if __name__ != 'test.support':
 | 
						|
    raise ImportError('support must be imported from the test package')
 | 
						|
 | 
						|
import collections.abc
 | 
						|
import contextlib
 | 
						|
import errno
 | 
						|
import fnmatch
 | 
						|
import functools
 | 
						|
import gc
 | 
						|
import importlib
 | 
						|
import importlib.util
 | 
						|
import logging.handlers
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import re
 | 
						|
import shutil
 | 
						|
import socket
 | 
						|
import stat
 | 
						|
import struct
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import sysconfig
 | 
						|
import tempfile
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
import warnings
 | 
						|
 | 
						|
try:
 | 
						|
    import _thread, threading
 | 
						|
except ImportError:
 | 
						|
    _thread = None
 | 
						|
    threading = None
 | 
						|
try:
 | 
						|
    import multiprocessing.process
 | 
						|
except ImportError:
 | 
						|
    multiprocessing = None
 | 
						|
 | 
						|
try:
 | 
						|
    import zlib
 | 
						|
except ImportError:
 | 
						|
    zlib = None
 | 
						|
 | 
						|
try:
 | 
						|
    import gzip
 | 
						|
except ImportError:
 | 
						|
    gzip = None
 | 
						|
 | 
						|
try:
 | 
						|
    import bz2
 | 
						|
except ImportError:
 | 
						|
    bz2 = None
 | 
						|
 | 
						|
try:
 | 
						|
    import lzma
 | 
						|
except ImportError:
 | 
						|
    lzma = None
 | 
						|
 | 
						|
try:
 | 
						|
    import resource
 | 
						|
except ImportError:
 | 
						|
    resource = None
 | 
						|
 | 
						|
__all__ = [
 | 
						|
    # globals
 | 
						|
    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
 | 
						|
    # exceptions
 | 
						|
    "Error", "TestFailed", "ResourceDenied",
 | 
						|
    # imports
 | 
						|
    "import_module", "import_fresh_module", "CleanImport",
 | 
						|
    # modules
 | 
						|
    "unload", "forget",
 | 
						|
    # io
 | 
						|
    "record_original_stdout", "get_original_stdout", "captured_stdout",
 | 
						|
    "captured_stdin", "captured_stderr",
 | 
						|
    # filesystem
 | 
						|
    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
 | 
						|
    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
 | 
						|
    # unittest
 | 
						|
    "is_resource_enabled", "requires", "requires_freebsd_version",
 | 
						|
    "requires_linux_version", "requires_mac_ver", "check_syntax_error",
 | 
						|
    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
 | 
						|
    "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
 | 
						|
    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
 | 
						|
    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
 | 
						|
    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
 | 
						|
    "anticipate_failure", "load_package_tests",
 | 
						|
    # sys
 | 
						|
    "is_jython", "check_impl_detail",
 | 
						|
    # network
 | 
						|
    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
 | 
						|
    # processes
 | 
						|
    'temp_umask', "reap_children",
 | 
						|
    # logging
 | 
						|
    "TestHandler",
 | 
						|
    # threads
 | 
						|
    "threading_setup", "threading_cleanup",
 | 
						|
    # miscellaneous
 | 
						|
    "check_warnings", "EnvironmentVarGuard", "run_with_locale", "swap_item",
 | 
						|
    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
 | 
						|
    "run_with_tz",
 | 
						|
    ]
 | 
						|
 | 
						|
class Error(Exception):
 | 
						|
    """Base class for regression test exceptions."""
 | 
						|
 | 
						|
class TestFailed(Error):
 | 
						|
    """Test failed."""
 | 
						|
 | 
						|
class ResourceDenied(unittest.SkipTest):
 | 
						|
    """Test skipped because it requested a disallowed resource.
 | 
						|
 | 
						|
    This is raised when a test calls requires() for a resource that
 | 
						|
    has not be enabled.  It is used to distinguish between expected
 | 
						|
    and unexpected skips.
 | 
						|
    """
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def _ignore_deprecated_imports(ignore=True):
 | 
						|
    """Context manager to suppress package and module deprecation
 | 
						|
    warnings when importing them.
 | 
						|
 | 
						|
    If ignore is False, this context manager has no effect.
 | 
						|
    """
 | 
						|
    if ignore:
 | 
						|
        with warnings.catch_warnings():
 | 
						|
            warnings.filterwarnings("ignore", ".+ (module|package)",
 | 
						|
                                    DeprecationWarning)
 | 
						|
            yield
 | 
						|
    else:
 | 
						|
        yield
 | 
						|
 | 
						|
 | 
						|
def import_module(name, deprecated=False, *, required_on=()):
 | 
						|
    """Import and return the module to be tested, raising SkipTest if
 | 
						|
    it is not available.
 | 
						|
 | 
						|
    If deprecated is True, any module or package deprecation messages
 | 
						|
    will be suppressed. If a module is required on a platform but optional for
 | 
						|
    others, set required_on to an iterable of platform prefixes which will be
 | 
						|
    compared against sys.platform.
 | 
						|
    """
 | 
						|
    with _ignore_deprecated_imports(deprecated):
 | 
						|
        try:
 | 
						|
            return importlib.import_module(name)
 | 
						|
        except ImportError as msg:
 | 
						|
            if sys.platform.startswith(tuple(required_on)):
 | 
						|
                raise
 | 
						|
            raise unittest.SkipTest(str(msg))
 | 
						|
 | 
						|
 | 
						|
def _save_and_remove_module(name, orig_modules):
 | 
						|
    """Helper function to save and remove a module from sys.modules
 | 
						|
 | 
						|
    Raise ImportError if the module can't be imported.
 | 
						|
    """
 | 
						|
    # try to import the module and raise an error if it can't be imported
 | 
						|
    if name not in sys.modules:
 | 
						|
        __import__(name)
 | 
						|
        del sys.modules[name]
 | 
						|
    for modname in list(sys.modules):
 | 
						|
        if modname == name or modname.startswith(name + '.'):
 | 
						|
            orig_modules[modname] = sys.modules[modname]
 | 
						|
            del sys.modules[modname]
 | 
						|
 | 
						|
def _save_and_block_module(name, orig_modules):
 | 
						|
    """Helper function to save and block a module in sys.modules
 | 
						|
 | 
						|
    Return True if the module was in sys.modules, False otherwise.
 | 
						|
    """
 | 
						|
    saved = True
 | 
						|
    try:
 | 
						|
        orig_modules[name] = sys.modules[name]
 | 
						|
    except KeyError:
 | 
						|
        saved = False
 | 
						|
    sys.modules[name] = None
 | 
						|
    return saved
 | 
						|
 | 
						|
 | 
						|
def anticipate_failure(condition):
 | 
						|
    """Decorator to mark a test that is known to be broken in some cases
 | 
						|
 | 
						|
       Any use of this decorator should have a comment identifying the
 | 
						|
       associated tracker issue.
 | 
						|
    """
 | 
						|
    if condition:
 | 
						|
        return unittest.expectedFailure
 | 
						|
    return lambda f: f
 | 
						|
 | 
						|
def load_package_tests(pkg_dir, loader, standard_tests, pattern):
 | 
						|
    """Generic load_tests implementation for simple test packages.
 | 
						|
 | 
						|
    Most packages can implement load_tests using this function as follows:
 | 
						|
 | 
						|
       def load_tests(*args):
 | 
						|
           return load_package_tests(os.path.dirname(__file__), *args)
 | 
						|
    """
 | 
						|
    if pattern is None:
 | 
						|
        pattern = "test*"
 | 
						|
    top_dir = os.path.dirname(              # Lib
 | 
						|
                  os.path.dirname(              # test
 | 
						|
                      os.path.dirname(__file__)))   # support
 | 
						|
    package_tests = loader.discover(start_dir=pkg_dir,
 | 
						|
                                    top_level_dir=top_dir,
 | 
						|
                                    pattern=pattern)
 | 
						|
    standard_tests.addTests(package_tests)
 | 
						|
    return standard_tests
 | 
						|
 | 
						|
 | 
						|
def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
 | 
						|
    """Import and return a module, deliberately bypassing sys.modules.
 | 
						|
 | 
						|
    This function imports and returns a fresh copy of the named Python module
 | 
						|
    by removing the named module from sys.modules before doing the import.
 | 
						|
    Note that unlike reload, the original module is not affected by
 | 
						|
    this operation.
 | 
						|
 | 
						|
    *fresh* is an iterable of additional module names that are also removed
 | 
						|
    from the sys.modules cache before doing the import.
 | 
						|
 | 
						|
    *blocked* is an iterable of module names that are replaced with None
 | 
						|
    in the module cache during the import to ensure that attempts to import
 | 
						|
    them raise ImportError.
 | 
						|
 | 
						|
    The named module and any modules named in the *fresh* and *blocked*
 | 
						|
    parameters are saved before starting the import and then reinserted into
 | 
						|
    sys.modules when the fresh import is complete.
 | 
						|
 | 
						|
    Module and package deprecation messages are suppressed during this import
 | 
						|
    if *deprecated* is True.
 | 
						|
 | 
						|
    This function will raise ImportError if the named module cannot be
 | 
						|
    imported.
 | 
						|
    """
 | 
						|
    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
 | 
						|
    # to make sure that this utility function is working as expected
 | 
						|
    with _ignore_deprecated_imports(deprecated):
 | 
						|
        # Keep track of modules saved for later restoration as well
 | 
						|
        # as those which just need a blocking entry removed
 | 
						|
        orig_modules = {}
 | 
						|
        names_to_remove = []
 | 
						|
        _save_and_remove_module(name, orig_modules)
 | 
						|
        try:
 | 
						|
            for fresh_name in fresh:
 | 
						|
                _save_and_remove_module(fresh_name, orig_modules)
 | 
						|
            for blocked_name in blocked:
 | 
						|
                if not _save_and_block_module(blocked_name, orig_modules):
 | 
						|
                    names_to_remove.append(blocked_name)
 | 
						|
            fresh_module = importlib.import_module(name)
 | 
						|
        except ImportError:
 | 
						|
            fresh_module = None
 | 
						|
        finally:
 | 
						|
            for orig_name, module in orig_modules.items():
 | 
						|
                sys.modules[orig_name] = module
 | 
						|
            for name_to_remove in names_to_remove:
 | 
						|
                del sys.modules[name_to_remove]
 | 
						|
        return fresh_module
 | 
						|
 | 
						|
 | 
						|
def get_attribute(obj, name):
 | 
						|
    """Get an attribute, raising SkipTest if AttributeError is raised."""
 | 
						|
    try:
 | 
						|
        attribute = getattr(obj, name)
 | 
						|
    except AttributeError:
 | 
						|
        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
 | 
						|
    else:
 | 
						|
        return attribute
 | 
						|
 | 
						|
verbose = 1              # Flag set to 0 by regrtest.py
 | 
						|
use_resources = None     # Flag set to [] by regrtest.py
 | 
						|
max_memuse = 0           # Disable bigmem tests (they will still be run with
 | 
						|
                         # small sizes, to make sure they work.)
 | 
						|
real_max_memuse = 0
 | 
						|
failfast = False
 | 
						|
match_tests = None
 | 
						|
 | 
						|
# _original_stdout is meant to hold stdout at the time regrtest began.
 | 
						|
# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
 | 
						|
# The point is to have some flavor of stdout the user can actually see.
 | 
						|
_original_stdout = None
 | 
						|
def record_original_stdout(stdout):
 | 
						|
    global _original_stdout
 | 
						|
    _original_stdout = stdout
 | 
						|
 | 
						|
def get_original_stdout():
 | 
						|
    return _original_stdout or sys.stdout
 | 
						|
 | 
						|
def unload(name):
 | 
						|
    try:
 | 
						|
        del sys.modules[name]
 | 
						|
    except KeyError:
 | 
						|
        pass
 | 
						|
 | 
						|
if sys.platform.startswith("win"):
 | 
						|
    def _waitfor(func, pathname, waitall=False):
 | 
						|
        # Perform the operation
 | 
						|
        func(pathname)
 | 
						|
        # Now setup the wait loop
 | 
						|
        if waitall:
 | 
						|
            dirname = pathname
 | 
						|
        else:
 | 
						|
            dirname, name = os.path.split(pathname)
 | 
						|
            dirname = dirname or '.'
 | 
						|
        # Check for `pathname` to be removed from the filesystem.
 | 
						|
        # The exponential backoff of the timeout amounts to a total
 | 
						|
        # of ~1 second after which the deletion is probably an error
 | 
						|
        # anyway.
 | 
						|
        # Testing on a i7@4.3GHz shows that usually only 1 iteration is
 | 
						|
        # required when contention occurs.
 | 
						|
        timeout = 0.001
 | 
						|
        while timeout < 1.0:
 | 
						|
            # Note we are only testing for the existence of the file(s) in
 | 
						|
            # the contents of the directory regardless of any security or
 | 
						|
            # access rights.  If we have made it this far, we have sufficient
 | 
						|
            # permissions to do that much using Python's equivalent of the
 | 
						|
            # Windows API FindFirstFile.
 | 
						|
            # Other Windows APIs can fail or give incorrect results when
 | 
						|
            # dealing with files that are pending deletion.
 | 
						|
            L = os.listdir(dirname)
 | 
						|
            if not (L if waitall else name in L):
 | 
						|
                return
 | 
						|
            # Increase the timeout and try again
 | 
						|
            time.sleep(timeout)
 | 
						|
            timeout *= 2
 | 
						|
        warnings.warn('tests may fail, delete still pending for ' + pathname,
 | 
						|
                      RuntimeWarning, stacklevel=4)
 | 
						|
 | 
						|
    def _unlink(filename):
 | 
						|
        _waitfor(os.unlink, filename)
 | 
						|
 | 
						|
    def _rmdir(dirname):
 | 
						|
        _waitfor(os.rmdir, dirname)
 | 
						|
 | 
						|
    def _rmtree(path):
 | 
						|
        def _rmtree_inner(path):
 | 
						|
            for name in os.listdir(path):
 | 
						|
                fullname = os.path.join(path, name)
 | 
						|
                try:
 | 
						|
                    mode = os.lstat(fullname).st_mode
 | 
						|
                except OSError as exc:
 | 
						|
                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
 | 
						|
                          file=sys.__stderr__)
 | 
						|
                    mode = 0
 | 
						|
                if stat.S_ISDIR(mode):
 | 
						|
                    _waitfor(_rmtree_inner, fullname, waitall=True)
 | 
						|
                    os.rmdir(fullname)
 | 
						|
                else:
 | 
						|
                    os.unlink(fullname)
 | 
						|
        _waitfor(_rmtree_inner, path, waitall=True)
 | 
						|
        _waitfor(os.rmdir, path)
 | 
						|
else:
 | 
						|
    _unlink = os.unlink
 | 
						|
    _rmdir = os.rmdir
 | 
						|
    _rmtree = shutil.rmtree
 | 
						|
 | 
						|
def unlink(filename):
 | 
						|
    try:
 | 
						|
        _unlink(filename)
 | 
						|
    except (FileNotFoundError, NotADirectoryError):
 | 
						|
        pass
 | 
						|
 | 
						|
def rmdir(dirname):
 | 
						|
    try:
 | 
						|
        _rmdir(dirname)
 | 
						|
    except FileNotFoundError:
 | 
						|
        pass
 | 
						|
 | 
						|
def rmtree(path):
 | 
						|
    try:
 | 
						|
        _rmtree(path)
 | 
						|
    except FileNotFoundError:
 | 
						|
        pass
 | 
						|
 | 
						|
def make_legacy_pyc(source):
 | 
						|
    """Move a PEP 3147 pyc/pyo file to its legacy pyc/pyo location.
 | 
						|
 | 
						|
    The choice of .pyc or .pyo extension is done based on the __debug__ flag
 | 
						|
    value.
 | 
						|
 | 
						|
    :param source: The file system path to the source file.  The source file
 | 
						|
        does not need to exist, however the PEP 3147 pyc file must exist.
 | 
						|
    :return: The file system path to the legacy pyc file.
 | 
						|
    """
 | 
						|
    pyc_file = importlib.util.cache_from_source(source)
 | 
						|
    up_one = os.path.dirname(os.path.abspath(source))
 | 
						|
    legacy_pyc = os.path.join(up_one, source + ('c' if __debug__ else 'o'))
 | 
						|
    os.rename(pyc_file, legacy_pyc)
 | 
						|
    return legacy_pyc
 | 
						|
 | 
						|
def forget(modname):
 | 
						|
    """'Forget' a module was ever imported.
 | 
						|
 | 
						|
    This removes the module from sys.modules and deletes any PEP 3147 or
 | 
						|
    legacy .pyc and .pyo files.
 | 
						|
    """
 | 
						|
    unload(modname)
 | 
						|
    for dirname in sys.path:
 | 
						|
        source = os.path.join(dirname, modname + '.py')
 | 
						|
        # It doesn't matter if they exist or not, unlink all possible
 | 
						|
        # combinations of PEP 3147 and legacy pyc and pyo files.
 | 
						|
        unlink(source + 'c')
 | 
						|
        unlink(source + 'o')
 | 
						|
        unlink(importlib.util.cache_from_source(source, debug_override=True))
 | 
						|
        unlink(importlib.util.cache_from_source(source, debug_override=False))
 | 
						|
 | 
						|
# Check whether a gui is actually available
 | 
						|
def _is_gui_available():
 | 
						|
    if hasattr(_is_gui_available, 'result'):
 | 
						|
        return _is_gui_available.result
 | 
						|
    reason = None
 | 
						|
    if sys.platform.startswith('win'):
 | 
						|
        # if Python is running as a service (such as the buildbot service),
 | 
						|
        # gui interaction may be disallowed
 | 
						|
        import ctypes
 | 
						|
        import ctypes.wintypes
 | 
						|
        UOI_FLAGS = 1
 | 
						|
        WSF_VISIBLE = 0x0001
 | 
						|
        class USEROBJECTFLAGS(ctypes.Structure):
 | 
						|
            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
 | 
						|
                        ("fReserved", ctypes.wintypes.BOOL),
 | 
						|
                        ("dwFlags", ctypes.wintypes.DWORD)]
 | 
						|
        dll = ctypes.windll.user32
 | 
						|
        h = dll.GetProcessWindowStation()
 | 
						|
        if not h:
 | 
						|
            raise ctypes.WinError()
 | 
						|
        uof = USEROBJECTFLAGS()
 | 
						|
        needed = ctypes.wintypes.DWORD()
 | 
						|
        res = dll.GetUserObjectInformationW(h,
 | 
						|
            UOI_FLAGS,
 | 
						|
            ctypes.byref(uof),
 | 
						|
            ctypes.sizeof(uof),
 | 
						|
            ctypes.byref(needed))
 | 
						|
        if not res:
 | 
						|
            raise ctypes.WinError()
 | 
						|
        if not bool(uof.dwFlags & WSF_VISIBLE):
 | 
						|
            reason = "gui not available (WSF_VISIBLE flag not set)"
 | 
						|
    elif sys.platform == 'darwin':
 | 
						|
        # The Aqua Tk implementations on OS X can abort the process if
 | 
						|
        # being called in an environment where a window server connection
 | 
						|
        # cannot be made, for instance when invoked by a buildbot or ssh
 | 
						|
        # process not running under the same user id as the current console
 | 
						|
        # user.  To avoid that, raise an exception if the window manager
 | 
						|
        # connection is not available.
 | 
						|
        from ctypes import cdll, c_int, pointer, Structure
 | 
						|
        from ctypes.util import find_library
 | 
						|
 | 
						|
        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
 | 
						|
 | 
						|
        if app_services.CGMainDisplayID() == 0:
 | 
						|
            reason = "gui tests cannot run without OS X window manager"
 | 
						|
        else:
 | 
						|
            class ProcessSerialNumber(Structure):
 | 
						|
                _fields_ = [("highLongOfPSN", c_int),
 | 
						|
                            ("lowLongOfPSN", c_int)]
 | 
						|
            psn = ProcessSerialNumber()
 | 
						|
            psn_p = pointer(psn)
 | 
						|
            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
 | 
						|
                  (app_services.SetFrontProcess(psn_p) < 0) ):
 | 
						|
                reason = "cannot run without OS X gui process"
 | 
						|
 | 
						|
    # check on every platform whether tkinter can actually do anything
 | 
						|
    # but skip the test on OS X because it can cause segfaults in Cocoa Tk
 | 
						|
    # when running regrtest with the -j option (multiple threads/subprocesses)
 | 
						|
    if (not reason) and (sys.platform != 'darwin'):
 | 
						|
        try:
 | 
						|
            from tkinter import Tk
 | 
						|
            root = Tk()
 | 
						|
            root.destroy()
 | 
						|
        except Exception as e:
 | 
						|
            err_string = str(e)
 | 
						|
            if len(err_string) > 50:
 | 
						|
                err_string = err_string[:50] + ' [...]'
 | 
						|
            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
 | 
						|
                                                           err_string)
 | 
						|
 | 
						|
    _is_gui_available.reason = reason
 | 
						|
    _is_gui_available.result = not reason
 | 
						|
 | 
						|
    return _is_gui_available.result
 | 
						|
 | 
						|
def is_resource_enabled(resource):
 | 
						|
    """Test whether a resource is enabled.
 | 
						|
 | 
						|
    Known resources are set by regrtest.py.  If not running under regrtest.py,
 | 
						|
    all resources are assumed enabled unless use_resources has been set.
 | 
						|
    """
 | 
						|
    return use_resources is None or resource in use_resources
 | 
						|
 | 
						|
def requires(resource, msg=None):
 | 
						|
    """Raise ResourceDenied if the specified resource is not available."""
 | 
						|
    if resource == 'gui' and not _is_gui_available():
 | 
						|
        raise ResourceDenied(_is_gui_available.reason)
 | 
						|
    if not is_resource_enabled(resource):
 | 
						|
        if msg is None:
 | 
						|
            msg = "Use of the %r resource not enabled" % resource
 | 
						|
        raise ResourceDenied(msg)
 | 
						|
 | 
						|
def _requires_unix_version(sysname, min_version):
 | 
						|
    """Decorator raising SkipTest if the OS is `sysname` and the version is less
 | 
						|
    than `min_version`.
 | 
						|
 | 
						|
    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
 | 
						|
    the FreeBSD version is less than 7.2.
 | 
						|
    """
 | 
						|
    def decorator(func):
 | 
						|
        @functools.wraps(func)
 | 
						|
        def wrapper(*args, **kw):
 | 
						|
            if platform.system() == sysname:
 | 
						|
                version_txt = platform.release().split('-', 1)[0]
 | 
						|
                try:
 | 
						|
                    version = tuple(map(int, version_txt.split('.')))
 | 
						|
                except ValueError:
 | 
						|
                    pass
 | 
						|
                else:
 | 
						|
                    if version < min_version:
 | 
						|
                        min_version_txt = '.'.join(map(str, min_version))
 | 
						|
                        raise unittest.SkipTest(
 | 
						|
                            "%s version %s or higher required, not %s"
 | 
						|
                            % (sysname, min_version_txt, version_txt))
 | 
						|
            return func(*args, **kw)
 | 
						|
        wrapper.min_version = min_version
 | 
						|
        return wrapper
 | 
						|
    return decorator
 | 
						|
 | 
						|
def requires_freebsd_version(*min_version):
 | 
						|
    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
 | 
						|
    less than `min_version`.
 | 
						|
 | 
						|
    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
 | 
						|
    version is less than 7.2.
 | 
						|
    """
 | 
						|
    return _requires_unix_version('FreeBSD', min_version)
 | 
						|
 | 
						|
def requires_linux_version(*min_version):
 | 
						|
    """Decorator raising SkipTest if the OS is Linux and the Linux version is
 | 
						|
    less than `min_version`.
 | 
						|
 | 
						|
    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
 | 
						|
    version is less than 2.6.32.
 | 
						|
    """
 | 
						|
    return _requires_unix_version('Linux', min_version)
 | 
						|
 | 
						|
def requires_mac_ver(*min_version):
 | 
						|
    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
 | 
						|
    version if less than min_version.
 | 
						|
 | 
						|
    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
 | 
						|
    is lesser than 10.5.
 | 
						|
    """
 | 
						|
    def decorator(func):
 | 
						|
        @functools.wraps(func)
 | 
						|
        def wrapper(*args, **kw):
 | 
						|
            if sys.platform == 'darwin':
 | 
						|
                version_txt = platform.mac_ver()[0]
 | 
						|
                try:
 | 
						|
                    version = tuple(map(int, version_txt.split('.')))
 | 
						|
                except ValueError:
 | 
						|
                    pass
 | 
						|
                else:
 | 
						|
                    if version < min_version:
 | 
						|
                        min_version_txt = '.'.join(map(str, min_version))
 | 
						|
                        raise unittest.SkipTest(
 | 
						|
                            "Mac OS X %s or higher required, not %s"
 | 
						|
                            % (min_version_txt, version_txt))
 | 
						|
            return func(*args, **kw)
 | 
						|
        wrapper.min_version = min_version
 | 
						|
        return wrapper
 | 
						|
    return decorator
 | 
						|
 | 
						|
 | 
						|
# Don't use "localhost", since resolving it uses the DNS under recent
 | 
						|
# Windows versions (see issue #18792).
 | 
						|
HOST = "127.0.0.1"
 | 
						|
HOSTv6 = "::1"
 | 
						|
 | 
						|
 | 
						|
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
 | 
						|
    """Returns an unused port that should be suitable for binding.  This is
 | 
						|
    achieved by creating a temporary socket with the same family and type as
 | 
						|
    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
 | 
						|
    the specified host address (defaults to 0.0.0.0) with the port set to 0,
 | 
						|
    eliciting an unused ephemeral port from the OS.  The temporary socket is
 | 
						|
    then closed and deleted, and the ephemeral port is returned.
 | 
						|
 | 
						|
    Either this method or bind_port() should be used for any tests where a
 | 
						|
    server socket needs to be bound to a particular port for the duration of
 | 
						|
    the test.  Which one to use depends on whether the calling code is creating
 | 
						|
    a python socket, or if an unused port needs to be provided in a constructor
 | 
						|
    or passed to an external program (i.e. the -accept argument to openssl's
 | 
						|
    s_server mode).  Always prefer bind_port() over find_unused_port() where
 | 
						|
    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
 | 
						|
    socket is bound to a hard coded port, the ability to run multiple instances
 | 
						|
    of the test simultaneously on the same host is compromised, which makes the
 | 
						|
    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
 | 
						|
    may simply manifest as a failed test, which can be recovered from without
 | 
						|
    intervention in most cases, but on Windows, the entire python process can
 | 
						|
    completely and utterly wedge, requiring someone to log in to the buildbot
 | 
						|
    and manually kill the affected process.
 | 
						|
 | 
						|
    (This is easy to reproduce on Windows, unfortunately, and can be traced to
 | 
						|
    the SO_REUSEADDR socket option having different semantics on Windows versus
 | 
						|
    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
 | 
						|
    listen and then accept connections on identical host/ports.  An EADDRINUSE
 | 
						|
    OSError will be raised at some point (depending on the platform and
 | 
						|
    the order bind and listen were called on each socket).
 | 
						|
 | 
						|
    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
 | 
						|
    will ever be raised when attempting to bind two identical host/ports. When
 | 
						|
    accept() is called on each socket, the second caller's process will steal
 | 
						|
    the port from the first caller, leaving them both in an awkwardly wedged
 | 
						|
    state where they'll no longer respond to any signals or graceful kills, and
 | 
						|
    must be forcibly killed via OpenProcess()/TerminateProcess().
 | 
						|
 | 
						|
    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
 | 
						|
    instead of SO_REUSEADDR, which effectively affords the same semantics as
 | 
						|
    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
 | 
						|
    Source world compared to Windows ones, this is a common mistake.  A quick
 | 
						|
    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
 | 
						|
    openssl.exe is called with the 's_server' option, for example. See
 | 
						|
    http://bugs.python.org/issue2550 for more info.  The following site also
 | 
						|
    has a very thorough description about the implications of both REUSEADDR
 | 
						|
    and EXCLUSIVEADDRUSE on Windows:
 | 
						|
    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
 | 
						|
 | 
						|
    XXX: although this approach is a vast improvement on previous attempts to
 | 
						|
    elicit unused ports, it rests heavily on the assumption that the ephemeral
 | 
						|
    port returned to us by the OS won't immediately be dished back out to some
 | 
						|
    other process when we close and delete our temporary socket but before our
 | 
						|
    calling code has a chance to bind the returned port.  We can deal with this
 | 
						|
    issue if/when we come across it.
 | 
						|
    """
 | 
						|
 | 
						|
    tempsock = socket.socket(family, socktype)
 | 
						|
    port = bind_port(tempsock)
 | 
						|
    tempsock.close()
 | 
						|
    del tempsock
 | 
						|
    return port
 | 
						|
 | 
						|
def bind_port(sock, host=HOST):
 | 
						|
    """Bind the socket to a free port and return the port number.  Relies on
 | 
						|
    ephemeral ports in order to ensure we are using an unbound port.  This is
 | 
						|
    important as many tests may be running simultaneously, especially in a
 | 
						|
    buildbot environment.  This method raises an exception if the sock.family
 | 
						|
    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
 | 
						|
    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
 | 
						|
    for TCP/IP sockets.  The only case for setting these options is testing
 | 
						|
    multicasting via multiple UDP sockets.
 | 
						|
 | 
						|
    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
 | 
						|
    on Windows), it will be set on the socket.  This will prevent anyone else
 | 
						|
    from bind()'ing to our host/port for the duration of the test.
 | 
						|
    """
 | 
						|
 | 
						|
    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
 | 
						|
        if hasattr(socket, 'SO_REUSEADDR'):
 | 
						|
            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
 | 
						|
                raise TestFailed("tests should never set the SO_REUSEADDR "   \
 | 
						|
                                 "socket option on TCP/IP sockets!")
 | 
						|
        if hasattr(socket, 'SO_REUSEPORT'):
 | 
						|
            try:
 | 
						|
                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
 | 
						|
                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
 | 
						|
                                     "socket option on TCP/IP sockets!")
 | 
						|
            except OSError:
 | 
						|
                # Python's socket module was compiled using modern headers
 | 
						|
                # thus defining SO_REUSEPORT but this process is running
 | 
						|
                # under an older kernel that does not support SO_REUSEPORT.
 | 
						|
                pass
 | 
						|
        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
 | 
						|
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
 | 
						|
 | 
						|
    sock.bind((host, 0))
 | 
						|
    port = sock.getsockname()[1]
 | 
						|
    return port
 | 
						|
 | 
						|
def _is_ipv6_enabled():
 | 
						|
    """Check whether IPv6 is enabled on this host."""
 | 
						|
    if socket.has_ipv6:
 | 
						|
        sock = None
 | 
						|
        try:
 | 
						|
            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
 | 
						|
            sock.bind((HOSTv6, 0))
 | 
						|
            return True
 | 
						|
        except OSError:
 | 
						|
            pass
 | 
						|
        finally:
 | 
						|
            if sock:
 | 
						|
                sock.close()
 | 
						|
    return False
 | 
						|
 | 
						|
IPV6_ENABLED = _is_ipv6_enabled()
 | 
						|
 | 
						|
 | 
						|
# A constant likely larger than the underlying OS pipe buffer size, to
 | 
						|
# make writes blocking.
 | 
						|
# Windows limit seems to be around 512 B, and many Unix kernels have a
 | 
						|
# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
 | 
						|
# (see issue #17835 for a discussion of this number).
 | 
						|
PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
 | 
						|
 | 
						|
# A constant likely larger than the underlying OS socket buffer size, to make
 | 
						|
# writes blocking.
 | 
						|
# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
 | 
						|
# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
 | 
						|
# for a discussion of this number).
 | 
						|
SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
 | 
						|
 | 
						|
# decorator for skipping tests on non-IEEE 754 platforms
 | 
						|
requires_IEEE_754 = unittest.skipUnless(
 | 
						|
    float.__getformat__("double").startswith("IEEE"),
 | 
						|
    "test requires IEEE 754 doubles")
 | 
						|
 | 
						|
requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
 | 
						|
 | 
						|
requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
 | 
						|
 | 
						|
requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
 | 
						|
 | 
						|
requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
 | 
						|
 | 
						|
is_jython = sys.platform.startswith('java')
 | 
						|
 | 
						|
# Filename used for testing
 | 
						|
if os.name == 'java':
 | 
						|
    # Jython disallows @ in module names
 | 
						|
    TESTFN = '$test'
 | 
						|
else:
 | 
						|
    TESTFN = '@test'
 | 
						|
 | 
						|
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
 | 
						|
# module name.
 | 
						|
TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
 | 
						|
 | 
						|
# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
 | 
						|
# or None if there is no such character.
 | 
						|
FS_NONASCII = None
 | 
						|
for character in (
 | 
						|
    # First try printable and common characters to have a readable filename.
 | 
						|
    # For each character, the encoding list are just example of encodings able
 | 
						|
    # to encode the character (the list is not exhaustive).
 | 
						|
 | 
						|
    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
 | 
						|
    '\u00E6',
 | 
						|
    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
 | 
						|
    '\u0130',
 | 
						|
    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
 | 
						|
    '\u0141',
 | 
						|
    # U+03C6 (Greek Small Letter Phi): cp1253
 | 
						|
    '\u03C6',
 | 
						|
    # U+041A (Cyrillic Capital Letter Ka): cp1251
 | 
						|
    '\u041A',
 | 
						|
    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
 | 
						|
    '\u05D0',
 | 
						|
    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
 | 
						|
    '\u060C',
 | 
						|
    # U+062A (Arabic Letter Teh): cp720
 | 
						|
    '\u062A',
 | 
						|
    # U+0E01 (Thai Character Ko Kai): cp874
 | 
						|
    '\u0E01',
 | 
						|
 | 
						|
    # Then try more "special" characters. "special" because they may be
 | 
						|
    # interpreted or displayed differently depending on the exact locale
 | 
						|
    # encoding and the font.
 | 
						|
 | 
						|
    # U+00A0 (No-Break Space)
 | 
						|
    '\u00A0',
 | 
						|
    # U+20AC (Euro Sign)
 | 
						|
    '\u20AC',
 | 
						|
):
 | 
						|
    try:
 | 
						|
        os.fsdecode(os.fsencode(character))
 | 
						|
    except UnicodeError:
 | 
						|
        pass
 | 
						|
    else:
 | 
						|
        FS_NONASCII = character
 | 
						|
        break
 | 
						|
 | 
						|
# TESTFN_UNICODE is a non-ascii filename
 | 
						|
TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
 | 
						|
if sys.platform == 'darwin':
 | 
						|
    # In Mac OS X's VFS API file names are, by definition, canonically
 | 
						|
    # decomposed Unicode, encoded using UTF-8. See QA1173:
 | 
						|
    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
 | 
						|
    import unicodedata
 | 
						|
    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
 | 
						|
TESTFN_ENCODING = sys.getfilesystemencoding()
 | 
						|
 | 
						|
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
 | 
						|
# encoded by the filesystem encoding (in strict mode). It can be None if we
 | 
						|
# cannot generate such filename.
 | 
						|
TESTFN_UNENCODABLE = None
 | 
						|
if os.name in ('nt', 'ce'):
 | 
						|
    # skip win32s (0) or Windows 9x/ME (1)
 | 
						|
    if sys.getwindowsversion().platform >= 2:
 | 
						|
        # Different kinds of characters from various languages to minimize the
 | 
						|
        # probability that the whole name is encodable to MBCS (issue #9819)
 | 
						|
        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
 | 
						|
        try:
 | 
						|
            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
 | 
						|
        except UnicodeEncodeError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
 | 
						|
                  'Unicode filename tests may not be effective'
 | 
						|
                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
 | 
						|
            TESTFN_UNENCODABLE = None
 | 
						|
# Mac OS X denies unencodable filenames (invalid utf-8)
 | 
						|
elif sys.platform != 'darwin':
 | 
						|
    try:
 | 
						|
        # ascii and utf-8 cannot encode the byte 0xff
 | 
						|
        b'\xff'.decode(TESTFN_ENCODING)
 | 
						|
    except UnicodeDecodeError:
 | 
						|
        # 0xff will be encoded using the surrogate character u+DCFF
 | 
						|
        TESTFN_UNENCODABLE = TESTFN \
 | 
						|
            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
 | 
						|
    else:
 | 
						|
        # File system encoding (eg. ISO-8859-* encodings) can encode
 | 
						|
        # the byte 0xff. Skip some unicode filename tests.
 | 
						|
        pass
 | 
						|
 | 
						|
# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
 | 
						|
# decoded from the filesystem encoding (in strict mode). It can be None if we
 | 
						|
# cannot generate such filename (ex: the latin1 encoding can decode any byte
 | 
						|
# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
 | 
						|
# to the surrogateescape error handler (PEP 383), but not from the filesystem
 | 
						|
# encoding in strict mode.
 | 
						|
TESTFN_UNDECODABLE = None
 | 
						|
for name in (
 | 
						|
    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
 | 
						|
    # accepts it to create a file or a directory, or don't accept to enter to
 | 
						|
    # such directory (when the bytes name is used). So test b'\xe7' first: it is
 | 
						|
    # not decodable from cp932.
 | 
						|
    b'\xe7w\xf0',
 | 
						|
    # undecodable from ASCII, UTF-8
 | 
						|
    b'\xff',
 | 
						|
    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
 | 
						|
    # and cp857
 | 
						|
    b'\xae\xd5'
 | 
						|
    # undecodable from UTF-8 (UNIX and Mac OS X)
 | 
						|
    b'\xed\xb2\x80', b'\xed\xb4\x80',
 | 
						|
    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
 | 
						|
    # cp1253, cp1254, cp1255, cp1257, cp1258
 | 
						|
    b'\x81\x98',
 | 
						|
):
 | 
						|
    try:
 | 
						|
        name.decode(TESTFN_ENCODING)
 | 
						|
    except UnicodeDecodeError:
 | 
						|
        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
 | 
						|
        break
 | 
						|
 | 
						|
if FS_NONASCII:
 | 
						|
    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
 | 
						|
else:
 | 
						|
    TESTFN_NONASCII = None
 | 
						|
 | 
						|
# Save the initial cwd
 | 
						|
SAVEDCWD = os.getcwd()
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def temp_dir(path=None, quiet=False):
 | 
						|
    """Return a context manager that creates a temporary directory.
 | 
						|
 | 
						|
    Arguments:
 | 
						|
 | 
						|
      path: the directory to create temporarily.  If omitted or None,
 | 
						|
        defaults to creating a temporary directory using tempfile.mkdtemp.
 | 
						|
 | 
						|
      quiet: if False (the default), the context manager raises an exception
 | 
						|
        on error.  Otherwise, if the path is specified and cannot be
 | 
						|
        created, only a warning is issued.
 | 
						|
 | 
						|
    """
 | 
						|
    dir_created = False
 | 
						|
    if path is None:
 | 
						|
        path = tempfile.mkdtemp()
 | 
						|
        dir_created = True
 | 
						|
        path = os.path.realpath(path)
 | 
						|
    else:
 | 
						|
        try:
 | 
						|
            os.mkdir(path)
 | 
						|
            dir_created = True
 | 
						|
        except OSError:
 | 
						|
            if not quiet:
 | 
						|
                raise
 | 
						|
            warnings.warn('tests may fail, unable to create temp dir: ' + path,
 | 
						|
                          RuntimeWarning, stacklevel=3)
 | 
						|
    try:
 | 
						|
        yield path
 | 
						|
    finally:
 | 
						|
        if dir_created:
 | 
						|
            shutil.rmtree(path)
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def change_cwd(path, quiet=False):
 | 
						|
    """Return a context manager that changes the current working directory.
 | 
						|
 | 
						|
    Arguments:
 | 
						|
 | 
						|
      path: the directory to use as the temporary current working directory.
 | 
						|
 | 
						|
      quiet: if False (the default), the context manager raises an exception
 | 
						|
        on error.  Otherwise, it issues only a warning and keeps the current
 | 
						|
        working directory the same.
 | 
						|
 | 
						|
    """
 | 
						|
    saved_dir = os.getcwd()
 | 
						|
    try:
 | 
						|
        os.chdir(path)
 | 
						|
    except OSError:
 | 
						|
        if not quiet:
 | 
						|
            raise
 | 
						|
        warnings.warn('tests may fail, unable to change CWD to: ' + path,
 | 
						|
                      RuntimeWarning, stacklevel=3)
 | 
						|
    try:
 | 
						|
        yield os.getcwd()
 | 
						|
    finally:
 | 
						|
        os.chdir(saved_dir)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def temp_cwd(name='tempcwd', quiet=False):
 | 
						|
    """
 | 
						|
    Context manager that temporarily creates and changes the CWD.
 | 
						|
 | 
						|
    The function temporarily changes the current working directory
 | 
						|
    after creating a temporary directory in the current directory with
 | 
						|
    name *name*.  If *name* is None, the temporary directory is
 | 
						|
    created using tempfile.mkdtemp.
 | 
						|
 | 
						|
    If *quiet* is False (default) and it is not possible to
 | 
						|
    create or change the CWD, an error is raised.  If *quiet* is True,
 | 
						|
    only a warning is raised and the original CWD is used.
 | 
						|
 | 
						|
    """
 | 
						|
    with temp_dir(path=name, quiet=quiet) as temp_path:
 | 
						|
        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
 | 
						|
            yield cwd_dir
 | 
						|
 | 
						|
if hasattr(os, "umask"):
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def temp_umask(umask):
 | 
						|
        """Context manager that temporarily sets the process umask."""
 | 
						|
        oldmask = os.umask(umask)
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            os.umask(oldmask)
 | 
						|
 | 
						|
# TEST_HOME_DIR refers to the top level directory of the "test" package
 | 
						|
# that contains Python's regression test suite
 | 
						|
TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
 | 
						|
TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
 | 
						|
 | 
						|
# TEST_DATA_DIR is used as a target download location for remote resources
 | 
						|
TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
 | 
						|
 | 
						|
def findfile(filename, subdir=None):
 | 
						|
    """Try to find a file on sys.path or in the test directory.  If it is not
 | 
						|
    found the argument passed to the function is returned (this does not
 | 
						|
    necessarily signal failure; could still be the legitimate path).
 | 
						|
 | 
						|
    Setting *subdir* indicates a relative path to use to find the file
 | 
						|
    rather than looking directly in the path directories.
 | 
						|
    """
 | 
						|
    if os.path.isabs(filename):
 | 
						|
        return filename
 | 
						|
    if subdir is not None:
 | 
						|
        filename = os.path.join(subdir, filename)
 | 
						|
    path = [TEST_HOME_DIR] + sys.path
 | 
						|
    for dn in path:
 | 
						|
        fn = os.path.join(dn, filename)
 | 
						|
        if os.path.exists(fn): return fn
 | 
						|
    return filename
 | 
						|
 | 
						|
def create_empty_file(filename):
 | 
						|
    """Create an empty file. If the file already exists, truncate it."""
 | 
						|
    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
 | 
						|
    os.close(fd)
 | 
						|
 | 
						|
def sortdict(dict):
 | 
						|
    "Like repr(dict), but in sorted order."
 | 
						|
    items = sorted(dict.items())
 | 
						|
    reprpairs = ["%r: %r" % pair for pair in items]
 | 
						|
    withcommas = ", ".join(reprpairs)
 | 
						|
    return "{%s}" % withcommas
 | 
						|
 | 
						|
def make_bad_fd():
 | 
						|
    """
 | 
						|
    Create an invalid file descriptor by opening and closing a file and return
 | 
						|
    its fd.
 | 
						|
    """
 | 
						|
    file = open(TESTFN, "wb")
 | 
						|
    try:
 | 
						|
        return file.fileno()
 | 
						|
    finally:
 | 
						|
        file.close()
 | 
						|
        unlink(TESTFN)
 | 
						|
 | 
						|
def check_syntax_error(testcase, statement):
 | 
						|
    testcase.assertRaises(SyntaxError, compile, statement,
 | 
						|
                          '<test string>', 'exec')
 | 
						|
 | 
						|
def open_urlresource(url, *args, **kw):
 | 
						|
    import urllib.request, urllib.parse
 | 
						|
 | 
						|
    check = kw.pop('check', None)
 | 
						|
 | 
						|
    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
 | 
						|
 | 
						|
    fn = os.path.join(TEST_DATA_DIR, filename)
 | 
						|
 | 
						|
    def check_valid_file(fn):
 | 
						|
        f = open(fn, *args, **kw)
 | 
						|
        if check is None:
 | 
						|
            return f
 | 
						|
        elif check(f):
 | 
						|
            f.seek(0)
 | 
						|
            return f
 | 
						|
        f.close()
 | 
						|
 | 
						|
    if os.path.exists(fn):
 | 
						|
        f = check_valid_file(fn)
 | 
						|
        if f is not None:
 | 
						|
            return f
 | 
						|
        unlink(fn)
 | 
						|
 | 
						|
    # Verify the requirement before downloading the file
 | 
						|
    requires('urlfetch')
 | 
						|
 | 
						|
    print('\tfetching %s ...' % url, file=get_original_stdout())
 | 
						|
    f = urllib.request.urlopen(url, timeout=15)
 | 
						|
    try:
 | 
						|
        with open(fn, "wb") as out:
 | 
						|
            s = f.read()
 | 
						|
            while s:
 | 
						|
                out.write(s)
 | 
						|
                s = f.read()
 | 
						|
    finally:
 | 
						|
        f.close()
 | 
						|
 | 
						|
    f = check_valid_file(fn)
 | 
						|
    if f is not None:
 | 
						|
        return f
 | 
						|
    raise TestFailed('invalid resource %r' % fn)
 | 
						|
 | 
						|
 | 
						|
class WarningsRecorder(object):
 | 
						|
    """Convenience wrapper for the warnings list returned on
 | 
						|
       entry to the warnings.catch_warnings() context manager.
 | 
						|
    """
 | 
						|
    def __init__(self, warnings_list):
 | 
						|
        self._warnings = warnings_list
 | 
						|
        self._last = 0
 | 
						|
 | 
						|
    def __getattr__(self, attr):
 | 
						|
        if len(self._warnings) > self._last:
 | 
						|
            return getattr(self._warnings[-1], attr)
 | 
						|
        elif attr in warnings.WarningMessage._WARNING_DETAILS:
 | 
						|
            return None
 | 
						|
        raise AttributeError("%r has no attribute %r" % (self, attr))
 | 
						|
 | 
						|
    @property
 | 
						|
    def warnings(self):
 | 
						|
        return self._warnings[self._last:]
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        self._last = len(self._warnings)
 | 
						|
 | 
						|
 | 
						|
def _filterwarnings(filters, quiet=False):
 | 
						|
    """Catch the warnings, then check if all the expected
 | 
						|
    warnings have been raised and re-raise unexpected warnings.
 | 
						|
    If 'quiet' is True, only re-raise the unexpected warnings.
 | 
						|
    """
 | 
						|
    # Clear the warning registry of the calling module
 | 
						|
    # in order to re-raise the warnings.
 | 
						|
    frame = sys._getframe(2)
 | 
						|
    registry = frame.f_globals.get('__warningregistry__')
 | 
						|
    if registry:
 | 
						|
        registry.clear()
 | 
						|
    with warnings.catch_warnings(record=True) as w:
 | 
						|
        # Set filter "always" to record all warnings.  Because
 | 
						|
        # test_warnings swap the module, we need to look up in
 | 
						|
        # the sys.modules dictionary.
 | 
						|
        sys.modules['warnings'].simplefilter("always")
 | 
						|
        yield WarningsRecorder(w)
 | 
						|
    # Filter the recorded warnings
 | 
						|
    reraise = list(w)
 | 
						|
    missing = []
 | 
						|
    for msg, cat in filters:
 | 
						|
        seen = False
 | 
						|
        for w in reraise[:]:
 | 
						|
            warning = w.message
 | 
						|
            # Filter out the matching messages
 | 
						|
            if (re.match(msg, str(warning), re.I) and
 | 
						|
                issubclass(warning.__class__, cat)):
 | 
						|
                seen = True
 | 
						|
                reraise.remove(w)
 | 
						|
        if not seen and not quiet:
 | 
						|
            # This filter caught nothing
 | 
						|
            missing.append((msg, cat.__name__))
 | 
						|
    if reraise:
 | 
						|
        raise AssertionError("unhandled warning %s" % reraise[0])
 | 
						|
    if missing:
 | 
						|
        raise AssertionError("filter (%r, %s) did not catch any warning" %
 | 
						|
                             missing[0])
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def check_warnings(*filters, **kwargs):
 | 
						|
    """Context manager to silence warnings.
 | 
						|
 | 
						|
    Accept 2-tuples as positional arguments:
 | 
						|
        ("message regexp", WarningCategory)
 | 
						|
 | 
						|
    Optional argument:
 | 
						|
     - if 'quiet' is True, it does not fail if a filter catches nothing
 | 
						|
        (default True without argument,
 | 
						|
         default False if some filters are defined)
 | 
						|
 | 
						|
    Without argument, it defaults to:
 | 
						|
        check_warnings(("", Warning), quiet=True)
 | 
						|
    """
 | 
						|
    quiet = kwargs.get('quiet')
 | 
						|
    if not filters:
 | 
						|
        filters = (("", Warning),)
 | 
						|
        # Preserve backward compatibility
 | 
						|
        if quiet is None:
 | 
						|
            quiet = True
 | 
						|
    return _filterwarnings(filters, quiet)
 | 
						|
 | 
						|
 | 
						|
class CleanImport(object):
 | 
						|
    """Context manager to force import to return a new module reference.
 | 
						|
 | 
						|
    This is useful for testing module-level behaviours, such as
 | 
						|
    the emission of a DeprecationWarning on import.
 | 
						|
 | 
						|
    Use like this:
 | 
						|
 | 
						|
        with CleanImport("foo"):
 | 
						|
            importlib.import_module("foo") # new reference
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *module_names):
 | 
						|
        self.original_modules = sys.modules.copy()
 | 
						|
        for module_name in module_names:
 | 
						|
            if module_name in sys.modules:
 | 
						|
                module = sys.modules[module_name]
 | 
						|
                # It is possible that module_name is just an alias for
 | 
						|
                # another module (e.g. stub for modules renamed in 3.x).
 | 
						|
                # In that case, we also need delete the real module to clear
 | 
						|
                # the import cache.
 | 
						|
                if module.__name__ != module_name:
 | 
						|
                    del sys.modules[module.__name__]
 | 
						|
                del sys.modules[module_name]
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        sys.modules.update(self.original_modules)
 | 
						|
 | 
						|
 | 
						|
class EnvironmentVarGuard(collections.abc.MutableMapping):
 | 
						|
 | 
						|
    """Class to help protect the environment variable properly.  Can be used as
 | 
						|
    a context manager."""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._environ = os.environ
 | 
						|
        self._changed = {}
 | 
						|
 | 
						|
    def __getitem__(self, envvar):
 | 
						|
        return self._environ[envvar]
 | 
						|
 | 
						|
    def __setitem__(self, envvar, value):
 | 
						|
        # Remember the initial value on the first access
 | 
						|
        if envvar not in self._changed:
 | 
						|
            self._changed[envvar] = self._environ.get(envvar)
 | 
						|
        self._environ[envvar] = value
 | 
						|
 | 
						|
    def __delitem__(self, envvar):
 | 
						|
        # Remember the initial value on the first access
 | 
						|
        if envvar not in self._changed:
 | 
						|
            self._changed[envvar] = self._environ.get(envvar)
 | 
						|
        if envvar in self._environ:
 | 
						|
            del self._environ[envvar]
 | 
						|
 | 
						|
    def keys(self):
 | 
						|
        return self._environ.keys()
 | 
						|
 | 
						|
    def __iter__(self):
 | 
						|
        return iter(self._environ)
 | 
						|
 | 
						|
    def __len__(self):
 | 
						|
        return len(self._environ)
 | 
						|
 | 
						|
    def set(self, envvar, value):
 | 
						|
        self[envvar] = value
 | 
						|
 | 
						|
    def unset(self, envvar):
 | 
						|
        del self[envvar]
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        for (k, v) in self._changed.items():
 | 
						|
            if v is None:
 | 
						|
                if k in self._environ:
 | 
						|
                    del self._environ[k]
 | 
						|
            else:
 | 
						|
                self._environ[k] = v
 | 
						|
        os.environ = self._environ
 | 
						|
 | 
						|
 | 
						|
class DirsOnSysPath(object):
 | 
						|
    """Context manager to temporarily add directories to sys.path.
 | 
						|
 | 
						|
    This makes a copy of sys.path, appends any directories given
 | 
						|
    as positional arguments, then reverts sys.path to the copied
 | 
						|
    settings when the context ends.
 | 
						|
 | 
						|
    Note that *all* sys.path modifications in the body of the
 | 
						|
    context manager, including replacement of the object,
 | 
						|
    will be reverted at the end of the block.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, *paths):
 | 
						|
        self.original_value = sys.path[:]
 | 
						|
        self.original_object = sys.path
 | 
						|
        sys.path.extend(paths)
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        sys.path = self.original_object
 | 
						|
        sys.path[:] = self.original_value
 | 
						|
 | 
						|
 | 
						|
class TransientResource(object):
 | 
						|
 | 
						|
    """Raise ResourceDenied if an exception is raised while the context manager
 | 
						|
    is in effect that matches the specified exception and attributes."""
 | 
						|
 | 
						|
    def __init__(self, exc, **kwargs):
 | 
						|
        self.exc = exc
 | 
						|
        self.attrs = kwargs
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, type_=None, value=None, traceback=None):
 | 
						|
        """If type_ is a subclass of self.exc and value has attributes matching
 | 
						|
        self.attrs, raise ResourceDenied.  Otherwise let the exception
 | 
						|
        propagate (if any)."""
 | 
						|
        if type_ is not None and issubclass(self.exc, type_):
 | 
						|
            for attr, attr_value in self.attrs.items():
 | 
						|
                if not hasattr(value, attr):
 | 
						|
                    break
 | 
						|
                if getattr(value, attr) != attr_value:
 | 
						|
                    break
 | 
						|
            else:
 | 
						|
                raise ResourceDenied("an optional resource is not available")
 | 
						|
 | 
						|
# Context managers that raise ResourceDenied when various issues
 | 
						|
# with the Internet connection manifest themselves as exceptions.
 | 
						|
# XXX deprecate these and use transient_internet() instead
 | 
						|
time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
 | 
						|
socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
 | 
						|
ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def transient_internet(resource_name, *, timeout=30.0, errnos=()):
 | 
						|
    """Return a context manager that raises ResourceDenied when various issues
 | 
						|
    with the Internet connection manifest themselves as exceptions."""
 | 
						|
    default_errnos = [
 | 
						|
        ('ECONNREFUSED', 111),
 | 
						|
        ('ECONNRESET', 104),
 | 
						|
        ('EHOSTUNREACH', 113),
 | 
						|
        ('ENETUNREACH', 101),
 | 
						|
        ('ETIMEDOUT', 110),
 | 
						|
    ]
 | 
						|
    default_gai_errnos = [
 | 
						|
        ('EAI_AGAIN', -3),
 | 
						|
        ('EAI_FAIL', -4),
 | 
						|
        ('EAI_NONAME', -2),
 | 
						|
        ('EAI_NODATA', -5),
 | 
						|
        # Encountered when trying to resolve IPv6-only hostnames
 | 
						|
        ('WSANO_DATA', 11004),
 | 
						|
    ]
 | 
						|
 | 
						|
    denied = ResourceDenied("Resource %r is not available" % resource_name)
 | 
						|
    captured_errnos = errnos
 | 
						|
    gai_errnos = []
 | 
						|
    if not captured_errnos:
 | 
						|
        captured_errnos = [getattr(errno, name, num)
 | 
						|
                           for (name, num) in default_errnos]
 | 
						|
        gai_errnos = [getattr(socket, name, num)
 | 
						|
                      for (name, num) in default_gai_errnos]
 | 
						|
 | 
						|
    def filter_error(err):
 | 
						|
        n = getattr(err, 'errno', None)
 | 
						|
        if (isinstance(err, socket.timeout) or
 | 
						|
            (isinstance(err, socket.gaierror) and n in gai_errnos) or
 | 
						|
            n in captured_errnos):
 | 
						|
            if not verbose:
 | 
						|
                sys.stderr.write(denied.args[0] + "\n")
 | 
						|
            raise denied from err
 | 
						|
 | 
						|
    old_timeout = socket.getdefaulttimeout()
 | 
						|
    try:
 | 
						|
        if timeout is not None:
 | 
						|
            socket.setdefaulttimeout(timeout)
 | 
						|
        yield
 | 
						|
    except OSError as err:
 | 
						|
        # urllib can wrap original socket errors multiple times (!), we must
 | 
						|
        # unwrap to get at the original error.
 | 
						|
        while True:
 | 
						|
            a = err.args
 | 
						|
            if len(a) >= 1 and isinstance(a[0], OSError):
 | 
						|
                err = a[0]
 | 
						|
            # The error can also be wrapped as args[1]:
 | 
						|
            #    except socket.error as msg:
 | 
						|
            #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
 | 
						|
            elif len(a) >= 2 and isinstance(a[1], OSError):
 | 
						|
                err = a[1]
 | 
						|
            else:
 | 
						|
                break
 | 
						|
        filter_error(err)
 | 
						|
        raise
 | 
						|
    # XXX should we catch generic exceptions and look for their
 | 
						|
    # __cause__ or __context__?
 | 
						|
    finally:
 | 
						|
        socket.setdefaulttimeout(old_timeout)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def captured_output(stream_name):
 | 
						|
    """Return a context manager used by captured_stdout/stdin/stderr
 | 
						|
    that temporarily replaces the sys stream *stream_name* with a StringIO."""
 | 
						|
    import io
 | 
						|
    orig_stdout = getattr(sys, stream_name)
 | 
						|
    setattr(sys, stream_name, io.StringIO())
 | 
						|
    try:
 | 
						|
        yield getattr(sys, stream_name)
 | 
						|
    finally:
 | 
						|
        setattr(sys, stream_name, orig_stdout)
 | 
						|
 | 
						|
def captured_stdout():
 | 
						|
    """Capture the output of sys.stdout:
 | 
						|
 | 
						|
       with captured_stdout() as stdout:
 | 
						|
           print("hello")
 | 
						|
       self.assertEqual(stdout.getvalue(), "hello\n")
 | 
						|
    """
 | 
						|
    return captured_output("stdout")
 | 
						|
 | 
						|
def captured_stderr():
 | 
						|
    """Capture the output of sys.stderr:
 | 
						|
 | 
						|
       with captured_stderr() as stderr:
 | 
						|
           print("hello", file=sys.stderr)
 | 
						|
       self.assertEqual(stderr.getvalue(), "hello\n")
 | 
						|
    """
 | 
						|
    return captured_output("stderr")
 | 
						|
 | 
						|
def captured_stdin():
 | 
						|
    """Capture the input to sys.stdin:
 | 
						|
 | 
						|
       with captured_stdin() as stdin:
 | 
						|
           stdin.write('hello\n')
 | 
						|
           stdin.seek(0)
 | 
						|
           # call test code that consumes from sys.stdin
 | 
						|
           captured = input()
 | 
						|
       self.assertEqual(captured, "hello")
 | 
						|
    """
 | 
						|
    return captured_output("stdin")
 | 
						|
 | 
						|
 | 
						|
def gc_collect():
 | 
						|
    """Force as many objects as possible to be collected.
 | 
						|
 | 
						|
    In non-CPython implementations of Python, this is needed because timely
 | 
						|
    deallocation is not guaranteed by the garbage collector.  (Even in CPython
 | 
						|
    this can be the case in case of reference cycles.)  This means that __del__
 | 
						|
    methods may be called later than expected and weakrefs may remain alive for
 | 
						|
    longer than expected.  This function tries its best to force all garbage
 | 
						|
    objects to disappear.
 | 
						|
    """
 | 
						|
    gc.collect()
 | 
						|
    if is_jython:
 | 
						|
        time.sleep(0.1)
 | 
						|
    gc.collect()
 | 
						|
    gc.collect()
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def disable_gc():
 | 
						|
    have_gc = gc.isenabled()
 | 
						|
    gc.disable()
 | 
						|
    try:
 | 
						|
        yield
 | 
						|
    finally:
 | 
						|
        if have_gc:
 | 
						|
            gc.enable()
 | 
						|
 | 
						|
 | 
						|
def python_is_optimized():
 | 
						|
    """Find if Python was built with optimizations."""
 | 
						|
    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
 | 
						|
    final_opt = ""
 | 
						|
    for opt in cflags.split():
 | 
						|
        if opt.startswith('-O'):
 | 
						|
            final_opt = opt
 | 
						|
    return final_opt != '' and final_opt != '-O0'
 | 
						|
 | 
						|
 | 
						|
_header = 'nP'
 | 
						|
_align = '0n'
 | 
						|
if hasattr(sys, "gettotalrefcount"):
 | 
						|
    _header = '2P' + _header
 | 
						|
    _align = '0P'
 | 
						|
_vheader = _header + 'n'
 | 
						|
 | 
						|
def calcobjsize(fmt):
 | 
						|
    return struct.calcsize(_header + fmt + _align)
 | 
						|
 | 
						|
def calcvobjsize(fmt):
 | 
						|
    return struct.calcsize(_vheader + fmt + _align)
 | 
						|
 | 
						|
 | 
						|
_TPFLAGS_HAVE_GC = 1<<14
 | 
						|
_TPFLAGS_HEAPTYPE = 1<<9
 | 
						|
 | 
						|
def check_sizeof(test, o, size):
 | 
						|
    import _testcapi
 | 
						|
    result = sys.getsizeof(o)
 | 
						|
    # add GC header size
 | 
						|
    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
 | 
						|
        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
 | 
						|
        size += _testcapi.SIZEOF_PYGC_HEAD
 | 
						|
    msg = 'wrong size for %s: got %d, expected %d' \
 | 
						|
            % (type(o), result, size)
 | 
						|
    test.assertEqual(result, size, msg)
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Decorator for running a function in a different locale, correctly resetting
 | 
						|
# it afterwards.
 | 
						|
 | 
						|
def run_with_locale(catstr, *locales):
 | 
						|
    def decorator(func):
 | 
						|
        def inner(*args, **kwds):
 | 
						|
            try:
 | 
						|
                import locale
 | 
						|
                category = getattr(locale, catstr)
 | 
						|
                orig_locale = locale.setlocale(category)
 | 
						|
            except AttributeError:
 | 
						|
                # if the test author gives us an invalid category string
 | 
						|
                raise
 | 
						|
            except:
 | 
						|
                # cannot retrieve original locale, so do nothing
 | 
						|
                locale = orig_locale = None
 | 
						|
            else:
 | 
						|
                for loc in locales:
 | 
						|
                    try:
 | 
						|
                        locale.setlocale(category, loc)
 | 
						|
                        break
 | 
						|
                    except:
 | 
						|
                        pass
 | 
						|
 | 
						|
            # now run the function, resetting the locale on exceptions
 | 
						|
            try:
 | 
						|
                return func(*args, **kwds)
 | 
						|
            finally:
 | 
						|
                if locale and orig_locale:
 | 
						|
                    locale.setlocale(category, orig_locale)
 | 
						|
        inner.__name__ = func.__name__
 | 
						|
        inner.__doc__ = func.__doc__
 | 
						|
        return inner
 | 
						|
    return decorator
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Decorator for running a function in a specific timezone, correctly
 | 
						|
# resetting it afterwards.
 | 
						|
 | 
						|
def run_with_tz(tz):
 | 
						|
    def decorator(func):
 | 
						|
        def inner(*args, **kwds):
 | 
						|
            try:
 | 
						|
                tzset = time.tzset
 | 
						|
            except AttributeError:
 | 
						|
                raise unittest.SkipTest("tzset required")
 | 
						|
            if 'TZ' in os.environ:
 | 
						|
                orig_tz = os.environ['TZ']
 | 
						|
            else:
 | 
						|
                orig_tz = None
 | 
						|
            os.environ['TZ'] = tz
 | 
						|
            tzset()
 | 
						|
 | 
						|
            # now run the function, resetting the tz on exceptions
 | 
						|
            try:
 | 
						|
                return func(*args, **kwds)
 | 
						|
            finally:
 | 
						|
                if orig_tz is None:
 | 
						|
                    del os.environ['TZ']
 | 
						|
                else:
 | 
						|
                    os.environ['TZ'] = orig_tz
 | 
						|
                time.tzset()
 | 
						|
 | 
						|
        inner.__name__ = func.__name__
 | 
						|
        inner.__doc__ = func.__doc__
 | 
						|
        return inner
 | 
						|
    return decorator
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Big-memory-test support. Separate from 'resources' because memory use
 | 
						|
# should be configurable.
 | 
						|
 | 
						|
# Some handy shorthands. Note that these are used for byte-limits as well
 | 
						|
# as size-limits, in the various bigmem tests
 | 
						|
_1M = 1024*1024
 | 
						|
_1G = 1024 * _1M
 | 
						|
_2G = 2 * _1G
 | 
						|
_4G = 4 * _1G
 | 
						|
 | 
						|
MAX_Py_ssize_t = sys.maxsize
 | 
						|
 | 
						|
def set_memlimit(limit):
 | 
						|
    global max_memuse
 | 
						|
    global real_max_memuse
 | 
						|
    sizes = {
 | 
						|
        'k': 1024,
 | 
						|
        'm': _1M,
 | 
						|
        'g': _1G,
 | 
						|
        't': 1024*_1G,
 | 
						|
    }
 | 
						|
    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
 | 
						|
                 re.IGNORECASE | re.VERBOSE)
 | 
						|
    if m is None:
 | 
						|
        raise ValueError('Invalid memory limit %r' % (limit,))
 | 
						|
    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
 | 
						|
    real_max_memuse = memlimit
 | 
						|
    if memlimit > MAX_Py_ssize_t:
 | 
						|
        memlimit = MAX_Py_ssize_t
 | 
						|
    if memlimit < _2G - 1:
 | 
						|
        raise ValueError('Memory limit %r too low to be useful' % (limit,))
 | 
						|
    max_memuse = memlimit
 | 
						|
 | 
						|
class _MemoryWatchdog:
 | 
						|
    """An object which periodically watches the process' memory consumption
 | 
						|
    and prints it out.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
 | 
						|
        self.started = False
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        try:
 | 
						|
            f = open(self.procfile, 'r')
 | 
						|
        except OSError as e:
 | 
						|
            warnings.warn('/proc not available for stats: {}'.format(e),
 | 
						|
                          RuntimeWarning)
 | 
						|
            sys.stderr.flush()
 | 
						|
            return
 | 
						|
 | 
						|
        watchdog_script = findfile("memory_watchdog.py")
 | 
						|
        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
 | 
						|
                                             stdin=f, stderr=subprocess.DEVNULL)
 | 
						|
        f.close()
 | 
						|
        self.started = True
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        if self.started:
 | 
						|
            self.mem_watchdog.terminate()
 | 
						|
            self.mem_watchdog.wait()
 | 
						|
 | 
						|
 | 
						|
def bigmemtest(size, memuse, dry_run=True):
 | 
						|
    """Decorator for bigmem tests.
 | 
						|
 | 
						|
    'minsize' is the minimum useful size for the test (in arbitrary,
 | 
						|
    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
 | 
						|
    the test, or a good estimate of it.
 | 
						|
 | 
						|
    if 'dry_run' is False, it means the test doesn't support dummy runs
 | 
						|
    when -M is not specified.
 | 
						|
    """
 | 
						|
    def decorator(f):
 | 
						|
        def wrapper(self):
 | 
						|
            size = wrapper.size
 | 
						|
            memuse = wrapper.memuse
 | 
						|
            if not real_max_memuse:
 | 
						|
                maxsize = 5147
 | 
						|
            else:
 | 
						|
                maxsize = size
 | 
						|
 | 
						|
            if ((real_max_memuse or not dry_run)
 | 
						|
                and real_max_memuse < maxsize * memuse):
 | 
						|
                raise unittest.SkipTest(
 | 
						|
                    "not enough memory: %.1fG minimum needed"
 | 
						|
                    % (size * memuse / (1024 ** 3)))
 | 
						|
 | 
						|
            if real_max_memuse and verbose:
 | 
						|
                print()
 | 
						|
                print(" ... expected peak memory use: {peak:.1f}G"
 | 
						|
                      .format(peak=size * memuse / (1024 ** 3)))
 | 
						|
                watchdog = _MemoryWatchdog()
 | 
						|
                watchdog.start()
 | 
						|
            else:
 | 
						|
                watchdog = None
 | 
						|
 | 
						|
            try:
 | 
						|
                return f(self, maxsize)
 | 
						|
            finally:
 | 
						|
                if watchdog:
 | 
						|
                    watchdog.stop()
 | 
						|
 | 
						|
        wrapper.size = size
 | 
						|
        wrapper.memuse = memuse
 | 
						|
        return wrapper
 | 
						|
    return decorator
 | 
						|
 | 
						|
def bigaddrspacetest(f):
 | 
						|
    """Decorator for tests that fill the address space."""
 | 
						|
    def wrapper(self):
 | 
						|
        if max_memuse < MAX_Py_ssize_t:
 | 
						|
            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
 | 
						|
                raise unittest.SkipTest(
 | 
						|
                    "not enough memory: try a 32-bit build instead")
 | 
						|
            else:
 | 
						|
                raise unittest.SkipTest(
 | 
						|
                    "not enough memory: %.1fG minimum needed"
 | 
						|
                    % (MAX_Py_ssize_t / (1024 ** 3)))
 | 
						|
        else:
 | 
						|
            return f(self)
 | 
						|
    return wrapper
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# unittest integration.
 | 
						|
 | 
						|
class BasicTestRunner:
 | 
						|
    def run(self, test):
 | 
						|
        result = unittest.TestResult()
 | 
						|
        test(result)
 | 
						|
        return result
 | 
						|
 | 
						|
def _id(obj):
 | 
						|
    return obj
 | 
						|
 | 
						|
def requires_resource(resource):
 | 
						|
    if resource == 'gui' and not _is_gui_available():
 | 
						|
        return unittest.skip(_is_gui_available.reason)
 | 
						|
    if is_resource_enabled(resource):
 | 
						|
        return _id
 | 
						|
    else:
 | 
						|
        return unittest.skip("resource {0!r} is not enabled".format(resource))
 | 
						|
 | 
						|
def cpython_only(test):
 | 
						|
    """
 | 
						|
    Decorator for tests only applicable on CPython.
 | 
						|
    """
 | 
						|
    return impl_detail(cpython=True)(test)
 | 
						|
 | 
						|
def impl_detail(msg=None, **guards):
 | 
						|
    if check_impl_detail(**guards):
 | 
						|
        return _id
 | 
						|
    if msg is None:
 | 
						|
        guardnames, default = _parse_guards(guards)
 | 
						|
        if default:
 | 
						|
            msg = "implementation detail not available on {0}"
 | 
						|
        else:
 | 
						|
            msg = "implementation detail specific to {0}"
 | 
						|
        guardnames = sorted(guardnames.keys())
 | 
						|
        msg = msg.format(' or '.join(guardnames))
 | 
						|
    return unittest.skip(msg)
 | 
						|
 | 
						|
def _parse_guards(guards):
 | 
						|
    # Returns a tuple ({platform_name: run_me}, default_value)
 | 
						|
    if not guards:
 | 
						|
        return ({'cpython': True}, False)
 | 
						|
    is_true = list(guards.values())[0]
 | 
						|
    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
 | 
						|
    return (guards, not is_true)
 | 
						|
 | 
						|
# Use the following check to guard CPython's implementation-specific tests --
 | 
						|
# or to run them only on the implementation(s) guarded by the arguments.
 | 
						|
def check_impl_detail(**guards):
 | 
						|
    """This function returns True or False depending on the host platform.
 | 
						|
       Examples:
 | 
						|
          if check_impl_detail():               # only on CPython (default)
 | 
						|
          if check_impl_detail(jython=True):    # only on Jython
 | 
						|
          if check_impl_detail(cpython=False):  # everywhere except on CPython
 | 
						|
    """
 | 
						|
    guards, default = _parse_guards(guards)
 | 
						|
    return guards.get(platform.python_implementation().lower(), default)
 | 
						|
 | 
						|
 | 
						|
def no_tracing(func):
 | 
						|
    """Decorator to temporarily turn off tracing for the duration of a test."""
 | 
						|
    if not hasattr(sys, 'gettrace'):
 | 
						|
        return func
 | 
						|
    else:
 | 
						|
        @functools.wraps(func)
 | 
						|
        def wrapper(*args, **kwargs):
 | 
						|
            original_trace = sys.gettrace()
 | 
						|
            try:
 | 
						|
                sys.settrace(None)
 | 
						|
                return func(*args, **kwargs)
 | 
						|
            finally:
 | 
						|
                sys.settrace(original_trace)
 | 
						|
        return wrapper
 | 
						|
 | 
						|
 | 
						|
def refcount_test(test):
 | 
						|
    """Decorator for tests which involve reference counting.
 | 
						|
 | 
						|
    To start, the decorator does not run the test if is not run by CPython.
 | 
						|
    After that, any trace function is unset during the test to prevent
 | 
						|
    unexpected refcounts caused by the trace function.
 | 
						|
 | 
						|
    """
 | 
						|
    return no_tracing(cpython_only(test))
 | 
						|
 | 
						|
 | 
						|
def _filter_suite(suite, pred):
 | 
						|
    """Recursively filter test cases in a suite based on a predicate."""
 | 
						|
    newtests = []
 | 
						|
    for test in suite._tests:
 | 
						|
        if isinstance(test, unittest.TestSuite):
 | 
						|
            _filter_suite(test, pred)
 | 
						|
            newtests.append(test)
 | 
						|
        else:
 | 
						|
            if pred(test):
 | 
						|
                newtests.append(test)
 | 
						|
    suite._tests = newtests
 | 
						|
 | 
						|
def _run_suite(suite):
 | 
						|
    """Run tests from a unittest.TestSuite-derived class."""
 | 
						|
    if verbose:
 | 
						|
        runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
 | 
						|
                                         failfast=failfast)
 | 
						|
    else:
 | 
						|
        runner = BasicTestRunner()
 | 
						|
 | 
						|
    result = runner.run(suite)
 | 
						|
    if not result.wasSuccessful():
 | 
						|
        if len(result.errors) == 1 and not result.failures:
 | 
						|
            err = result.errors[0][1]
 | 
						|
        elif len(result.failures) == 1 and not result.errors:
 | 
						|
            err = result.failures[0][1]
 | 
						|
        else:
 | 
						|
            err = "multiple errors occurred"
 | 
						|
            if not verbose: err += "; run in verbose mode for details"
 | 
						|
        raise TestFailed(err)
 | 
						|
 | 
						|
 | 
						|
def run_unittest(*classes):
 | 
						|
    """Run tests from unittest.TestCase-derived classes."""
 | 
						|
    valid_types = (unittest.TestSuite, unittest.TestCase)
 | 
						|
    suite = unittest.TestSuite()
 | 
						|
    for cls in classes:
 | 
						|
        if isinstance(cls, str):
 | 
						|
            if cls in sys.modules:
 | 
						|
                suite.addTest(unittest.findTestCases(sys.modules[cls]))
 | 
						|
            else:
 | 
						|
                raise ValueError("str arguments must be keys in sys.modules")
 | 
						|
        elif isinstance(cls, valid_types):
 | 
						|
            suite.addTest(cls)
 | 
						|
        else:
 | 
						|
            suite.addTest(unittest.makeSuite(cls))
 | 
						|
    def case_pred(test):
 | 
						|
        if match_tests is None:
 | 
						|
            return True
 | 
						|
        for name in test.id().split("."):
 | 
						|
            if fnmatch.fnmatchcase(name, match_tests):
 | 
						|
                return True
 | 
						|
        return False
 | 
						|
    _filter_suite(suite, case_pred)
 | 
						|
    _run_suite(suite)
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Check for the presence of docstrings.
 | 
						|
 | 
						|
# Rather than trying to enumerate all the cases where docstrings may be
 | 
						|
# disabled, we just check for that directly
 | 
						|
 | 
						|
def _check_docstrings():
 | 
						|
    """Just used to check if docstrings are enabled"""
 | 
						|
 | 
						|
MISSING_C_DOCSTRINGS = (check_impl_detail() and
 | 
						|
                        sys.platform != 'win32' and
 | 
						|
                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
 | 
						|
 | 
						|
HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
 | 
						|
                   not MISSING_C_DOCSTRINGS)
 | 
						|
 | 
						|
requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
 | 
						|
                                          "test requires docstrings")
 | 
						|
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# doctest driver.
 | 
						|
 | 
						|
def run_doctest(module, verbosity=None, optionflags=0):
 | 
						|
    """Run doctest on the given module.  Return (#failures, #tests).
 | 
						|
 | 
						|
    If optional argument verbosity is not specified (or is None), pass
 | 
						|
    support's belief about verbosity on to doctest.  Else doctest's
 | 
						|
    usual behavior is used (it searches sys.argv for -v).
 | 
						|
    """
 | 
						|
 | 
						|
    import doctest
 | 
						|
 | 
						|
    if verbosity is None:
 | 
						|
        verbosity = verbose
 | 
						|
    else:
 | 
						|
        verbosity = None
 | 
						|
 | 
						|
    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
 | 
						|
    if f:
 | 
						|
        raise TestFailed("%d of %d doctests failed" % (f, t))
 | 
						|
    if verbose:
 | 
						|
        print('doctest (%s) ... %d tests with zero failures' %
 | 
						|
              (module.__name__, t))
 | 
						|
    return f, t
 | 
						|
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Support for saving and restoring the imported modules.
 | 
						|
 | 
						|
def modules_setup():
 | 
						|
    return sys.modules.copy(),
 | 
						|
 | 
						|
def modules_cleanup(oldmodules):
 | 
						|
    # Encoders/decoders are registered permanently within the internal
 | 
						|
    # codec cache. If we destroy the corresponding modules their
 | 
						|
    # globals will be set to None which will trip up the cached functions.
 | 
						|
    encodings = [(k, v) for k, v in sys.modules.items()
 | 
						|
                 if k.startswith('encodings.')]
 | 
						|
    sys.modules.clear()
 | 
						|
    sys.modules.update(encodings)
 | 
						|
    # XXX: This kind of problem can affect more than just encodings. In particular
 | 
						|
    # extension modules (such as _ssl) don't cope with reloading properly.
 | 
						|
    # Really, test modules should be cleaning out the test specific modules they
 | 
						|
    # know they added (ala test_runpy) rather than relying on this function (as
 | 
						|
    # test_importhooks and test_pkg do currently).
 | 
						|
    # Implicitly imported *real* modules should be left alone (see issue 10556).
 | 
						|
    sys.modules.update(oldmodules)
 | 
						|
 | 
						|
#=======================================================================
 | 
						|
# Threading support to prevent reporting refleaks when running regrtest.py -R
 | 
						|
 | 
						|
# NOTE: we use thread._count() rather than threading.enumerate() (or the
 | 
						|
# moral equivalent thereof) because a threading.Thread object is still alive
 | 
						|
# until its __bootstrap() method has returned, even after it has been
 | 
						|
# unregistered from the threading module.
 | 
						|
# thread._count(), on the other hand, only gets decremented *after* the
 | 
						|
# __bootstrap() method has returned, which gives us reliable reference counts
 | 
						|
# at the end of a test run.
 | 
						|
 | 
						|
def threading_setup():
 | 
						|
    if _thread:
 | 
						|
        return _thread._count(), threading._dangling.copy()
 | 
						|
    else:
 | 
						|
        return 1, ()
 | 
						|
 | 
						|
def threading_cleanup(*original_values):
 | 
						|
    if not _thread:
 | 
						|
        return
 | 
						|
    _MAX_COUNT = 100
 | 
						|
    for count in range(_MAX_COUNT):
 | 
						|
        values = _thread._count(), threading._dangling
 | 
						|
        if values == original_values:
 | 
						|
            break
 | 
						|
        time.sleep(0.01)
 | 
						|
        gc_collect()
 | 
						|
    # XXX print a warning in case of failure?
 | 
						|
 | 
						|
def reap_threads(func):
 | 
						|
    """Use this function when threads are being used.  This will
 | 
						|
    ensure that the threads are cleaned up even when the test fails.
 | 
						|
    If threading is unavailable this function does nothing.
 | 
						|
    """
 | 
						|
    if not _thread:
 | 
						|
        return func
 | 
						|
 | 
						|
    @functools.wraps(func)
 | 
						|
    def decorator(*args):
 | 
						|
        key = threading_setup()
 | 
						|
        try:
 | 
						|
            return func(*args)
 | 
						|
        finally:
 | 
						|
            threading_cleanup(*key)
 | 
						|
    return decorator
 | 
						|
 | 
						|
def reap_children():
 | 
						|
    """Use this function at the end of test_main() whenever sub-processes
 | 
						|
    are started.  This will help ensure that no extra children (zombies)
 | 
						|
    stick around to hog resources and create problems when looking
 | 
						|
    for refleaks.
 | 
						|
    """
 | 
						|
 | 
						|
    # Reap all our dead child processes so we don't leave zombies around.
 | 
						|
    # These hog resources and might be causing some of the buildbots to die.
 | 
						|
    if hasattr(os, 'waitpid'):
 | 
						|
        any_process = -1
 | 
						|
        while True:
 | 
						|
            try:
 | 
						|
                # This will raise an exception on Windows.  That's ok.
 | 
						|
                pid, status = os.waitpid(any_process, os.WNOHANG)
 | 
						|
                if pid == 0:
 | 
						|
                    break
 | 
						|
            except:
 | 
						|
                break
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def swap_attr(obj, attr, new_val):
 | 
						|
    """Temporary swap out an attribute with a new object.
 | 
						|
 | 
						|
    Usage:
 | 
						|
        with swap_attr(obj, "attr", 5):
 | 
						|
            ...
 | 
						|
 | 
						|
        This will set obj.attr to 5 for the duration of the with: block,
 | 
						|
        restoring the old value at the end of the block. If `attr` doesn't
 | 
						|
        exist on `obj`, it will be created and then deleted at the end of the
 | 
						|
        block.
 | 
						|
    """
 | 
						|
    if hasattr(obj, attr):
 | 
						|
        real_val = getattr(obj, attr)
 | 
						|
        setattr(obj, attr, new_val)
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            setattr(obj, attr, real_val)
 | 
						|
    else:
 | 
						|
        setattr(obj, attr, new_val)
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            delattr(obj, attr)
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def swap_item(obj, item, new_val):
 | 
						|
    """Temporary swap out an item with a new object.
 | 
						|
 | 
						|
    Usage:
 | 
						|
        with swap_item(obj, "item", 5):
 | 
						|
            ...
 | 
						|
 | 
						|
        This will set obj["item"] to 5 for the duration of the with: block,
 | 
						|
        restoring the old value at the end of the block. If `item` doesn't
 | 
						|
        exist on `obj`, it will be created and then deleted at the end of the
 | 
						|
        block.
 | 
						|
    """
 | 
						|
    if item in obj:
 | 
						|
        real_val = obj[item]
 | 
						|
        obj[item] = new_val
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            obj[item] = real_val
 | 
						|
    else:
 | 
						|
        obj[item] = new_val
 | 
						|
        try:
 | 
						|
            yield
 | 
						|
        finally:
 | 
						|
            del obj[item]
 | 
						|
 | 
						|
def strip_python_stderr(stderr):
 | 
						|
    """Strip the stderr of a Python process from potential debug output
 | 
						|
    emitted by the interpreter.
 | 
						|
 | 
						|
    This will typically be run on the result of the communicate() method
 | 
						|
    of a subprocess.Popen object.
 | 
						|
    """
 | 
						|
    stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
 | 
						|
    return stderr
 | 
						|
 | 
						|
def args_from_interpreter_flags():
 | 
						|
    """Return a list of command-line arguments reproducing the current
 | 
						|
    settings in sys.flags and sys.warnoptions."""
 | 
						|
    return subprocess._args_from_interpreter_flags()
 | 
						|
 | 
						|
#============================================================
 | 
						|
# Support for assertions about logging.
 | 
						|
#============================================================
 | 
						|
 | 
						|
class TestHandler(logging.handlers.BufferingHandler):
 | 
						|
    def __init__(self, matcher):
 | 
						|
        # BufferingHandler takes a "capacity" argument
 | 
						|
        # so as to know when to flush. As we're overriding
 | 
						|
        # shouldFlush anyway, we can set a capacity of zero.
 | 
						|
        # You can call flush() manually to clear out the
 | 
						|
        # buffer.
 | 
						|
        logging.handlers.BufferingHandler.__init__(self, 0)
 | 
						|
        self.matcher = matcher
 | 
						|
 | 
						|
    def shouldFlush(self):
 | 
						|
        return False
 | 
						|
 | 
						|
    def emit(self, record):
 | 
						|
        self.format(record)
 | 
						|
        self.buffer.append(record.__dict__)
 | 
						|
 | 
						|
    def matches(self, **kwargs):
 | 
						|
        """
 | 
						|
        Look for a saved dict whose keys/values match the supplied arguments.
 | 
						|
        """
 | 
						|
        result = False
 | 
						|
        for d in self.buffer:
 | 
						|
            if self.matcher.matches(d, **kwargs):
 | 
						|
                result = True
 | 
						|
                break
 | 
						|
        return result
 | 
						|
 | 
						|
class Matcher(object):
 | 
						|
 | 
						|
    _partial_matches = ('msg', 'message')
 | 
						|
 | 
						|
    def matches(self, d, **kwargs):
 | 
						|
        """
 | 
						|
        Try to match a single dict with the supplied arguments.
 | 
						|
 | 
						|
        Keys whose values are strings and which are in self._partial_matches
 | 
						|
        will be checked for partial (i.e. substring) matches. You can extend
 | 
						|
        this scheme to (for example) do regular expression matching, etc.
 | 
						|
        """
 | 
						|
        result = True
 | 
						|
        for k in kwargs:
 | 
						|
            v = kwargs[k]
 | 
						|
            dv = d.get(k)
 | 
						|
            if not self.match_value(k, dv, v):
 | 
						|
                result = False
 | 
						|
                break
 | 
						|
        return result
 | 
						|
 | 
						|
    def match_value(self, k, dv, v):
 | 
						|
        """
 | 
						|
        Try to match a single stored value (dv) with a supplied value (v).
 | 
						|
        """
 | 
						|
        if type(v) != type(dv):
 | 
						|
            result = False
 | 
						|
        elif type(dv) is not str or k not in self._partial_matches:
 | 
						|
            result = (v == dv)
 | 
						|
        else:
 | 
						|
            result = dv.find(v) >= 0
 | 
						|
        return result
 | 
						|
 | 
						|
 | 
						|
_can_symlink = None
 | 
						|
def can_symlink():
 | 
						|
    global _can_symlink
 | 
						|
    if _can_symlink is not None:
 | 
						|
        return _can_symlink
 | 
						|
    symlink_path = TESTFN + "can_symlink"
 | 
						|
    try:
 | 
						|
        os.symlink(TESTFN, symlink_path)
 | 
						|
        can = True
 | 
						|
    except (OSError, NotImplementedError, AttributeError):
 | 
						|
        can = False
 | 
						|
    else:
 | 
						|
        os.remove(symlink_path)
 | 
						|
    _can_symlink = can
 | 
						|
    return can
 | 
						|
 | 
						|
def skip_unless_symlink(test):
 | 
						|
    """Skip decorator for tests that require functional symlink"""
 | 
						|
    ok = can_symlink()
 | 
						|
    msg = "Requires functional symlink implementation"
 | 
						|
    return test if ok else unittest.skip(msg)(test)
 | 
						|
 | 
						|
_can_xattr = None
 | 
						|
def can_xattr():
 | 
						|
    global _can_xattr
 | 
						|
    if _can_xattr is not None:
 | 
						|
        return _can_xattr
 | 
						|
    if not hasattr(os, "setxattr"):
 | 
						|
        can = False
 | 
						|
    else:
 | 
						|
        tmp_fp, tmp_name = tempfile.mkstemp()
 | 
						|
        try:
 | 
						|
            with open(TESTFN, "wb") as fp:
 | 
						|
                try:
 | 
						|
                    # TESTFN & tempfile may use different file systems with
 | 
						|
                    # different capabilities
 | 
						|
                    os.setxattr(tmp_fp, b"user.test", b"")
 | 
						|
                    os.setxattr(fp.fileno(), b"user.test", b"")
 | 
						|
                    # Kernels < 2.6.39 don't respect setxattr flags.
 | 
						|
                    kernel_version = platform.release()
 | 
						|
                    m = re.match("2.6.(\d{1,2})", kernel_version)
 | 
						|
                    can = m is None or int(m.group(1)) >= 39
 | 
						|
                except OSError:
 | 
						|
                    can = False
 | 
						|
        finally:
 | 
						|
            unlink(TESTFN)
 | 
						|
            unlink(tmp_name)
 | 
						|
    _can_xattr = can
 | 
						|
    return can
 | 
						|
 | 
						|
def skip_unless_xattr(test):
 | 
						|
    """Skip decorator for tests that require functional extended attributes"""
 | 
						|
    ok = can_xattr()
 | 
						|
    msg = "no non-broken extended attribute support"
 | 
						|
    return test if ok else unittest.skip(msg)(test)
 | 
						|
 | 
						|
 | 
						|
def fs_is_case_insensitive(directory):
 | 
						|
    """Detects if the file system for the specified directory is case-insensitive."""
 | 
						|
    base_fp, base_path = tempfile.mkstemp(dir=directory)
 | 
						|
    case_path = base_path.upper()
 | 
						|
    if case_path == base_path:
 | 
						|
        case_path = base_path.lower()
 | 
						|
    try:
 | 
						|
        return os.path.samefile(base_path, case_path)
 | 
						|
    except FileNotFoundError:
 | 
						|
        return False
 | 
						|
    finally:
 | 
						|
        os.unlink(base_path)
 | 
						|
 | 
						|
 | 
						|
class SuppressCrashReport:
 | 
						|
    """Try to prevent a crash report from popping up.
 | 
						|
 | 
						|
    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
 | 
						|
    disable the creation of coredump file.
 | 
						|
    """
 | 
						|
    old_value = None
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        """On Windows, disable Windows Error Reporting dialogs using
 | 
						|
        SetErrorMode.
 | 
						|
 | 
						|
        On UNIX, try to save the previous core file size limit, then set
 | 
						|
        soft limit to 0.
 | 
						|
        """
 | 
						|
        if sys.platform.startswith('win'):
 | 
						|
            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
 | 
						|
            # GetErrorMode is not available on Windows XP and Windows Server 2003,
 | 
						|
            # but SetErrorMode returns the previous value, so we can use that
 | 
						|
            import ctypes
 | 
						|
            self._k32 = ctypes.windll.kernel32
 | 
						|
            SEM_NOGPFAULTERRORBOX = 0x02
 | 
						|
            self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
 | 
						|
            self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
 | 
						|
        else:
 | 
						|
            if resource is not None:
 | 
						|
                try:
 | 
						|
                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
 | 
						|
                    resource.setrlimit(resource.RLIMIT_CORE,
 | 
						|
                                       (0, self.old_value[1]))
 | 
						|
                except (ValueError, OSError):
 | 
						|
                    pass
 | 
						|
            if sys.platform == 'darwin':
 | 
						|
                # Check if the 'Crash Reporter' on OSX was configured
 | 
						|
                # in 'Developer' mode and warn that it will get triggered
 | 
						|
                # when it is.
 | 
						|
                #
 | 
						|
                # This assumes that this context manager is used in tests
 | 
						|
                # that might trigger the next manager.
 | 
						|
                value = subprocess.Popen(['/usr/bin/defaults', 'read',
 | 
						|
                        'com.apple.CrashReporter', 'DialogType'],
 | 
						|
                        stdout=subprocess.PIPE).communicate()[0]
 | 
						|
                if value.strip() == b'developer':
 | 
						|
                    print("this test triggers the Crash Reporter, "
 | 
						|
                          "that is intentional", end='', flush=True)
 | 
						|
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *ignore_exc):
 | 
						|
        """Restore Windows ErrorMode or core file behavior to initial value."""
 | 
						|
        if self.old_value is None:
 | 
						|
            return
 | 
						|
 | 
						|
        if sys.platform.startswith('win'):
 | 
						|
            self._k32.SetErrorMode(self.old_value)
 | 
						|
        else:
 | 
						|
            if resource is not None:
 | 
						|
                try:
 | 
						|
                    resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
 | 
						|
                except (ValueError, OSError):
 | 
						|
                    pass
 | 
						|
 | 
						|
 | 
						|
def patch(test_instance, object_to_patch, attr_name, new_value):
 | 
						|
    """Override 'object_to_patch'.'attr_name' with 'new_value'.
 | 
						|
 | 
						|
    Also, add a cleanup procedure to 'test_instance' to restore
 | 
						|
    'object_to_patch' value for 'attr_name'.
 | 
						|
    The 'attr_name' should be a valid attribute for 'object_to_patch'.
 | 
						|
 | 
						|
    """
 | 
						|
    # check that 'attr_name' is a real attribute for 'object_to_patch'
 | 
						|
    # will raise AttributeError if it does not exist
 | 
						|
    getattr(object_to_patch, attr_name)
 | 
						|
 | 
						|
    # keep a copy of the old value
 | 
						|
    attr_is_local = False
 | 
						|
    try:
 | 
						|
        old_value = object_to_patch.__dict__[attr_name]
 | 
						|
    except (AttributeError, KeyError):
 | 
						|
        old_value = getattr(object_to_patch, attr_name, None)
 | 
						|
    else:
 | 
						|
        attr_is_local = True
 | 
						|
 | 
						|
    # restore the value when the test is done
 | 
						|
    def cleanup():
 | 
						|
        if attr_is_local:
 | 
						|
            setattr(object_to_patch, attr_name, old_value)
 | 
						|
        else:
 | 
						|
            delattr(object_to_patch, attr_name)
 | 
						|
 | 
						|
    test_instance.addCleanup(cleanup)
 | 
						|
 | 
						|
    # actually override the attribute
 | 
						|
    setattr(object_to_patch, attr_name, new_value)
 | 
						|
 | 
						|
 | 
						|
def run_in_subinterp(code):
 | 
						|
    """
 | 
						|
    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
 | 
						|
    module is enabled.
 | 
						|
    """
 | 
						|
    # Issue #10915, #15751: PyGILState_*() functions don't work with
 | 
						|
    # sub-interpreters, the tracemalloc module uses these functions internally
 | 
						|
    try:
 | 
						|
        import tracemalloc
 | 
						|
    except ImportError:
 | 
						|
        pass
 | 
						|
    else:
 | 
						|
        if tracemalloc.is_tracing():
 | 
						|
            raise unittest.SkipTest("run_in_subinterp() cannot be used "
 | 
						|
                                     "if tracemalloc module is tracing "
 | 
						|
                                     "memory allocations")
 | 
						|
    import _testcapi
 | 
						|
    return _testcapi.run_in_subinterp(code)
 |