cpython/Lib/test/test_concurrent_futures/executor.py
Victor Stinner 0855b2c8b6
[3.12] gh-108834: Sync libregrtest with the main branch (#108966)
* gh-108834: regrtest reruns failed tests in subprocesses (#108839)

When using --rerun option, regrtest now re-runs failed tests
in verbose mode in fresh worker processes to have more
deterministic behavior. So it can write its final report even
if a test killed a worker progress.

Add --fail-rerun option to regrtest: exit with non-zero exit code
if a test failed pass passed when re-run in verbose mode (in a
fresh process). That's now more useful since tests can pass
when re-run in a fresh worker progress, whereas they failed
when run after other tests when tests are run sequentially.

Rename --verbose2 option (-w) to --rerun. Keep --verbose2 as a
deprecated alias.

Changes:

* Fix and enhance statistics in regrtest summary. Add "(filtered)"
  when --match and/or --ignore options are used.
* Add RunTests class.
* Add TestResult.get_rerun_match_tests() method
* Rewrite code to serialize/deserialize worker arguments as JSON
  using a new WorkerJob class.
* Fix stats when a test is run with --forever --rerun.
* If failed test names cannot be parsed, log a warning and don't
  filter tests.
* test_regrtest.test_rerun_success() now uses a marker file, since
  the test is re-run in a separated process.
* Add tests on normalize_test_name() function.
* Add test_success() and test_skip() tests to test_regrtest.

(cherry picked from commit 31c2945f14)

* gh-108834: regrtest --fail-rerun exits with code 5 (#108896)

When the --fail-rerun option is used and a test fails and then pass,
regrtest now uses exit code 5 ("rerun) instead of 2 ("bad test").

(cherry picked from commit 1170d5a292)

* gh-108416: Mark slow but not CPU bound test methods with requires_resource('walltime') (GH-108480)

(cherry picked from commit 1e0d62793a)

* Manually sync Lib/test/libregrtest/ from main

---------

Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
2023-09-08 15:14:17 +02:00

108 lines
3.6 KiB
Python

import threading
import time
import weakref
from concurrent import futures
from test import support
def mul(x, y):
return x * y
def capture(*args, **kwargs):
return args, kwargs
class MyObject(object):
def my_method(self):
pass
def make_dummy_object(_):
return MyObject()
class ExecutorTest:
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEqual(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEqual(16, future.result())
future = self.executor.submit(capture, 1, self=2, fn=3)
self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
with self.assertRaises(TypeError):
self.executor.submit(fn=capture, arg=1)
with self.assertRaises(TypeError):
self.executor.submit(arg=1)
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
self.assertEqual(
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
list(map(pow, range(10), range(10))))
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
@support.requires_resource('walltime')
def test_map_timeout(self):
results = []
try:
for i in self.executor.map(time.sleep,
[0, 0, 6],
timeout=5):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')
self.assertEqual([None, None], results)
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes
# have exited).
self.executor.map(str, [2] * (self.worker_count + 1))
self.executor.shutdown()
@support.cpython_only
def test_no_stale_references(self):
# Issue #16284: check that the executors don't unnecessarily hang onto
# references.
my_object = MyObject()
my_object_collected = threading.Event()
my_object_callback = weakref.ref(
my_object, lambda obj: my_object_collected.set())
# Deliberately discarding the future.
self.executor.submit(my_object.my_method)
del my_object
collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
self.assertTrue(collected,
"Stale reference not collected within timeout.")
def test_max_workers_negative(self):
for number in (0, -1):
with self.assertRaisesRegex(ValueError,
"max_workers must be greater "
"than 0"):
self.executor_type(max_workers=number)
def test_free_reference(self):
# Issue #14406: Result iterator should not keep an internal
# reference to result objects.
for obj in self.executor.map(make_dummy_object, range(10)):
wr = weakref.ref(obj)
del obj
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())