mirror of
https://github.com/python/cpython.git
synced 2025-10-12 01:43:12 +00:00
[3.6] bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed (GH-3247) (#4254)
* bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed
* Avoid mucking with process state in test.
Add a warning if the semaphore process died, as semaphores may then be leaked.
* Add NEWS entry
(cherry picked from commit cbe1756
)
This commit is contained in:
parent
f8b3f6b178
commit
b5f09acf0a
3 changed files with 57 additions and 7 deletions
|
@ -4,6 +4,7 @@
|
|||
|
||||
import unittest
|
||||
import queue as pyqueue
|
||||
import contextlib
|
||||
import time
|
||||
import io
|
||||
import itertools
|
||||
|
@ -4125,14 +4126,14 @@ class TestStartMethod(unittest.TestCase):
|
|||
self.fail("failed spawning forkserver or grandchild")
|
||||
|
||||
|
||||
#
|
||||
# Check that killing process does not leak named semaphores
|
||||
#
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32",
|
||||
"test semantics don't make sense on Windows")
|
||||
class TestSemaphoreTracker(unittest.TestCase):
|
||||
|
||||
def test_semaphore_tracker(self):
|
||||
#
|
||||
# Check that killing process does not leak named semaphores
|
||||
#
|
||||
import subprocess
|
||||
cmd = '''if 1:
|
||||
import multiprocessing as mp, time, os
|
||||
|
@ -4166,6 +4167,40 @@ class TestSemaphoreTracker(unittest.TestCase):
|
|||
self.assertRegex(err, expected)
|
||||
self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
|
||||
|
||||
def check_semaphore_tracker_death(self, signum, should_die):
|
||||
# bpo-31310: if the semaphore tracker process has died, it should
|
||||
# be restarted implicitly.
|
||||
from multiprocessing.semaphore_tracker import _semaphore_tracker
|
||||
_semaphore_tracker.ensure_running()
|
||||
pid = _semaphore_tracker._pid
|
||||
os.kill(pid, signum)
|
||||
time.sleep(1.0) # give it time to die
|
||||
|
||||
ctx = multiprocessing.get_context("spawn")
|
||||
with contextlib.ExitStack() as stack:
|
||||
if should_die:
|
||||
stack.enter_context(self.assertWarnsRegex(
|
||||
UserWarning,
|
||||
"semaphore_tracker: process died"))
|
||||
sem = ctx.Semaphore()
|
||||
sem.acquire()
|
||||
sem.release()
|
||||
wr = weakref.ref(sem)
|
||||
# ensure `sem` gets collected, which triggers communication with
|
||||
# the semaphore tracker
|
||||
del sem
|
||||
gc.collect()
|
||||
self.assertIsNone(wr())
|
||||
|
||||
def test_semaphore_tracker_sigint(self):
|
||||
# Catchable signal (ignored by semaphore tracker)
|
||||
self.check_semaphore_tracker_death(signal.SIGINT, False)
|
||||
|
||||
def test_semaphore_tracker_sigkill(self):
|
||||
# Uncatchable signal.
|
||||
self.check_semaphore_tracker_death(signal.SIGKILL, True)
|
||||
|
||||
|
||||
class TestSimpleQueue(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue