mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
[3.11] gh-98624 Add mutex to unittest.mock.NonCallableMock (GH-98688) (#98797)
(cherry picked from commit 0346eddbe9
)
Co-authored-by: noah-weingarden <33741795+noah-weingarden@users.noreply.github.com>
This commit is contained in:
parent
434943e0b4
commit
725efe4219
2 changed files with 39 additions and 27 deletions
|
@ -35,6 +35,7 @@ from asyncio import iscoroutinefunction
|
|||
from types import CodeType, ModuleType, MethodType
|
||||
from unittest.util import safe_repr
|
||||
from functools import wraps, partial
|
||||
from threading import RLock
|
||||
|
||||
|
||||
class InvalidSpecError(Exception):
|
||||
|
@ -402,6 +403,14 @@ class Base(object):
|
|||
class NonCallableMock(Base):
|
||||
"""A non-callable version of `Mock`"""
|
||||
|
||||
# Store a mutex as a class attribute in order to protect concurrent access
|
||||
# to mock attributes. Using a class attribute allows all NonCallableMock
|
||||
# instances to share the mutex for simplicity.
|
||||
#
|
||||
# See https://github.com/python/cpython/issues/98624 for why this is
|
||||
# necessary.
|
||||
_lock = RLock()
|
||||
|
||||
def __new__(cls, /, *args, **kw):
|
||||
# every instance has its own class
|
||||
# so we can create magic methods on the
|
||||
|
@ -644,35 +653,36 @@ class NonCallableMock(Base):
|
|||
f"{name!r} is not a valid assertion. Use a spec "
|
||||
f"for the mock if {name!r} is meant to be an attribute.")
|
||||
|
||||
result = self._mock_children.get(name)
|
||||
if result is _deleted:
|
||||
raise AttributeError(name)
|
||||
elif result is None:
|
||||
wraps = None
|
||||
if self._mock_wraps is not None:
|
||||
# XXXX should we get the attribute without triggering code
|
||||
# execution?
|
||||
wraps = getattr(self._mock_wraps, name)
|
||||
with NonCallableMock._lock:
|
||||
result = self._mock_children.get(name)
|
||||
if result is _deleted:
|
||||
raise AttributeError(name)
|
||||
elif result is None:
|
||||
wraps = None
|
||||
if self._mock_wraps is not None:
|
||||
# XXXX should we get the attribute without triggering code
|
||||
# execution?
|
||||
wraps = getattr(self._mock_wraps, name)
|
||||
|
||||
result = self._get_child_mock(
|
||||
parent=self, name=name, wraps=wraps, _new_name=name,
|
||||
_new_parent=self
|
||||
)
|
||||
self._mock_children[name] = result
|
||||
|
||||
elif isinstance(result, _SpecState):
|
||||
try:
|
||||
result = create_autospec(
|
||||
result.spec, result.spec_set, result.instance,
|
||||
result.parent, result.name
|
||||
result = self._get_child_mock(
|
||||
parent=self, name=name, wraps=wraps, _new_name=name,
|
||||
_new_parent=self
|
||||
)
|
||||
except InvalidSpecError:
|
||||
target_name = self.__dict__['_mock_name'] or self
|
||||
raise InvalidSpecError(
|
||||
f'Cannot autospec attr {name!r} from target '
|
||||
f'{target_name!r} as it has already been mocked out. '
|
||||
f'[target={self!r}, attr={result.spec!r}]')
|
||||
self._mock_children[name] = result
|
||||
self._mock_children[name] = result
|
||||
|
||||
elif isinstance(result, _SpecState):
|
||||
try:
|
||||
result = create_autospec(
|
||||
result.spec, result.spec_set, result.instance,
|
||||
result.parent, result.name
|
||||
)
|
||||
except InvalidSpecError:
|
||||
target_name = self.__dict__['_mock_name'] or self
|
||||
raise InvalidSpecError(
|
||||
f'Cannot autospec attr {name!r} from target '
|
||||
f'{target_name!r} as it has already been mocked out. '
|
||||
f'[target={self!r}, attr={result.spec!r}]')
|
||||
self._mock_children[name] = result
|
||||
|
||||
return result
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue