bpo-32622: Native sendfile on windows (GH-5565)

* Support sendfile on Windows Proactor event loop naively.
(cherry picked from commit a19fb3c6aa)

Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
This commit is contained in:
Miss Islington (bot) 2018-02-25 09:10:58 -08:00 committed by GitHub
parent b6b6669cfd
commit 632c1cb571
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 431 additions and 93 deletions

View file

@ -6,11 +6,14 @@ proactor is only implemented on Windows with IOCP.
__all__ = 'BaseProactorEventLoop',
import io
import os
import socket
import warnings
from . import base_events
from . import constants
from . import events
from . import futures
from . import protocols
from . import sslproto
@ -107,6 +110,11 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self._force_close(exc)
def _force_close(self, exc):
if self._empty_waiter is not None:
if exc is None:
self._empty_waiter.set_result(None)
else:
self._empty_waiter.set_exception(exc)
if self._closing:
return
self._closing = True
@ -327,6 +335,10 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
_start_tls_compatible = True
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self._empty_waiter = None
def write(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError(
@ -334,6 +346,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
f"not {type(data).__name__}")
if self._eof_written:
raise RuntimeError('write_eof() already called')
if self._empty_waiter is not None:
raise RuntimeError('unable to write; sendfile is in progress')
if not data:
return
@ -393,6 +407,8 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
self._maybe_pause_protocol()
else:
self._write_fut.add_done_callback(self._loop_writing)
if self._empty_waiter is not None and self._write_fut is None:
self._empty_waiter.set_result(None)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
@ -407,6 +423,17 @@ class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
def abort(self):
self._force_close(None)
def _make_empty_waiter(self):
if self._empty_waiter is not None:
raise RuntimeError("Empty waiter is already set")
self._empty_waiter = self._loop.create_future()
if self._write_fut is None:
self._empty_waiter.set_result(None)
return self._empty_waiter
def _reset_empty_waiter(self):
self._empty_waiter = None
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
def __init__(self, *args, **kw):
@ -447,7 +474,7 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport,
transports.Transport):
"""Transport for connected sockets."""
_sendfile_compatible = constants._SendfileMode.FALLBACK
_sendfile_compatible = constants._SendfileMode.TRY_NATIVE
def _set_extra(self, sock):
self._extra['socket'] = sock
@ -556,6 +583,47 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
async def sock_accept(self, sock):
return await self._proactor.accept(sock)
async def _sock_sendfile_native(self, sock, file, offset, count):
try:
fileno = file.fileno()
except (AttributeError, io.UnsupportedOperation) as err:
raise events.SendfileNotAvailableError("not a regular file")
try:
fsize = os.fstat(fileno).st_size
except OSError as err:
raise events.SendfileNotAvailableError("not a regular file")
blocksize = count if count else fsize
if not blocksize:
return 0 # empty file
blocksize = min(blocksize, 0xffff_ffff)
end_pos = min(offset + count, fsize) if count else fsize
offset = min(offset, fsize)
total_sent = 0
try:
while True:
blocksize = min(end_pos - offset, blocksize)
if blocksize <= 0:
return total_sent
await self._proactor.sendfile(sock, file, offset, blocksize)
offset += blocksize
total_sent += blocksize
finally:
if total_sent > 0:
file.seek(offset)
async def _sendfile_native(self, transp, file, offset, count):
resume_reading = transp.is_reading()
transp.pause_reading()
await transp._make_empty_waiter()
try:
return await self.sock_sendfile(transp._sock, file, offset, count,
fallback=False)
finally:
transp._reset_empty_waiter()
if resume_reading:
transp.resume_reading()
def _close_self_pipe(self):
if self._self_reading_future is not None:
self._self_reading_future.cancel()