mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
asyncio: sync with Tulip
* Tulip issue 184: FlowControlMixin constructor now get the event loop if the loop parameter is not set. Add unit tests to ensure that constructor of StreamReader and StreamReaderProtocol classes get the event loop. * Remove outdated TODO/XXX
This commit is contained in:
parent
7eb10311be
commit
70db9e428a
6 changed files with 29 additions and 11 deletions
|
@ -20,7 +20,6 @@ _FINISHED = 'FINISHED'
|
||||||
|
|
||||||
_PY34 = sys.version_info >= (3, 4)
|
_PY34 = sys.version_info >= (3, 4)
|
||||||
|
|
||||||
# TODO: Do we really want to depend on concurrent.futures internals?
|
|
||||||
Error = concurrent.futures._base.Error
|
Error = concurrent.futures._base.Error
|
||||||
CancelledError = concurrent.futures.CancelledError
|
CancelledError = concurrent.futures.CancelledError
|
||||||
TimeoutError = concurrent.futures.TimeoutError
|
TimeoutError = concurrent.futures.TimeoutError
|
||||||
|
@ -30,7 +29,6 @@ STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging
|
||||||
|
|
||||||
class InvalidStateError(Error):
|
class InvalidStateError(Error):
|
||||||
"""The operation is not allowed in this state."""
|
"""The operation is not allowed in this state."""
|
||||||
# TODO: Show the future, its state, the method, and the required state.
|
|
||||||
|
|
||||||
|
|
||||||
class _TracebackLogger:
|
class _TracebackLogger:
|
||||||
|
|
|
@ -487,7 +487,8 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
||||||
self.call_soon(loop)
|
self.call_soon(loop)
|
||||||
|
|
||||||
def _process_events(self, event_list):
|
def _process_events(self, event_list):
|
||||||
pass # XXX hard work currently done in poll
|
# Events are processed in the IocpProactor._poll() method
|
||||||
|
pass
|
||||||
|
|
||||||
def _stop_accept_futures(self):
|
def _stop_accept_futures(self):
|
||||||
for future in self._accept_futures.values():
|
for future in self._accept_futures.values():
|
||||||
|
|
|
@ -145,7 +145,6 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
||||||
pass # False alarm.
|
pass # False alarm.
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
# There's nowhere to send the error, so just log it.
|
# There's nowhere to send the error, so just log it.
|
||||||
# TODO: Someone will want an error handler for this.
|
|
||||||
if exc.errno in (errno.EMFILE, errno.ENFILE,
|
if exc.errno in (errno.EMFILE, errno.ENFILE,
|
||||||
errno.ENOBUFS, errno.ENOMEM):
|
errno.ENOBUFS, errno.ENOMEM):
|
||||||
# Some platforms (e.g. Linux keep reporting the FD as
|
# Some platforms (e.g. Linux keep reporting the FD as
|
||||||
|
|
|
@ -145,7 +145,10 @@ class FlowControlMixin(protocols.Protocol):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, loop=None):
|
def __init__(self, loop=None):
|
||||||
self._loop = loop # May be None; we may never need it.
|
if loop is None:
|
||||||
|
self._loop = events.get_event_loop()
|
||||||
|
else:
|
||||||
|
self._loop = loop
|
||||||
self._paused = False
|
self._paused = False
|
||||||
self._drain_waiter = None
|
self._drain_waiter = None
|
||||||
self._connection_lost = False
|
self._connection_lost = False
|
||||||
|
@ -306,8 +309,9 @@ class StreamReader:
|
||||||
# it also doubles as half the buffer limit.
|
# it also doubles as half the buffer limit.
|
||||||
self._limit = limit
|
self._limit = limit
|
||||||
if loop is None:
|
if loop is None:
|
||||||
loop = events.get_event_loop()
|
self._loop = events.get_event_loop()
|
||||||
self._loop = loop
|
else:
|
||||||
|
self._loop = loop
|
||||||
self._buffer = bytearray()
|
self._buffer = bytearray()
|
||||||
self._eof = False # Whether we're done.
|
self._eof = False # Whether we're done.
|
||||||
self._waiter = None # A future.
|
self._waiter = None # A future.
|
||||||
|
|
|
@ -496,9 +496,6 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
|
||||||
def can_write_eof(self):
|
def can_write_eof(self):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# TODO: Make the relationships between write_eof(), close(),
|
|
||||||
# abort(), _fatal_error() and _close() more straightforward.
|
|
||||||
|
|
||||||
def write_eof(self):
|
def write_eof(self):
|
||||||
if self._closing:
|
if self._closing:
|
||||||
return
|
return
|
||||||
|
@ -897,7 +894,7 @@ class FastChildWatcher(BaseChildWatcher):
|
||||||
|
|
||||||
|
|
||||||
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
|
||||||
"""XXX"""
|
"""UNIX event loop policy with a watcher for child processes."""
|
||||||
_loop_factory = _UnixSelectorEventLoop
|
_loop_factory = _UnixSelectorEventLoop
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
|
@ -625,6 +625,25 @@ os.close(fd)
|
||||||
data = self.loop.run_until_complete(reader.read(-1))
|
data = self.loop.run_until_complete(reader.read(-1))
|
||||||
self.assertEqual(data, b'data')
|
self.assertEqual(data, b'data')
|
||||||
|
|
||||||
|
def test_streamreader_constructor(self):
|
||||||
|
self.addCleanup(asyncio.set_event_loop, None)
|
||||||
|
asyncio.set_event_loop(self.loop)
|
||||||
|
|
||||||
|
# Tulip issue #184: Ensure that StreamReaderProtocol constructor
|
||||||
|
# retrieves the current loop if the loop parameter is not set
|
||||||
|
reader = asyncio.StreamReader()
|
||||||
|
self.assertIs(reader._loop, self.loop)
|
||||||
|
|
||||||
|
def test_streamreaderprotocol_constructor(self):
|
||||||
|
self.addCleanup(asyncio.set_event_loop, None)
|
||||||
|
asyncio.set_event_loop(self.loop)
|
||||||
|
|
||||||
|
# Tulip issue #184: Ensure that StreamReaderProtocol constructor
|
||||||
|
# retrieves the current loop if the loop parameter is not set
|
||||||
|
reader = mock.Mock()
|
||||||
|
protocol = asyncio.StreamReaderProtocol(reader)
|
||||||
|
self.assertIs(protocol._loop, self.loop)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue