mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
gh-91487: Optimize asyncio UDP speed (GH-91488)
Fix #91487 When transferring a small file, e.g. 256 KiB, the speed of this PR is comparable. However, if a large file, e.g. 65536 KiB, is transferred, asyncio UDP will be over 100 times faster than the original. The speed is presumably significantly faster if a larger file is transferred, e.g. 1048576 KiB. Automerge-Triggered-By: GH:gpshead
This commit is contained in:
parent
c9e231de85
commit
42fabc3ea7
3 changed files with 10 additions and 2 deletions
|
@ -459,6 +459,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport,
|
||||||
waiter=None, extra=None):
|
waiter=None, extra=None):
|
||||||
self._address = address
|
self._address = address
|
||||||
self._empty_waiter = None
|
self._empty_waiter = None
|
||||||
|
self._buffer_size = 0
|
||||||
# We don't need to call _protocol.connection_made() since our base
|
# We don't need to call _protocol.connection_made() since our base
|
||||||
# constructor does it for us.
|
# constructor does it for us.
|
||||||
super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)
|
super().__init__(loop, sock, protocol, waiter=waiter, extra=extra)
|
||||||
|
@ -471,7 +472,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport,
|
||||||
_set_socket_extra(self, sock)
|
_set_socket_extra(self, sock)
|
||||||
|
|
||||||
def get_write_buffer_size(self):
|
def get_write_buffer_size(self):
|
||||||
return sum(len(data) for data, _ in self._buffer)
|
return self._buffer_size
|
||||||
|
|
||||||
def abort(self):
|
def abort(self):
|
||||||
self._force_close(None)
|
self._force_close(None)
|
||||||
|
@ -496,6 +497,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport,
|
||||||
|
|
||||||
# Ensure that what we buffer is immutable.
|
# Ensure that what we buffer is immutable.
|
||||||
self._buffer.append((bytes(data), addr))
|
self._buffer.append((bytes(data), addr))
|
||||||
|
self._buffer_size += len(data)
|
||||||
|
|
||||||
if self._write_fut is None:
|
if self._write_fut is None:
|
||||||
# No current write operations are active, kick one off
|
# No current write operations are active, kick one off
|
||||||
|
@ -522,6 +524,7 @@ class _ProactorDatagramTransport(_ProactorBasePipeTransport,
|
||||||
return
|
return
|
||||||
|
|
||||||
data, addr = self._buffer.popleft()
|
data, addr = self._buffer.popleft()
|
||||||
|
self._buffer_size -= len(data)
|
||||||
if self._address is not None:
|
if self._address is not None:
|
||||||
self._write_fut = self._loop._proactor.send(self._sock,
|
self._write_fut = self._loop._proactor.send(self._sock,
|
||||||
data)
|
data)
|
||||||
|
|
|
@ -1131,6 +1131,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
||||||
waiter=None, extra=None):
|
waiter=None, extra=None):
|
||||||
super().__init__(loop, sock, protocol, extra)
|
super().__init__(loop, sock, protocol, extra)
|
||||||
self._address = address
|
self._address = address
|
||||||
|
self._buffer_size = 0
|
||||||
self._loop.call_soon(self._protocol.connection_made, self)
|
self._loop.call_soon(self._protocol.connection_made, self)
|
||||||
# only start reading when connection_made() has been called
|
# only start reading when connection_made() has been called
|
||||||
self._loop.call_soon(self._add_reader,
|
self._loop.call_soon(self._add_reader,
|
||||||
|
@ -1141,7 +1142,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
||||||
waiter, None)
|
waiter, None)
|
||||||
|
|
||||||
def get_write_buffer_size(self):
|
def get_write_buffer_size(self):
|
||||||
return sum(len(data) for data, _ in self._buffer)
|
return self._buffer_size
|
||||||
|
|
||||||
def _read_ready(self):
|
def _read_ready(self):
|
||||||
if self._conn_lost:
|
if self._conn_lost:
|
||||||
|
@ -1200,11 +1201,13 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
||||||
|
|
||||||
# Ensure that what we buffer is immutable.
|
# Ensure that what we buffer is immutable.
|
||||||
self._buffer.append((bytes(data), addr))
|
self._buffer.append((bytes(data), addr))
|
||||||
|
self._buffer_size += len(data)
|
||||||
self._maybe_pause_protocol()
|
self._maybe_pause_protocol()
|
||||||
|
|
||||||
def _sendto_ready(self):
|
def _sendto_ready(self):
|
||||||
while self._buffer:
|
while self._buffer:
|
||||||
data, addr = self._buffer.popleft()
|
data, addr = self._buffer.popleft()
|
||||||
|
self._buffer_size -= len(data)
|
||||||
try:
|
try:
|
||||||
if self._extra['peername']:
|
if self._extra['peername']:
|
||||||
self._sock.send(data)
|
self._sock.send(data)
|
||||||
|
@ -1212,6 +1215,7 @@ class _SelectorDatagramTransport(_SelectorTransport):
|
||||||
self._sock.sendto(data, addr)
|
self._sock.sendto(data, addr)
|
||||||
except (BlockingIOError, InterruptedError):
|
except (BlockingIOError, InterruptedError):
|
||||||
self._buffer.appendleft((data, addr)) # Try again later.
|
self._buffer.appendleft((data, addr)) # Try again later.
|
||||||
|
self._buffer_size += len(data)
|
||||||
break
|
break
|
||||||
except OSError as exc:
|
except OSError as exc:
|
||||||
self._protocol.error_received(exc)
|
self._protocol.error_received(exc)
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Optimize asyncio UDP speed, over 100 times faster when transferring a large file.
|
Loading…
Add table
Add a link
Reference in a new issue