mirror of
https://github.com/python/cpython.git
synced 2025-07-08 03:45:36 +00:00

Copy the list of dangling threads to make sure that the list of "Dangling thread" is complete. Previously, the list was incomplete if threads completed just before the list was displayed. Changes: * Rewrite the warning to make it easier to understand. * Use support.sleeping_retry(). * threading_cleanup() no longer copies threading._dangling, but only counts the number of dangling thread. * Remove support.gc_support() call.
250 lines
7.9 KiB
Python
250 lines
7.9 KiB
Python
import _thread
|
|
import contextlib
|
|
import functools
|
|
import sys
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
from test import support
|
|
|
|
|
|
#=======================================================================
|
|
# Threading support to prevent reporting refleaks when running regrtest.py -R
|
|
|
|
# NOTE: we use thread._count() rather than threading.enumerate() (or the
|
|
# moral equivalent thereof) because a threading.Thread object is still alive
|
|
# until its __bootstrap() method has returned, even after it has been
|
|
# unregistered from the threading module.
|
|
# thread._count(), on the other hand, only gets decremented *after* the
|
|
# __bootstrap() method has returned, which gives us reliable reference counts
|
|
# at the end of a test run.
|
|
|
|
|
|
def threading_setup():
|
|
return _thread._count(), len(threading._dangling)
|
|
|
|
|
|
def threading_cleanup(*original_values):
|
|
orig_count, orig_ndangling = original_values
|
|
|
|
timeout = 1.0
|
|
for _ in support.sleeping_retry(timeout, error=False):
|
|
# Copy the thread list to get a consistent output. threading._dangling
|
|
# is a WeakSet, its value changes when it's read.
|
|
dangling_threads = list(threading._dangling)
|
|
count = _thread._count()
|
|
|
|
if count <= orig_count:
|
|
return
|
|
|
|
# Timeout!
|
|
support.environment_altered = True
|
|
support.print_warning(
|
|
f"threading_cleanup() failed to clean up threads "
|
|
f"in {timeout:.1f} seconds\n"
|
|
f" before: thread count={orig_count}, dangling={orig_ndangling}\n"
|
|
f" after: thread count={count}, dangling={len(dangling_threads)}")
|
|
for thread in dangling_threads:
|
|
support.print_warning(f"Dangling thread: {thread!r}")
|
|
|
|
# The warning happens when a test spawns threads and some of these threads
|
|
# are still running after the test completes. To fix this warning, join
|
|
# threads explicitly to wait until they complete.
|
|
#
|
|
# To make the warning more likely, reduce the timeout.
|
|
|
|
|
|
def reap_threads(func):
|
|
"""Use this function when threads are being used. This will
|
|
ensure that the threads are cleaned up even when the test fails.
|
|
"""
|
|
@functools.wraps(func)
|
|
def decorator(*args):
|
|
key = threading_setup()
|
|
try:
|
|
return func(*args)
|
|
finally:
|
|
threading_cleanup(*key)
|
|
return decorator
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def wait_threads_exit(timeout=None):
|
|
"""
|
|
bpo-31234: Context manager to wait until all threads created in the with
|
|
statement exit.
|
|
|
|
Use _thread.count() to check if threads exited. Indirectly, wait until
|
|
threads exit the internal t_bootstrap() C function of the _thread module.
|
|
|
|
threading_setup() and threading_cleanup() are designed to emit a warning
|
|
if a test leaves running threads in the background. This context manager
|
|
is designed to cleanup threads started by the _thread.start_new_thread()
|
|
which doesn't allow to wait for thread exit, whereas thread.Thread has a
|
|
join() method.
|
|
"""
|
|
if timeout is None:
|
|
timeout = support.SHORT_TIMEOUT
|
|
old_count = _thread._count()
|
|
try:
|
|
yield
|
|
finally:
|
|
start_time = time.monotonic()
|
|
for _ in support.sleeping_retry(timeout, error=False):
|
|
support.gc_collect()
|
|
count = _thread._count()
|
|
if count <= old_count:
|
|
break
|
|
else:
|
|
dt = time.monotonic() - start_time
|
|
msg = (f"wait_threads() failed to cleanup {count - old_count} "
|
|
f"threads after {dt:.1f} seconds "
|
|
f"(count: {count}, old count: {old_count})")
|
|
raise AssertionError(msg)
|
|
|
|
|
|
def join_thread(thread, timeout=None):
|
|
"""Join a thread. Raise an AssertionError if the thread is still alive
|
|
after timeout seconds.
|
|
"""
|
|
if timeout is None:
|
|
timeout = support.SHORT_TIMEOUT
|
|
thread.join(timeout)
|
|
if thread.is_alive():
|
|
msg = f"failed to join the thread in {timeout:.1f} seconds"
|
|
raise AssertionError(msg)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def start_threads(threads, unlock=None):
|
|
try:
|
|
import faulthandler
|
|
except ImportError:
|
|
# It isn't supported on subinterpreters yet.
|
|
faulthandler = None
|
|
threads = list(threads)
|
|
started = []
|
|
try:
|
|
try:
|
|
for t in threads:
|
|
t.start()
|
|
started.append(t)
|
|
except:
|
|
if support.verbose:
|
|
print("Can't start %d threads, only %d threads started" %
|
|
(len(threads), len(started)))
|
|
raise
|
|
yield
|
|
finally:
|
|
try:
|
|
if unlock:
|
|
unlock()
|
|
endtime = time.monotonic()
|
|
for timeout in range(1, 16):
|
|
endtime += 60
|
|
for t in started:
|
|
t.join(max(endtime - time.monotonic(), 0.01))
|
|
started = [t for t in started if t.is_alive()]
|
|
if not started:
|
|
break
|
|
if support.verbose:
|
|
print('Unable to join %d threads during a period of '
|
|
'%d minutes' % (len(started), timeout))
|
|
finally:
|
|
started = [t for t in started if t.is_alive()]
|
|
if started:
|
|
if faulthandler is not None:
|
|
faulthandler.dump_traceback(sys.stdout)
|
|
raise AssertionError('Unable to join %d threads' % len(started))
|
|
|
|
|
|
class catch_threading_exception:
|
|
"""
|
|
Context manager catching threading.Thread exception using
|
|
threading.excepthook.
|
|
|
|
Attributes set when an exception is caught:
|
|
|
|
* exc_type
|
|
* exc_value
|
|
* exc_traceback
|
|
* thread
|
|
|
|
See threading.excepthook() documentation for these attributes.
|
|
|
|
These attributes are deleted at the context manager exit.
|
|
|
|
Usage:
|
|
|
|
with threading_helper.catch_threading_exception() as cm:
|
|
# code spawning a thread which raises an exception
|
|
...
|
|
|
|
# check the thread exception, use cm attributes:
|
|
# exc_type, exc_value, exc_traceback, thread
|
|
...
|
|
|
|
# exc_type, exc_value, exc_traceback, thread attributes of cm no longer
|
|
# exists at this point
|
|
# (to avoid reference cycles)
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.exc_type = None
|
|
self.exc_value = None
|
|
self.exc_traceback = None
|
|
self.thread = None
|
|
self._old_hook = None
|
|
|
|
def _hook(self, args):
|
|
self.exc_type = args.exc_type
|
|
self.exc_value = args.exc_value
|
|
self.exc_traceback = args.exc_traceback
|
|
self.thread = args.thread
|
|
|
|
def __enter__(self):
|
|
self._old_hook = threading.excepthook
|
|
threading.excepthook = self._hook
|
|
return self
|
|
|
|
def __exit__(self, *exc_info):
|
|
threading.excepthook = self._old_hook
|
|
del self.exc_type
|
|
del self.exc_value
|
|
del self.exc_traceback
|
|
del self.thread
|
|
|
|
|
|
def _can_start_thread() -> bool:
|
|
"""Detect whether Python can start new threads.
|
|
|
|
Some WebAssembly platforms do not provide a working pthread
|
|
implementation. Thread support is stubbed and any attempt
|
|
to create a new thread fails.
|
|
|
|
- wasm32-wasi does not have threading.
|
|
- wasm32-emscripten can be compiled with or without pthread
|
|
support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
|
|
"""
|
|
if sys.platform == "emscripten":
|
|
return sys._emscripten_info.pthreads
|
|
elif sys.platform == "wasi":
|
|
return False
|
|
else:
|
|
# assume all other platforms have working thread support.
|
|
return True
|
|
|
|
can_start_thread = _can_start_thread()
|
|
|
|
def requires_working_threading(*, module=False):
|
|
"""Skip tests or modules that require working threading.
|
|
|
|
Can be used as a function/class decorator or to skip an entire module.
|
|
"""
|
|
msg = "requires threading support"
|
|
if module:
|
|
if not can_start_thread:
|
|
raise unittest.SkipTest(msg)
|
|
else:
|
|
return unittest.skipUnless(can_start_thread, msg)
|