bpo-34270: Make it possible to name asyncio tasks (GH-8547)

Co-authored-by: Antti Haapala <antti.haapala@anttipatterns.com>
This commit is contained in:
Alex Grönholm 2018-08-09 00:06:47 +03:00 committed by Yury Selivanov
parent 52dee687af
commit cca4eec3c0
13 changed files with 266 additions and 28 deletions

View file

@ -825,6 +825,34 @@ class BaseEventLoopTests(test_utils.TestCase):
task._log_destroy_pending = False
coro.close()
def test_create_named_task_with_default_factory(self):
async def test():
pass
loop = asyncio.new_event_loop()
task = loop.create_task(test(), name='test_task')
try:
self.assertEqual(task.get_name(), 'test_task')
finally:
loop.run_until_complete(task)
loop.close()
def test_create_named_task_with_custom_factory(self):
def task_factory(loop, coro):
return asyncio.Task(coro, loop=loop)
async def test():
pass
loop = asyncio.new_event_loop()
loop.set_task_factory(task_factory)
task = loop.create_task(test(), name='test_task')
try:
self.assertEqual(task.get_name(), 'test_task')
finally:
loop.run_until_complete(task)
loop.close()
def test_run_forever_keyboard_interrupt(self):
# Python issue #22601: ensure that the temporary task created by
# run_forever() consumes the KeyboardInterrupt and so don't log