GH-66285: fix forking in asyncio (#99539)

`asyncio` now does not shares event loop and signal wakeupfd in forked processes.
This commit is contained in:
Kumar Aditya 2022-11-24 09:10:27 +05:30 committed by GitHub
parent 9dc08361be
commit 0c1fbc17b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 0 deletions

View file

@ -17,6 +17,7 @@ import socket
import subprocess import subprocess
import sys import sys
import threading import threading
import signal
from . import format_helpers from . import format_helpers
@ -665,6 +666,14 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
def __init__(self): def __init__(self):
self._local = self._Local() self._local = self._Local()
if hasattr(os, 'fork'):
def on_fork():
# Reset the loop and wakeupfd in the forked child process.
self._local = self._Local()
signal.set_wakeup_fd(-1)
os.register_at_fork(after_in_child=on_fork)
def get_event_loop(self): def get_event_loop(self):
"""Get the event loop for the current context. """Get the event loop for the current context.

View file

@ -11,10 +11,13 @@ import stat
import sys import sys
import threading import threading
import unittest import unittest
import time
from unittest import mock from unittest import mock
import warnings import warnings
import multiprocessing
from test.support import os_helper from test.support import os_helper
from test.support import socket_helper from test.support import socket_helper
from test.support import wait_process
if sys.platform == 'win32': if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only') raise unittest.SkipTest('UNIX only')
@ -1867,5 +1870,97 @@ class TestFunctional(unittest.TestCase):
wsock.close() wsock.close()
@unittest.skipUnless(hasattr(os, 'fork'), 'requires os.fork()')
class TestFork(unittest.IsolatedAsyncioTestCase):
async def test_fork_not_share_event_loop(self):
# The forked process should not share the event loop with the parent
loop = asyncio.get_running_loop()
r, w = os.pipe()
self.addCleanup(os.close, r)
self.addCleanup(os.close, w)
pid = os.fork()
if pid == 0:
# child
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
os.write(w, str(id(loop)).encode())
finally:
os._exit(0)
else:
# parent
child_loop = int(os.read(r, 100).decode())
self.assertNotEqual(child_loop, id(loop))
wait_process(pid, exitcode=0)
def test_fork_signal_handling(self):
# Sending signal to the forked process should not affect the parent
# process
ctx = multiprocessing.get_context('fork')
manager = ctx.Manager()
self.addCleanup(manager.shutdown)
child_started = manager.Event()
child_handled = manager.Event()
parent_handled = manager.Event()
def child_main():
signal.signal(signal.SIGTERM, lambda *args: child_handled.set())
child_started.set()
time.sleep(1)
async def main():
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, lambda *args: parent_handled.set())
process = ctx.Process(target=child_main)
process.start()
child_started.wait()
os.kill(process.pid, signal.SIGTERM)
process.join()
async def func():
await asyncio.sleep(0.1)
return 42
# Test parent's loop is still functional
self.assertEqual(await asyncio.create_task(func()), 42)
asyncio.run(main())
self.assertFalse(parent_handled.is_set())
self.assertTrue(child_handled.is_set())
def test_fork_asyncio_run(self):
ctx = multiprocessing.get_context('fork')
manager = ctx.Manager()
self.addCleanup(manager.shutdown)
result = manager.Value('i', 0)
async def child_main():
await asyncio.sleep(0.1)
result.value = 42
process = ctx.Process(target=lambda: asyncio.run(child_main()))
process.start()
process.join()
self.assertEqual(result.value, 42)
def test_fork_asyncio_subprocess(self):
ctx = multiprocessing.get_context('fork')
manager = ctx.Manager()
self.addCleanup(manager.shutdown)
result = manager.Value('i', 1)
async def child_main():
proc = await asyncio.create_subprocess_exec(sys.executable, '-c', 'pass')
result.value = await proc.wait()
process = ctx.Process(target=lambda: asyncio.run(child_main()))
process.start()
process.join()
self.assertEqual(result.value, 0)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -0,0 +1 @@
Fix :mod:`asyncio` to not share event loop and signal wakeupfd in forked processes. Patch by Kumar Aditya.