mirror of
https://github.com/python/cpython.git
synced 2025-07-07 19:35:27 +00:00
gh-132561: Fix the public multiprocessing.SemLock.locked
method (#132586)
Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
This commit is contained in:
parent
0c356c865a
commit
15c75d7a8b
3 changed files with 40 additions and 1 deletions
|
@ -91,7 +91,7 @@ class SemLock(object):
|
|||
self.release = self._semlock.release
|
||||
|
||||
def locked(self):
|
||||
return self._semlock._count() != 0
|
||||
return self._semlock._is_zero()
|
||||
|
||||
def __enter__(self):
|
||||
return self._semlock.__enter__()
|
||||
|
|
|
@ -1492,6 +1492,27 @@ class _TestLock(BaseTestCase):
|
|||
self.assertFalse(lock.locked())
|
||||
self.assertRaises((ValueError, threading.ThreadError), lock.release)
|
||||
|
||||
@classmethod
|
||||
def _test_lock_locked_2processes(cls, lock, event, res):
|
||||
lock.acquire()
|
||||
res.value = lock.locked()
|
||||
event.set()
|
||||
|
||||
def test_lock_locked_2processes(self):
|
||||
if self.TYPE != 'processes':
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
lock = self.Lock()
|
||||
event = self.Event()
|
||||
res = self.Value('b', 0)
|
||||
p = self.Process(target=self._test_lock_locked_2processes,
|
||||
args=(lock, event, res))
|
||||
p.start()
|
||||
event.wait()
|
||||
self.assertTrue(lock.locked())
|
||||
self.assertTrue(res.value)
|
||||
p.join()
|
||||
|
||||
@staticmethod
|
||||
def _acquire_release(lock, timeout, l=None, n=1):
|
||||
for _ in range(n):
|
||||
|
@ -1561,6 +1582,22 @@ class _TestLock(BaseTestCase):
|
|||
self.assertFalse(lock.locked())
|
||||
self.assertRaises((AssertionError, RuntimeError), lock.release)
|
||||
|
||||
def test_rlock_locked_2processes(self):
|
||||
if self.TYPE != 'processes':
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
rlock = self.RLock()
|
||||
event = self.Event()
|
||||
res = Value('b', 0)
|
||||
# target is the same as for the test_lock_locked_2processes test.
|
||||
p = self.Process(target=self._test_lock_locked_2processes,
|
||||
args=(rlock, event, res))
|
||||
p.start()
|
||||
event.wait()
|
||||
self.assertTrue(rlock.locked())
|
||||
self.assertTrue(res.value)
|
||||
p.join()
|
||||
|
||||
def test_lock_context(self):
|
||||
with self.Lock() as locked:
|
||||
self.assertTrue(locked)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Fix the public ``locked`` method of ``multiprocessing.SemLock`` class.
|
||||
Also adding 2 tests for the derivated :class:`multiprocessing.Lock` and :class:`multiprocessing.RLock` classes.
|
Loading…
Add table
Add a link
Reference in a new issue