mirror of
https://github.com/python/cpython.git
synced 2025-07-24 03:35:53 +00:00
[3.13] gh-110206: Fix multiprocessing test_notify_all (GH-130933) (#130950)
The test could deadlock trying join on the worker processes due to a
combination of behaviors:
* The use of `assertReachesEventually` did not ensure that workers
actually woken.release() because the SyncManager's Semaphore does not
implement get_value.
* This mean that the test could finish and the variable "sleeping" would
got out of scope and be collected. This unregisters the proxy leading
to failures in the worker or possibly the manager.
* The subsequent call to `p.join()` during cleanUp therefore never
finished.
This takes two approaches to fix this:
1) Use woken.acquire() to ensure that the workers actually finish
calling woken.release()
2) At the end of the test, wait until the workers are finished, while `cond`,
`sleeping`, and `woken` are still valid.
(cherry picked from commit c476410dc5
)
Co-authored-by: Sam Gross <colesbury@gmail.com>
This commit is contained in:
parent
e0838a2022
commit
94b94d0b12
1 changed files with 12 additions and 5 deletions
|
@ -1649,18 +1649,19 @@ class _TestCondition(BaseTestCase):
|
|||
woken = self.Semaphore(0)
|
||||
|
||||
# start some threads/processes which will timeout
|
||||
workers = []
|
||||
for i in range(3):
|
||||
p = self.Process(target=self.f,
|
||||
args=(cond, sleeping, woken, TIMEOUT1))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
self.addCleanup(p.join)
|
||||
workers.append(p)
|
||||
|
||||
t = threading.Thread(target=self.f,
|
||||
args=(cond, sleeping, woken, TIMEOUT1))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
self.addCleanup(t.join)
|
||||
workers.append(t)
|
||||
|
||||
# wait for them all to sleep
|
||||
for i in range(6):
|
||||
|
@ -1679,12 +1680,12 @@ class _TestCondition(BaseTestCase):
|
|||
p = self.Process(target=self.f, args=(cond, sleeping, woken))
|
||||
p.daemon = True
|
||||
p.start()
|
||||
self.addCleanup(p.join)
|
||||
workers.append(p)
|
||||
|
||||
t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
self.addCleanup(t.join)
|
||||
workers.append(t)
|
||||
|
||||
# wait for them to all sleep
|
||||
for i in range(6):
|
||||
|
@ -1700,11 +1701,17 @@ class _TestCondition(BaseTestCase):
|
|||
cond.release()
|
||||
|
||||
# check they have all woken
|
||||
self.assertReachesEventually(lambda: get_value(woken), 6)
|
||||
for i in range(6):
|
||||
woken.acquire()
|
||||
self.assertReturnsIfImplemented(0, get_value, woken)
|
||||
|
||||
# check state is not mucked up
|
||||
self.check_invariant(cond)
|
||||
|
||||
for w in workers:
|
||||
# NOTE: join_process and join_thread are the same
|
||||
threading_helper.join_thread(w)
|
||||
|
||||
def test_notify_n(self):
|
||||
cond = self.Condition()
|
||||
sleeping = self.Semaphore(0)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue