mirror of
https://github.com/python/cpython.git
synced 2025-08-30 13:38:43 +00:00
bpo-36888: Add multiprocessing.parent_process() (GH-13247)
This commit is contained in:
parent
5ae1c84bcd
commit
c09a9f56c0
12 changed files with 155 additions and 20 deletions
|
@ -269,6 +269,64 @@ class _TestProcess(BaseTestCase):
|
|||
q.put(bytes(current.authkey))
|
||||
q.put(current.pid)
|
||||
|
||||
def test_parent_process_attributes(self):
|
||||
if self.TYPE == "threads":
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
self.assertIsNone(self.parent_process())
|
||||
|
||||
rconn, wconn = self.Pipe(duplex=False)
|
||||
p = self.Process(target=self._test_send_parent_process, args=(wconn,))
|
||||
p.start()
|
||||
p.join()
|
||||
parent_pid, parent_name = rconn.recv()
|
||||
self.assertEqual(parent_pid, self.current_process().pid)
|
||||
self.assertEqual(parent_pid, os.getpid())
|
||||
self.assertEqual(parent_name, self.current_process().name)
|
||||
|
||||
@classmethod
|
||||
def _test_send_parent_process(cls, wconn):
|
||||
from multiprocessing.process import parent_process
|
||||
wconn.send([parent_process().pid, parent_process().name])
|
||||
|
||||
def test_parent_process(self):
|
||||
if self.TYPE == "threads":
|
||||
self.skipTest('test not appropriate for {}'.format(self.TYPE))
|
||||
|
||||
# Launch a child process. Make it launch a grandchild process. Kill the
|
||||
# child process and make sure that the grandchild notices the death of
|
||||
# its parent (a.k.a the child process).
|
||||
rconn, wconn = self.Pipe(duplex=False)
|
||||
p = self.Process(
|
||||
target=self._test_create_grandchild_process, args=(wconn, ))
|
||||
p.start()
|
||||
|
||||
if not rconn.poll(timeout=5):
|
||||
raise AssertionError("Could not communicate with child process")
|
||||
parent_process_status = rconn.recv()
|
||||
self.assertEqual(parent_process_status, "alive")
|
||||
|
||||
p.terminate()
|
||||
p.join()
|
||||
|
||||
if not rconn.poll(timeout=5):
|
||||
raise AssertionError("Could not communicate with child process")
|
||||
parent_process_status = rconn.recv()
|
||||
self.assertEqual(parent_process_status, "not alive")
|
||||
|
||||
@classmethod
|
||||
def _test_create_grandchild_process(cls, wconn):
|
||||
p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
|
||||
p.start()
|
||||
time.sleep(100)
|
||||
|
||||
@classmethod
|
||||
def _test_report_parent_status(cls, wconn):
|
||||
from multiprocessing.process import parent_process
|
||||
wconn.send("alive" if parent_process().is_alive() else "not alive")
|
||||
parent_process().join(timeout=5)
|
||||
wconn.send("alive" if parent_process().is_alive() else "not alive")
|
||||
|
||||
def test_process(self):
|
||||
q = self.Queue(1)
|
||||
e = self.Event()
|
||||
|
@ -5398,6 +5456,7 @@ class ProcessesMixin(BaseMixin):
|
|||
Process = multiprocessing.Process
|
||||
connection = multiprocessing.connection
|
||||
current_process = staticmethod(multiprocessing.current_process)
|
||||
parent_process = staticmethod(multiprocessing.parent_process)
|
||||
active_children = staticmethod(multiprocessing.active_children)
|
||||
Pool = staticmethod(multiprocessing.Pool)
|
||||
Pipe = staticmethod(multiprocessing.Pipe)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue