mirror of
https://github.com/python/cpython.git
synced 2025-08-31 05:58:33 +00:00
bpo-32015: Asyncio looping during simultaneously socket read/write an… (#4386)
* bpo-32015: Asyncio cycling during simultaneously socket read/write and reconnection * Tests fix * Tests fix * News add * Add new unit tests
This commit is contained in:
parent
56935a53b1
commit
e1d62e0b7c
3 changed files with 79 additions and 38 deletions
|
@ -370,25 +370,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
self._sock_recv(fut, False, sock, n)
|
||||
self._sock_recv(fut, None, sock, n)
|
||||
return fut
|
||||
|
||||
def _sock_recv(self, fut, registered, sock, n):
|
||||
def _sock_recv(self, fut, registered_fd, sock, n):
|
||||
# _sock_recv() can add itself as an I/O callback if the operation can't
|
||||
# be done immediately. Don't use it directly, call sock_recv().
|
||||
fd = sock.fileno()
|
||||
if registered:
|
||||
if registered_fd is not None:
|
||||
# Remove the callback early. It should be rare that the
|
||||
# selector says the fd is ready but the call still returns
|
||||
# EAGAIN, and I am willing to take a hit in that case in
|
||||
# order to simplify the common case.
|
||||
self.remove_reader(fd)
|
||||
self.remove_reader(registered_fd)
|
||||
if fut.cancelled():
|
||||
return
|
||||
try:
|
||||
data = sock.recv(n)
|
||||
except (BlockingIOError, InterruptedError):
|
||||
self.add_reader(fd, self._sock_recv, fut, True, sock, n)
|
||||
fd = sock.fileno()
|
||||
self.add_reader(fd, self._sock_recv, fut, fd, sock, n)
|
||||
except Exception as exc:
|
||||
fut.set_exception(exc)
|
||||
else:
|
||||
|
@ -405,25 +405,25 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
if self._debug and sock.gettimeout() != 0:
|
||||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
self._sock_recv_into(fut, False, sock, buf)
|
||||
self._sock_recv_into(fut, None, sock, buf)
|
||||
return fut
|
||||
|
||||
def _sock_recv_into(self, fut, registered, sock, buf):
|
||||
def _sock_recv_into(self, fut, registered_fd, sock, buf):
|
||||
# _sock_recv_into() can add itself as an I/O callback if the operation
|
||||
# can't be done immediately. Don't use it directly, call sock_recv_into().
|
||||
fd = sock.fileno()
|
||||
if registered:
|
||||
if registered_fd is not None:
|
||||
# Remove the callback early. It should be rare that the
|
||||
# selector says the fd is ready but the call still returns
|
||||
# EAGAIN, and I am willing to take a hit in that case in
|
||||
# order to simplify the common case.
|
||||
self.remove_reader(fd)
|
||||
self.remove_reader(registered_fd)
|
||||
if fut.cancelled():
|
||||
return
|
||||
try:
|
||||
nbytes = sock.recv_into(buf)
|
||||
except (BlockingIOError, InterruptedError):
|
||||
self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
|
||||
fd = sock.fileno()
|
||||
self.add_reader(fd, self._sock_recv_into, fut, fd, sock, buf)
|
||||
except Exception as exc:
|
||||
fut.set_exception(exc)
|
||||
else:
|
||||
|
@ -444,16 +444,14 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
raise ValueError("the socket must be non-blocking")
|
||||
fut = self.create_future()
|
||||
if data:
|
||||
self._sock_sendall(fut, False, sock, data)
|
||||
self._sock_sendall(fut, None, sock, data)
|
||||
else:
|
||||
fut.set_result(None)
|
||||
return fut
|
||||
|
||||
def _sock_sendall(self, fut, registered, sock, data):
|
||||
fd = sock.fileno()
|
||||
|
||||
if registered:
|
||||
self.remove_writer(fd)
|
||||
def _sock_sendall(self, fut, registered_fd, sock, data):
|
||||
if registered_fd is not None:
|
||||
self.remove_writer(registered_fd)
|
||||
if fut.cancelled():
|
||||
return
|
||||
|
||||
|
@ -470,7 +468,8 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
else:
|
||||
if n:
|
||||
data = data[n:]
|
||||
self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
|
||||
fd = sock.fileno()
|
||||
self.add_writer(fd, self._sock_sendall, fut, fd, sock, data)
|
||||
|
||||
@coroutine
|
||||
def sock_connect(self, sock, address):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue