[3.12] gh-125679: multiprocessing Lock and RLock - fix invalid representation string on MacOSX. (GH-125680) (#126534)

gh-125679: multiprocessing Lock and RLock - fix invalid representation string on MacOSX. (GH-125680)
(cherry picked from commit 75f7cf91ec)

Co-authored-by: Duprat <yduprat@gmail.com>
This commit is contained in:
Miss Islington (bot) 2024-11-07 10:50:46 +01:00 committed by GitHub
parent d71da0feda
commit 4f10b8eacf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 126 additions and 2 deletions

View file

@ -1363,6 +1363,66 @@ class _TestQueue(BaseTestCase):
class _TestLock(BaseTestCase):
@staticmethod
def _acquire(lock, l=None):
lock.acquire()
if l is not None:
l.append(repr(lock))
@staticmethod
def _acquire_event(lock, event):
lock.acquire()
event.set()
time.sleep(1.0)
def test_repr_lock(self):
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
lock = self.Lock()
self.assertEqual(f'<Lock(owner=None)>', repr(lock))
lock.acquire()
self.assertEqual(f'<Lock(owner=MainProcess)>', repr(lock))
lock.release()
tname = 'T1'
l = []
t = threading.Thread(target=self._acquire,
args=(lock, l),
name=tname)
t.start()
time.sleep(0.1)
self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0])
lock.release()
t = threading.Thread(target=self._acquire,
args=(lock,),
name=tname)
t.start()
time.sleep(0.1)
self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock))
lock.release()
pname = 'P1'
l = multiprocessing.Manager().list()
p = self.Process(target=self._acquire,
args=(lock, l),
name=pname)
p.start()
p.join()
self.assertEqual(f'<Lock(owner={pname})>', l[0])
lock = self.Lock()
event = self.Event()
p = self.Process(target=self._acquire_event,
args=(lock, event),
name='P2')
p.start()
event.wait()
self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock))
p.terminate()
def test_lock(self):
lock = self.Lock()
self.assertEqual(lock.acquire(), True)
@ -1370,6 +1430,68 @@ class _TestLock(BaseTestCase):
self.assertEqual(lock.release(), None)
self.assertRaises((ValueError, threading.ThreadError), lock.release)
@staticmethod
def _acquire_release(lock, timeout, l=None, n=1):
for _ in range(n):
lock.acquire()
if l is not None:
l.append(repr(lock))
time.sleep(timeout)
for _ in range(n):
lock.release()
def test_repr_rlock(self):
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
lock = self.RLock()
self.assertEqual('<RLock(None, 0)>', repr(lock))
n = 3
for _ in range(n):
lock.acquire()
self.assertEqual(f'<RLock(MainProcess, {n})>', repr(lock))
for _ in range(n):
lock.release()
t, l = [], []
for i in range(n):
t.append(threading.Thread(target=self._acquire_release,
args=(lock, 0.1, l, i+1),
name=f'T{i+1}'))
t[-1].start()
for t_ in t:
t_.join()
for i in range(n):
self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l)
t = threading.Thread(target=self._acquire_release,
args=(lock, 0.2),
name=f'T1')
t.start()
time.sleep(0.1)
self.assertEqual('<RLock(SomeOtherThread, nonzero)>', repr(lock))
time.sleep(0.2)
pname = 'P1'
l = multiprocessing.Manager().list()
p = self.Process(target=self._acquire_release,
args=(lock, 0.1, l),
name=pname)
p.start()
p.join()
self.assertEqual(f'<RLock({pname}, 1)>', l[0])
event = self.Event()
lock = self.RLock()
p = self.Process(target=self._acquire_event,
args=(lock, event))
p.start()
event.wait()
self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock))
p.join()
def test_rlock(self):
lock = self.RLock()
self.assertEqual(lock.acquire(), True)