mirror of
https://github.com/python/cpython.git
synced 2025-08-03 00:23:06 +00:00

We don't need to open the files in text mode just to create them (or update their modification time).
362 lines
14 KiB
Python
362 lines
14 KiB
Python
"""Tests for the packaging.install module."""
|
|
|
|
import os
|
|
from tempfile import mkstemp
|
|
from packaging import install
|
|
from packaging.pypi.xmlrpc import Client
|
|
from packaging.metadata import Metadata
|
|
|
|
from packaging.tests.support import LoggingCatcher, TempdirManager, unittest
|
|
try:
|
|
import threading
|
|
from packaging.tests.pypi_server import use_xmlrpc_server
|
|
except ImportError:
|
|
threading = None
|
|
use_xmlrpc_server = None
|
|
|
|
|
|
class InstalledDist:
|
|
"""Distribution object, represent distributions currently installed on the
|
|
system"""
|
|
def __init__(self, name, version, deps):
|
|
self.metadata = Metadata()
|
|
self.name = name
|
|
self.version = version
|
|
self.metadata['Name'] = name
|
|
self.metadata['Version'] = version
|
|
self.metadata['Requires-Dist'] = deps
|
|
|
|
def __repr__(self):
|
|
return '<InstalledDist %s>' % self.metadata['Name']
|
|
|
|
|
|
class ToInstallDist:
|
|
"""Distribution that will be installed"""
|
|
|
|
def __init__(self, files=False):
|
|
self._files = files
|
|
self.install_called = False
|
|
self.install_called_with = {}
|
|
self.uninstall_called = False
|
|
self._real_files = []
|
|
self.name = "fake"
|
|
self.version = "fake"
|
|
if files:
|
|
for f in range(0, 3):
|
|
self._real_files.append(mkstemp())
|
|
|
|
def _unlink_installed_files(self):
|
|
if self._files:
|
|
for f in self._real_files:
|
|
os.unlink(f[1])
|
|
|
|
def list_installed_files(self, **args):
|
|
if self._files:
|
|
return [f[1] for f in self._real_files]
|
|
|
|
def get_install(self, **args):
|
|
return self.list_installed_files()
|
|
|
|
|
|
class MagicMock:
|
|
def __init__(self, return_value=None, raise_exception=False):
|
|
self.called = False
|
|
self._times_called = 0
|
|
self._called_with = []
|
|
self._return_value = return_value
|
|
self._raise = raise_exception
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
self.called = True
|
|
self._times_called = self._times_called + 1
|
|
self._called_with.append((args, kwargs))
|
|
iterable = hasattr(self._raise, '__iter__')
|
|
if self._raise:
|
|
if ((not iterable and self._raise)
|
|
or self._raise[self._times_called - 1]):
|
|
raise Exception
|
|
return self._return_value
|
|
|
|
def called_with(self, *args, **kwargs):
|
|
return (args, kwargs) in self._called_with
|
|
|
|
|
|
def get_installed_dists(dists):
|
|
"""Return a list of fake installed dists.
|
|
The list is name, version, deps"""
|
|
objects = []
|
|
for name, version, deps in dists:
|
|
objects.append(InstalledDist(name, version, deps))
|
|
return objects
|
|
|
|
|
|
class TestInstall(LoggingCatcher, TempdirManager, unittest.TestCase):
|
|
def _get_client(self, server, *args, **kwargs):
|
|
return Client(server.full_address, *args, **kwargs)
|
|
|
|
def _get_results(self, output):
|
|
"""return a list of results"""
|
|
installed = [(o.name, str(o.version)) for o in output['install']]
|
|
remove = [(o.name, str(o.version)) for o in output['remove']]
|
|
conflict = [(o.name, str(o.version)) for o in output['conflict']]
|
|
return installed, remove, conflict
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_existing_deps(self, server):
|
|
# Test that the installer get the dependencies from the metadatas
|
|
# and ask the index for this dependencies.
|
|
# In this test case, we have choxie that is dependent from towel-stuff
|
|
# 0.1, which is in-turn dependent on bacon <= 0.2:
|
|
# choxie -> towel-stuff -> bacon.
|
|
# Each release metadata is not provided in metadata 1.2.
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (<= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.1',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
installed = get_installed_dists([('bacon', '0.1', [])])
|
|
output = install.get_infos("choxie", index=client,
|
|
installed=installed)
|
|
|
|
# we don't have installed bacon as it's already installed system-wide
|
|
self.assertEqual(0, len(output['remove']))
|
|
self.assertEqual(2, len(output['install']))
|
|
readable_output = [(o.name, str(o.version))
|
|
for o in output['install']]
|
|
self.assertIn(('towel-stuff', '0.1'), readable_output)
|
|
self.assertIn(('choxie', '2.0.0.9'), readable_output)
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_upgrade_existing_deps(self, server):
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (>= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.2',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
|
|
output = install.get_infos("choxie", index=client,
|
|
installed=get_installed_dists([('bacon', '0.1', [])]))
|
|
installed = [(o.name, str(o.version)) for o in output['install']]
|
|
|
|
# we need bacon 0.2, but 0.1 is installed.
|
|
# So we expect to remove 0.1 and to install 0.2 instead.
|
|
remove = [(o.name, str(o.version)) for o in output['remove']]
|
|
self.assertIn(('choxie', '2.0.0.9'), installed)
|
|
self.assertIn(('towel-stuff', '0.1'), installed)
|
|
self.assertIn(('bacon', '0.2'), installed)
|
|
self.assertIn(('bacon', '0.1'), remove)
|
|
self.assertEqual(0, len(output['conflict']))
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_conflicts(self, server):
|
|
# Tests that conflicts are detected
|
|
client = self._get_client(server)
|
|
archive_path = '%s/distribution.tar.gz' % server.full_address
|
|
|
|
# choxie depends on towel-stuff, which depends on bacon.
|
|
server.xmlrpc.set_distributions([
|
|
{'name': 'choxie',
|
|
'version': '2.0.0.9',
|
|
'requires_dist': ['towel-stuff (0.1)'],
|
|
'url': archive_path},
|
|
{'name': 'towel-stuff',
|
|
'version': '0.1',
|
|
'requires_dist': ['bacon (>= 0.2)'],
|
|
'url': archive_path},
|
|
{'name': 'bacon',
|
|
'version': '0.2',
|
|
'requires_dist': [],
|
|
'url': archive_path},
|
|
])
|
|
|
|
# name, version, deps.
|
|
already_installed = [('bacon', '0.1', []),
|
|
('chicken', '1.1', ['bacon (0.1)'])]
|
|
output = install.get_infos(
|
|
'choxie', index=client,
|
|
installed=get_installed_dists(already_installed))
|
|
|
|
# we need bacon 0.2, but 0.1 is installed.
|
|
# So we expect to remove 0.1 and to install 0.2 instead.
|
|
installed, remove, conflict = self._get_results(output)
|
|
self.assertIn(('choxie', '2.0.0.9'), installed)
|
|
self.assertIn(('towel-stuff', '0.1'), installed)
|
|
self.assertIn(('bacon', '0.2'), installed)
|
|
self.assertIn(('bacon', '0.1'), remove)
|
|
self.assertIn(('chicken', '1.1'), conflict)
|
|
|
|
@unittest.skipIf(threading is None, 'needs threading')
|
|
@use_xmlrpc_server()
|
|
def test_installation_unexisting_project(self, server):
|
|
# Test that the isntalled raises an exception if the project does not
|
|
# exists.
|
|
client = self._get_client(server)
|
|
self.assertRaises(install.InstallationException,
|
|
install.get_infos,
|
|
'unexisting project', index=client)
|
|
|
|
def test_move_files(self):
|
|
# test that the files are really moved, and that the new path is
|
|
# returned.
|
|
path = self.mkdtemp()
|
|
newpath = self.mkdtemp()
|
|
files = [os.path.join(path, str(x)) for x in range(1, 20)]
|
|
for f in files:
|
|
open(f, 'ab+').close()
|
|
output = [o for o in install._move_files(files, newpath)]
|
|
|
|
# check that output return the list of old/new places
|
|
for f in files:
|
|
self.assertIn((f, '%s%s' % (newpath, f)), output)
|
|
|
|
# remove the files
|
|
for f in [o[1] for o in output]: # o[1] is the new place
|
|
os.remove(f)
|
|
|
|
def test_update_infos(self):
|
|
tests = [[
|
|
{'foo': ['foobar', 'foo', 'baz'], 'baz': ['foo', 'foo']},
|
|
{'foo': ['additional_content', 'yeah'], 'baz': ['test', 'foo']},
|
|
{'foo': ['foobar', 'foo', 'baz', 'additional_content', 'yeah'],
|
|
'baz': ['foo', 'foo', 'test', 'foo']},
|
|
]]
|
|
|
|
for dict1, dict2, expect in tests:
|
|
install._update_infos(dict1, dict2)
|
|
for key in expect:
|
|
self.assertEqual(expect[key], dict1[key])
|
|
|
|
def test_install_dists_rollback(self):
|
|
# if one of the distribution installation fails, call uninstall on all
|
|
# installed distributions.
|
|
|
|
old_install_dist = install._install_dist
|
|
old_uninstall = getattr(install, 'uninstall', None)
|
|
|
|
install._install_dist = MagicMock(return_value=[],
|
|
raise_exception=(False, True))
|
|
install.remove = MagicMock()
|
|
try:
|
|
d1 = ToInstallDist()
|
|
d2 = ToInstallDist()
|
|
path = self.mkdtemp()
|
|
self.assertRaises(Exception, install.install_dists, [d1, d2], path)
|
|
self.assertTrue(install._install_dist.called_with(d1, path))
|
|
self.assertTrue(install.remove.called)
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
install.remove = old_uninstall
|
|
|
|
def test_install_dists_success(self):
|
|
old_install_dist = install._install_dist
|
|
install._install_dist = MagicMock(return_value=[])
|
|
try:
|
|
# test that the install method is called on each distributions
|
|
d1 = ToInstallDist()
|
|
d2 = ToInstallDist()
|
|
|
|
# should call install
|
|
path = self.mkdtemp()
|
|
install.install_dists([d1, d2], path)
|
|
for dist in (d1, d2):
|
|
self.assertTrue(install._install_dist.called_with(dist, path))
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
|
|
def test_install_from_infos_conflict(self):
|
|
# assert conflicts raise an exception
|
|
self.assertRaises(install.InstallationConflict,
|
|
install.install_from_infos,
|
|
conflicts=[ToInstallDist()])
|
|
|
|
def test_install_from_infos_remove_success(self):
|
|
old_install_dists = install.install_dists
|
|
install.install_dists = lambda x, y=None: None
|
|
try:
|
|
dists = []
|
|
for i in range(2):
|
|
dists.append(ToInstallDist(files=True))
|
|
install.install_from_infos(remove=dists)
|
|
|
|
# assert that the files have been removed
|
|
for dist in dists:
|
|
for f in dist.list_installed_files():
|
|
self.assertFalse(os.path.exists(f))
|
|
finally:
|
|
install.install_dists = old_install_dists
|
|
|
|
def test_install_from_infos_remove_rollback(self):
|
|
old_install_dist = install._install_dist
|
|
old_uninstall = getattr(install, 'uninstall', None)
|
|
|
|
install._install_dist = MagicMock(return_value=[],
|
|
raise_exception=(False, True))
|
|
install.uninstall = MagicMock()
|
|
try:
|
|
# assert that if an error occurs, the removed files are restored.
|
|
remove = []
|
|
for i in range(2):
|
|
remove.append(ToInstallDist(files=True))
|
|
to_install = [ToInstallDist(), ToInstallDist()]
|
|
temp_dir = self.mkdtemp()
|
|
|
|
self.assertRaises(Exception, install.install_from_infos,
|
|
install_path=temp_dir, install=to_install,
|
|
remove=remove)
|
|
# assert that the files are in the same place
|
|
# assert that the files have been removed
|
|
for dist in remove:
|
|
for f in dist.list_installed_files():
|
|
self.assertTrue(os.path.exists(f))
|
|
dist._unlink_installed_files()
|
|
finally:
|
|
install.install_dist = old_install_dist
|
|
install.uninstall = old_uninstall
|
|
|
|
def test_install_from_infos_install_succes(self):
|
|
old_install_dist = install._install_dist
|
|
install._install_dist = MagicMock([])
|
|
try:
|
|
# assert that the distribution can be installed
|
|
install_path = "my_install_path"
|
|
to_install = [ToInstallDist(), ToInstallDist()]
|
|
|
|
install.install_from_infos(install_path, install=to_install)
|
|
for dist in to_install:
|
|
install._install_dist.called_with(install_path)
|
|
finally:
|
|
install._install_dist = old_install_dist
|
|
|
|
|
|
def test_suite():
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(unittest.makeSuite(TestInstall))
|
|
return suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main(defaultTest='test_suite')
|