cpython/Lib/unittest/test/testmock/testasync.py
Elizabeth Uselton d6a9d17d8b bpo-37555: Update _CallList.__contains__ to respect ANY (#14700)
* Flip equality to use mock calls' __eq__

* bpo-37555: Regression test demonstrating assert_has_calls not working with ANY and spec_set

Co-authored-by: Neal Finne <neal@nealfinne.com>

* Revert "Flip equality to use mock calls' __eq__"

This reverts commit 94ddf54c5a.

* bpo-37555: Add regression tests for mock ANY ordering issues

Add regression tests for whether __eq__ is order agnostic on _Call and _CallList, which is useful for comparisons involving ANY, especially if the ANY comparison is to a class not defaulting __eq__ to NotImplemented.

Co-authored-by: Neal Finne <neal@nealfinne.com>

* bpo-37555: Fix _CallList and _Call order sensitivity

_Call and _CallList depend on ordering to correctly process that an object being compared to ANY with __eq__ should return True. This fix updates the comparison to check both a == b and b == a and return True if either condition is met, fixing situations from the tests in the previous two commits where assertEqual would not be commutative if checking _Call or _CallList objects. This seems like a reasonable fix considering that the Python data model specifies that if an object doesn't know how to compare itself to another object it should return NotImplemented, and that on getting NotImplemented from a == b, it should try b == a, implying that good behavior for __eq__ is commutative. This also flips the order of comparison in _CallList's __contains__ method, guaranteeing ANY will be on the left and have it's __eq__ called for equality checking, fixing the interaction between assert_has_calls and ANY.

Co-author: Neal Finne <neal@neal.finne.com>

* bpo-37555: Ensure _call_matcher returns _Call object

* Adding ACK and news entry

* bpo-37555: Replacing __eq__ with == to sidestep NotImplemented

bool(NotImplemented) returns True, so it's necessary to use ==
instead of __eq__ in this comparison.

* bpo-37555: cleaning up changes unnecessary to the final product

* bpo-37555: Fixed call on bound arguments to respect args and kwargs

* Revert "bpo-37555: Add regression tests for mock ANY ordering issues"

This reverts commit 49c5310ad4.

* Revert "bpo-37555: cleaning up changes unnecessary to the final product"

This reverts commit 18e964ba01.

* Revert "bpo-37555: Replacing __eq__ with == to sidestep NotImplemented"

This reverts commit f295eaca5b.

* Revert "bpo-37555: Fix _CallList and _Call order sensitivity"

This reverts commit 874fb697b8.

* Updated NEWS.d

* bpo-37555: Add tests checking every function using _call_matcher both with and without spec

* bpo-37555: Ensure all assert methods using _call_matcher are actually passing calls

* Remove AnyCompare and use call objects everywhere.

* Revert "Remove AnyCompare and use call objects everywhere."

This reverts commit 24973c0b32.

* Check for exception in assert_any_await
2019-09-13 16:54:32 +01:00

662 lines
22 KiB
Python

import asyncio
import inspect
import unittest
from unittest.mock import (ANY, call, AsyncMock, patch, MagicMock,
create_autospec, _AwaitEvent)
def tearDownModule():
asyncio.set_event_loop_policy(None)
class AsyncClass:
def __init__(self):
pass
async def async_method(self):
pass
def normal_method(self):
pass
class AwaitableClass:
def __await__(self):
yield
async def async_func():
pass
async def async_func_args(a, b, *, c):
pass
def normal_func():
pass
class NormalClass(object):
def a(self):
pass
async_foo_name = f'{__name__}.AsyncClass'
normal_foo_name = f'{__name__}.NormalClass'
class AsyncPatchDecoratorTest(unittest.TestCase):
def test_is_coroutine_function_patch(self):
@patch.object(AsyncClass, 'async_method')
def test_async(mock_method):
self.assertTrue(asyncio.iscoroutinefunction(mock_method))
test_async()
def test_is_async_patch(self):
@patch.object(AsyncClass, 'async_method')
def test_async(mock_method):
m = mock_method()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
@patch(f'{async_foo_name}.async_method')
def test_no_parent_attribute(mock_method):
m = mock_method()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
test_async()
test_no_parent_attribute()
def test_is_AsyncMock_patch(self):
@patch.object(AsyncClass, 'async_method')
def test_async(mock_method):
self.assertIsInstance(mock_method, AsyncMock)
test_async()
def test_async_def_patch(self):
@patch(f"{__name__}.async_func", AsyncMock())
async def test_async():
self.assertIsInstance(async_func, AsyncMock)
asyncio.run(test_async())
self.assertTrue(inspect.iscoroutinefunction(async_func))
class AsyncPatchCMTest(unittest.TestCase):
def test_is_async_function_cm(self):
def test_async():
with patch.object(AsyncClass, 'async_method') as mock_method:
self.assertTrue(asyncio.iscoroutinefunction(mock_method))
test_async()
def test_is_async_cm(self):
def test_async():
with patch.object(AsyncClass, 'async_method') as mock_method:
m = mock_method()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
test_async()
def test_is_AsyncMock_cm(self):
def test_async():
with patch.object(AsyncClass, 'async_method') as mock_method:
self.assertIsInstance(mock_method, AsyncMock)
test_async()
def test_async_def_cm(self):
async def test_async():
with patch(f"{__name__}.async_func", AsyncMock()):
self.assertIsInstance(async_func, AsyncMock)
self.assertTrue(inspect.iscoroutinefunction(async_func))
asyncio.run(test_async())
class AsyncMockTest(unittest.TestCase):
def test_iscoroutinefunction_default(self):
mock = AsyncMock()
self.assertTrue(asyncio.iscoroutinefunction(mock))
def test_iscoroutinefunction_function(self):
async def foo(): pass
mock = AsyncMock(foo)
self.assertTrue(asyncio.iscoroutinefunction(mock))
self.assertTrue(inspect.iscoroutinefunction(mock))
def test_isawaitable(self):
mock = AsyncMock()
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
self.assertIn('assert_awaited', dir(mock))
def test_iscoroutinefunction_normal_function(self):
def foo(): pass
mock = AsyncMock(foo)
self.assertTrue(asyncio.iscoroutinefunction(mock))
self.assertTrue(inspect.iscoroutinefunction(mock))
def test_future_isfuture(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
fut = asyncio.Future()
loop.stop()
loop.close()
mock = AsyncMock(fut)
self.assertIsInstance(mock, asyncio.Future)
class AsyncAutospecTest(unittest.TestCase):
def test_is_AsyncMock_patch(self):
@patch(async_foo_name, autospec=True)
def test_async(mock_method):
self.assertIsInstance(mock_method.async_method, AsyncMock)
self.assertIsInstance(mock_method, MagicMock)
@patch(async_foo_name, autospec=True)
def test_normal_method(mock_method):
self.assertIsInstance(mock_method.normal_method, MagicMock)
test_async()
test_normal_method()
def test_create_autospec_instance(self):
with self.assertRaises(RuntimeError):
create_autospec(async_func, instance=True)
def test_create_autospec_awaitable_class(self):
awaitable_mock = create_autospec(spec=AwaitableClass())
self.assertIsInstance(create_autospec(awaitable_mock), AsyncMock)
def test_create_autospec(self):
spec = create_autospec(async_func_args)
awaitable = spec(1, 2, c=3)
async def main():
await awaitable
self.assertEqual(spec.await_count, 0)
self.assertIsNone(spec.await_args)
self.assertEqual(spec.await_args_list, [])
self.assertIsInstance(spec.awaited, _AwaitEvent)
spec.assert_not_awaited()
asyncio.run(main())
self.assertTrue(asyncio.iscoroutinefunction(spec))
self.assertTrue(asyncio.iscoroutine(awaitable))
self.assertEqual(spec.await_count, 1)
self.assertEqual(spec.await_args, call(1, 2, c=3))
self.assertEqual(spec.await_args_list, [call(1, 2, c=3)])
spec.assert_awaited_once()
spec.assert_awaited_once_with(1, 2, c=3)
spec.assert_awaited_with(1, 2, c=3)
spec.assert_awaited()
with self.assertRaises(AssertionError):
spec.assert_any_await(e=1)
def test_patch_with_autospec(self):
async def test_async():
with patch(f"{__name__}.async_func_args", autospec=True) as mock_method:
awaitable = mock_method(1, 2, c=3)
self.assertIsInstance(mock_method.mock, AsyncMock)
self.assertTrue(asyncio.iscoroutinefunction(mock_method))
self.assertTrue(asyncio.iscoroutine(awaitable))
self.assertTrue(inspect.isawaitable(awaitable))
# Verify the default values during mock setup
self.assertEqual(mock_method.await_count, 0)
self.assertEqual(mock_method.await_args_list, [])
self.assertIsNone(mock_method.await_args)
self.assertIsInstance(mock_method.awaited, _AwaitEvent)
mock_method.assert_not_awaited()
await awaitable
self.assertEqual(mock_method.await_count, 1)
self.assertEqual(mock_method.await_args, call(1, 2, c=3))
self.assertEqual(mock_method.await_args_list, [call(1, 2, c=3)])
mock_method.assert_awaited_once()
mock_method.assert_awaited_once_with(1, 2, c=3)
mock_method.assert_awaited_with(1, 2, c=3)
mock_method.assert_awaited()
mock_method.reset_mock()
self.assertEqual(mock_method.await_count, 0)
self.assertIsNone(mock_method.await_args)
self.assertEqual(mock_method.await_args_list, [])
asyncio.run(test_async())
class AsyncSpecTest(unittest.TestCase):
def test_spec_as_async_positional_magicmock(self):
mock = MagicMock(async_func)
self.assertIsInstance(mock, MagicMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_as_async_kw_magicmock(self):
mock = MagicMock(spec=async_func)
self.assertIsInstance(mock, MagicMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_as_async_kw_AsyncMock(self):
mock = AsyncMock(spec=async_func)
self.assertIsInstance(mock, AsyncMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_as_async_positional_AsyncMock(self):
mock = AsyncMock(async_func)
self.assertIsInstance(mock, AsyncMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_as_normal_kw_AsyncMock(self):
mock = AsyncMock(spec=normal_func)
self.assertIsInstance(mock, AsyncMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_as_normal_positional_AsyncMock(self):
mock = AsyncMock(normal_func)
self.assertIsInstance(mock, AsyncMock)
m = mock()
self.assertTrue(inspect.isawaitable(m))
asyncio.run(m)
def test_spec_async_mock(self):
@patch.object(AsyncClass, 'async_method', spec=True)
def test_async(mock_method):
self.assertIsInstance(mock_method, AsyncMock)
test_async()
def test_spec_parent_not_async_attribute_is(self):
@patch(async_foo_name, spec=True)
def test_async(mock_method):
self.assertIsInstance(mock_method, MagicMock)
self.assertIsInstance(mock_method.async_method, AsyncMock)
test_async()
def test_target_async_spec_not(self):
@patch.object(AsyncClass, 'async_method', spec=NormalClass.a)
def test_async_attribute(mock_method):
self.assertIsInstance(mock_method, MagicMock)
self.assertFalse(inspect.iscoroutine(mock_method))
self.assertFalse(inspect.isawaitable(mock_method))
test_async_attribute()
def test_target_not_async_spec_is(self):
@patch.object(NormalClass, 'a', spec=async_func)
def test_attribute_not_async_spec_is(mock_async_func):
self.assertIsInstance(mock_async_func, AsyncMock)
test_attribute_not_async_spec_is()
def test_spec_async_attributes(self):
@patch(normal_foo_name, spec=AsyncClass)
def test_async_attributes_coroutines(MockNormalClass):
self.assertIsInstance(MockNormalClass.async_method, AsyncMock)
self.assertIsInstance(MockNormalClass, MagicMock)
test_async_attributes_coroutines()
class AsyncSpecSetTest(unittest.TestCase):
def test_is_AsyncMock_patch(self):
@patch.object(AsyncClass, 'async_method', spec_set=True)
def test_async(async_method):
self.assertIsInstance(async_method, AsyncMock)
def test_is_async_AsyncMock(self):
mock = AsyncMock(spec_set=AsyncClass.async_method)
self.assertTrue(asyncio.iscoroutinefunction(mock))
self.assertIsInstance(mock, AsyncMock)
def test_is_child_AsyncMock(self):
mock = MagicMock(spec_set=AsyncClass)
self.assertTrue(asyncio.iscoroutinefunction(mock.async_method))
self.assertFalse(asyncio.iscoroutinefunction(mock.normal_method))
self.assertIsInstance(mock.async_method, AsyncMock)
self.assertIsInstance(mock.normal_method, MagicMock)
self.assertIsInstance(mock, MagicMock)
def test_magicmock_lambda_spec(self):
mock_obj = MagicMock()
mock_obj.mock_func = MagicMock(spec=lambda x: x)
with patch.object(mock_obj, "mock_func") as cm:
self.assertIsInstance(cm, MagicMock)
class AsyncArguments(unittest.TestCase):
def test_add_return_value(self):
async def addition(self, var):
return var + 1
mock = AsyncMock(addition, return_value=10)
output = asyncio.run(mock(5))
self.assertEqual(output, 10)
def test_add_side_effect_exception(self):
async def addition(var):
return var + 1
mock = AsyncMock(addition, side_effect=Exception('err'))
with self.assertRaises(Exception):
asyncio.run(mock(5))
def test_add_side_effect_function(self):
async def addition(var):
return var + 1
mock = AsyncMock(side_effect=addition)
result = asyncio.run(mock(5))
self.assertEqual(result, 6)
def test_add_side_effect_iterable(self):
vals = [1, 2, 3]
mock = AsyncMock(side_effect=vals)
for item in vals:
self.assertEqual(item, asyncio.run(mock()))
with self.assertRaises(RuntimeError) as e:
asyncio.run(mock())
self.assertEqual(
e.exception,
RuntimeError('coroutine raised StopIteration')
)
class AsyncContextManagerTest(unittest.TestCase):
class WithAsyncContextManager:
async def __aenter__(self, *args, **kwargs):
return self
async def __aexit__(self, *args, **kwargs):
pass
def test_magic_methods_are_async_mocks(self):
mock = MagicMock(self.WithAsyncContextManager())
self.assertIsInstance(mock.__aenter__, AsyncMock)
self.assertIsInstance(mock.__aexit__, AsyncMock)
def test_mock_supports_async_context_manager(self):
called = False
instance = self.WithAsyncContextManager()
mock_instance = MagicMock(instance)
async def use_context_manager():
nonlocal called
async with mock_instance as result:
called = True
return result
result = asyncio.run(use_context_manager())
self.assertTrue(called)
self.assertTrue(mock_instance.__aenter__.called)
self.assertTrue(mock_instance.__aexit__.called)
self.assertIsNot(mock_instance, result)
self.assertIsInstance(result, AsyncMock)
def test_mock_customize_async_context_manager(self):
instance = self.WithAsyncContextManager()
mock_instance = MagicMock(instance)
expected_result = object()
mock_instance.__aenter__.return_value = expected_result
async def use_context_manager():
async with mock_instance as result:
return result
self.assertIs(asyncio.run(use_context_manager()), expected_result)
def test_mock_customize_async_context_manager_with_coroutine(self):
enter_called = False
exit_called = False
async def enter_coroutine(*args):
nonlocal enter_called
enter_called = True
async def exit_coroutine(*args):
nonlocal exit_called
exit_called = True
instance = self.WithAsyncContextManager()
mock_instance = MagicMock(instance)
mock_instance.__aenter__ = enter_coroutine
mock_instance.__aexit__ = exit_coroutine
async def use_context_manager():
async with mock_instance:
pass
asyncio.run(use_context_manager())
self.assertTrue(enter_called)
self.assertTrue(exit_called)
def test_context_manager_raise_exception_by_default(self):
async def raise_in(context_manager):
async with context_manager:
raise TypeError()
instance = self.WithAsyncContextManager()
mock_instance = MagicMock(instance)
with self.assertRaises(TypeError):
asyncio.run(raise_in(mock_instance))
class AsyncIteratorTest(unittest.TestCase):
class WithAsyncIterator(object):
def __init__(self):
self.items = ["foo", "NormalFoo", "baz"]
def __aiter__(self):
return self
async def __anext__(self):
try:
return self.items.pop()
except IndexError:
pass
raise StopAsyncIteration
def test_mock_aiter_and_anext(self):
instance = self.WithAsyncIterator()
mock_instance = MagicMock(instance)
self.assertEqual(asyncio.iscoroutine(instance.__aiter__),
asyncio.iscoroutine(mock_instance.__aiter__))
self.assertEqual(asyncio.iscoroutine(instance.__anext__),
asyncio.iscoroutine(mock_instance.__anext__))
iterator = instance.__aiter__()
if asyncio.iscoroutine(iterator):
iterator = asyncio.run(iterator)
mock_iterator = mock_instance.__aiter__()
if asyncio.iscoroutine(mock_iterator):
mock_iterator = asyncio.run(mock_iterator)
self.assertEqual(asyncio.iscoroutine(iterator.__aiter__),
asyncio.iscoroutine(mock_iterator.__aiter__))
self.assertEqual(asyncio.iscoroutine(iterator.__anext__),
asyncio.iscoroutine(mock_iterator.__anext__))
def test_mock_async_for(self):
async def iterate(iterator):
accumulator = []
async for item in iterator:
accumulator.append(item)
return accumulator
expected = ["FOO", "BAR", "BAZ"]
with self.subTest("iterate through default value"):
mock_instance = MagicMock(self.WithAsyncIterator())
self.assertEqual([], asyncio.run(iterate(mock_instance)))
with self.subTest("iterate through set return_value"):
mock_instance = MagicMock(self.WithAsyncIterator())
mock_instance.__aiter__.return_value = expected[:]
self.assertEqual(expected, asyncio.run(iterate(mock_instance)))
with self.subTest("iterate through set return_value iterator"):
mock_instance = MagicMock(self.WithAsyncIterator())
mock_instance.__aiter__.return_value = iter(expected[:])
self.assertEqual(expected, asyncio.run(iterate(mock_instance)))
class AsyncMockAssert(unittest.TestCase):
def setUp(self):
self.mock = AsyncMock()
async def _runnable_test(self, *args):
if not args:
await self.mock()
else:
await self.mock(*args)
def test_assert_awaited(self):
with self.assertRaises(AssertionError):
self.mock.assert_awaited()
asyncio.run(self._runnable_test())
self.mock.assert_awaited()
def test_assert_awaited_once(self):
with self.assertRaises(AssertionError):
self.mock.assert_awaited_once()
asyncio.run(self._runnable_test())
self.mock.assert_awaited_once()
asyncio.run(self._runnable_test())
with self.assertRaises(AssertionError):
self.mock.assert_awaited_once()
def test_assert_awaited_with(self):
asyncio.run(self._runnable_test())
msg = 'expected await not found'
with self.assertRaisesRegex(AssertionError, msg):
self.mock.assert_awaited_with('foo')
asyncio.run(self._runnable_test('foo'))
self.mock.assert_awaited_with('foo')
asyncio.run(self._runnable_test('SomethingElse'))
with self.assertRaises(AssertionError):
self.mock.assert_awaited_with('foo')
def test_assert_awaited_once_with(self):
with self.assertRaises(AssertionError):
self.mock.assert_awaited_once_with('foo')
asyncio.run(self._runnable_test('foo'))
self.mock.assert_awaited_once_with('foo')
asyncio.run(self._runnable_test('foo'))
with self.assertRaises(AssertionError):
self.mock.assert_awaited_once_with('foo')
def test_assert_any_wait(self):
with self.assertRaises(AssertionError):
self.mock.assert_any_await('NormalFoo')
asyncio.run(self._runnable_test('foo'))
with self.assertRaises(AssertionError):
self.mock.assert_any_await('NormalFoo')
asyncio.run(self._runnable_test('NormalFoo'))
self.mock.assert_any_await('NormalFoo')
asyncio.run(self._runnable_test('SomethingElse'))
self.mock.assert_any_await('NormalFoo')
def test_assert_has_awaits_no_order(self):
calls = [call('NormalFoo'), call('baz')]
with self.assertRaises(AssertionError) as cm:
self.mock.assert_has_awaits(calls)
self.assertEqual(len(cm.exception.args), 1)
asyncio.run(self._runnable_test('foo'))
with self.assertRaises(AssertionError):
self.mock.assert_has_awaits(calls)
asyncio.run(self._runnable_test('NormalFoo'))
with self.assertRaises(AssertionError):
self.mock.assert_has_awaits(calls)
asyncio.run(self._runnable_test('baz'))
self.mock.assert_has_awaits(calls)
asyncio.run(self._runnable_test('SomethingElse'))
self.mock.assert_has_awaits(calls)
def test_awaits_asserts_with_any(self):
class Foo:
def __eq__(self, other): pass
asyncio.run(self._runnable_test(Foo(), 1))
self.mock.assert_has_awaits([call(ANY, 1)])
self.mock.assert_awaited_with(ANY, 1)
self.mock.assert_any_await(ANY, 1)
def test_awaits_asserts_with_spec_and_any(self):
class Foo:
def __eq__(self, other): pass
mock_with_spec = AsyncMock(spec=Foo)
async def _custom_mock_runnable_test(*args):
await mock_with_spec(*args)
asyncio.run(_custom_mock_runnable_test(Foo(), 1))
mock_with_spec.assert_has_awaits([call(ANY, 1)])
mock_with_spec.assert_awaited_with(ANY, 1)
mock_with_spec.assert_any_await(ANY, 1)
def test_assert_has_awaits_ordered(self):
calls = [call('NormalFoo'), call('baz')]
with self.assertRaises(AssertionError):
self.mock.assert_has_awaits(calls, any_order=True)
asyncio.run(self._runnable_test('baz'))
with self.assertRaises(AssertionError):
self.mock.assert_has_awaits(calls, any_order=True)
asyncio.run(self._runnable_test('foo'))
with self.assertRaises(AssertionError):
self.mock.assert_has_awaits(calls, any_order=True)
asyncio.run(self._runnable_test('NormalFoo'))
self.mock.assert_has_awaits(calls, any_order=True)
asyncio.run(self._runnable_test('qux'))
self.mock.assert_has_awaits(calls, any_order=True)
def test_assert_not_awaited(self):
self.mock.assert_not_awaited()
asyncio.run(self._runnable_test())
with self.assertRaises(AssertionError):
self.mock.assert_not_awaited()