mirror of
https://github.com/python/cpython.git
synced 2025-07-15 23:35:23 +00:00

Add also a PYTHONASYNCIODEBUG environment variable to debug coroutines since Python startup, to be able to debug coroutines defined directly in the asyncio module.
1681 lines
51 KiB
Python
1681 lines
51 KiB
Python
"""Tests for tasks.py."""
|
|
|
|
import gc
|
|
import os.path
|
|
import unittest
|
|
from test.script_helper import assert_python_ok
|
|
|
|
import asyncio
|
|
from asyncio import test_utils
|
|
|
|
|
|
@asyncio.coroutine
|
|
def coroutine_function():
|
|
pass
|
|
|
|
|
|
class Dummy:
|
|
|
|
def __repr__(self):
|
|
return 'Dummy()'
|
|
|
|
def __call__(self, *args):
|
|
pass
|
|
|
|
|
|
class TaskTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.loop = test_utils.TestLoop()
|
|
asyncio.set_event_loop(None)
|
|
|
|
def tearDown(self):
|
|
self.loop.close()
|
|
gc.collect()
|
|
|
|
def test_task_class(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t = asyncio.Task(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t._loop, self.loop)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
t = asyncio.Task(notmuch(), loop=loop)
|
|
self.assertIs(t._loop, loop)
|
|
loop.close()
|
|
|
|
def test_async_coroutine(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t = asyncio.async(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t._loop, self.loop)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
t = asyncio.async(notmuch(), loop=loop)
|
|
self.assertIs(t._loop, loop)
|
|
loop.close()
|
|
|
|
def test_async_future(self):
|
|
f_orig = asyncio.Future(loop=self.loop)
|
|
f_orig.set_result('ko')
|
|
|
|
f = asyncio.async(f_orig)
|
|
self.loop.run_until_complete(f)
|
|
self.assertTrue(f.done())
|
|
self.assertEqual(f.result(), 'ko')
|
|
self.assertIs(f, f_orig)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
with self.assertRaises(ValueError):
|
|
f = asyncio.async(f_orig, loop=loop)
|
|
|
|
loop.close()
|
|
|
|
f = asyncio.async(f_orig, loop=self.loop)
|
|
self.assertIs(f, f_orig)
|
|
|
|
def test_async_task(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ok'
|
|
t_orig = asyncio.Task(notmuch(), loop=self.loop)
|
|
t = asyncio.async(t_orig)
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'ok')
|
|
self.assertIs(t, t_orig)
|
|
|
|
loop = asyncio.new_event_loop()
|
|
|
|
with self.assertRaises(ValueError):
|
|
t = asyncio.async(t_orig, loop=loop)
|
|
|
|
loop.close()
|
|
|
|
t = asyncio.async(t_orig, loop=self.loop)
|
|
self.assertIs(t, t_orig)
|
|
|
|
def test_async_neither(self):
|
|
with self.assertRaises(TypeError):
|
|
asyncio.async('ok')
|
|
|
|
def test_task_repr(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
yield from []
|
|
return 'abc'
|
|
|
|
t = asyncio.Task(notmuch(), loop=self.loop)
|
|
t.add_done_callback(Dummy())
|
|
self.assertEqual(repr(t), 'Task(<notmuch>)<PENDING, [Dummy()]>')
|
|
t.cancel() # Does not take immediate effect!
|
|
self.assertEqual(repr(t), 'Task(<notmuch>)<CANCELLING, [Dummy()]>')
|
|
self.assertRaises(asyncio.CancelledError,
|
|
self.loop.run_until_complete, t)
|
|
self.assertEqual(repr(t), 'Task(<notmuch>)<CANCELLED>')
|
|
t = asyncio.Task(notmuch(), loop=self.loop)
|
|
self.loop.run_until_complete(t)
|
|
self.assertEqual(repr(t), "Task(<notmuch>)<result='abc'>")
|
|
|
|
def test_task_repr_custom(self):
|
|
@asyncio.coroutine
|
|
def coro():
|
|
pass
|
|
|
|
class T(asyncio.Future):
|
|
def __repr__(self):
|
|
return 'T[]'
|
|
|
|
class MyTask(asyncio.Task, T):
|
|
def __repr__(self):
|
|
return super().__repr__()
|
|
|
|
gen = coro()
|
|
t = MyTask(gen, loop=self.loop)
|
|
self.assertEqual(repr(t), 'T[](<coro>)')
|
|
gen.close()
|
|
|
|
def test_task_basics(self):
|
|
@asyncio.coroutine
|
|
def outer():
|
|
a = yield from inner1()
|
|
b = yield from inner2()
|
|
return a+b
|
|
|
|
@asyncio.coroutine
|
|
def inner1():
|
|
return 42
|
|
|
|
@asyncio.coroutine
|
|
def inner2():
|
|
return 1000
|
|
|
|
t = outer()
|
|
self.assertEqual(self.loop.run_until_complete(t), 1042)
|
|
|
|
def test_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from asyncio.sleep(10.0, loop=loop)
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
loop.call_soon(t.cancel)
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(t.cancelled())
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_cancel_yield(self):
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield
|
|
yield
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop) # start coro
|
|
t.cancel()
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, t)
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(t.cancelled())
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_cancel_inner_future(self):
|
|
f = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from f
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop) # start task
|
|
f.cancel()
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(t)
|
|
self.assertTrue(f.cancelled())
|
|
self.assertTrue(t.cancelled())
|
|
|
|
def test_cancel_both_task_and_inner_future(self):
|
|
f = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from f
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
|
|
f.cancel()
|
|
t.cancel()
|
|
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(t)
|
|
|
|
self.assertTrue(t.done())
|
|
self.assertTrue(f.cancelled())
|
|
self.assertTrue(t.cancelled())
|
|
|
|
def test_cancel_task_catching(self):
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from fut1
|
|
try:
|
|
yield from fut2
|
|
except asyncio.CancelledError:
|
|
return 42
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut1) # White-box test.
|
|
fut1.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut2) # White-box test.
|
|
t.cancel()
|
|
self.assertTrue(fut2.cancelled())
|
|
res = self.loop.run_until_complete(t)
|
|
self.assertEqual(res, 42)
|
|
self.assertFalse(t.cancelled())
|
|
|
|
def test_cancel_task_ignoring(self):
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
fut3 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
yield from fut1
|
|
try:
|
|
yield from fut2
|
|
except asyncio.CancelledError:
|
|
pass
|
|
res = yield from fut3
|
|
return res
|
|
|
|
t = asyncio.Task(task(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut1) # White-box test.
|
|
fut1.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut2) # White-box test.
|
|
t.cancel()
|
|
self.assertTrue(fut2.cancelled())
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(t._fut_waiter, fut3) # White-box test.
|
|
fut3.set_result(42)
|
|
res = self.loop.run_until_complete(t)
|
|
self.assertEqual(res, 42)
|
|
self.assertFalse(fut3.cancelled())
|
|
self.assertFalse(t.cancelled())
|
|
|
|
def test_cancel_current_task(self):
|
|
loop = asyncio.new_event_loop()
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
t.cancel()
|
|
self.assertTrue(t._must_cancel) # White-box test.
|
|
# The sleep should be cancelled immediately.
|
|
yield from asyncio.sleep(100, loop=loop)
|
|
return 12
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
self.assertRaises(
|
|
asyncio.CancelledError, loop.run_until_complete, t)
|
|
self.assertTrue(t.done())
|
|
self.assertFalse(t._must_cancel) # White-box test.
|
|
self.assertFalse(t.cancel())
|
|
|
|
def test_stop_while_run_in_complete(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0.1
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0.1
|
|
self.assertAlmostEqual(0.3, when)
|
|
yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
x = 0
|
|
waiters = []
|
|
|
|
@asyncio.coroutine
|
|
def task():
|
|
nonlocal x
|
|
while x < 10:
|
|
waiters.append(asyncio.sleep(0.1, loop=loop))
|
|
yield from waiters[-1]
|
|
x += 1
|
|
if x == 2:
|
|
loop.stop()
|
|
|
|
t = asyncio.Task(task(), loop=loop)
|
|
self.assertRaises(
|
|
RuntimeError, loop.run_until_complete, t)
|
|
self.assertFalse(t.done())
|
|
self.assertEqual(x, 2)
|
|
self.assertAlmostEqual(0.3, loop.time())
|
|
|
|
# close generators
|
|
for w in waiters:
|
|
w.close()
|
|
|
|
def test_wait_for(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
foo_running = None
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
nonlocal foo_running
|
|
foo_running = True
|
|
try:
|
|
yield from asyncio.sleep(0.2, loop=loop)
|
|
finally:
|
|
foo_running = False
|
|
return 'done'
|
|
|
|
fut = asyncio.Task(foo(), loop=loop)
|
|
|
|
with self.assertRaises(asyncio.TimeoutError):
|
|
loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop))
|
|
self.assertTrue(fut.done())
|
|
# it should have been cancelled due to the timeout
|
|
self.assertTrue(fut.cancelled())
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
self.assertEqual(foo_running, False)
|
|
|
|
def test_wait_for_blocking(self):
|
|
loop = test_utils.TestLoop()
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
return 'done'
|
|
|
|
res = loop.run_until_complete(asyncio.wait_for(coro(),
|
|
timeout=None,
|
|
loop=loop))
|
|
self.assertEqual(res, 'done')
|
|
|
|
def test_wait_for_with_global_loop(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.2, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.01, when)
|
|
yield 0.01
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
yield from asyncio.sleep(0.2, loop=loop)
|
|
return 'done'
|
|
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
fut = asyncio.Task(foo(), loop=loop)
|
|
with self.assertRaises(asyncio.TimeoutError):
|
|
loop.run_until_complete(asyncio.wait_for(fut, 0.01))
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
|
|
self.assertAlmostEqual(0.01, loop.time())
|
|
self.assertTrue(fut.done())
|
|
self.assertTrue(fut.cancelled())
|
|
|
|
def test_wait(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
yield 0.15
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], loop=loop)
|
|
self.assertEqual(done, set([a, b]))
|
|
self.assertEqual(pending, set())
|
|
return 42
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertEqual(res, 42)
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
# Doing it again should take no time and exercise a different path.
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_wait_with_global_loop(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.01, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.015, when)
|
|
yield 0.015
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.01, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.015, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a])
|
|
self.assertEqual(done, set([a, b]))
|
|
self.assertEqual(pending, set())
|
|
return 42
|
|
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
res = loop.run_until_complete(
|
|
asyncio.Task(foo(), loop=loop))
|
|
finally:
|
|
asyncio.set_event_loop(None)
|
|
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_wait_duplicate_coroutines(self):
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
c = coro('test')
|
|
|
|
task = asyncio.Task(
|
|
asyncio.wait([c, c, coro('spam')], loop=self.loop),
|
|
loop=self.loop)
|
|
|
|
done, pending = self.loop.run_until_complete(task)
|
|
|
|
self.assertFalse(pending)
|
|
self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
|
|
|
|
def test_wait_errors(self):
|
|
self.assertRaises(
|
|
ValueError, self.loop.run_until_complete,
|
|
asyncio.wait(set(), loop=self.loop))
|
|
|
|
self.assertRaises(
|
|
ValueError, self.loop.run_until_complete,
|
|
asyncio.wait([asyncio.sleep(10.0, loop=self.loop)],
|
|
return_when=-1, loop=self.loop))
|
|
|
|
def test_wait_first_completed(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
|
|
loop=loop),
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertFalse(a.done())
|
|
self.assertTrue(b.done())
|
|
self.assertIsNone(b.result())
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_really_done(self):
|
|
# there is possibility that some tasks in the pending list
|
|
# became done but their callbacks haven't all been called yet
|
|
|
|
@asyncio.coroutine
|
|
def coro1():
|
|
yield
|
|
|
|
@asyncio.coroutine
|
|
def coro2():
|
|
yield
|
|
yield
|
|
|
|
a = asyncio.Task(coro1(), loop=self.loop)
|
|
b = asyncio.Task(coro2(), loop=self.loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED,
|
|
loop=self.loop),
|
|
loop=self.loop)
|
|
|
|
done, pending = self.loop.run_until_complete(task)
|
|
self.assertEqual({a, b}, done)
|
|
self.assertTrue(a.done())
|
|
self.assertIsNone(a.result())
|
|
self.assertTrue(b.done())
|
|
self.assertIsNone(b.result())
|
|
|
|
def test_wait_first_exception(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
# first_exception, task already has exception
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def exc():
|
|
raise ZeroDivisionError('err')
|
|
|
|
b = asyncio.Task(exc(), loop=loop)
|
|
task = asyncio.Task(
|
|
asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
|
|
loop=loop),
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertAlmostEqual(0, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_first_exception_in_wait(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.01, when)
|
|
yield 0.01
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
# first_exception, exception during waiting
|
|
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def exc():
|
|
yield from asyncio.sleep(0.01, loop=loop)
|
|
raise ZeroDivisionError('err')
|
|
|
|
b = asyncio.Task(exc(), loop=loop)
|
|
task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION,
|
|
loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(task)
|
|
self.assertEqual({b}, done)
|
|
self.assertEqual({a}, pending)
|
|
self.assertAlmostEqual(0.01, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_with_exception(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
yield 0.15
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper():
|
|
yield from asyncio.sleep(0.15, loop=loop)
|
|
raise ZeroDivisionError('really')
|
|
|
|
b = asyncio.Task(sleeper(), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], loop=loop)
|
|
self.assertEqual(len(done), 2)
|
|
self.assertEqual(pending, set())
|
|
errors = set(f for f in done if f.exception() is not None)
|
|
self.assertEqual(len(errors), 1)
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
def test_wait_with_timeout(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.11, when)
|
|
yield 0.11
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
done, pending = yield from asyncio.wait([b, a], timeout=0.11,
|
|
loop=loop)
|
|
self.assertEqual(done, set([a]))
|
|
self.assertEqual(pending, set([b]))
|
|
|
|
loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.11, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_wait_concurrent_complete(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.15, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
|
|
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
|
|
|
|
done, pending = loop.run_until_complete(
|
|
asyncio.wait([b, a], timeout=0.1, loop=loop))
|
|
|
|
self.assertEqual(done, set([a]))
|
|
self.assertEqual(pending, set([b]))
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_as_completed(self):
|
|
|
|
def gen():
|
|
yield 0
|
|
yield 0
|
|
yield 0.01
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
completed = set()
|
|
time_shifted = False
|
|
|
|
@asyncio.coroutine
|
|
def sleeper(dt, x):
|
|
nonlocal time_shifted
|
|
yield from asyncio.sleep(dt, loop=loop)
|
|
completed.add(x)
|
|
if not time_shifted and 'a' in completed and 'b' in completed:
|
|
time_shifted = True
|
|
loop.advance_time(0.14)
|
|
return x
|
|
|
|
a = sleeper(0.01, 'a')
|
|
b = sleeper(0.01, 'b')
|
|
c = sleeper(0.15, 'c')
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
values = []
|
|
for f in asyncio.as_completed([b, c, a], loop=loop):
|
|
values.append((yield from f))
|
|
return values
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
self.assertTrue('a' in res[:2])
|
|
self.assertTrue('b' in res[:2])
|
|
self.assertEqual(res[2], 'c')
|
|
|
|
# Doing it again should take no time and exercise a different path.
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertAlmostEqual(0.15, loop.time())
|
|
|
|
def test_as_completed_with_timeout(self):
|
|
|
|
def gen():
|
|
yield
|
|
yield 0
|
|
yield 0
|
|
yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.sleep(0.1, 'a', loop=loop)
|
|
b = asyncio.sleep(0.15, 'b', loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
values = []
|
|
for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
|
|
if values:
|
|
loop.advance_time(0.02)
|
|
try:
|
|
v = yield from f
|
|
values.append((1, v))
|
|
except asyncio.TimeoutError as exc:
|
|
values.append((2, exc))
|
|
return values
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
self.assertEqual(len(res), 2, res)
|
|
self.assertEqual(res[0], (1, 'a'))
|
|
self.assertEqual(res[1][0], 2)
|
|
self.assertIsInstance(res[1][1], asyncio.TimeoutError)
|
|
self.assertAlmostEqual(0.12, loop.time())
|
|
|
|
# move forward to close generator
|
|
loop.advance_time(10)
|
|
loop.run_until_complete(asyncio.wait([a, b], loop=loop))
|
|
|
|
def test_as_completed_with_unused_timeout(self):
|
|
|
|
def gen():
|
|
yield
|
|
yield 0
|
|
yield 0.01
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.sleep(0.01, 'a', loop=loop)
|
|
|
|
@asyncio.coroutine
|
|
def foo():
|
|
for f in asyncio.as_completed([a], timeout=1, loop=loop):
|
|
v = yield from f
|
|
self.assertEqual(v, 'a')
|
|
|
|
res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
|
|
|
|
def test_as_completed_reverse_wait(self):
|
|
|
|
def gen():
|
|
yield 0
|
|
yield 0.05
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.sleep(0.05, 'a', loop=loop)
|
|
b = asyncio.sleep(0.10, 'b', loop=loop)
|
|
fs = {a, b}
|
|
futs = list(asyncio.as_completed(fs, loop=loop))
|
|
self.assertEqual(len(futs), 2)
|
|
|
|
x = loop.run_until_complete(futs[1])
|
|
self.assertEqual(x, 'a')
|
|
self.assertAlmostEqual(0.05, loop.time())
|
|
loop.advance_time(0.05)
|
|
y = loop.run_until_complete(futs[0])
|
|
self.assertEqual(y, 'b')
|
|
self.assertAlmostEqual(0.10, loop.time())
|
|
|
|
def test_as_completed_concurrent(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.05, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(0.05, when)
|
|
yield 0.05
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
a = asyncio.sleep(0.05, 'a', loop=loop)
|
|
b = asyncio.sleep(0.05, 'b', loop=loop)
|
|
fs = {a, b}
|
|
futs = list(asyncio.as_completed(fs, loop=loop))
|
|
self.assertEqual(len(futs), 2)
|
|
waiter = asyncio.wait(futs, loop=loop)
|
|
done, pending = loop.run_until_complete(waiter)
|
|
self.assertEqual(set(f.result() for f in done), {'a', 'b'})
|
|
|
|
def test_as_completed_duplicate_coroutines(self):
|
|
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
|
|
@asyncio.coroutine
|
|
def runner():
|
|
result = []
|
|
c = coro('ham')
|
|
for f in asyncio.as_completed([c, c, coro('spam')],
|
|
loop=self.loop):
|
|
result.append((yield from f))
|
|
return result
|
|
|
|
fut = asyncio.Task(runner(), loop=self.loop)
|
|
self.loop.run_until_complete(fut)
|
|
result = fut.result()
|
|
self.assertEqual(set(result), {'ham', 'spam'})
|
|
self.assertEqual(len(result), 2)
|
|
|
|
def test_sleep(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.05, when)
|
|
when = yield 0.05
|
|
self.assertAlmostEqual(0.1, when)
|
|
yield 0.05
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper(dt, arg):
|
|
yield from asyncio.sleep(dt/2, loop=loop)
|
|
res = yield from asyncio.sleep(dt/2, arg, loop=loop)
|
|
return res
|
|
|
|
t = asyncio.Task(sleeper(0.1, 'yeah'), loop=loop)
|
|
loop.run_until_complete(t)
|
|
self.assertTrue(t.done())
|
|
self.assertEqual(t.result(), 'yeah')
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
def test_sleep_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop),
|
|
loop=loop)
|
|
|
|
handle = None
|
|
orig_call_later = loop.call_later
|
|
|
|
def call_later(self, delay, callback, *args):
|
|
nonlocal handle
|
|
handle = orig_call_later(self, delay, callback, *args)
|
|
return handle
|
|
|
|
loop.call_later = call_later
|
|
test_utils.run_briefly(loop)
|
|
|
|
self.assertFalse(handle._cancelled)
|
|
|
|
t.cancel()
|
|
test_utils.run_briefly(loop)
|
|
self.assertTrue(handle._cancelled)
|
|
|
|
def test_task_cancel_sleeping_task(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(0.1, when)
|
|
when = yield 0
|
|
self.assertAlmostEqual(5000, when)
|
|
yield 0.1
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
sleepfut = None
|
|
|
|
@asyncio.coroutine
|
|
def sleep(dt):
|
|
nonlocal sleepfut
|
|
sleepfut = asyncio.sleep(dt, loop=loop)
|
|
yield from sleepfut
|
|
|
|
@asyncio.coroutine
|
|
def doit():
|
|
sleeper = asyncio.Task(sleep(5000), loop=loop)
|
|
loop.call_later(0.1, sleeper.cancel)
|
|
try:
|
|
yield from sleeper
|
|
except asyncio.CancelledError:
|
|
return 'cancelled'
|
|
else:
|
|
return 'slept in'
|
|
|
|
doer = doit()
|
|
self.assertEqual(loop.run_until_complete(doer), 'cancelled')
|
|
self.assertAlmostEqual(0.1, loop.time())
|
|
|
|
def test_task_cancel_waiter_future(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
yield from fut
|
|
|
|
task = asyncio.Task(coro(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(task._fut_waiter, fut)
|
|
|
|
task.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, task)
|
|
self.assertIsNone(task._fut_waiter)
|
|
self.assertTrue(fut.cancelled())
|
|
|
|
def test_step_in_completed_task(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
return 'ko'
|
|
|
|
gen = notmuch()
|
|
task = asyncio.Task(gen, loop=self.loop)
|
|
task.set_result('ok')
|
|
|
|
self.assertRaises(AssertionError, task._step)
|
|
gen.close()
|
|
|
|
def test_step_result(self):
|
|
@asyncio.coroutine
|
|
def notmuch():
|
|
yield None
|
|
yield 1
|
|
return 'ko'
|
|
|
|
self.assertRaises(
|
|
RuntimeError, self.loop.run_until_complete, notmuch())
|
|
|
|
def test_step_result_future(self):
|
|
# If coroutine returns future, task waits on this future.
|
|
|
|
class Fut(asyncio.Future):
|
|
def __init__(self, *args, **kwds):
|
|
self.cb_added = False
|
|
super().__init__(*args, **kwds)
|
|
|
|
def add_done_callback(self, fn):
|
|
self.cb_added = True
|
|
super().add_done_callback(fn)
|
|
|
|
fut = Fut(loop=self.loop)
|
|
result = None
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
nonlocal result
|
|
result = yield from fut
|
|
|
|
t = asyncio.Task(wait_for_future(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(fut.cb_added)
|
|
|
|
res = object()
|
|
fut.set_result(res)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(res, result)
|
|
self.assertTrue(t.done())
|
|
self.assertIsNone(t.result())
|
|
|
|
def test_step_with_baseexception(self):
|
|
@asyncio.coroutine
|
|
def notmutch():
|
|
raise BaseException()
|
|
|
|
task = asyncio.Task(notmutch(), loop=self.loop)
|
|
self.assertRaises(BaseException, task._step)
|
|
|
|
self.assertTrue(task.done())
|
|
self.assertIsInstance(task.exception(), BaseException)
|
|
|
|
def test_baseexception_during_cancel(self):
|
|
|
|
def gen():
|
|
when = yield
|
|
self.assertAlmostEqual(10.0, when)
|
|
yield 0
|
|
|
|
loop = test_utils.TestLoop(gen)
|
|
self.addCleanup(loop.close)
|
|
|
|
@asyncio.coroutine
|
|
def sleeper():
|
|
yield from asyncio.sleep(10, loop=loop)
|
|
|
|
base_exc = BaseException()
|
|
|
|
@asyncio.coroutine
|
|
def notmutch():
|
|
try:
|
|
yield from sleeper()
|
|
except asyncio.CancelledError:
|
|
raise base_exc
|
|
|
|
task = asyncio.Task(notmutch(), loop=loop)
|
|
test_utils.run_briefly(loop)
|
|
|
|
task.cancel()
|
|
self.assertFalse(task.done())
|
|
|
|
self.assertRaises(BaseException, test_utils.run_briefly, loop)
|
|
|
|
self.assertTrue(task.done())
|
|
self.assertFalse(task.cancelled())
|
|
self.assertIs(task.exception(), base_exc)
|
|
|
|
def test_iscoroutinefunction(self):
|
|
def fn():
|
|
pass
|
|
|
|
self.assertFalse(asyncio.iscoroutinefunction(fn))
|
|
|
|
def fn1():
|
|
yield
|
|
self.assertFalse(asyncio.iscoroutinefunction(fn1))
|
|
|
|
@asyncio.coroutine
|
|
def fn2():
|
|
yield
|
|
self.assertTrue(asyncio.iscoroutinefunction(fn2))
|
|
|
|
def test_yield_vs_yield_from(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
yield fut
|
|
|
|
task = wait_for_future()
|
|
with self.assertRaises(RuntimeError):
|
|
self.loop.run_until_complete(task)
|
|
|
|
self.assertFalse(fut.done())
|
|
|
|
def test_yield_vs_yield_from_generator(self):
|
|
@asyncio.coroutine
|
|
def coro():
|
|
yield
|
|
|
|
@asyncio.coroutine
|
|
def wait_for_future():
|
|
gen = coro()
|
|
try:
|
|
yield gen
|
|
finally:
|
|
gen.close()
|
|
|
|
task = wait_for_future()
|
|
self.assertRaises(
|
|
RuntimeError,
|
|
self.loop.run_until_complete, task)
|
|
|
|
def test_coroutine_non_gen_function(self):
|
|
@asyncio.coroutine
|
|
def func():
|
|
return 'test'
|
|
|
|
self.assertTrue(asyncio.iscoroutinefunction(func))
|
|
|
|
coro = func()
|
|
self.assertTrue(asyncio.iscoroutine(coro))
|
|
|
|
res = self.loop.run_until_complete(coro)
|
|
self.assertEqual(res, 'test')
|
|
|
|
def test_coroutine_non_gen_function_return_future(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def func():
|
|
return fut
|
|
|
|
@asyncio.coroutine
|
|
def coro():
|
|
fut.set_result('test')
|
|
|
|
t1 = asyncio.Task(func(), loop=self.loop)
|
|
t2 = asyncio.Task(coro(), loop=self.loop)
|
|
res = self.loop.run_until_complete(t1)
|
|
self.assertEqual(res, 'test')
|
|
self.assertIsNone(t2.result())
|
|
|
|
def test_current_task(self):
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
@asyncio.coroutine
|
|
def coro(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task)
|
|
|
|
task = asyncio.Task(coro(self.loop), loop=self.loop)
|
|
self.loop.run_until_complete(task)
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
def test_current_task_with_interleaving_tasks(self):
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
fut1 = asyncio.Future(loop=self.loop)
|
|
fut2 = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def coro1(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
|
|
yield from fut1
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task1)
|
|
fut2.set_result(True)
|
|
|
|
@asyncio.coroutine
|
|
def coro2(loop):
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
|
|
fut1.set_result(True)
|
|
yield from fut2
|
|
self.assertTrue(asyncio.Task.current_task(loop=loop) is task2)
|
|
|
|
task1 = asyncio.Task(coro1(self.loop), loop=self.loop)
|
|
task2 = asyncio.Task(coro2(self.loop), loop=self.loop)
|
|
|
|
self.loop.run_until_complete(asyncio.wait((task1, task2),
|
|
loop=self.loop))
|
|
self.assertIsNone(asyncio.Task.current_task(loop=self.loop))
|
|
|
|
# Some thorough tests for cancellation propagation through
|
|
# coroutines, tasks and wait().
|
|
|
|
def test_yield_future_passes_cancel(self):
|
|
# Cancelling outer() cancels inner() cancels waiter.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
try:
|
|
yield from waiter
|
|
except asyncio.CancelledError:
|
|
proof += 1
|
|
raise
|
|
else:
|
|
self.fail('got past sleep() in inner()')
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
try:
|
|
yield from inner()
|
|
except asyncio.CancelledError:
|
|
proof += 100 # Expect this path.
|
|
else:
|
|
proof += 10
|
|
|
|
f = asyncio.async(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
self.loop.run_until_complete(f)
|
|
self.assertEqual(proof, 101)
|
|
self.assertTrue(waiter.cancelled())
|
|
|
|
def test_yield_wait_does_not_shield_cancel(self):
|
|
# Cancelling outer() makes wait() return early, leaves inner()
|
|
# running.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
d, p = yield from asyncio.wait([inner()], loop=self.loop)
|
|
proof += 100
|
|
|
|
f = asyncio.async(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
self.assertRaises(
|
|
asyncio.CancelledError, self.loop.run_until_complete, f)
|
|
waiter.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(proof, 1)
|
|
|
|
def test_shield_result(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
inner.set_result(42)
|
|
res = self.loop.run_until_complete(outer)
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_shield_exception(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
test_utils.run_briefly(self.loop)
|
|
exc = RuntimeError('expected')
|
|
inner.set_exception(exc)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIs(outer.exception(), exc)
|
|
|
|
def test_shield_cancel(self):
|
|
inner = asyncio.Future(loop=self.loop)
|
|
outer = asyncio.shield(inner)
|
|
test_utils.run_briefly(self.loop)
|
|
inner.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(outer.cancelled())
|
|
|
|
def test_shield_shortcut(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
fut.set_result(42)
|
|
res = self.loop.run_until_complete(asyncio.shield(fut))
|
|
self.assertEqual(res, 42)
|
|
|
|
def test_shield_effect(self):
|
|
# Cancelling outer() does not affect inner().
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof
|
|
yield from asyncio.shield(inner(), loop=self.loop)
|
|
proof += 100
|
|
|
|
f = asyncio.async(outer(), loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
f.cancel()
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.loop.run_until_complete(f)
|
|
waiter.set_result(None)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(proof, 1)
|
|
|
|
def test_shield_gather(self):
|
|
child1 = asyncio.Future(loop=self.loop)
|
|
child2 = asyncio.Future(loop=self.loop)
|
|
parent = asyncio.gather(child1, child2, loop=self.loop)
|
|
outer = asyncio.shield(parent, loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
outer.cancel()
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertTrue(outer.cancelled())
|
|
child1.set_result(1)
|
|
child2.set_result(2)
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertEqual(parent.result(), [1, 2])
|
|
|
|
def test_gather_shield(self):
|
|
child1 = asyncio.Future(loop=self.loop)
|
|
child2 = asyncio.Future(loop=self.loop)
|
|
inner1 = asyncio.shield(child1, loop=self.loop)
|
|
inner2 = asyncio.shield(child2, loop=self.loop)
|
|
parent = asyncio.gather(inner1, inner2, loop=self.loop)
|
|
test_utils.run_briefly(self.loop)
|
|
parent.cancel()
|
|
# This should cancel inner1 and inner2 but bot child1 and child2.
|
|
test_utils.run_briefly(self.loop)
|
|
self.assertIsInstance(parent.exception(), asyncio.CancelledError)
|
|
self.assertTrue(inner1.cancelled())
|
|
self.assertTrue(inner2.cancelled())
|
|
child1.set_result(1)
|
|
child2.set_result(2)
|
|
test_utils.run_briefly(self.loop)
|
|
|
|
def test_as_completed_invalid_args(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
# as_completed() expects a list of futures, not a future instance
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.as_completed(fut, loop=self.loop))
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.as_completed(coroutine_function(), loop=self.loop))
|
|
|
|
def test_wait_invalid_args(self):
|
|
fut = asyncio.Future(loop=self.loop)
|
|
|
|
# wait() expects a list of futures, not a future instance
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.wait(fut, loop=self.loop))
|
|
self.assertRaises(TypeError, self.loop.run_until_complete,
|
|
asyncio.wait(coroutine_function(), loop=self.loop))
|
|
|
|
# wait() expects at least a future
|
|
self.assertRaises(ValueError, self.loop.run_until_complete,
|
|
asyncio.wait([], loop=self.loop))
|
|
|
|
class GatherTestsBase:
|
|
|
|
def setUp(self):
|
|
self.one_loop = test_utils.TestLoop()
|
|
self.other_loop = test_utils.TestLoop()
|
|
|
|
def tearDown(self):
|
|
self.one_loop.close()
|
|
self.other_loop.close()
|
|
|
|
def _run_loop(self, loop):
|
|
while loop._ready:
|
|
test_utils.run_briefly(loop)
|
|
|
|
def _check_success(self, **kwargs):
|
|
a, b, c = [asyncio.Future(loop=self.one_loop) for i in range(3)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
b.set_result(1)
|
|
a.set_result(2)
|
|
self._run_loop(self.one_loop)
|
|
self.assertEqual(cb.called, False)
|
|
self.assertFalse(fut.done())
|
|
c.set_result(3)
|
|
self._run_loop(self.one_loop)
|
|
cb.assert_called_once_with(fut)
|
|
self.assertEqual(fut.result(), [2, 1, 3])
|
|
|
|
def test_success(self):
|
|
self._check_success()
|
|
self._check_success(return_exceptions=False)
|
|
|
|
def test_result_exception_success(self):
|
|
self._check_success(return_exceptions=True)
|
|
|
|
def test_one_exception(self):
|
|
a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
exc = ZeroDivisionError()
|
|
a.set_result(1)
|
|
b.set_exception(exc)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertIs(fut.exception(), exc)
|
|
# Does nothing
|
|
c.set_result(3)
|
|
d.cancel()
|
|
e.set_exception(RuntimeError())
|
|
e.exception()
|
|
|
|
def test_return_exceptions(self):
|
|
a, b, c, d = [asyncio.Future(loop=self.one_loop) for i in range(4)]
|
|
fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
|
|
return_exceptions=True)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
exc = ZeroDivisionError()
|
|
exc2 = RuntimeError()
|
|
b.set_result(1)
|
|
c.set_exception(exc)
|
|
a.set_result(3)
|
|
self._run_loop(self.one_loop)
|
|
self.assertFalse(fut.done())
|
|
d.set_exception(exc2)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertEqual(fut.result(), [3, 1, exc, exc2])
|
|
|
|
def test_env_var_debug(self):
|
|
path = os.path.dirname(asyncio.__file__)
|
|
path = os.path.normpath(os.path.join(path, '..'))
|
|
code = '\n'.join((
|
|
'import sys',
|
|
'sys.path.insert(0, %r)' % path,
|
|
'import asyncio.tasks',
|
|
'print(asyncio.tasks._DEBUG)'))
|
|
|
|
# Test with -E to not fail if the unit test was run with
|
|
# PYTHONASYNCIODEBUG set to a non-empty string
|
|
sts, stdout, stderr = assert_python_ok('-E', '-c', code)
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-c', code,
|
|
PYTHONASYNCIODEBUG='')
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-c', code,
|
|
PYTHONASYNCIODEBUG='1')
|
|
self.assertEqual(stdout.rstrip(), b'True')
|
|
|
|
sts, stdout, stderr = assert_python_ok('-E', '-c', code,
|
|
PYTHONASYNCIODEBUG='1')
|
|
self.assertEqual(stdout.rstrip(), b'False')
|
|
|
|
|
|
class FutureGatherTests(GatherTestsBase, unittest.TestCase):
|
|
|
|
def wrap_futures(self, *futures):
|
|
return futures
|
|
|
|
def _check_empty_sequence(self, seq_or_iter):
|
|
asyncio.set_event_loop(self.one_loop)
|
|
self.addCleanup(asyncio.set_event_loop, None)
|
|
fut = asyncio.gather(*seq_or_iter)
|
|
self.assertIsInstance(fut, asyncio.Future)
|
|
self.assertIs(fut._loop, self.one_loop)
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
self.assertEqual(fut.result(), [])
|
|
fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
|
|
def test_constructor_empty_sequence(self):
|
|
self._check_empty_sequence([])
|
|
self._check_empty_sequence(())
|
|
self._check_empty_sequence(set())
|
|
self._check_empty_sequence(iter(""))
|
|
|
|
def test_constructor_heterogenous_futures(self):
|
|
fut1 = asyncio.Future(loop=self.one_loop)
|
|
fut2 = asyncio.Future(loop=self.other_loop)
|
|
with self.assertRaises(ValueError):
|
|
asyncio.gather(fut1, fut2)
|
|
with self.assertRaises(ValueError):
|
|
asyncio.gather(fut1, loop=self.other_loop)
|
|
|
|
def test_constructor_homogenous_futures(self):
|
|
children = [asyncio.Future(loop=self.other_loop) for i in range(3)]
|
|
fut = asyncio.gather(*children)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
self._run_loop(self.other_loop)
|
|
self.assertFalse(fut.done())
|
|
fut = asyncio.gather(*children, loop=self.other_loop)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
self._run_loop(self.other_loop)
|
|
self.assertFalse(fut.done())
|
|
|
|
def test_one_cancellation(self):
|
|
a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)]
|
|
fut = asyncio.gather(a, b, c, d, e)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
a.set_result(1)
|
|
b.cancel()
|
|
self._run_loop(self.one_loop)
|
|
self.assertTrue(fut.done())
|
|
cb.assert_called_once_with(fut)
|
|
self.assertFalse(fut.cancelled())
|
|
self.assertIsInstance(fut.exception(), asyncio.CancelledError)
|
|
# Does nothing
|
|
c.set_result(3)
|
|
d.cancel()
|
|
e.set_exception(RuntimeError())
|
|
e.exception()
|
|
|
|
def test_result_exception_one_cancellation(self):
|
|
a, b, c, d, e, f = [asyncio.Future(loop=self.one_loop)
|
|
for i in range(6)]
|
|
fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
|
|
cb = test_utils.MockCallback()
|
|
fut.add_done_callback(cb)
|
|
a.set_result(1)
|
|
zde = ZeroDivisionError()
|
|
b.set_exception(zde)
|
|
c.cancel()
|
|
self._run_loop(self.one_loop)
|
|
self.assertFalse(fut.done())
|
|
d.set_result(3)
|
|
e.cancel()
|
|
rte = RuntimeError()
|
|
f.set_exception(rte)
|
|
res = self.one_loop.run_until_complete(fut)
|
|
self.assertIsInstance(res[2], asyncio.CancelledError)
|
|
self.assertIsInstance(res[4], asyncio.CancelledError)
|
|
res[2] = res[4] = None
|
|
self.assertEqual(res, [1, zde, None, 3, None, rte])
|
|
cb.assert_called_once_with(fut)
|
|
|
|
|
|
class CoroutineGatherTests(GatherTestsBase, unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
asyncio.set_event_loop(self.one_loop)
|
|
|
|
def tearDown(self):
|
|
asyncio.set_event_loop(None)
|
|
super().tearDown()
|
|
|
|
def wrap_futures(self, *futures):
|
|
coros = []
|
|
for fut in futures:
|
|
@asyncio.coroutine
|
|
def coro(fut=fut):
|
|
return (yield from fut)
|
|
coros.append(coro())
|
|
return coros
|
|
|
|
def test_constructor_loop_selection(self):
|
|
@asyncio.coroutine
|
|
def coro():
|
|
return 'abc'
|
|
gen1 = coro()
|
|
gen2 = coro()
|
|
fut = asyncio.gather(gen1, gen2)
|
|
self.assertIs(fut._loop, self.one_loop)
|
|
gen1.close()
|
|
gen2.close()
|
|
gen3 = coro()
|
|
gen4 = coro()
|
|
fut = asyncio.gather(gen3, gen4, loop=self.other_loop)
|
|
self.assertIs(fut._loop, self.other_loop)
|
|
gen3.close()
|
|
gen4.close()
|
|
|
|
def test_duplicate_coroutines(self):
|
|
@asyncio.coroutine
|
|
def coro(s):
|
|
return s
|
|
c = coro('abc')
|
|
fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
|
|
self._run_loop(self.one_loop)
|
|
self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
|
|
|
|
def test_cancellation_broadcast(self):
|
|
# Cancelling outer() cancels all children.
|
|
proof = 0
|
|
waiter = asyncio.Future(loop=self.one_loop)
|
|
|
|
@asyncio.coroutine
|
|
def inner():
|
|
nonlocal proof
|
|
yield from waiter
|
|
proof += 1
|
|
|
|
child1 = asyncio.async(inner(), loop=self.one_loop)
|
|
child2 = asyncio.async(inner(), loop=self.one_loop)
|
|
gatherer = None
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
nonlocal proof, gatherer
|
|
gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
|
|
yield from gatherer
|
|
proof += 100
|
|
|
|
f = asyncio.async(outer(), loop=self.one_loop)
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertTrue(f.cancel())
|
|
with self.assertRaises(asyncio.CancelledError):
|
|
self.one_loop.run_until_complete(f)
|
|
self.assertFalse(gatherer.cancel())
|
|
self.assertTrue(waiter.cancelled())
|
|
self.assertTrue(child1.cancelled())
|
|
self.assertTrue(child2.cancelled())
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertEqual(proof, 0)
|
|
|
|
def test_exception_marking(self):
|
|
# Test for the first line marked "Mark exception retrieved."
|
|
|
|
@asyncio.coroutine
|
|
def inner(f):
|
|
yield from f
|
|
raise RuntimeError('should not be ignored')
|
|
|
|
a = asyncio.Future(loop=self.one_loop)
|
|
b = asyncio.Future(loop=self.one_loop)
|
|
|
|
@asyncio.coroutine
|
|
def outer():
|
|
yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop)
|
|
|
|
f = asyncio.async(outer(), loop=self.one_loop)
|
|
test_utils.run_briefly(self.one_loop)
|
|
a.set_result(None)
|
|
test_utils.run_briefly(self.one_loop)
|
|
b.set_result(None)
|
|
test_utils.run_briefly(self.one_loop)
|
|
self.assertIsInstance(f.exception(), RuntimeError)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|