mirror of
https://github.com/python/cpython.git
synced 2025-08-30 21:48:47 +00:00
asyncio, Tulip issue 157: Improve test_events.py, avoid run_briefly() which is
not reliable
This commit is contained in:
parent
eeeebcd816
commit
e6a537976e
2 changed files with 71 additions and 73 deletions
|
@ -21,10 +21,11 @@ try:
|
|||
except ImportError: # pragma: no cover
|
||||
ssl = None
|
||||
|
||||
from . import tasks
|
||||
from . import base_events
|
||||
from . import events
|
||||
from . import futures
|
||||
from . import selectors
|
||||
from . import tasks
|
||||
|
||||
|
||||
if sys.platform == 'win32': # pragma: no cover
|
||||
|
@ -52,18 +53,14 @@ def run_briefly(loop):
|
|||
gen.close()
|
||||
|
||||
|
||||
def run_until(loop, pred, timeout=None):
|
||||
if timeout is not None:
|
||||
def run_until(loop, pred, timeout=30):
|
||||
deadline = time.time() + timeout
|
||||
while not pred():
|
||||
if timeout is not None:
|
||||
timeout = deadline - time.time()
|
||||
if timeout <= 0:
|
||||
return False
|
||||
loop.run_until_complete(tasks.sleep(timeout, loop=loop))
|
||||
else:
|
||||
run_briefly(loop)
|
||||
return True
|
||||
raise futures.TimeoutError()
|
||||
loop.run_until_complete(tasks.sleep(0.001, loop=loop))
|
||||
|
||||
|
||||
def run_once(loop):
|
||||
|
|
|
@ -56,6 +56,7 @@ SIGNING_CA = data_file('pycacert.pem')
|
|||
|
||||
|
||||
class MyBaseProto(asyncio.Protocol):
|
||||
connected = None
|
||||
done = None
|
||||
|
||||
def __init__(self, loop=None):
|
||||
|
@ -63,12 +64,15 @@ class MyBaseProto(asyncio.Protocol):
|
|||
self.state = 'INITIAL'
|
||||
self.nbytes = 0
|
||||
if loop is not None:
|
||||
self.connected = asyncio.Future(loop=loop)
|
||||
self.done = asyncio.Future(loop=loop)
|
||||
|
||||
def connection_made(self, transport):
|
||||
self.transport = transport
|
||||
assert self.state == 'INITIAL', self.state
|
||||
self.state = 'CONNECTED'
|
||||
if self.connected:
|
||||
self.connected.set_result(None)
|
||||
|
||||
def data_received(self, data):
|
||||
assert self.state == 'CONNECTED', self.state
|
||||
|
@ -330,7 +334,8 @@ class EventLoopTestsMixin:
|
|||
|
||||
def test_reader_callback(self):
|
||||
r, w = test_utils.socketpair()
|
||||
bytes_read = []
|
||||
r.setblocking(False)
|
||||
bytes_read = bytearray()
|
||||
|
||||
def reader():
|
||||
try:
|
||||
|
@ -340,37 +345,40 @@ class EventLoopTestsMixin:
|
|||
# at least on Linux -- see man select.
|
||||
return
|
||||
if data:
|
||||
bytes_read.append(data)
|
||||
bytes_read.extend(data)
|
||||
else:
|
||||
self.assertTrue(self.loop.remove_reader(r.fileno()))
|
||||
r.close()
|
||||
|
||||
self.loop.add_reader(r.fileno(), reader)
|
||||
self.loop.call_soon(w.send, b'abc')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
|
||||
self.loop.call_soon(w.send, b'def')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
|
||||
self.loop.call_soon(w.close)
|
||||
self.loop.call_soon(self.loop.stop)
|
||||
self.loop.run_forever()
|
||||
self.assertEqual(b''.join(bytes_read), b'abcdef')
|
||||
self.assertEqual(bytes_read, b'abcdef')
|
||||
|
||||
def test_writer_callback(self):
|
||||
r, w = test_utils.socketpair()
|
||||
w.setblocking(False)
|
||||
self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024))
|
||||
test_utils.run_briefly(self.loop)
|
||||
|
||||
def remove_writer():
|
||||
self.assertTrue(self.loop.remove_writer(w.fileno()))
|
||||
def writer(data):
|
||||
w.send(data)
|
||||
self.loop.stop()
|
||||
|
||||
self.loop.call_soon(remove_writer)
|
||||
self.loop.call_soon(self.loop.stop)
|
||||
data = b'x' * 1024
|
||||
self.loop.add_writer(w.fileno(), writer, data)
|
||||
self.loop.run_forever()
|
||||
|
||||
self.assertTrue(self.loop.remove_writer(w.fileno()))
|
||||
self.assertFalse(self.loop.remove_writer(w.fileno()))
|
||||
|
||||
w.close()
|
||||
data = r.recv(256*1024)
|
||||
read = r.recv(len(data) * 2)
|
||||
r.close()
|
||||
self.assertGreaterEqual(len(data), 200)
|
||||
self.assertEqual(read, data)
|
||||
|
||||
def _basetest_sock_client_ops(self, httpd, sock):
|
||||
sock.setblocking(False)
|
||||
|
@ -464,10 +472,10 @@ class EventLoopTestsMixin:
|
|||
self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
|
||||
# Now set a handler and handle it.
|
||||
self.loop.add_signal_handler(signal.SIGINT, my_handler)
|
||||
test_utils.run_briefly(self.loop)
|
||||
|
||||
os.kill(os.getpid(), signal.SIGINT)
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.assertEqual(caught, 1)
|
||||
test_utils.run_until(self.loop, lambda: caught)
|
||||
|
||||
# Removing it should restore the default handler.
|
||||
self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
|
||||
self.assertEqual(signal.getsignal(signal.SIGINT),
|
||||
|
@ -623,7 +631,7 @@ class EventLoopTestsMixin:
|
|||
self.assertIn(str(httpd.address), cm.exception.strerror)
|
||||
|
||||
def test_create_server(self):
|
||||
proto = MyProto()
|
||||
proto = MyProto(self.loop)
|
||||
f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
|
||||
server = self.loop.run_until_complete(f)
|
||||
self.assertEqual(len(server.sockets), 1)
|
||||
|
@ -633,14 +641,11 @@ class EventLoopTestsMixin:
|
|||
client = socket.socket()
|
||||
client.connect(('127.0.0.1', port))
|
||||
client.sendall(b'xxx')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: proto is not None, 10)
|
||||
self.assertIsInstance(proto, MyProto)
|
||||
self.assertEqual('INITIAL', proto.state)
|
||||
test_utils.run_briefly(self.loop)
|
||||
|
||||
self.loop.run_until_complete(proto.connected)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
|
||||
timeout=10)
|
||||
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
|
||||
self.assertEqual(3, proto.nbytes)
|
||||
|
||||
# extra info is available
|
||||
|
@ -650,7 +655,7 @@ class EventLoopTestsMixin:
|
|||
|
||||
# close connection
|
||||
proto.transport.close()
|
||||
test_utils.run_briefly(self.loop) # windows iocp
|
||||
self.loop.run_until_complete(proto.done)
|
||||
|
||||
self.assertEqual('CLOSED', proto.state)
|
||||
|
||||
|
@ -672,27 +677,22 @@ class EventLoopTestsMixin:
|
|||
|
||||
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
|
||||
def test_create_unix_server(self):
|
||||
proto = MyProto()
|
||||
proto = MyProto(loop=self.loop)
|
||||
server, path = self._make_unix_server(lambda: proto)
|
||||
self.assertEqual(len(server.sockets), 1)
|
||||
|
||||
client = socket.socket(socket.AF_UNIX)
|
||||
client.connect(path)
|
||||
client.sendall(b'xxx')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: proto is not None, 10)
|
||||
|
||||
self.assertIsInstance(proto, MyProto)
|
||||
self.assertEqual('INITIAL', proto.state)
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.loop.run_until_complete(proto.connected)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
|
||||
timeout=10)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
|
||||
self.assertEqual(3, proto.nbytes)
|
||||
|
||||
# close connection
|
||||
proto.transport.close()
|
||||
test_utils.run_briefly(self.loop) # windows iocp
|
||||
self.loop.run_until_complete(proto.done)
|
||||
|
||||
self.assertEqual('CLOSED', proto.state)
|
||||
|
||||
|
@ -735,12 +735,10 @@ class EventLoopTestsMixin:
|
|||
client, pr = self.loop.run_until_complete(f_c)
|
||||
|
||||
client.write(b'xxx')
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.assertIsInstance(proto, MyProto)
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.loop.run_until_complete(proto.connected)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
|
||||
timeout=10)
|
||||
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
|
||||
self.assertEqual(3, proto.nbytes)
|
||||
|
||||
# extra info is available
|
||||
|
@ -774,12 +772,9 @@ class EventLoopTestsMixin:
|
|||
client, pr = self.loop.run_until_complete(f_c)
|
||||
|
||||
client.write(b'xxx')
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.assertIsInstance(proto, MyProto)
|
||||
test_utils.run_briefly(self.loop)
|
||||
self.loop.run_until_complete(proto.connected)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
|
||||
timeout=10)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
|
||||
self.assertEqual(3, proto.nbytes)
|
||||
|
||||
# close connection
|
||||
|
@ -1044,15 +1039,9 @@ class EventLoopTestsMixin:
|
|||
|
||||
self.assertEqual('INITIALIZED', client.state)
|
||||
transport.sendto(b'xxx')
|
||||
for _ in range(1000):
|
||||
if server.nbytes:
|
||||
break
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: server.nbytes)
|
||||
self.assertEqual(3, server.nbytes)
|
||||
for _ in range(1000):
|
||||
if client.nbytes:
|
||||
break
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: client.nbytes)
|
||||
|
||||
# received
|
||||
self.assertEqual(8, client.nbytes)
|
||||
|
@ -1097,11 +1086,11 @@ class EventLoopTestsMixin:
|
|||
self.loop.run_until_complete(connect())
|
||||
|
||||
os.write(wpipe, b'1')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
|
||||
self.assertEqual(1, proto.nbytes)
|
||||
|
||||
os.write(wpipe, b'2345')
|
||||
test_utils.run_briefly(self.loop)
|
||||
test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
|
||||
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
|
||||
self.assertEqual(5, proto.nbytes)
|
||||
|
||||
|
@ -1166,14 +1155,19 @@ class EventLoopTestsMixin:
|
|||
self.assertEqual('CONNECTED', proto.state)
|
||||
|
||||
transport.write(b'1')
|
||||
test_utils.run_briefly(self.loop)
|
||||
data = os.read(rpipe, 1024)
|
||||
|
||||
data = bytearray()
|
||||
def reader(data):
|
||||
chunk = os.read(rpipe, 1024)
|
||||
data += chunk
|
||||
return len(data)
|
||||
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 1)
|
||||
self.assertEqual(b'1', data)
|
||||
|
||||
transport.write(b'2345')
|
||||
test_utils.run_briefly(self.loop)
|
||||
data = os.read(rpipe, 1024)
|
||||
self.assertEqual(b'2345', data)
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 5)
|
||||
self.assertEqual(b'12345', data)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
|
||||
os.close(rpipe)
|
||||
|
@ -1225,14 +1219,21 @@ class EventLoopTestsMixin:
|
|||
self.assertEqual('CONNECTED', proto.state)
|
||||
|
||||
transport.write(b'1')
|
||||
test_utils.run_briefly(self.loop)
|
||||
data = os.read(master, 1024)
|
||||
|
||||
data = bytearray()
|
||||
def reader(data):
|
||||
chunk = os.read(master, 1024)
|
||||
data += chunk
|
||||
return len(data)
|
||||
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 1,
|
||||
timeout=10)
|
||||
self.assertEqual(b'1', data)
|
||||
|
||||
transport.write(b'2345')
|
||||
test_utils.run_briefly(self.loop)
|
||||
data = os.read(master, 1024)
|
||||
self.assertEqual(b'2345', data)
|
||||
test_utils.run_until(self.loop, lambda: reader(data) >= 5,
|
||||
timeout=10)
|
||||
self.assertEqual(b'12345', data)
|
||||
self.assertEqual('CONNECTED', proto.state)
|
||||
|
||||
os.close(master)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue