mirror of
https://github.com/python/cpython.git
synced 2025-08-10 03:49:18 +00:00
gh-102209: Sync with zipp 3.15 moving complexity tests into dedicated module (#102232)
Sync with jaraco/zipp@757a4e1a.
This commit is contained in:
parent
207e1c5cae
commit
a35fd38b57
7 changed files with 87 additions and 51 deletions
|
@ -1,30 +0,0 @@
|
|||
import contextlib
|
||||
import time
|
||||
|
||||
|
||||
class DeadlineExceeded(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TimedContext(contextlib.ContextDecorator):
|
||||
"""
|
||||
A context that will raise DeadlineExceeded if the
|
||||
max duration is reached during the execution.
|
||||
|
||||
>>> TimedContext(1)(time.sleep)(.1)
|
||||
>>> TimedContext(0)(time.sleep)(.1)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
tests._context.DeadlineExceeded: (..., 0)
|
||||
"""
|
||||
|
||||
def __init__(self, max_duration: int):
|
||||
self.max_duration = max_duration
|
||||
|
||||
def __enter__(self):
|
||||
self.start = time.monotonic()
|
||||
|
||||
def __exit__(self, *err):
|
||||
duration = time.monotonic() - self.start
|
||||
if duration > self.max_duration:
|
||||
raise DeadlineExceeded(duration, self.max_duration)
|
|
@ -1,8 +0,0 @@
|
|||
try:
|
||||
from func_timeout import func_set_timeout as set_timeout
|
||||
except ImportError: # pragma: no cover
|
||||
# provide a fallback that doesn't actually time out
|
||||
from ._context import TimedContext as set_timeout
|
||||
|
||||
|
||||
__all__ = ['set_timeout']
|
|
@ -1,4 +1,6 @@
|
|||
import itertools
|
||||
from collections import deque
|
||||
from itertools import islice
|
||||
|
||||
|
||||
# from jaraco.itertools 6.3.0
|
||||
|
@ -39,3 +41,39 @@ def always_iterable(obj, base_type=(str, bytes)):
|
|||
return iter(obj)
|
||||
except TypeError:
|
||||
return iter((obj,))
|
||||
|
||||
|
||||
# from more_itertools v9.0.0
|
||||
def consume(iterator, n=None):
|
||||
"""Advance *iterable* by *n* steps. If *n* is ``None``, consume it
|
||||
entirely.
|
||||
Efficiently exhausts an iterator without returning values. Defaults to
|
||||
consuming the whole iterator, but an optional second argument may be
|
||||
provided to limit consumption.
|
||||
>>> i = (x for x in range(10))
|
||||
>>> next(i)
|
||||
0
|
||||
>>> consume(i, 3)
|
||||
>>> next(i)
|
||||
4
|
||||
>>> consume(i)
|
||||
>>> next(i)
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
StopIteration
|
||||
If the iterator has fewer items remaining than the provided limit, the
|
||||
whole iterator will be consumed.
|
||||
>>> i = (x for x in range(3))
|
||||
>>> consume(i, 5)
|
||||
>>> next(i)
|
||||
Traceback (most recent call last):
|
||||
File "<stdin>", line 1, in <module>
|
||||
StopIteration
|
||||
"""
|
||||
# Use functions that consume iterators at C speed.
|
||||
if n is None:
|
||||
# feed the entire iterator into a zero-length deque
|
||||
deque(iterator, maxlen=0)
|
||||
else:
|
||||
# advance to the empty slice starting at position n
|
||||
next(islice(iterator, n, n), None)
|
||||
|
|
9
Lib/test/test_zipfile/_support.py
Normal file
9
Lib/test/test_zipfile/_support.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
import importlib
|
||||
import unittest
|
||||
|
||||
|
||||
def import_or_skip(name):
|
||||
try:
|
||||
return importlib.import_module(name)
|
||||
except ImportError: # pragma: no cover
|
||||
raise unittest.SkipTest(f'Unable to import {name}')
|
24
Lib/test/test_zipfile/test_complexity.py
Normal file
24
Lib/test/test_zipfile/test_complexity.py
Normal file
|
@ -0,0 +1,24 @@
|
|||
import unittest
|
||||
import string
|
||||
import zipfile
|
||||
|
||||
from ._functools import compose
|
||||
from ._itertools import consume
|
||||
|
||||
from ._support import import_or_skip
|
||||
|
||||
|
||||
big_o = import_or_skip('big_o')
|
||||
|
||||
|
||||
class TestComplexity(unittest.TestCase):
|
||||
def test_implied_dirs_performance(self):
|
||||
best, others = big_o.big_o(
|
||||
compose(consume, zipfile.CompleteDirs._implied_dirs),
|
||||
lambda size: [
|
||||
'/'.join(string.ascii_lowercase + str(n)) for n in range(size)
|
||||
],
|
||||
max_n=1000,
|
||||
min_n=1,
|
||||
)
|
||||
assert best <= big_o.complexities.Linear
|
|
@ -3,7 +3,6 @@ import itertools
|
|||
import contextlib
|
||||
import pathlib
|
||||
import pickle
|
||||
import string
|
||||
import sys
|
||||
import unittest
|
||||
import zipfile
|
||||
|
@ -12,7 +11,6 @@ from ._functools import compose
|
|||
from ._itertools import Counter
|
||||
|
||||
from ._test_params import parameterize, Invoked
|
||||
from ._func_timeout_compat import set_timeout
|
||||
|
||||
from test.support.os_helper import temp_dir
|
||||
|
||||
|
@ -22,9 +20,6 @@ class jaraco:
|
|||
Counter = Counter
|
||||
|
||||
|
||||
consume = tuple
|
||||
|
||||
|
||||
def add_dirs(zf):
|
||||
"""
|
||||
Given a writable zip file zf, inject directory entries for
|
||||
|
@ -330,12 +325,6 @@ class TestPath(unittest.TestCase):
|
|||
# Check the file iterated all items
|
||||
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
|
||||
|
||||
# timeout disabled due to #102209
|
||||
# @set_timeout(3)
|
||||
def test_implied_dirs_performance(self):
|
||||
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
|
||||
zipfile.CompleteDirs._implied_dirs(data)
|
||||
|
||||
@pass_alpharep
|
||||
def test_read_does_not_close(self, alpharep):
|
||||
alpharep = self.zipfile_ondisk(alpharep)
|
||||
|
@ -513,7 +502,7 @@ class TestPath(unittest.TestCase):
|
|||
saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath))
|
||||
restored_1 = pickle.loads(saved_1)
|
||||
first, *rest = restored_1.iterdir()
|
||||
assert first.read_text().startswith('content of ')
|
||||
assert first.read_text(encoding='utf-8').startswith('content of ')
|
||||
|
||||
@pass_alpharep
|
||||
def test_extract_orig_with_implied_dirs(self, alpharep):
|
||||
|
@ -525,3 +514,12 @@ class TestPath(unittest.TestCase):
|
|||
# wrap the zipfile for its side effect
|
||||
zipfile.Path(zf)
|
||||
zf.extractall(source_path.parent)
|
||||
|
||||
@pass_alpharep
|
||||
def test_getinfo_missing(self, alpharep):
|
||||
"""
|
||||
Validate behavior of getinfo on original zipfile after wrapping.
|
||||
"""
|
||||
zipfile.Path(alpharep)
|
||||
with self.assertRaises(KeyError):
|
||||
alpharep.getinfo('does-not-exist')
|
||||
|
|
|
@ -86,6 +86,11 @@ class CompleteDirs(InitializedState, zipfile.ZipFile):
|
|||
"""
|
||||
A ZipFile subclass that ensures that implied directories
|
||||
are always included in the namelist.
|
||||
|
||||
>>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
|
||||
['foo/', 'foo/bar/']
|
||||
>>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
|
||||
['foo/']
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
|
@ -215,7 +220,7 @@ class Path:
|
|||
|
||||
Read text:
|
||||
|
||||
>>> c.read_text()
|
||||
>>> c.read_text(encoding='utf-8')
|
||||
'content of c'
|
||||
|
||||
existence:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue