bpo-30064: Fix asyncio loop.sock_* race condition issue (GH-20369)

(cherry picked from commit 210a137396)

Co-authored-by: Fantix King <fantix.king@gmail.com>
This commit is contained in:
Miss Islington (bot) 2020-05-27 13:39:03 -07:00 committed by GitHub
parent c011d1b5be
commit 3a2667d91e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 16 deletions

View file

@ -1,4 +1,5 @@
import socket
import time
import asyncio
import sys
from asyncio import proactor_events
@ -122,6 +123,136 @@ class BaseSockTestsMixin:
sock = socket.socket()
self._basetest_sock_recv_into(httpd, sock)
async def _basetest_sock_recv_racing(self, httpd, sock):
sock.setblocking(False)
await self.loop.sock_connect(sock, httpd.address)
task = asyncio.create_task(self.loop.sock_recv(sock, 1024))
await asyncio.sleep(0)
task.cancel()
asyncio.create_task(
self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
data = await self.loop.sock_recv(sock, 1024)
# consume data
await self.loop.sock_recv(sock, 1024)
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
async def _basetest_sock_recv_into_racing(self, httpd, sock):
sock.setblocking(False)
await self.loop.sock_connect(sock, httpd.address)
data = bytearray(1024)
with memoryview(data) as buf:
task = asyncio.create_task(
self.loop.sock_recv_into(sock, buf[:1024]))
await asyncio.sleep(0)
task.cancel()
task = asyncio.create_task(
self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
nbytes = await self.loop.sock_recv_into(sock, buf[:1024])
# consume data
await self.loop.sock_recv_into(sock, buf[nbytes:])
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
await task
async def _basetest_sock_send_racing(self, listener, sock):
listener.bind(('127.0.0.1', 0))
listener.listen(1)
# make connection
sock.setblocking(False)
task = asyncio.create_task(
self.loop.sock_connect(sock, listener.getsockname()))
await asyncio.sleep(0)
server = listener.accept()[0]
server.setblocking(False)
with server:
await task
# fill the buffer
with self.assertRaises(BlockingIOError):
while True:
sock.send(b' ' * 5)
# cancel a blocked sock_sendall
task = asyncio.create_task(
self.loop.sock_sendall(sock, b'hello'))
await asyncio.sleep(0)
task.cancel()
# clear the buffer
async def recv_until():
data = b''
while not data:
data = await self.loop.sock_recv(server, 1024)
data = data.strip()
return data
task = asyncio.create_task(recv_until())
# immediately register another sock_sendall
await self.loop.sock_sendall(sock, b'world')
data = await task
# ProactorEventLoop could deliver hello
self.assertTrue(data.endswith(b'world'))
async def _basetest_sock_connect_racing(self, listener, sock):
listener.bind(('127.0.0.1', 0))
addr = listener.getsockname()
sock.setblocking(False)
task = asyncio.create_task(self.loop.sock_connect(sock, addr))
await asyncio.sleep(0)
task.cancel()
listener.listen(1)
i = 0
while True:
try:
await self.loop.sock_connect(sock, addr)
break
except ConnectionRefusedError: # on Linux we need another retry
await self.loop.sock_connect(sock, addr)
break
except OSError as e: # on Windows we need more retries
# A connect request was made on an already connected socket
if getattr(e, 'winerror', 0) == 10056:
break
# https://stackoverflow.com/a/54437602/3316267
if getattr(e, 'winerror', 0) != 10022:
raise
i += 1
if i >= 128:
raise # too many retries
# avoid touching event loop to maintain race condition
time.sleep(0.01)
def test_sock_client_racing(self):
with test_utils.run_test_server() as httpd:
sock = socket.socket()
with sock:
self.loop.run_until_complete(asyncio.wait_for(
self._basetest_sock_recv_racing(httpd, sock), 10))
sock = socket.socket()
with sock:
self.loop.run_until_complete(asyncio.wait_for(
self._basetest_sock_recv_into_racing(httpd, sock), 10))
listener = socket.socket()
sock = socket.socket()
with listener, sock:
self.loop.run_until_complete(asyncio.wait_for(
self._basetest_sock_send_racing(listener, sock), 10))
listener = socket.socket()
sock = socket.socket()
with listener, sock:
self.loop.run_until_complete(asyncio.wait_for(
self._basetest_sock_connect_racing(listener, sock), 10))
async def _basetest_huge_content(self, address):
sock = socket.socket()
sock.setblocking(False)