mirror of
https://github.com/python/cpython.git
synced 2025-07-21 10:15:46 +00:00

Fix compileall.compile_dir() ddir= behavior on sub-packages. Fixes compileall.compile_dir's ddir parameter and compileall command line flag `-d` to no longer write the wrong pathname to the generated pyc file for submodules beneath the root of the directory tree being compiled. This fixes a regression introduced with Python 3.5. Also marks the _new_ in 3.9 from PR #16012 parameters to compile_dir as keyword only (as that is the only way they will be used) and fixes an omission of them in one place from the docs.
580 lines
18 KiB
Python
580 lines
18 KiB
Python
import abc
|
|
import builtins
|
|
import contextlib
|
|
import errno
|
|
import functools
|
|
import importlib
|
|
from importlib import machinery, util, invalidate_caches
|
|
from importlib.abc import ResourceReader
|
|
import io
|
|
import marshal
|
|
import os
|
|
import os.path
|
|
from pathlib import Path, PurePath
|
|
from test import support
|
|
import unittest
|
|
import sys
|
|
import tempfile
|
|
import types
|
|
|
|
from . import data01
|
|
from . import zipdata01
|
|
|
|
|
|
BUILTINS = types.SimpleNamespace()
|
|
BUILTINS.good_name = None
|
|
BUILTINS.bad_name = None
|
|
if 'errno' in sys.builtin_module_names:
|
|
BUILTINS.good_name = 'errno'
|
|
if 'importlib' not in sys.builtin_module_names:
|
|
BUILTINS.bad_name = 'importlib'
|
|
|
|
EXTENSIONS = types.SimpleNamespace()
|
|
EXTENSIONS.path = None
|
|
EXTENSIONS.ext = None
|
|
EXTENSIONS.filename = None
|
|
EXTENSIONS.file_path = None
|
|
EXTENSIONS.name = '_testcapi'
|
|
|
|
def _extension_details():
|
|
global EXTENSIONS
|
|
for path in sys.path:
|
|
for ext in machinery.EXTENSION_SUFFIXES:
|
|
filename = EXTENSIONS.name + ext
|
|
file_path = os.path.join(path, filename)
|
|
if os.path.exists(file_path):
|
|
EXTENSIONS.path = path
|
|
EXTENSIONS.ext = ext
|
|
EXTENSIONS.filename = filename
|
|
EXTENSIONS.file_path = file_path
|
|
return
|
|
|
|
_extension_details()
|
|
|
|
|
|
def import_importlib(module_name):
|
|
"""Import a module from importlib both w/ and w/o _frozen_importlib."""
|
|
fresh = ('importlib',) if '.' in module_name else ()
|
|
frozen = support.import_fresh_module(module_name)
|
|
source = support.import_fresh_module(module_name, fresh=fresh,
|
|
blocked=('_frozen_importlib', '_frozen_importlib_external'))
|
|
return {'Frozen': frozen, 'Source': source}
|
|
|
|
|
|
def specialize_class(cls, kind, base=None, **kwargs):
|
|
# XXX Support passing in submodule names--load (and cache) them?
|
|
# That would clean up the test modules a bit more.
|
|
if base is None:
|
|
base = unittest.TestCase
|
|
elif not isinstance(base, type):
|
|
base = base[kind]
|
|
name = '{}_{}'.format(kind, cls.__name__)
|
|
bases = (cls, base)
|
|
specialized = types.new_class(name, bases)
|
|
specialized.__module__ = cls.__module__
|
|
specialized._NAME = cls.__name__
|
|
specialized._KIND = kind
|
|
for attr, values in kwargs.items():
|
|
value = values[kind]
|
|
setattr(specialized, attr, value)
|
|
return specialized
|
|
|
|
|
|
def split_frozen(cls, base=None, **kwargs):
|
|
frozen = specialize_class(cls, 'Frozen', base, **kwargs)
|
|
source = specialize_class(cls, 'Source', base, **kwargs)
|
|
return frozen, source
|
|
|
|
|
|
def test_both(test_class, base=None, **kwargs):
|
|
return split_frozen(test_class, base, **kwargs)
|
|
|
|
|
|
CASE_INSENSITIVE_FS = True
|
|
# Windows is the only OS that is *always* case-insensitive
|
|
# (OS X *can* be case-sensitive).
|
|
if sys.platform not in ('win32', 'cygwin'):
|
|
changed_name = __file__.upper()
|
|
if changed_name == __file__:
|
|
changed_name = __file__.lower()
|
|
if not os.path.exists(changed_name):
|
|
CASE_INSENSITIVE_FS = False
|
|
|
|
source_importlib = import_importlib('importlib')['Source']
|
|
__import__ = {'Frozen': staticmethod(builtins.__import__),
|
|
'Source': staticmethod(source_importlib.__import__)}
|
|
|
|
|
|
def case_insensitive_tests(test):
|
|
"""Class decorator that nullifies tests requiring a case-insensitive
|
|
file system."""
|
|
return unittest.skipIf(not CASE_INSENSITIVE_FS,
|
|
"requires a case-insensitive filesystem")(test)
|
|
|
|
|
|
def submodule(parent, name, pkg_dir, content=''):
|
|
path = os.path.join(pkg_dir, name + '.py')
|
|
with open(path, 'w') as subfile:
|
|
subfile.write(content)
|
|
return '{}.{}'.format(parent, name), path
|
|
|
|
|
|
def get_code_from_pyc(pyc_path):
|
|
"""Reads a pyc file and returns the unmarshalled code object within.
|
|
|
|
No header validation is performed.
|
|
"""
|
|
with open(pyc_path, 'rb') as pyc_f:
|
|
pyc_f.seek(16)
|
|
return marshal.load(pyc_f)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def uncache(*names):
|
|
"""Uncache a module from sys.modules.
|
|
|
|
A basic sanity check is performed to prevent uncaching modules that either
|
|
cannot/shouldn't be uncached.
|
|
|
|
"""
|
|
for name in names:
|
|
if name in ('sys', 'marshal', 'imp'):
|
|
raise ValueError(
|
|
"cannot uncache {0}".format(name))
|
|
try:
|
|
del sys.modules[name]
|
|
except KeyError:
|
|
pass
|
|
try:
|
|
yield
|
|
finally:
|
|
for name in names:
|
|
try:
|
|
del sys.modules[name]
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temp_module(name, content='', *, pkg=False):
|
|
conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
|
|
with support.temp_cwd(None) as cwd:
|
|
with uncache(name, *conflicts):
|
|
with support.DirsOnSysPath(cwd):
|
|
invalidate_caches()
|
|
|
|
location = os.path.join(cwd, name)
|
|
if pkg:
|
|
modpath = os.path.join(location, '__init__.py')
|
|
os.mkdir(name)
|
|
else:
|
|
modpath = location + '.py'
|
|
if content is None:
|
|
# Make sure the module file gets created.
|
|
content = ''
|
|
if content is not None:
|
|
# not a namespace package
|
|
with open(modpath, 'w') as modfile:
|
|
modfile.write(content)
|
|
yield location
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def import_state(**kwargs):
|
|
"""Context manager to manage the various importers and stored state in the
|
|
sys module.
|
|
|
|
The 'modules' attribute is not supported as the interpreter state stores a
|
|
pointer to the dict that the interpreter uses internally;
|
|
reassigning to sys.modules does not have the desired effect.
|
|
|
|
"""
|
|
originals = {}
|
|
try:
|
|
for attr, default in (('meta_path', []), ('path', []),
|
|
('path_hooks', []),
|
|
('path_importer_cache', {})):
|
|
originals[attr] = getattr(sys, attr)
|
|
if attr in kwargs:
|
|
new_value = kwargs[attr]
|
|
del kwargs[attr]
|
|
else:
|
|
new_value = default
|
|
setattr(sys, attr, new_value)
|
|
if len(kwargs):
|
|
raise ValueError(
|
|
'unrecognized arguments: {0}'.format(kwargs.keys()))
|
|
yield
|
|
finally:
|
|
for attr, value in originals.items():
|
|
setattr(sys, attr, value)
|
|
|
|
|
|
class _ImporterMock:
|
|
|
|
"""Base class to help with creating importer mocks."""
|
|
|
|
def __init__(self, *names, module_code={}):
|
|
self.modules = {}
|
|
self.module_code = {}
|
|
for name in names:
|
|
if not name.endswith('.__init__'):
|
|
import_name = name
|
|
else:
|
|
import_name = name[:-len('.__init__')]
|
|
if '.' not in name:
|
|
package = None
|
|
elif import_name == name:
|
|
package = name.rsplit('.', 1)[0]
|
|
else:
|
|
package = import_name
|
|
module = types.ModuleType(import_name)
|
|
module.__loader__ = self
|
|
module.__file__ = '<mock __file__>'
|
|
module.__package__ = package
|
|
module.attr = name
|
|
if import_name != name:
|
|
module.__path__ = ['<mock __path__>']
|
|
self.modules[import_name] = module
|
|
if import_name in module_code:
|
|
self.module_code[import_name] = module_code[import_name]
|
|
|
|
def __getitem__(self, name):
|
|
return self.modules[name]
|
|
|
|
def __enter__(self):
|
|
self._uncache = uncache(*self.modules.keys())
|
|
self._uncache.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, *exc_info):
|
|
self._uncache.__exit__(None, None, None)
|
|
|
|
|
|
class mock_modules(_ImporterMock):
|
|
|
|
"""Importer mock using PEP 302 APIs."""
|
|
|
|
def find_module(self, fullname, path=None):
|
|
if fullname not in self.modules:
|
|
return None
|
|
else:
|
|
return self
|
|
|
|
def load_module(self, fullname):
|
|
if fullname not in self.modules:
|
|
raise ImportError
|
|
else:
|
|
sys.modules[fullname] = self.modules[fullname]
|
|
if fullname in self.module_code:
|
|
try:
|
|
self.module_code[fullname]()
|
|
except Exception:
|
|
del sys.modules[fullname]
|
|
raise
|
|
return self.modules[fullname]
|
|
|
|
|
|
class mock_spec(_ImporterMock):
|
|
|
|
"""Importer mock using PEP 451 APIs."""
|
|
|
|
def find_spec(self, fullname, path=None, parent=None):
|
|
try:
|
|
module = self.modules[fullname]
|
|
except KeyError:
|
|
return None
|
|
spec = util.spec_from_file_location(
|
|
fullname, module.__file__, loader=self,
|
|
submodule_search_locations=getattr(module, '__path__', None))
|
|
return spec
|
|
|
|
def create_module(self, spec):
|
|
if spec.name not in self.modules:
|
|
raise ImportError
|
|
return self.modules[spec.name]
|
|
|
|
def exec_module(self, module):
|
|
try:
|
|
self.module_code[module.__spec__.name]()
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
def writes_bytecode_files(fxn):
|
|
"""Decorator to protect sys.dont_write_bytecode from mutation and to skip
|
|
tests that require it to be set to False."""
|
|
if sys.dont_write_bytecode:
|
|
return lambda *args, **kwargs: None
|
|
@functools.wraps(fxn)
|
|
def wrapper(*args, **kwargs):
|
|
original = sys.dont_write_bytecode
|
|
sys.dont_write_bytecode = False
|
|
try:
|
|
to_return = fxn(*args, **kwargs)
|
|
finally:
|
|
sys.dont_write_bytecode = original
|
|
return to_return
|
|
return wrapper
|
|
|
|
|
|
def ensure_bytecode_path(bytecode_path):
|
|
"""Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
|
|
|
|
:param bytecode_path: File system path to PEP 3147 pyc file.
|
|
"""
|
|
try:
|
|
os.mkdir(os.path.dirname(bytecode_path))
|
|
except OSError as error:
|
|
if error.errno != errno.EEXIST:
|
|
raise
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def temporary_pycache_prefix(prefix):
|
|
"""Adjust and restore sys.pycache_prefix."""
|
|
_orig_prefix = sys.pycache_prefix
|
|
sys.pycache_prefix = prefix
|
|
try:
|
|
yield
|
|
finally:
|
|
sys.pycache_prefix = _orig_prefix
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def create_modules(*names):
|
|
"""Temporarily create each named module with an attribute (named 'attr')
|
|
that contains the name passed into the context manager that caused the
|
|
creation of the module.
|
|
|
|
All files are created in a temporary directory returned by
|
|
tempfile.mkdtemp(). This directory is inserted at the beginning of
|
|
sys.path. When the context manager exits all created files (source and
|
|
bytecode) are explicitly deleted.
|
|
|
|
No magic is performed when creating packages! This means that if you create
|
|
a module within a package you must also create the package's __init__ as
|
|
well.
|
|
|
|
"""
|
|
source = 'attr = {0!r}'
|
|
created_paths = []
|
|
mapping = {}
|
|
state_manager = None
|
|
uncache_manager = None
|
|
try:
|
|
temp_dir = tempfile.mkdtemp()
|
|
mapping['.root'] = temp_dir
|
|
import_names = set()
|
|
for name in names:
|
|
if not name.endswith('__init__'):
|
|
import_name = name
|
|
else:
|
|
import_name = name[:-len('.__init__')]
|
|
import_names.add(import_name)
|
|
if import_name in sys.modules:
|
|
del sys.modules[import_name]
|
|
name_parts = name.split('.')
|
|
file_path = temp_dir
|
|
for directory in name_parts[:-1]:
|
|
file_path = os.path.join(file_path, directory)
|
|
if not os.path.exists(file_path):
|
|
os.mkdir(file_path)
|
|
created_paths.append(file_path)
|
|
file_path = os.path.join(file_path, name_parts[-1] + '.py')
|
|
with open(file_path, 'w') as file:
|
|
file.write(source.format(name))
|
|
created_paths.append(file_path)
|
|
mapping[name] = file_path
|
|
uncache_manager = uncache(*import_names)
|
|
uncache_manager.__enter__()
|
|
state_manager = import_state(path=[temp_dir])
|
|
state_manager.__enter__()
|
|
yield mapping
|
|
finally:
|
|
if state_manager is not None:
|
|
state_manager.__exit__(None, None, None)
|
|
if uncache_manager is not None:
|
|
uncache_manager.__exit__(None, None, None)
|
|
support.rmtree(temp_dir)
|
|
|
|
|
|
def mock_path_hook(*entries, importer):
|
|
"""A mock sys.path_hooks entry."""
|
|
def hook(entry):
|
|
if entry not in entries:
|
|
raise ImportError
|
|
return importer
|
|
return hook
|
|
|
|
|
|
class CASEOKTestBase:
|
|
|
|
def caseok_env_changed(self, *, should_exist):
|
|
possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
|
|
if any(x in self.importlib._bootstrap_external._os.environ
|
|
for x in possibilities) != should_exist:
|
|
self.skipTest('os.environ changes not reflected in _os.environ')
|
|
|
|
|
|
def create_package(file, path, is_package=True, contents=()):
|
|
class Reader(ResourceReader):
|
|
def get_resource_reader(self, package):
|
|
return self
|
|
|
|
def open_resource(self, path):
|
|
self._path = path
|
|
if isinstance(file, Exception):
|
|
raise file
|
|
else:
|
|
return file
|
|
|
|
def resource_path(self, path_):
|
|
self._path = path_
|
|
if isinstance(path, Exception):
|
|
raise path
|
|
else:
|
|
return path
|
|
|
|
def is_resource(self, path_):
|
|
self._path = path_
|
|
if isinstance(path, Exception):
|
|
raise path
|
|
for entry in contents:
|
|
parts = entry.split('/')
|
|
if len(parts) == 1 and parts[0] == path_:
|
|
return True
|
|
return False
|
|
|
|
def contents(self):
|
|
if isinstance(path, Exception):
|
|
raise path
|
|
# There's no yield from in baseball, er, Python 2.
|
|
for entry in contents:
|
|
yield entry
|
|
|
|
name = 'testingpackage'
|
|
# Unfortunately importlib.util.module_from_spec() was not introduced until
|
|
# Python 3.5.
|
|
module = types.ModuleType(name)
|
|
loader = Reader()
|
|
spec = machinery.ModuleSpec(
|
|
name, loader,
|
|
origin='does-not-exist',
|
|
is_package=is_package)
|
|
module.__spec__ = spec
|
|
module.__loader__ = loader
|
|
return module
|
|
|
|
|
|
class CommonResourceTests(abc.ABC):
|
|
@abc.abstractmethod
|
|
def execute(self, package, path):
|
|
raise NotImplementedError
|
|
|
|
def test_package_name(self):
|
|
# Passing in the package name should succeed.
|
|
self.execute(data01.__name__, 'utf-8.file')
|
|
|
|
def test_package_object(self):
|
|
# Passing in the package itself should succeed.
|
|
self.execute(data01, 'utf-8.file')
|
|
|
|
def test_string_path(self):
|
|
# Passing in a string for the path should succeed.
|
|
path = 'utf-8.file'
|
|
self.execute(data01, path)
|
|
|
|
@unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
|
|
def test_pathlib_path(self):
|
|
# Passing in a pathlib.PurePath object for the path should succeed.
|
|
path = PurePath('utf-8.file')
|
|
self.execute(data01, path)
|
|
|
|
def test_absolute_path(self):
|
|
# An absolute path is a ValueError.
|
|
path = Path(__file__)
|
|
full_path = path.parent/'utf-8.file'
|
|
with self.assertRaises(ValueError):
|
|
self.execute(data01, full_path)
|
|
|
|
def test_relative_path(self):
|
|
# A relative path is a ValueError.
|
|
with self.assertRaises(ValueError):
|
|
self.execute(data01, '../data01/utf-8.file')
|
|
|
|
def test_importing_module_as_side_effect(self):
|
|
# The anchor package can already be imported.
|
|
del sys.modules[data01.__name__]
|
|
self.execute(data01.__name__, 'utf-8.file')
|
|
|
|
def test_non_package_by_name(self):
|
|
# The anchor package cannot be a module.
|
|
with self.assertRaises(TypeError):
|
|
self.execute(__name__, 'utf-8.file')
|
|
|
|
def test_non_package_by_package(self):
|
|
# The anchor package cannot be a module.
|
|
with self.assertRaises(TypeError):
|
|
module = sys.modules['test.test_importlib.util']
|
|
self.execute(module, 'utf-8.file')
|
|
|
|
@unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
|
|
def test_resource_opener(self):
|
|
bytes_data = io.BytesIO(b'Hello, world!')
|
|
package = create_package(file=bytes_data, path=FileNotFoundError())
|
|
self.execute(package, 'utf-8.file')
|
|
self.assertEqual(package.__loader__._path, 'utf-8.file')
|
|
|
|
@unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
|
|
def test_resource_path(self):
|
|
bytes_data = io.BytesIO(b'Hello, world!')
|
|
path = __file__
|
|
package = create_package(file=bytes_data, path=path)
|
|
self.execute(package, 'utf-8.file')
|
|
self.assertEqual(package.__loader__._path, 'utf-8.file')
|
|
|
|
def test_useless_loader(self):
|
|
package = create_package(file=FileNotFoundError(),
|
|
path=FileNotFoundError())
|
|
with self.assertRaises(FileNotFoundError):
|
|
self.execute(package, 'utf-8.file')
|
|
|
|
|
|
class ZipSetupBase:
|
|
ZIP_MODULE = None
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
data_path = Path(cls.ZIP_MODULE.__file__)
|
|
data_dir = data_path.parent
|
|
cls._zip_path = str(data_dir / 'ziptestdata.zip')
|
|
sys.path.append(cls._zip_path)
|
|
cls.data = importlib.import_module('ziptestdata')
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
try:
|
|
sys.path.remove(cls._zip_path)
|
|
except ValueError:
|
|
pass
|
|
|
|
try:
|
|
del sys.path_importer_cache[cls._zip_path]
|
|
del sys.modules[cls.data.__name__]
|
|
except KeyError:
|
|
pass
|
|
|
|
try:
|
|
del cls.data
|
|
del cls._zip_path
|
|
except AttributeError:
|
|
pass
|
|
|
|
def setUp(self):
|
|
modules = support.modules_setup()
|
|
self.addCleanup(support.modules_cleanup, *modules)
|
|
|
|
|
|
class ZipSetup(ZipSetupBase):
|
|
ZIP_MODULE = zipdata01 # type: ignore
|