mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 15:58:57 +00:00 
			
		
		
		
	 760872efec
			
		
	
	
		760872efec
		
			
		
	
	
	
	
		
			
			There was a deadlock when `ProcessPoolExecutor` shuts down at the same time that a queueing thread handles an error processing a task. Don't use `_shutdown_lock` to protect the `_ThreadWakeup` pipes -- use an internal lock instead. This fixes the ordering deadlock where the `ExecutorManagerThread` holds the `_shutdown_lock` and joins the queueing thread, while the queueing thread is attempting to acquire the `_shutdown_lock` while closing the `_ThreadWakeup`.
		
			
				
	
	
		
			345 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			345 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import signal
 | |
| import sys
 | |
| import threading
 | |
| import time
 | |
| import unittest
 | |
| from concurrent import futures
 | |
| 
 | |
| from test import support
 | |
| from test.support.script_helper import assert_python_ok
 | |
| 
 | |
| from .util import (
 | |
|     BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
 | |
|     ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
 | |
|     create_executor_tests, setup_module)
 | |
| 
 | |
| 
 | |
| def sleep_and_print(t, msg):
 | |
|     time.sleep(t)
 | |
|     print(msg)
 | |
|     sys.stdout.flush()
 | |
| 
 | |
| 
 | |
| class ExecutorShutdownTest:
 | |
|     def test_run_after_shutdown(self):
 | |
|         self.executor.shutdown()
 | |
|         self.assertRaises(RuntimeError,
 | |
|                           self.executor.submit,
 | |
|                           pow, 2, 5)
 | |
| 
 | |
|     def test_interpreter_shutdown(self):
 | |
|         # Test the atexit hook for shutdown of worker threads and processes
 | |
|         rc, out, err = assert_python_ok('-c', """if 1:
 | |
|             from concurrent.futures import {executor_type}
 | |
|             from time import sleep
 | |
|             from test.test_concurrent_futures.test_shutdown import sleep_and_print
 | |
|             if __name__ == "__main__":
 | |
|                 context = '{context}'
 | |
|                 if context == "":
 | |
|                     t = {executor_type}(5)
 | |
|                 else:
 | |
|                     from multiprocessing import get_context
 | |
|                     context = get_context(context)
 | |
|                     t = {executor_type}(5, mp_context=context)
 | |
|                 t.submit(sleep_and_print, 1.0, "apple")
 | |
|             """.format(executor_type=self.executor_type.__name__,
 | |
|                        context=getattr(self, "ctx", "")))
 | |
|         # Errors in atexit hooks don't change the process exit code, check
 | |
|         # stderr manually.
 | |
|         self.assertFalse(err)
 | |
|         self.assertEqual(out.strip(), b"apple")
 | |
| 
 | |
|     def test_submit_after_interpreter_shutdown(self):
 | |
|         # Test the atexit hook for shutdown of worker threads and processes
 | |
|         rc, out, err = assert_python_ok('-c', """if 1:
 | |
|             import atexit
 | |
|             @atexit.register
 | |
|             def run_last():
 | |
|                 try:
 | |
|                     t.submit(id, None)
 | |
|                 except RuntimeError:
 | |
|                     print("runtime-error")
 | |
|                     raise
 | |
|             from concurrent.futures import {executor_type}
 | |
|             if __name__ == "__main__":
 | |
|                 context = '{context}'
 | |
|                 if not context:
 | |
|                     t = {executor_type}(5)
 | |
|                 else:
 | |
|                     from multiprocessing import get_context
 | |
|                     context = get_context(context)
 | |
|                     t = {executor_type}(5, mp_context=context)
 | |
|                     t.submit(id, 42).result()
 | |
|             """.format(executor_type=self.executor_type.__name__,
 | |
|                        context=getattr(self, "ctx", "")))
 | |
|         # Errors in atexit hooks don't change the process exit code, check
 | |
|         # stderr manually.
 | |
|         self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
 | |
|         self.assertEqual(out.strip(), b"runtime-error")
 | |
| 
 | |
|     def test_hang_issue12364(self):
 | |
|         fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
 | |
|         self.executor.shutdown()
 | |
|         for f in fs:
 | |
|             f.result()
 | |
| 
 | |
|     def test_cancel_futures(self):
 | |
|         assert self.worker_count <= 5, "test needs few workers"
 | |
|         fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
 | |
|         self.executor.shutdown(cancel_futures=True)
 | |
|         # We can't guarantee the exact number of cancellations, but we can
 | |
|         # guarantee that *some* were cancelled. With few workers, many of
 | |
|         # the submitted futures should have been cancelled.
 | |
|         cancelled = [fut for fut in fs if fut.cancelled()]
 | |
|         self.assertGreater(len(cancelled), 20)
 | |
| 
 | |
|         # Ensure the other futures were able to finish.
 | |
|         # Use "not fut.cancelled()" instead of "fut.done()" to include futures
 | |
|         # that may have been left in a pending state.
 | |
|         others = [fut for fut in fs if not fut.cancelled()]
 | |
|         for fut in others:
 | |
|             self.assertTrue(fut.done(), msg=f"{fut._state=}")
 | |
|             self.assertIsNone(fut.exception())
 | |
| 
 | |
|         # Similar to the number of cancelled futures, we can't guarantee the
 | |
|         # exact number that completed. But, we can guarantee that at least
 | |
|         # one finished.
 | |
|         self.assertGreater(len(others), 0)
 | |
| 
 | |
|     def test_hang_gh83386(self):
 | |
|         """shutdown(wait=False) doesn't hang at exit with running futures.
 | |
| 
 | |
|         See https://github.com/python/cpython/issues/83386.
 | |
|         """
 | |
|         if self.executor_type == futures.ProcessPoolExecutor:
 | |
|             raise unittest.SkipTest(
 | |
|                 "Hangs, see https://github.com/python/cpython/issues/83386")
 | |
| 
 | |
|         rc, out, err = assert_python_ok('-c', """if True:
 | |
|             from concurrent.futures import {executor_type}
 | |
|             from test.test_concurrent_futures.test_shutdown import sleep_and_print
 | |
|             if __name__ == "__main__":
 | |
|                 if {context!r}: multiprocessing.set_start_method({context!r})
 | |
|                 t = {executor_type}(max_workers=3)
 | |
|                 t.submit(sleep_and_print, 1.0, "apple")
 | |
|                 t.shutdown(wait=False)
 | |
|             """.format(executor_type=self.executor_type.__name__,
 | |
|                        context=getattr(self, 'ctx', None)))
 | |
|         self.assertFalse(err)
 | |
|         self.assertEqual(out.strip(), b"apple")
 | |
| 
 | |
|     def test_hang_gh94440(self):
 | |
|         """shutdown(wait=True) doesn't hang when a future was submitted and
 | |
|         quickly canceled right before shutdown.
 | |
| 
 | |
|         See https://github.com/python/cpython/issues/94440.
 | |
|         """
 | |
|         if not hasattr(signal, 'alarm'):
 | |
|             raise unittest.SkipTest(
 | |
|                 "Tested platform does not support the alarm signal")
 | |
| 
 | |
|         def timeout(_signum, _frame):
 | |
|             raise RuntimeError("timed out waiting for shutdown")
 | |
| 
 | |
|         kwargs = {}
 | |
|         if getattr(self, 'ctx', None):
 | |
|             kwargs['mp_context'] = self.get_context()
 | |
|         executor = self.executor_type(max_workers=1, **kwargs)
 | |
|         executor.submit(int).result()
 | |
|         old_handler = signal.signal(signal.SIGALRM, timeout)
 | |
|         try:
 | |
|             signal.alarm(5)
 | |
|             executor.submit(int).cancel()
 | |
|             executor.shutdown(wait=True)
 | |
|         finally:
 | |
|             signal.alarm(0)
 | |
|             signal.signal(signal.SIGALRM, old_handler)
 | |
| 
 | |
| 
 | |
| class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
 | |
|     def test_threads_terminate(self):
 | |
|         def acquire_lock(lock):
 | |
|             lock.acquire()
 | |
| 
 | |
|         sem = threading.Semaphore(0)
 | |
|         for i in range(3):
 | |
|             self.executor.submit(acquire_lock, sem)
 | |
|         self.assertEqual(len(self.executor._threads), 3)
 | |
|         for i in range(3):
 | |
|             sem.release()
 | |
|         self.executor.shutdown()
 | |
|         for t in self.executor._threads:
 | |
|             t.join()
 | |
| 
 | |
|     def test_context_manager_shutdown(self):
 | |
|         with futures.ThreadPoolExecutor(max_workers=5) as e:
 | |
|             executor = e
 | |
|             self.assertEqual(list(e.map(abs, range(-5, 5))),
 | |
|                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 | |
| 
 | |
|         for t in executor._threads:
 | |
|             t.join()
 | |
| 
 | |
|     def test_del_shutdown(self):
 | |
|         executor = futures.ThreadPoolExecutor(max_workers=5)
 | |
|         res = executor.map(abs, range(-5, 5))
 | |
|         threads = executor._threads
 | |
|         del executor
 | |
| 
 | |
|         for t in threads:
 | |
|             t.join()
 | |
| 
 | |
|         # Make sure the results were all computed before the
 | |
|         # executor got shutdown.
 | |
|         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 | |
| 
 | |
|     def test_shutdown_no_wait(self):
 | |
|         # Ensure that the executor cleans up the threads when calling
 | |
|         # shutdown with wait=False
 | |
|         executor = futures.ThreadPoolExecutor(max_workers=5)
 | |
|         res = executor.map(abs, range(-5, 5))
 | |
|         threads = executor._threads
 | |
|         executor.shutdown(wait=False)
 | |
|         for t in threads:
 | |
|             t.join()
 | |
| 
 | |
|         # Make sure the results were all computed before the
 | |
|         # executor got shutdown.
 | |
|         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 | |
| 
 | |
| 
 | |
|     def test_thread_names_assigned(self):
 | |
|         executor = futures.ThreadPoolExecutor(
 | |
|             max_workers=5, thread_name_prefix='SpecialPool')
 | |
|         executor.map(abs, range(-5, 5))
 | |
|         threads = executor._threads
 | |
|         del executor
 | |
|         support.gc_collect()  # For PyPy or other GCs.
 | |
| 
 | |
|         for t in threads:
 | |
|             self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
 | |
|             t.join()
 | |
| 
 | |
|     def test_thread_names_default(self):
 | |
|         executor = futures.ThreadPoolExecutor(max_workers=5)
 | |
|         executor.map(abs, range(-5, 5))
 | |
|         threads = executor._threads
 | |
|         del executor
 | |
|         support.gc_collect()  # For PyPy or other GCs.
 | |
| 
 | |
|         for t in threads:
 | |
|             # Ensure that our default name is reasonably sane and unique when
 | |
|             # no thread_name_prefix was supplied.
 | |
|             self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
 | |
|             t.join()
 | |
| 
 | |
|     def test_cancel_futures_wait_false(self):
 | |
|         # Can only be reliably tested for TPE, since PPE often hangs with
 | |
|         # `wait=False` (even without *cancel_futures*).
 | |
|         rc, out, err = assert_python_ok('-c', """if True:
 | |
|             from concurrent.futures import ThreadPoolExecutor
 | |
|             from test.test_concurrent_futures.test_shutdown import sleep_and_print
 | |
|             if __name__ == "__main__":
 | |
|                 t = ThreadPoolExecutor()
 | |
|                 t.submit(sleep_and_print, .1, "apple")
 | |
|                 t.shutdown(wait=False, cancel_futures=True)
 | |
|             """)
 | |
|         # Errors in atexit hooks don't change the process exit code, check
 | |
|         # stderr manually.
 | |
|         self.assertFalse(err)
 | |
|         # gh-116682: stdout may be empty if shutdown happens before task
 | |
|         # starts executing.
 | |
|         self.assertIn(out.strip(), [b"apple", b""])
 | |
| 
 | |
| 
 | |
| class ProcessPoolShutdownTest(ExecutorShutdownTest):
 | |
|     def test_processes_terminate(self):
 | |
|         def acquire_lock(lock):
 | |
|             lock.acquire()
 | |
| 
 | |
|         mp_context = self.get_context()
 | |
|         if mp_context.get_start_method(allow_none=False) == "fork":
 | |
|             # fork pre-spawns, not on demand.
 | |
|             expected_num_processes = self.worker_count
 | |
|         else:
 | |
|             expected_num_processes = 3
 | |
| 
 | |
|         sem = mp_context.Semaphore(0)
 | |
|         for _ in range(3):
 | |
|             self.executor.submit(acquire_lock, sem)
 | |
|         self.assertEqual(len(self.executor._processes), expected_num_processes)
 | |
|         for _ in range(3):
 | |
|             sem.release()
 | |
|         processes = self.executor._processes
 | |
|         self.executor.shutdown()
 | |
| 
 | |
|         for p in processes.values():
 | |
|             p.join()
 | |
| 
 | |
|     def test_context_manager_shutdown(self):
 | |
|         with futures.ProcessPoolExecutor(
 | |
|                 max_workers=5, mp_context=self.get_context()) as e:
 | |
|             processes = e._processes
 | |
|             self.assertEqual(list(e.map(abs, range(-5, 5))),
 | |
|                              [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 | |
| 
 | |
|         for p in processes.values():
 | |
|             p.join()
 | |
| 
 | |
|     def test_del_shutdown(self):
 | |
|         executor = futures.ProcessPoolExecutor(
 | |
|                 max_workers=5, mp_context=self.get_context())
 | |
|         res = executor.map(abs, range(-5, 5))
 | |
|         executor_manager_thread = executor._executor_manager_thread
 | |
|         processes = executor._processes
 | |
|         call_queue = executor._call_queue
 | |
|         executor_manager_thread = executor._executor_manager_thread
 | |
|         del executor
 | |
|         support.gc_collect()  # For PyPy or other GCs.
 | |
| 
 | |
|         # Make sure that all the executor resources were properly cleaned by
 | |
|         # the shutdown process
 | |
|         executor_manager_thread.join()
 | |
|         for p in processes.values():
 | |
|             p.join()
 | |
|         call_queue.join_thread()
 | |
| 
 | |
|         # Make sure the results were all computed before the
 | |
|         # executor got shutdown.
 | |
|         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 | |
| 
 | |
|     def test_shutdown_no_wait(self):
 | |
|         # Ensure that the executor cleans up the processes when calling
 | |
|         # shutdown with wait=False
 | |
|         executor = futures.ProcessPoolExecutor(
 | |
|                 max_workers=5, mp_context=self.get_context())
 | |
|         res = executor.map(abs, range(-5, 5))
 | |
|         processes = executor._processes
 | |
|         call_queue = executor._call_queue
 | |
|         executor_manager_thread = executor._executor_manager_thread
 | |
|         executor.shutdown(wait=False)
 | |
| 
 | |
|         # Make sure that all the executor resources were properly cleaned by
 | |
|         # the shutdown process
 | |
|         executor_manager_thread.join()
 | |
|         for p in processes.values():
 | |
|             p.join()
 | |
|         call_queue.join_thread()
 | |
| 
 | |
|         # Make sure the results were all computed before the executor got
 | |
|         # shutdown.
 | |
|         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 | |
| 
 | |
| 
 | |
| create_executor_tests(globals(), ProcessPoolShutdownTest,
 | |
|                       executor_mixins=(ProcessPoolForkMixin,
 | |
|                                        ProcessPoolForkserverMixin,
 | |
|                                        ProcessPoolSpawnMixin))
 | |
| 
 | |
| 
 | |
| def setUpModule():
 | |
|     setup_module()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 |