mirror of
https://github.com/python/cpython.git
synced 2025-10-09 16:34:44 +00:00
gh-108388: Convert test_concurrent_futures to package (#108401)
Convert test_concurrent_futures to a package of sub-tests.
(cherry picked from commit aa6f787faa
)
This commit is contained in:
parent
ce37fbc778
commit
bba9aa60ae
14 changed files with 1847 additions and 1677 deletions
343
Lib/test/test_concurrent_futures/test_shutdown.py
Normal file
343
Lib/test/test_concurrent_futures/test_shutdown.py
Normal file
|
@ -0,0 +1,343 @@
|
|||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import unittest
|
||||
from concurrent import futures
|
||||
|
||||
from test import support
|
||||
from test.support.script_helper import assert_python_ok
|
||||
|
||||
from .util import (
|
||||
BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
|
||||
ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
|
||||
create_executor_tests, setup_module)
|
||||
|
||||
|
||||
def sleep_and_print(t, msg):
|
||||
time.sleep(t)
|
||||
print(msg)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
class ExecutorShutdownTest:
|
||||
def test_run_after_shutdown(self):
|
||||
self.executor.shutdown()
|
||||
self.assertRaises(RuntimeError,
|
||||
self.executor.submit,
|
||||
pow, 2, 5)
|
||||
|
||||
def test_interpreter_shutdown(self):
|
||||
# Test the atexit hook for shutdown of worker threads and processes
|
||||
rc, out, err = assert_python_ok('-c', """if 1:
|
||||
from concurrent.futures import {executor_type}
|
||||
from time import sleep
|
||||
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
||||
if __name__ == "__main__":
|
||||
context = '{context}'
|
||||
if context == "":
|
||||
t = {executor_type}(5)
|
||||
else:
|
||||
from multiprocessing import get_context
|
||||
context = get_context(context)
|
||||
t = {executor_type}(5, mp_context=context)
|
||||
t.submit(sleep_and_print, 1.0, "apple")
|
||||
""".format(executor_type=self.executor_type.__name__,
|
||||
context=getattr(self, "ctx", "")))
|
||||
# Errors in atexit hooks don't change the process exit code, check
|
||||
# stderr manually.
|
||||
self.assertFalse(err)
|
||||
self.assertEqual(out.strip(), b"apple")
|
||||
|
||||
def test_submit_after_interpreter_shutdown(self):
|
||||
# Test the atexit hook for shutdown of worker threads and processes
|
||||
rc, out, err = assert_python_ok('-c', """if 1:
|
||||
import atexit
|
||||
@atexit.register
|
||||
def run_last():
|
||||
try:
|
||||
t.submit(id, None)
|
||||
except RuntimeError:
|
||||
print("runtime-error")
|
||||
raise
|
||||
from concurrent.futures import {executor_type}
|
||||
if __name__ == "__main__":
|
||||
context = '{context}'
|
||||
if not context:
|
||||
t = {executor_type}(5)
|
||||
else:
|
||||
from multiprocessing import get_context
|
||||
context = get_context(context)
|
||||
t = {executor_type}(5, mp_context=context)
|
||||
t.submit(id, 42).result()
|
||||
""".format(executor_type=self.executor_type.__name__,
|
||||
context=getattr(self, "ctx", "")))
|
||||
# Errors in atexit hooks don't change the process exit code, check
|
||||
# stderr manually.
|
||||
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
|
||||
self.assertEqual(out.strip(), b"runtime-error")
|
||||
|
||||
def test_hang_issue12364(self):
|
||||
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
|
||||
self.executor.shutdown()
|
||||
for f in fs:
|
||||
f.result()
|
||||
|
||||
def test_cancel_futures(self):
|
||||
assert self.worker_count <= 5, "test needs few workers"
|
||||
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
|
||||
self.executor.shutdown(cancel_futures=True)
|
||||
# We can't guarantee the exact number of cancellations, but we can
|
||||
# guarantee that *some* were cancelled. With few workers, many of
|
||||
# the submitted futures should have been cancelled.
|
||||
cancelled = [fut for fut in fs if fut.cancelled()]
|
||||
self.assertGreater(len(cancelled), 20)
|
||||
|
||||
# Ensure the other futures were able to finish.
|
||||
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
|
||||
# that may have been left in a pending state.
|
||||
others = [fut for fut in fs if not fut.cancelled()]
|
||||
for fut in others:
|
||||
self.assertTrue(fut.done(), msg=f"{fut._state=}")
|
||||
self.assertIsNone(fut.exception())
|
||||
|
||||
# Similar to the number of cancelled futures, we can't guarantee the
|
||||
# exact number that completed. But, we can guarantee that at least
|
||||
# one finished.
|
||||
self.assertGreater(len(others), 0)
|
||||
|
||||
def test_hang_gh83386(self):
|
||||
"""shutdown(wait=False) doesn't hang at exit with running futures.
|
||||
|
||||
See https://github.com/python/cpython/issues/83386.
|
||||
"""
|
||||
if self.executor_type == futures.ProcessPoolExecutor:
|
||||
raise unittest.SkipTest(
|
||||
"Hangs, see https://github.com/python/cpython/issues/83386")
|
||||
|
||||
rc, out, err = assert_python_ok('-c', """if True:
|
||||
from concurrent.futures import {executor_type}
|
||||
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
||||
if __name__ == "__main__":
|
||||
if {context!r}: multiprocessing.set_start_method({context!r})
|
||||
t = {executor_type}(max_workers=3)
|
||||
t.submit(sleep_and_print, 1.0, "apple")
|
||||
t.shutdown(wait=False)
|
||||
""".format(executor_type=self.executor_type.__name__,
|
||||
context=getattr(self, 'ctx', None)))
|
||||
self.assertFalse(err)
|
||||
self.assertEqual(out.strip(), b"apple")
|
||||
|
||||
def test_hang_gh94440(self):
|
||||
"""shutdown(wait=True) doesn't hang when a future was submitted and
|
||||
quickly canceled right before shutdown.
|
||||
|
||||
See https://github.com/python/cpython/issues/94440.
|
||||
"""
|
||||
if not hasattr(signal, 'alarm'):
|
||||
raise unittest.SkipTest(
|
||||
"Tested platform does not support the alarm signal")
|
||||
|
||||
def timeout(_signum, _frame):
|
||||
raise RuntimeError("timed out waiting for shutdown")
|
||||
|
||||
kwargs = {}
|
||||
if getattr(self, 'ctx', None):
|
||||
kwargs['mp_context'] = self.get_context()
|
||||
executor = self.executor_type(max_workers=1, **kwargs)
|
||||
executor.submit(int).result()
|
||||
old_handler = signal.signal(signal.SIGALRM, timeout)
|
||||
try:
|
||||
signal.alarm(5)
|
||||
executor.submit(int).cancel()
|
||||
executor.shutdown(wait=True)
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
signal.signal(signal.SIGALRM, old_handler)
|
||||
|
||||
|
||||
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
|
||||
def test_threads_terminate(self):
|
||||
def acquire_lock(lock):
|
||||
lock.acquire()
|
||||
|
||||
sem = threading.Semaphore(0)
|
||||
for i in range(3):
|
||||
self.executor.submit(acquire_lock, sem)
|
||||
self.assertEqual(len(self.executor._threads), 3)
|
||||
for i in range(3):
|
||||
sem.release()
|
||||
self.executor.shutdown()
|
||||
for t in self.executor._threads:
|
||||
t.join()
|
||||
|
||||
def test_context_manager_shutdown(self):
|
||||
with futures.ThreadPoolExecutor(max_workers=5) as e:
|
||||
executor = e
|
||||
self.assertEqual(list(e.map(abs, range(-5, 5))),
|
||||
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
|
||||
|
||||
for t in executor._threads:
|
||||
t.join()
|
||||
|
||||
def test_del_shutdown(self):
|
||||
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
res = executor.map(abs, range(-5, 5))
|
||||
threads = executor._threads
|
||||
del executor
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# Make sure the results were all computed before the
|
||||
# executor got shutdown.
|
||||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||
|
||||
def test_shutdown_no_wait(self):
|
||||
# Ensure that the executor cleans up the threads when calling
|
||||
# shutdown with wait=False
|
||||
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
res = executor.map(abs, range(-5, 5))
|
||||
threads = executor._threads
|
||||
executor.shutdown(wait=False)
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# Make sure the results were all computed before the
|
||||
# executor got shutdown.
|
||||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||
|
||||
|
||||
def test_thread_names_assigned(self):
|
||||
executor = futures.ThreadPoolExecutor(
|
||||
max_workers=5, thread_name_prefix='SpecialPool')
|
||||
executor.map(abs, range(-5, 5))
|
||||
threads = executor._threads
|
||||
del executor
|
||||
support.gc_collect() # For PyPy or other GCs.
|
||||
|
||||
for t in threads:
|
||||
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
|
||||
t.join()
|
||||
|
||||
def test_thread_names_default(self):
|
||||
executor = futures.ThreadPoolExecutor(max_workers=5)
|
||||
executor.map(abs, range(-5, 5))
|
||||
threads = executor._threads
|
||||
del executor
|
||||
support.gc_collect() # For PyPy or other GCs.
|
||||
|
||||
for t in threads:
|
||||
# Ensure that our default name is reasonably sane and unique when
|
||||
# no thread_name_prefix was supplied.
|
||||
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
|
||||
t.join()
|
||||
|
||||
def test_cancel_futures_wait_false(self):
|
||||
# Can only be reliably tested for TPE, since PPE often hangs with
|
||||
# `wait=False` (even without *cancel_futures*).
|
||||
rc, out, err = assert_python_ok('-c', """if True:
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from test.test_concurrent_futures.test_shutdown import sleep_and_print
|
||||
if __name__ == "__main__":
|
||||
t = ThreadPoolExecutor()
|
||||
t.submit(sleep_and_print, .1, "apple")
|
||||
t.shutdown(wait=False, cancel_futures=True)
|
||||
""")
|
||||
# Errors in atexit hooks don't change the process exit code, check
|
||||
# stderr manually.
|
||||
self.assertFalse(err)
|
||||
self.assertEqual(out.strip(), b"apple")
|
||||
|
||||
|
||||
class ProcessPoolShutdownTest(ExecutorShutdownTest):
|
||||
def test_processes_terminate(self):
|
||||
def acquire_lock(lock):
|
||||
lock.acquire()
|
||||
|
||||
mp_context = self.get_context()
|
||||
if mp_context.get_start_method(allow_none=False) == "fork":
|
||||
# fork pre-spawns, not on demand.
|
||||
expected_num_processes = self.worker_count
|
||||
else:
|
||||
expected_num_processes = 3
|
||||
|
||||
sem = mp_context.Semaphore(0)
|
||||
for _ in range(3):
|
||||
self.executor.submit(acquire_lock, sem)
|
||||
self.assertEqual(len(self.executor._processes), expected_num_processes)
|
||||
for _ in range(3):
|
||||
sem.release()
|
||||
processes = self.executor._processes
|
||||
self.executor.shutdown()
|
||||
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
|
||||
def test_context_manager_shutdown(self):
|
||||
with futures.ProcessPoolExecutor(
|
||||
max_workers=5, mp_context=self.get_context()) as e:
|
||||
processes = e._processes
|
||||
self.assertEqual(list(e.map(abs, range(-5, 5))),
|
||||
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
|
||||
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
|
||||
def test_del_shutdown(self):
|
||||
executor = futures.ProcessPoolExecutor(
|
||||
max_workers=5, mp_context=self.get_context())
|
||||
res = executor.map(abs, range(-5, 5))
|
||||
executor_manager_thread = executor._executor_manager_thread
|
||||
processes = executor._processes
|
||||
call_queue = executor._call_queue
|
||||
executor_manager_thread = executor._executor_manager_thread
|
||||
del executor
|
||||
support.gc_collect() # For PyPy or other GCs.
|
||||
|
||||
# Make sure that all the executor resources were properly cleaned by
|
||||
# the shutdown process
|
||||
executor_manager_thread.join()
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
call_queue.join_thread()
|
||||
|
||||
# Make sure the results were all computed before the
|
||||
# executor got shutdown.
|
||||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||
|
||||
def test_shutdown_no_wait(self):
|
||||
# Ensure that the executor cleans up the processes when calling
|
||||
# shutdown with wait=False
|
||||
executor = futures.ProcessPoolExecutor(
|
||||
max_workers=5, mp_context=self.get_context())
|
||||
res = executor.map(abs, range(-5, 5))
|
||||
processes = executor._processes
|
||||
call_queue = executor._call_queue
|
||||
executor_manager_thread = executor._executor_manager_thread
|
||||
executor.shutdown(wait=False)
|
||||
|
||||
# Make sure that all the executor resources were properly cleaned by
|
||||
# the shutdown process
|
||||
executor_manager_thread.join()
|
||||
for p in processes.values():
|
||||
p.join()
|
||||
call_queue.join_thread()
|
||||
|
||||
# Make sure the results were all computed before the executor got
|
||||
# shutdown.
|
||||
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
|
||||
|
||||
|
||||
create_executor_tests(globals(), ProcessPoolShutdownTest,
|
||||
executor_mixins=(ProcessPoolForkMixin,
|
||||
ProcessPoolForkserverMixin,
|
||||
ProcessPoolSpawnMixin))
|
||||
|
||||
|
||||
def setUpModule():
|
||||
setup_module()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Add table
Add a link
Reference in a new issue