mirror of
https://github.com/python/cpython.git
synced 2025-09-26 10:19:53 +00:00
Issue 18984: Remove ._stopped Event from Thread internals.
The fix for issue 18808 left us checking two things to be sure a Thread was done: an Event (._stopped) and a mutex (._tstate_lock). Clumsy & brittle. This patch removes the Event, leaving just a happy lock :-) The bulk of the patch removes two excruciating tests, which were verifying sanity of the internals of the ._stopped Event after a fork. Thanks to Antoine Pitrou for verifying that's the only real value these tests had. One consequence of moving from an Event to a mutex: waiters (threads calling Thread.join()) used to block each on their own unique mutex (internal to the ._stopped event), but now all contend on the same mutex (._tstate_lock). These approaches have different performance characteristics on different platforms. I don't think it matters in this context.
This commit is contained in:
parent
050b62d1a6
commit
c363a23eff
2 changed files with 30 additions and 167 deletions
|
@ -647,144 +647,8 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
||||||
"""
|
"""
|
||||||
self._run_and_join(script)
|
self._run_and_join(script)
|
||||||
|
|
||||||
def assertScriptHasOutput(self, script, expected_output):
|
|
||||||
rc, out, err = assert_python_ok("-c", script)
|
|
||||||
data = out.decode().replace('\r', '')
|
|
||||||
self.assertEqual(data, expected_output)
|
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
|
||||||
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
||||||
def test_4_joining_across_fork_in_worker_thread(self):
|
def test_4_daemon_threads(self):
|
||||||
# There used to be a possible deadlock when forking from a child
|
|
||||||
# thread. See http://bugs.python.org/issue6643.
|
|
||||||
|
|
||||||
# The script takes the following steps:
|
|
||||||
# - The main thread in the parent process starts a new thread and then
|
|
||||||
# tries to join it.
|
|
||||||
# - The join operation acquires the Lock inside the thread's _block
|
|
||||||
# Condition. (See threading.py:Thread.join().)
|
|
||||||
# - We stub out the acquire method on the condition to force it to wait
|
|
||||||
# until the child thread forks. (See LOCK ACQUIRED HERE)
|
|
||||||
# - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
|
|
||||||
# HERE)
|
|
||||||
# - The main thread of the parent process enters Condition.wait(),
|
|
||||||
# which releases the lock on the child thread.
|
|
||||||
# - The child process returns. Without the necessary fix, when the
|
|
||||||
# main thread of the child process (which used to be the child thread
|
|
||||||
# in the parent process) attempts to exit, it will try to acquire the
|
|
||||||
# lock in the Thread._block Condition object and hang, because the
|
|
||||||
# lock was held across the fork.
|
|
||||||
|
|
||||||
script = """if 1:
|
|
||||||
import os, time, threading
|
|
||||||
|
|
||||||
finish_join = False
|
|
||||||
start_fork = False
|
|
||||||
|
|
||||||
def worker():
|
|
||||||
# Wait until this thread's lock is acquired before forking to
|
|
||||||
# create the deadlock.
|
|
||||||
global finish_join
|
|
||||||
while not start_fork:
|
|
||||||
time.sleep(0.01)
|
|
||||||
# LOCK HELD: Main thread holds lock across this call.
|
|
||||||
childpid = os.fork()
|
|
||||||
finish_join = True
|
|
||||||
if childpid != 0:
|
|
||||||
# Parent process just waits for child.
|
|
||||||
os.waitpid(childpid, 0)
|
|
||||||
# Child process should just return.
|
|
||||||
|
|
||||||
w = threading.Thread(target=worker)
|
|
||||||
|
|
||||||
# Stub out the private condition variable's lock acquire method.
|
|
||||||
# This acquires the lock and then waits until the child has forked
|
|
||||||
# before returning, which will release the lock soon after. If
|
|
||||||
# someone else tries to fix this test case by acquiring this lock
|
|
||||||
# before forking instead of resetting it, the test case will
|
|
||||||
# deadlock when it shouldn't.
|
|
||||||
condition = w._stopped._cond
|
|
||||||
orig_acquire = condition.acquire
|
|
||||||
call_count_lock = threading.Lock()
|
|
||||||
call_count = 0
|
|
||||||
def my_acquire():
|
|
||||||
global call_count
|
|
||||||
global start_fork
|
|
||||||
orig_acquire() # LOCK ACQUIRED HERE
|
|
||||||
start_fork = True
|
|
||||||
if call_count == 0:
|
|
||||||
while not finish_join:
|
|
||||||
time.sleep(0.01) # WORKER THREAD FORKS HERE
|
|
||||||
with call_count_lock:
|
|
||||||
call_count += 1
|
|
||||||
condition.acquire = my_acquire
|
|
||||||
|
|
||||||
w.start()
|
|
||||||
w.join()
|
|
||||||
print('end of main')
|
|
||||||
"""
|
|
||||||
self.assertScriptHasOutput(script, "end of main\n")
|
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
|
|
||||||
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
|
||||||
def test_5_clear_waiter_locks_to_avoid_crash(self):
|
|
||||||
# Check that a spawned thread that forks doesn't segfault on certain
|
|
||||||
# platforms, namely OS X. This used to happen if there was a waiter
|
|
||||||
# lock in the thread's condition variable's waiters list. Even though
|
|
||||||
# we know the lock will be held across the fork, it is not safe to
|
|
||||||
# release locks held across forks on all platforms, so releasing the
|
|
||||||
# waiter lock caused a segfault on OS X. Furthermore, since locks on
|
|
||||||
# OS X are (as of this writing) implemented with a mutex + condition
|
|
||||||
# variable instead of a semaphore, while we know that the Python-level
|
|
||||||
# lock will be acquired, we can't know if the internal mutex will be
|
|
||||||
# acquired at the time of the fork.
|
|
||||||
|
|
||||||
script = """if True:
|
|
||||||
import os, time, threading
|
|
||||||
|
|
||||||
start_fork = False
|
|
||||||
|
|
||||||
def worker():
|
|
||||||
# Wait until the main thread has attempted to join this thread
|
|
||||||
# before continuing.
|
|
||||||
while not start_fork:
|
|
||||||
time.sleep(0.01)
|
|
||||||
childpid = os.fork()
|
|
||||||
if childpid != 0:
|
|
||||||
# Parent process just waits for child.
|
|
||||||
(cpid, rc) = os.waitpid(childpid, 0)
|
|
||||||
assert cpid == childpid
|
|
||||||
assert rc == 0
|
|
||||||
print('end of worker thread')
|
|
||||||
else:
|
|
||||||
# Child process should just return.
|
|
||||||
pass
|
|
||||||
|
|
||||||
w = threading.Thread(target=worker)
|
|
||||||
|
|
||||||
# Stub out the private condition variable's _release_save method.
|
|
||||||
# This releases the condition's lock and flips the global that
|
|
||||||
# causes the worker to fork. At this point, the problematic waiter
|
|
||||||
# lock has been acquired once by the waiter and has been put onto
|
|
||||||
# the waiters list.
|
|
||||||
condition = w._stopped._cond
|
|
||||||
orig_release_save = condition._release_save
|
|
||||||
def my_release_save():
|
|
||||||
global start_fork
|
|
||||||
orig_release_save()
|
|
||||||
# Waiter lock held here, condition lock released.
|
|
||||||
start_fork = True
|
|
||||||
condition._release_save = my_release_save
|
|
||||||
|
|
||||||
w.start()
|
|
||||||
w.join()
|
|
||||||
print('end of main thread')
|
|
||||||
"""
|
|
||||||
output = "end of worker thread\nend of main thread\n"
|
|
||||||
self.assertScriptHasOutput(script, output)
|
|
||||||
|
|
||||||
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
|
|
||||||
def test_6_daemon_threads(self):
|
|
||||||
# Check that a daemon thread cannot crash the interpreter on shutdown
|
# Check that a daemon thread cannot crash the interpreter on shutdown
|
||||||
# by manipulating internal structures that are being disposed of in
|
# by manipulating internal structures that are being disposed of in
|
||||||
# the main thread.
|
# the main thread.
|
||||||
|
|
|
@ -549,7 +549,7 @@ class Thread:
|
||||||
self._ident = None
|
self._ident = None
|
||||||
self._tstate_lock = None
|
self._tstate_lock = None
|
||||||
self._started = Event()
|
self._started = Event()
|
||||||
self._stopped = Event()
|
self._is_stopped = False
|
||||||
self._initialized = True
|
self._initialized = True
|
||||||
# sys.stderr is not stored in the class like
|
# sys.stderr is not stored in the class like
|
||||||
# sys.exc_info since it can be changed between instances
|
# sys.exc_info since it can be changed between instances
|
||||||
|
@ -561,7 +561,6 @@ class Thread:
|
||||||
# private! Called by _after_fork() to reset our internal locks as
|
# private! Called by _after_fork() to reset our internal locks as
|
||||||
# they may be in an invalid state leading to a deadlock or crash.
|
# they may be in an invalid state leading to a deadlock or crash.
|
||||||
self._started._reset_internal_locks()
|
self._started._reset_internal_locks()
|
||||||
self._stopped._reset_internal_locks()
|
|
||||||
if is_alive:
|
if is_alive:
|
||||||
self._set_tstate_lock()
|
self._set_tstate_lock()
|
||||||
else:
|
else:
|
||||||
|
@ -574,7 +573,7 @@ class Thread:
|
||||||
status = "initial"
|
status = "initial"
|
||||||
if self._started.is_set():
|
if self._started.is_set():
|
||||||
status = "started"
|
status = "started"
|
||||||
if self._stopped.is_set():
|
if self._is_stopped:
|
||||||
status = "stopped"
|
status = "stopped"
|
||||||
if self._daemonic:
|
if self._daemonic:
|
||||||
status += " daemon"
|
status += " daemon"
|
||||||
|
@ -696,7 +695,6 @@ class Thread:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
with _active_limbo_lock:
|
with _active_limbo_lock:
|
||||||
self._stop()
|
|
||||||
try:
|
try:
|
||||||
# We don't call self._delete() because it also
|
# We don't call self._delete() because it also
|
||||||
# grabs _active_limbo_lock.
|
# grabs _active_limbo_lock.
|
||||||
|
@ -705,7 +703,8 @@ class Thread:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _stop(self):
|
def _stop(self):
|
||||||
self._stopped.set()
|
self._is_stopped = True
|
||||||
|
self._tstate_lock = None
|
||||||
|
|
||||||
def _delete(self):
|
def _delete(self):
|
||||||
"Remove current thread from the dict of currently running threads."
|
"Remove current thread from the dict of currently running threads."
|
||||||
|
@ -749,29 +748,24 @@ class Thread:
|
||||||
raise RuntimeError("cannot join thread before it is started")
|
raise RuntimeError("cannot join thread before it is started")
|
||||||
if self is current_thread():
|
if self is current_thread():
|
||||||
raise RuntimeError("cannot join current thread")
|
raise RuntimeError("cannot join current thread")
|
||||||
if not self.is_alive():
|
if timeout is None:
|
||||||
return
|
self._wait_for_tstate_lock()
|
||||||
self._stopped.wait(timeout)
|
else:
|
||||||
if self._stopped.is_set():
|
self._wait_for_tstate_lock(timeout=timeout)
|
||||||
self._wait_for_tstate_lock(timeout is None)
|
|
||||||
|
|
||||||
def _wait_for_tstate_lock(self, block):
|
def _wait_for_tstate_lock(self, block=True, timeout=-1):
|
||||||
# Issue #18808: wait for the thread state to be gone.
|
# Issue #18808: wait for the thread state to be gone.
|
||||||
# When self._stopped is set, the Python part of the thread is done,
|
# At the end of the thread's life, after all knowledge of the thread
|
||||||
# but the thread's tstate has not yet been destroyed. The C code
|
# is removed from C data structures, C code releases our _tstate_lock.
|
||||||
# releases self._tstate_lock when the C part of the thread is done
|
# This method passes its arguments to _tstate_lock.aquire().
|
||||||
# (the code at the end of the thread's life to remove all knowledge
|
# If the lock is acquired, the C code is done, and self._stop() is
|
||||||
# of the thread from the C data structures).
|
# called. That sets ._is_stopped to True, and ._tstate_lock to None.
|
||||||
# This method waits to acquire _tstate_lock if `block` is True, or
|
|
||||||
# sees whether it can be acquired immediately if `block` is False.
|
|
||||||
# If it does acquire the lock, the C code is done, and _tstate_lock
|
|
||||||
# is set to None.
|
|
||||||
lock = self._tstate_lock
|
lock = self._tstate_lock
|
||||||
if lock is None:
|
if lock is None: # already determined that the C code is done
|
||||||
return # already determined that the C code is done
|
assert self._is_stopped
|
||||||
if lock.acquire(block):
|
elif lock.acquire(block, timeout):
|
||||||
lock.release()
|
lock.release()
|
||||||
self._tstate_lock = None
|
self._stop()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def name(self):
|
def name(self):
|
||||||
|
@ -790,14 +784,10 @@ class Thread:
|
||||||
|
|
||||||
def is_alive(self):
|
def is_alive(self):
|
||||||
assert self._initialized, "Thread.__init__() not called"
|
assert self._initialized, "Thread.__init__() not called"
|
||||||
if not self._started.is_set():
|
if self._is_stopped or not self._started.is_set():
|
||||||
return False
|
return False
|
||||||
if not self._stopped.is_set():
|
|
||||||
return True
|
|
||||||
# The Python part of the thread is done, but the C part may still be
|
|
||||||
# waiting to run.
|
|
||||||
self._wait_for_tstate_lock(False)
|
self._wait_for_tstate_lock(False)
|
||||||
return self._tstate_lock is not None
|
return not self._is_stopped
|
||||||
|
|
||||||
isAlive = is_alive
|
isAlive = is_alive
|
||||||
|
|
||||||
|
@ -861,6 +851,7 @@ class _MainThread(Thread):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
Thread.__init__(self, name="MainThread", daemon=False)
|
Thread.__init__(self, name="MainThread", daemon=False)
|
||||||
|
self._set_tstate_lock()
|
||||||
self._started.set()
|
self._started.set()
|
||||||
self._set_ident()
|
self._set_ident()
|
||||||
with _active_limbo_lock:
|
with _active_limbo_lock:
|
||||||
|
@ -925,6 +916,14 @@ from _thread import stack_size
|
||||||
_main_thread = _MainThread()
|
_main_thread = _MainThread()
|
||||||
|
|
||||||
def _shutdown():
|
def _shutdown():
|
||||||
|
# Obscure: other threads may be waiting to join _main_thread. That's
|
||||||
|
# dubious, but some code does it. We can't wait for C code to release
|
||||||
|
# the main thread's tstate_lock - that won't happen until the interpreter
|
||||||
|
# is nearly dead. So we release it here. Note that just calling _stop()
|
||||||
|
# isn't enough: other threads may already be waiting on _tstate_lock.
|
||||||
|
assert _main_thread._tstate_lock is not None
|
||||||
|
assert _main_thread._tstate_lock.locked()
|
||||||
|
_main_thread._tstate_lock.release()
|
||||||
_main_thread._stop()
|
_main_thread._stop()
|
||||||
t = _pickSomeNonDaemonThread()
|
t = _pickSomeNonDaemonThread()
|
||||||
while t:
|
while t:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue