Docs and one small improvement for issue #25304, by Vincent Michel. (Merge 3.4->3.5.)

This commit is contained in:
Guido van Rossum 2015-10-05 16:23:13 -07:00
commit 5db034acfa
4 changed files with 75 additions and 4 deletions

View file

@ -96,10 +96,9 @@ the same thread. But when the task uses ``yield from``, the task is suspended
and the event loop executes the next task. and the event loop executes the next task.
To schedule a callback from a different thread, the To schedule a callback from a different thread, the
:meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example to :meth:`BaseEventLoop.call_soon_threadsafe` method should be used. Example::
schedule a coroutine from a different thread::
loop.call_soon_threadsafe(asyncio.ensure_future, coro_func()) loop.call_soon_threadsafe(callback, *args)
Most asyncio objects are not thread safe. You should only worry if you access Most asyncio objects are not thread safe. You should only worry if you access
objects outside the event loop. For example, to cancel a future, don't call objects outside the event loop. For example, to cancel a future, don't call
@ -110,6 +109,13 @@ directly its :meth:`Future.cancel` method, but::
To handle signals and to execute subprocesses, the event loop must be run in To handle signals and to execute subprocesses, the event loop must be run in
the main thread. the main thread.
To schedule a coroutine object from a different thread, the
:func:`run_coroutine_threadsafe` function should be used. It returns a
:class:`concurrent.futures.Future` to access the result::
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout) # Wait for the result with a timeout
The :meth:`BaseEventLoop.run_in_executor` method can be used with a thread pool The :meth:`BaseEventLoop.run_in_executor` method can be used with a thread pool
executor to execute a callback in different thread to not block the thread of executor to execute a callback in different thread to not block the thread of
the event loop. the event loop.

View file

@ -683,3 +683,42 @@ Task functions
.. versionchanged:: 3.4.3 .. versionchanged:: 3.4.3
If the wait is cancelled, the future *fut* is now also cancelled. If the wait is cancelled, the future *fut* is now also cancelled.
.. function:: run_coroutine_threadsafe(coro, loop)
Submit a :ref:`coroutine object <coroutine>` to a given event loop.
Return a :class:`concurrent.futures.Future` to access the result.
This function is meant to be called from a different thread than the one
where the event loop is running. Usage::
# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3
If an exception is raised in the coroutine, the returned future will be
notified. It can also be used to cancel the task in the event loop::
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print('The coroutine raised an exception: {!r}'.format(exc))
else:
print('The coroutine returned: {!r}'.format(result))
See the :ref:`concurrency and multithreading <asyncio-multithreading>`
section of the documentation.
.. note::
Unlike the functions above, :func:`run_coroutine_threadsafe` requires the
*loop* argument to be passed explicitely.
.. versionadded:: 3.4.4

View file

@ -704,7 +704,12 @@ def run_coroutine_threadsafe(coro, loop):
future = concurrent.futures.Future() future = concurrent.futures.Future()
def callback(): def callback():
futures._chain_future(ensure_future(coro, loop=loop), future) try:
futures._chain_future(ensure_future(coro, loop=loop), future)
except Exception as exc:
if future.set_running_or_notify_cancel():
future.set_exception(exc)
raise
loop.call_soon_threadsafe(callback) loop.call_soon_threadsafe(callback)
return future return future

View file

@ -2166,6 +2166,27 @@ class RunCoroutineThreadsafeTests(test_utils.TestCase):
with self.assertRaises(asyncio.CancelledError): with self.assertRaises(asyncio.CancelledError):
self.loop.run_until_complete(future) self.loop.run_until_complete(future)
def test_run_coroutine_threadsafe_task_factory_exception(self):
"""Test coroutine submission from a tread to an event loop
when the task factory raise an exception."""
# Clear the time generator
asyncio.ensure_future(self.add(1, 2), loop=self.loop)
# Schedule the target
future = self.loop.run_in_executor(None, self.target)
# Set corrupted task factory
self.loop.set_task_factory(lambda loop, coro: wrong_name)
# Set exception handler
callback = test_utils.MockCallback()
self.loop.set_exception_handler(callback)
# Run event loop
with self.assertRaises(NameError) as exc_context:
self.loop.run_until_complete(future)
# Check exceptions
self.assertIn('wrong_name', exc_context.exception.args[0])
self.assertEqual(len(callback.call_args_list), 1)
(loop, context), kwargs = callback.call_args
self.assertEqual(context['exception'], exc_context.exception)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()