mirror of
https://github.com/python/cpython.git
synced 2025-12-04 00:30:19 +00:00
gh-114911: Add CPUStopwatch test helper (GH-114912)
A few of our tests measure the time of CPU-bound operation, mainly to avoid quadratic or worse behaviour. Add a helper to ignore GC and time spent in other processes.
This commit is contained in:
parent
3b63d0769f
commit
7acf1fb5a7
3 changed files with 75 additions and 42 deletions
|
|
@ -2381,6 +2381,46 @@ def sleeping_retry(timeout, err_msg=None, /,
|
|||
delay = min(delay * 2, max_delay)
|
||||
|
||||
|
||||
class CPUStopwatch:
|
||||
"""Context manager to roughly time a CPU-bound operation.
|
||||
|
||||
Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of
|
||||
other processes).
|
||||
|
||||
N.B.:
|
||||
- This *includes* time spent in other threads.
|
||||
- Some systems only have a coarse resolution; check
|
||||
stopwatch.clock_info.rseolution if.
|
||||
|
||||
Usage:
|
||||
|
||||
with ProcessStopwatch() as stopwatch:
|
||||
...
|
||||
elapsed = stopwatch.seconds
|
||||
resolution = stopwatch.clock_info.resolution
|
||||
"""
|
||||
def __enter__(self):
|
||||
get_time = time.process_time
|
||||
clock_info = time.get_clock_info('process_time')
|
||||
if get_time() <= 0: # some platforms like WASM lack process_time()
|
||||
get_time = time.monotonic
|
||||
clock_info = time.get_clock_info('monotonic')
|
||||
self.context = disable_gc()
|
||||
self.context.__enter__()
|
||||
self.get_time = get_time
|
||||
self.clock_info = clock_info
|
||||
self.start_time = get_time()
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc):
|
||||
try:
|
||||
end_time = self.get_time()
|
||||
finally:
|
||||
result = self.context.__exit__(*exc)
|
||||
self.seconds = end_time - self.start_time
|
||||
return result
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def adjust_int_max_str_digits(max_digits):
|
||||
"""Temporarily change the integer string conversion length limit."""
|
||||
|
|
|
|||
|
|
@ -664,84 +664,78 @@ class IntStrDigitLimitsTests(unittest.TestCase):
|
|||
"""Regression test: ensure we fail before performing O(N**2) work."""
|
||||
maxdigits = sys.get_int_max_str_digits()
|
||||
assert maxdigits < 50_000, maxdigits # A test prerequisite.
|
||||
get_time = time.process_time
|
||||
if get_time() <= 0: # some platforms like WASM lack process_time()
|
||||
get_time = time.monotonic
|
||||
|
||||
huge_int = int(f'0x{"c"*65_000}', base=16) # 78268 decimal digits.
|
||||
digits = 78_268
|
||||
with support.adjust_int_max_str_digits(digits):
|
||||
start = get_time()
|
||||
with (
|
||||
support.adjust_int_max_str_digits(digits),
|
||||
support.CPUStopwatch() as sw_convert):
|
||||
huge_decimal = str(huge_int)
|
||||
seconds_to_convert = get_time() - start
|
||||
self.assertEqual(len(huge_decimal), digits)
|
||||
# Ensuring that we chose a slow enough conversion to measure.
|
||||
# It takes 0.1 seconds on a Zen based cloud VM in an opt build.
|
||||
# Some OSes have a low res 1/64s timer, skip if hard to measure.
|
||||
if seconds_to_convert < 1/64:
|
||||
if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
|
||||
raise unittest.SkipTest('"slow" conversion took only '
|
||||
f'{seconds_to_convert} seconds.')
|
||||
f'{sw_convert.seconds} seconds.')
|
||||
|
||||
# We test with the limit almost at the size needed to check performance.
|
||||
# The performant limit check is slightly fuzzy, give it a some room.
|
||||
with support.adjust_int_max_str_digits(int(.995 * digits)):
|
||||
with self.assertRaises(ValueError) as err:
|
||||
start = get_time()
|
||||
with (
|
||||
self.assertRaises(ValueError) as err,
|
||||
support.CPUStopwatch() as sw_fail_huge):
|
||||
str(huge_int)
|
||||
seconds_to_fail_huge = get_time() - start
|
||||
self.assertIn('conversion', str(err.exception))
|
||||
self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
|
||||
self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
|
||||
|
||||
# Now we test that a conversion that would take 30x as long also fails
|
||||
# in a similarly fast fashion.
|
||||
extra_huge_int = int(f'0x{"c"*500_000}', base=16) # 602060 digits.
|
||||
with self.assertRaises(ValueError) as err:
|
||||
start = get_time()
|
||||
with (
|
||||
self.assertRaises(ValueError) as err,
|
||||
support.CPUStopwatch() as sw_fail_extra_huge):
|
||||
# If not limited, 8 seconds said Zen based cloud VM.
|
||||
str(extra_huge_int)
|
||||
seconds_to_fail_extra_huge = get_time() - start
|
||||
self.assertIn('conversion', str(err.exception))
|
||||
self.assertLess(seconds_to_fail_extra_huge, seconds_to_convert/2)
|
||||
self.assertLess(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
|
||||
|
||||
def test_denial_of_service_prevented_str_to_int(self):
|
||||
"""Regression test: ensure we fail before performing O(N**2) work."""
|
||||
maxdigits = sys.get_int_max_str_digits()
|
||||
assert maxdigits < 100_000, maxdigits # A test prerequisite.
|
||||
get_time = time.process_time
|
||||
if get_time() <= 0: # some platforms like WASM lack process_time()
|
||||
get_time = time.monotonic
|
||||
|
||||
digits = 133700
|
||||
huge = '8'*digits
|
||||
with support.adjust_int_max_str_digits(digits):
|
||||
start = get_time()
|
||||
with (
|
||||
support.adjust_int_max_str_digits(digits),
|
||||
support.CPUStopwatch() as sw_convert):
|
||||
int(huge)
|
||||
seconds_to_convert = get_time() - start
|
||||
# Ensuring that we chose a slow enough conversion to measure.
|
||||
# It takes 0.1 seconds on a Zen based cloud VM in an opt build.
|
||||
# Some OSes have a low res 1/64s timer, skip if hard to measure.
|
||||
if seconds_to_convert < 1/64:
|
||||
if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
|
||||
raise unittest.SkipTest('"slow" conversion took only '
|
||||
f'{seconds_to_convert} seconds.')
|
||||
f'{sw_convert.seconds} seconds.')
|
||||
|
||||
with support.adjust_int_max_str_digits(digits - 1):
|
||||
with self.assertRaises(ValueError) as err:
|
||||
start = get_time()
|
||||
with (
|
||||
self.assertRaises(ValueError) as err,
|
||||
support.CPUStopwatch() as sw_fail_huge):
|
||||
int(huge)
|
||||
seconds_to_fail_huge = get_time() - start
|
||||
self.assertIn('conversion', str(err.exception))
|
||||
self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
|
||||
self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
|
||||
|
||||
# Now we test that a conversion that would take 30x as long also fails
|
||||
# in a similarly fast fashion.
|
||||
extra_huge = '7'*1_200_000
|
||||
with self.assertRaises(ValueError) as err:
|
||||
start = get_time()
|
||||
with (
|
||||
self.assertRaises(ValueError) as err,
|
||||
support.CPUStopwatch() as sw_fail_extra_huge):
|
||||
# If not limited, 8 seconds in the Zen based cloud VM.
|
||||
int(extra_huge)
|
||||
seconds_to_fail_extra_huge = get_time() - start
|
||||
self.assertIn('conversion', str(err.exception))
|
||||
self.assertLessEqual(seconds_to_fail_extra_huge, seconds_to_convert/2)
|
||||
self.assertLessEqual(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
|
||||
|
||||
def test_power_of_two_bases_unlimited(self):
|
||||
"""The limit does not apply to power of 2 bases."""
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
from test.support import (gc_collect, bigmemtest, _2G,
|
||||
cpython_only, captured_stdout,
|
||||
check_disallow_instantiation, is_emscripten, is_wasi,
|
||||
warnings_helper, SHORT_TIMEOUT)
|
||||
warnings_helper, SHORT_TIMEOUT, CPUStopwatch)
|
||||
import locale
|
||||
import re
|
||||
import string
|
||||
|
|
@ -2284,17 +2284,16 @@ class ReTests(unittest.TestCase):
|
|||
|
||||
def test_search_anchor_at_beginning(self):
|
||||
s = 'x'*10**7
|
||||
start = time.perf_counter()
|
||||
for p in r'\Ay', r'^y':
|
||||
self.assertIsNone(re.search(p, s))
|
||||
self.assertEqual(re.split(p, s), [s])
|
||||
self.assertEqual(re.findall(p, s), [])
|
||||
self.assertEqual(list(re.finditer(p, s)), [])
|
||||
self.assertEqual(re.sub(p, '', s), s)
|
||||
t = time.perf_counter() - start
|
||||
with CPUStopwatch() as stopwatch:
|
||||
for p in r'\Ay', r'^y':
|
||||
self.assertIsNone(re.search(p, s))
|
||||
self.assertEqual(re.split(p, s), [s])
|
||||
self.assertEqual(re.findall(p, s), [])
|
||||
self.assertEqual(list(re.finditer(p, s)), [])
|
||||
self.assertEqual(re.sub(p, '', s), s)
|
||||
# Without optimization it takes 1 second on my computer.
|
||||
# With optimization -- 0.0003 seconds.
|
||||
self.assertLess(t, 0.1)
|
||||
self.assertLess(stopwatch.seconds, 0.1)
|
||||
|
||||
def test_possessive_quantifiers(self):
|
||||
"""Test Possessive Quantifiers
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue