asyncio.transports: Make _ProactorBasePipeTransport use _FlowControlMixin

This commit is contained in:
Yury Selivanov 2014-02-18 18:41:13 -05:00
parent 569efa2e4b
commit 3cb9914488
4 changed files with 75 additions and 125 deletions

View file

@ -15,7 +15,8 @@ from . import transports
from .log import logger
class _ProactorBasePipeTransport(transports.BaseTransport):
class _ProactorBasePipeTransport(transports._FlowControlMixin,
transports.BaseTransport):
"""Base class for pipe and socket transports."""
def __init__(self, loop, sock, protocol, waiter=None,
@ -33,8 +34,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
self._conn_lost = 0
self._closing = False # Set when close() called.
self._eof_written = False
self._protocol_paused = False
self.set_write_buffer_limits()
if self._server is not None:
self._server.attach(self)
self._loop.call_soon(self._protocol.connection_made, self)
@ -94,56 +93,6 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
server.detach(self)
self._server = None
# XXX The next four methods are nearly identical to corresponding
# ones in _SelectorTransport. Maybe refactor buffer management to
# share the implementations? (Also these are really only needed
# by _ProactorWritePipeTransport but since _buffer is defined on
# the base class I am putting it here for now.)
def _maybe_pause_protocol(self):
size = self.get_write_buffer_size()
if size <= self._high_water:
return
if not self._protocol_paused:
self._protocol_paused = True
try:
self._protocol.pause_writing()
except Exception as exc:
self._loop.call_exception_handler({
'message': 'protocol.pause_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self.get_write_buffer_size() <= self._low_water):
self._protocol_paused = False
try:
self._protocol.resume_writing()
except Exception as exc:
self._loop.call_exception_handler({
'message': 'protocol.resume_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
if low is None:
high = 64*1024
else:
high = 4*low
if low is None:
low = high // 4
if not high >= low >= 0:
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
(high, low))
self._high_water = high
self._low_water = low
def get_write_buffer_size(self):
size = self._pending_write
if self._buffer is not None: