mirror of
https://github.com/python/cpython.git
synced 2025-10-17 20:28:43 +00:00
bpo-31855: unittest.mock.mock_open() results now respects the argument of read([size]) (GH-11521)
unittest.mock.mock_open() results now respects the argument of read([size]) Co-Authored-By: remilapeyre <remi.lapeyre@henki.fr>
This commit is contained in:
parent
ad4ed87241
commit
11a8832c98
3 changed files with 21 additions and 25 deletions
|
@ -25,6 +25,7 @@ __all__ = (
|
||||||
__version__ = '1.0'
|
__version__ = '1.0'
|
||||||
|
|
||||||
|
|
||||||
|
import io
|
||||||
import inspect
|
import inspect
|
||||||
import pprint
|
import pprint
|
||||||
import sys
|
import sys
|
||||||
|
@ -2318,25 +2319,12 @@ MethodWrapperTypes = (
|
||||||
|
|
||||||
file_spec = None
|
file_spec = None
|
||||||
|
|
||||||
def _iterate_read_data(read_data):
|
|
||||||
# Helper for mock_open:
|
|
||||||
# Retrieve lines from read_data via a generator so that separate calls to
|
|
||||||
# readline, read, and readlines are properly interleaved
|
|
||||||
sep = b'\n' if isinstance(read_data, bytes) else '\n'
|
|
||||||
data_as_list = [l + sep for l in read_data.split(sep)]
|
|
||||||
|
|
||||||
if data_as_list[-1] == sep:
|
def _to_stream(read_data):
|
||||||
# If the last line ended in a newline, the list comprehension will have an
|
if isinstance(read_data, bytes):
|
||||||
# extra entry that's just a newline. Remove this.
|
return io.BytesIO(read_data)
|
||||||
data_as_list = data_as_list[:-1]
|
|
||||||
else:
|
else:
|
||||||
# If there wasn't an extra newline by itself, then the file being
|
return io.StringIO(read_data)
|
||||||
# emulated doesn't have a newline to end the last line remove the
|
|
||||||
# newline that our naive format() added
|
|
||||||
data_as_list[-1] = data_as_list[-1][:-1]
|
|
||||||
|
|
||||||
for line in data_as_list:
|
|
||||||
yield line
|
|
||||||
|
|
||||||
|
|
||||||
def mock_open(mock=None, read_data=''):
|
def mock_open(mock=None, read_data=''):
|
||||||
|
@ -2351,20 +2339,23 @@ def mock_open(mock=None, read_data=''):
|
||||||
`read_data` is a string for the `read`, `readline` and `readlines` of the
|
`read_data` is a string for the `read`, `readline` and `readlines` of the
|
||||||
file handle to return. This is an empty string by default.
|
file handle to return. This is an empty string by default.
|
||||||
"""
|
"""
|
||||||
|
_read_data = _to_stream(read_data)
|
||||||
|
_state = [_read_data, None]
|
||||||
|
|
||||||
def _readlines_side_effect(*args, **kwargs):
|
def _readlines_side_effect(*args, **kwargs):
|
||||||
if handle.readlines.return_value is not None:
|
if handle.readlines.return_value is not None:
|
||||||
return handle.readlines.return_value
|
return handle.readlines.return_value
|
||||||
return list(_state[0])
|
return _state[0].readlines(*args, **kwargs)
|
||||||
|
|
||||||
def _read_side_effect(*args, **kwargs):
|
def _read_side_effect(*args, **kwargs):
|
||||||
if handle.read.return_value is not None:
|
if handle.read.return_value is not None:
|
||||||
return handle.read.return_value
|
return handle.read.return_value
|
||||||
return type(read_data)().join(_state[0])
|
return _state[0].read(*args, **kwargs)
|
||||||
|
|
||||||
def _readline_side_effect():
|
def _readline_side_effect(*args, **kwargs):
|
||||||
yield from _iter_side_effect()
|
yield from _iter_side_effect()
|
||||||
while True:
|
while True:
|
||||||
yield type(read_data)()
|
yield _state[0].readline(*args, **kwargs)
|
||||||
|
|
||||||
def _iter_side_effect():
|
def _iter_side_effect():
|
||||||
if handle.readline.return_value is not None:
|
if handle.readline.return_value is not None:
|
||||||
|
@ -2384,8 +2375,6 @@ def mock_open(mock=None, read_data=''):
|
||||||
handle = MagicMock(spec=file_spec)
|
handle = MagicMock(spec=file_spec)
|
||||||
handle.__enter__.return_value = handle
|
handle.__enter__.return_value = handle
|
||||||
|
|
||||||
_state = [_iterate_read_data(read_data), None]
|
|
||||||
|
|
||||||
handle.write.return_value = None
|
handle.write.return_value = None
|
||||||
handle.read.return_value = None
|
handle.read.return_value = None
|
||||||
handle.readline.return_value = None
|
handle.readline.return_value = None
|
||||||
|
@ -2398,7 +2387,7 @@ def mock_open(mock=None, read_data=''):
|
||||||
handle.__iter__.side_effect = _iter_side_effect
|
handle.__iter__.side_effect = _iter_side_effect
|
||||||
|
|
||||||
def reset_data(*args, **kwargs):
|
def reset_data(*args, **kwargs):
|
||||||
_state[0] = _iterate_read_data(read_data)
|
_state[0] = _to_stream(read_data)
|
||||||
if handle.readline.side_effect == _state[1]:
|
if handle.readline.side_effect == _state[1]:
|
||||||
# Only reset the side effect if the user hasn't overridden it.
|
# Only reset the side effect if the user hasn't overridden it.
|
||||||
_state[1] = _readline_side_effect()
|
_state[1] = _readline_side_effect()
|
||||||
|
|
|
@ -283,7 +283,12 @@ class TestMockOpen(unittest.TestCase):
|
||||||
# for mocks returned by mock_open
|
# for mocks returned by mock_open
|
||||||
some_data = 'foo\nbar\nbaz'
|
some_data = 'foo\nbar\nbaz'
|
||||||
mock = mock_open(read_data=some_data)
|
mock = mock_open(read_data=some_data)
|
||||||
self.assertEqual(mock().read(10), some_data)
|
self.assertEqual(mock().read(10), some_data[:10])
|
||||||
|
self.assertEqual(mock().read(10), some_data[:10])
|
||||||
|
|
||||||
|
f = mock()
|
||||||
|
self.assertEqual(f.read(10), some_data[:10])
|
||||||
|
self.assertEqual(f.read(10), some_data[10:])
|
||||||
|
|
||||||
|
|
||||||
def test_interleaved_reads(self):
|
def test_interleaved_reads(self):
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
:func:`unittest.mock.mock_open` results now respects the argument of read([size]).
|
||||||
|
Patch contributed by Rémi Lapeyre.
|
Loading…
Add table
Add a link
Reference in a new issue