mirror of
https://github.com/python/cpython.git
synced 2025-07-30 14:44:10 +00:00
[3.10] bpo-45238: Fix unittest.IsolatedAsyncioTestCase.debug() (GH-28449) (GH-28521)
It runs now asynchronous methods and callbacks.
If it fails, doCleanups() can be called for cleaning up.
(cherry picked from commit ecb6922ff2
)
Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
parent
8c1e1da565
commit
44396aaba9
4 changed files with 180 additions and 65 deletions
|
@ -72,15 +72,15 @@ class IsolatedAsyncioTestCase(TestCase):
|
|||
self._callMaybeAsync(function, *args, **kwargs)
|
||||
|
||||
def _callAsync(self, func, /, *args, **kwargs):
|
||||
assert self._asyncioTestLoop is not None
|
||||
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
|
||||
ret = func(*args, **kwargs)
|
||||
assert inspect.isawaitable(ret)
|
||||
assert inspect.isawaitable(ret), f'{func!r} returned non-awaitable'
|
||||
fut = self._asyncioTestLoop.create_future()
|
||||
self._asyncioCallsQueue.put_nowait((fut, ret))
|
||||
return self._asyncioTestLoop.run_until_complete(fut)
|
||||
|
||||
def _callMaybeAsync(self, func, /, *args, **kwargs):
|
||||
assert self._asyncioTestLoop is not None
|
||||
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
|
||||
ret = func(*args, **kwargs)
|
||||
if inspect.isawaitable(ret):
|
||||
fut = self._asyncioTestLoop.create_future()
|
||||
|
@ -109,7 +109,7 @@ class IsolatedAsyncioTestCase(TestCase):
|
|||
fut.set_exception(ex)
|
||||
|
||||
def _setupAsyncioLoop(self):
|
||||
assert self._asyncioTestLoop is None
|
||||
assert self._asyncioTestLoop is None, 'asyncio test loop already initialized'
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.set_debug(True)
|
||||
|
@ -119,7 +119,7 @@ class IsolatedAsyncioTestCase(TestCase):
|
|||
loop.run_until_complete(fut)
|
||||
|
||||
def _tearDownAsyncioLoop(self):
|
||||
assert self._asyncioTestLoop is not None
|
||||
assert self._asyncioTestLoop is not None, 'asyncio test loop is not initialized'
|
||||
loop = self._asyncioTestLoop
|
||||
self._asyncioTestLoop = None
|
||||
self._asyncioCallsQueue.put_nowait(None)
|
||||
|
@ -158,3 +158,12 @@ class IsolatedAsyncioTestCase(TestCase):
|
|||
return super().run(result)
|
||||
finally:
|
||||
self._tearDownAsyncioLoop()
|
||||
|
||||
def debug(self):
|
||||
self._setupAsyncioLoop()
|
||||
super().debug()
|
||||
self._tearDownAsyncioLoop()
|
||||
|
||||
def __del__(self):
|
||||
if self._asyncioTestLoop is not None:
|
||||
self._tearDownAsyncioLoop()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue