mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (GH-14344)
This commit is contained in:
parent
5cbbbd73a6
commit
0d671c04c3
7 changed files with 378 additions and 72 deletions
|
@ -633,6 +633,7 @@ class SubprocessMixin:
|
|||
|
||||
self.assertIsNone(self.loop.run_until_complete(execute()))
|
||||
|
||||
|
||||
if sys.platform != 'win32':
|
||||
# Unix
|
||||
class SubprocessWatcherMixin(SubprocessMixin):
|
||||
|
@ -648,7 +649,24 @@ if sys.platform != 'win32':
|
|||
watcher = self.Watcher()
|
||||
watcher.attach_loop(self.loop)
|
||||
policy.set_child_watcher(watcher)
|
||||
self.addCleanup(policy.set_child_watcher, None)
|
||||
|
||||
def tearDown(self):
|
||||
super().tearDown()
|
||||
policy = asyncio.get_event_loop_policy()
|
||||
watcher = policy.get_child_watcher()
|
||||
policy.set_child_watcher(None)
|
||||
watcher.attach_loop(None)
|
||||
watcher.close()
|
||||
|
||||
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
|
||||
test_utils.TestCase):
|
||||
|
||||
Watcher = unix_events.ThreadedChildWatcher
|
||||
|
||||
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
|
||||
test_utils.TestCase):
|
||||
|
||||
Watcher = unix_events.MultiLoopChildWatcher
|
||||
|
||||
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
|
||||
test_utils.TestCase):
|
||||
|
@ -670,5 +688,25 @@ else:
|
|||
self.set_event_loop(self.loop)
|
||||
|
||||
|
||||
class GenericWatcherTests:
|
||||
|
||||
def test_create_subprocess_fails_with_inactive_watcher(self):
|
||||
|
||||
async def execute():
|
||||
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
|
||||
watcher.is_active.return_value = False
|
||||
asyncio.set_child_watcher(watcher)
|
||||
|
||||
with self.assertRaises(RuntimeError):
|
||||
await subprocess.create_subprocess_exec(
|
||||
support.FakePath(sys.executable), '-c', 'pass')
|
||||
|
||||
watcher.add_child_handler.assert_not_called()
|
||||
|
||||
self.assertIsNone(self.loop.run_until_complete(execute()))
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue