gh-110771: Decompose run_forever() into parts (#110773)

Effectively introduce an unstable, private (really: protected) API for subclasses that want to override `run_forever()`.
This commit is contained in:
Russell Keith-Magee 2023-10-13 16:12:32 +02:00 committed by GitHub
parent 0ed2329a16
commit a7e2a10a85
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 31 deletions

View file

@ -400,6 +400,8 @@ class BaseEventLoop(events.AbstractEventLoop):
self._clock_resolution = time.get_clock_info('monotonic').resolution self._clock_resolution = time.get_clock_info('monotonic').resolution
self._exception_handler = None self._exception_handler = None
self.set_debug(coroutines._is_debug_mode()) self.set_debug(coroutines._is_debug_mode())
# The preserved state of async generator hooks.
self._old_agen_hooks = None
# In debug mode, if the execution of a callback or a step of a task # In debug mode, if the execution of a callback or a step of a task
# exceed this duration in seconds, the slow callback/task is logged. # exceed this duration in seconds, the slow callback/task is logged.
self.slow_callback_duration = 0.1 self.slow_callback_duration = 0.1
@ -601,29 +603,52 @@ class BaseEventLoop(events.AbstractEventLoop):
raise RuntimeError( raise RuntimeError(
'Cannot run the event loop while another loop is running') 'Cannot run the event loop while another loop is running')
def run_forever(self): def _run_forever_setup(self):
"""Run until stop() is called.""" """Prepare the run loop to process events.
This method exists so that custom custom event loop subclasses (e.g., event loops
that integrate a GUI event loop with Python's event loop) have access to all the
loop setup logic.
"""
self._check_closed() self._check_closed()
self._check_running() self._check_running()
self._set_coroutine_origin_tracking(self._debug) self._set_coroutine_origin_tracking(self._debug)
old_agen_hooks = sys.get_asyncgen_hooks() self._old_agen_hooks = sys.get_asyncgen_hooks()
try: self._thread_id = threading.get_ident()
self._thread_id = threading.get_ident() sys.set_asyncgen_hooks(
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook) finalizer=self._asyncgen_finalizer_hook
)
events._set_running_loop(self) events._set_running_loop(self)
def _run_forever_cleanup(self):
"""Clean up after an event loop finishes the looping over events.
This method exists so that custom custom event loop subclasses (e.g., event loops
that integrate a GUI event loop with Python's event loop) have access to all the
loop cleanup logic.
"""
self._stopping = False
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
# Restore any pre-existing async generator hooks.
if self._old_agen_hooks is not None:
sys.set_asyncgen_hooks(*self._old_agen_hooks)
self._old_agen_hooks = None
def run_forever(self):
"""Run until stop() is called."""
try:
self._run_forever_setup()
while True: while True:
self._run_once() self._run_once()
if self._stopping: if self._stopping:
break break
finally: finally:
self._stopping = False self._run_forever_cleanup()
self._thread_id = None
events._set_running_loop(None)
self._set_coroutine_origin_tracking(False)
sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future): def run_until_complete(self, future):
"""Run until the Future is done. """Run until the Future is done.

View file

@ -314,24 +314,25 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
proactor = IocpProactor() proactor = IocpProactor()
super().__init__(proactor) super().__init__(proactor)
def run_forever(self): def _run_forever_setup(self):
try: assert self._self_reading_future is None
assert self._self_reading_future is None self.call_soon(self._loop_self_reading)
self.call_soon(self._loop_self_reading) super()._run_forever_setup()
super().run_forever()
finally: def _run_forever_cleanup(self):
if self._self_reading_future is not None: super()._run_forever_cleanup()
ov = self._self_reading_future._ov if self._self_reading_future is not None:
self._self_reading_future.cancel() ov = self._self_reading_future._ov
# self_reading_future was just cancelled so if it hasn't been self._self_reading_future.cancel()
# finished yet, it never will be (it's possible that it has # self_reading_future was just cancelled so if it hasn't been
# already finished and its callback is waiting in the queue, # finished yet, it never will be (it's possible that it has
# where it could still happen if the event loop is restarted). # already finished and its callback is waiting in the queue,
# Unregister it otherwise IocpProactor.close will wait for it # where it could still happen if the event loop is restarted).
# forever # Unregister it otherwise IocpProactor.close will wait for it
if ov is not None: # forever
self._proactor._unregister(ov) if ov is not None:
self._self_reading_future = None self._proactor._unregister(ov)
self._self_reading_future = None
async def create_pipe_connection(self, protocol_factory, address): async def create_pipe_connection(self, protocol_factory, address):
f = self._proactor.connect_pipe(address) f = self._proactor.connect_pipe(address)

View file

@ -922,6 +922,43 @@ class BaseEventLoopTests(test_utils.TestCase):
self.loop.run_forever() self.loop.run_forever()
self.loop._selector.select.assert_called_once_with(0) self.loop._selector.select.assert_called_once_with(0)
def test_custom_run_forever_integration(self):
# Test that the run_forever_setup() and run_forever_cleanup() primitives
# can be used to implement a custom run_forever loop.
self.loop._process_events = mock.Mock()
count = 0
def callback():
nonlocal count
count += 1
self.loop.call_soon(callback)
# Set up the custom event loop
self.loop._run_forever_setup()
# Confirm the loop has been started
self.assertEqual(asyncio.get_running_loop(), self.loop)
self.assertTrue(self.loop.is_running())
# Our custom "event loop" just iterates 10 times before exiting.
for i in range(10):
self.loop._run_once()
# Clean up the event loop
self.loop._run_forever_cleanup()
# Confirm the loop has been cleaned up
with self.assertRaises(RuntimeError):
asyncio.get_running_loop()
self.assertFalse(self.loop.is_running())
# Confirm the loop actually did run, processing events 10 times,
# and invoking the callback once.
self.assertEqual(self.loop._process_events.call_count, 10)
self.assertEqual(count, 1)
async def leave_unfinalized_asyncgen(self): async def leave_unfinalized_asyncgen(self):
# Create an async generator, iterate it partially, and leave it # Create an async generator, iterate it partially, and leave it
# to be garbage collected. # to be garbage collected.

View file

@ -0,0 +1 @@
Expose the setup and cleanup portions of ``asyncio.run_forever()`` as the standalone methods ``asyncio.run_forever_setup()`` and ``asyncio.run_forever_cleanup()``. This allows for tighter integration with GUI event loops.