gh-97983: Revert "Lay the foundation for further work in asyncio.test_streams: port server cases to IsolatedAsyncioTestCase" (#98015)

This PR reverts gh-93369 and gh-97896 because they've made asyncio tests unstable. After these PRs were merged, random GitHub action jobs of random commits started to fail unrelated tests and test framework methods.

The reverting is necessary because such shrapnel failures are a symptom of some underlying bug that must be found and fixed first.

I had a hope that it's a server overload because we already have extremely rare disc access errors. However, one and a half day passed, and the failures continue to emerge both in PRs and commits.

Affected issue: gh-93357.
First reported in https://github.com/python/cpython/pull/97940#issuecomment-1270004134.

* Revert "gh-93357: Port test cases to IsolatedAsyncioTestCase, part 2 (#97896)"

This reverts commit 09aea94d29.

* Revert "gh-93357: Start porting asyncio server test cases to IsolatedAsyncioTestCase (#93369)"

This reverts commit ce8fc186ac.
This commit is contained in:
Oleg Iarygin 2022-10-07 18:14:28 +04:00 committed by GitHub
parent 6592a62ec2
commit f99bb20cde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -566,10 +566,46 @@ class StreamTests(test_utils.TestCase):
test_utils.run_briefly(self.loop) test_utils.run_briefly(self.loop)
self.assertIs(stream._waiter, None) self.assertIs(stream._waiter, None)
def test_start_server(self):
class NewStreamTests(unittest.IsolatedAsyncioTestCase): class MyServer:
async def test_start_server(self): def __init__(self, loop):
self.server = None
self.loop = loop
async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()
def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()
def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))
def start_callback(self):
sock = socket.create_server(('127.0.0.1', 0))
addr = sock.getsockname()
sock.close()
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client_callback,
host=addr[0], port=addr[1]))
return addr
def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None
async def client(addr): async def client(addr):
reader, writer = await asyncio.open_connection(*addr) reader, writer = await asyncio.open_connection(*addr)
@ -581,43 +617,61 @@ class NewStreamTests(unittest.IsolatedAsyncioTestCase):
await writer.wait_closed() await writer.wait_closed()
return msgback return msgback
async def handle_client(client_reader, client_writer): messages = []
data = await client_reader.readline() self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()
with self.subTest(msg="coroutine"): # test the server variant with a coroutine as client handler
server = await asyncio.start_server( server = MyServer(self.loop)
handle_client, addr = server.start()
host=socket_helper.HOSTv4 msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
) server.stop()
addr = server.sockets[0].getsockname() self.assertEqual(msg, b"hello world!\n")
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
with self.subTest(msg="callback"): # test the server variant with a callback as client handler
async def handle_client_callback(client_reader, client_writer): server = MyServer(self.loop)
asyncio.get_running_loop().create_task( addr = server.start_callback()
handle_client(client_reader, client_writer) msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
) server.stop()
self.assertEqual(msg, b"hello world!\n")
server = await asyncio.start_server( self.assertEqual(messages, [])
handle_client_callback,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
reader, writer = await asyncio.open_connection(*addr)
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
@socket_helper.skip_unless_bind_unix_socket @socket_helper.skip_unless_bind_unix_socket
async def test_start_unix_server(self): def test_start_unix_server(self):
class MyServer:
def __init__(self, loop, path):
self.server = None
self.loop = loop
self.path = path
async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()
def start(self):
self.server = self.loop.run_until_complete(
asyncio.start_unix_server(self.handle_client,
path=self.path))
def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))
def start_callback(self):
start = asyncio.start_unix_server(self.handle_client_callback,
path=self.path)
self.server = self.loop.run_until_complete(start)
def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None
async def client(path): async def client(path):
reader, writer = await asyncio.open_unix_connection(path) reader, writer = await asyncio.open_unix_connection(path)
@ -629,42 +683,64 @@ class NewStreamTests(unittest.IsolatedAsyncioTestCase):
await writer.wait_closed() await writer.wait_closed()
return msgback return msgback
async def handle_client(client_reader, client_writer): messages = []
data = await client_reader.readline() self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()
with self.subTest(msg="coroutine"): # test the server variant with a coroutine as client handler
with test_utils.unix_socket_path() as path: with test_utils.unix_socket_path() as path:
server = await asyncio.start_unix_server( server = MyServer(self.loop, path)
handle_client, server.start()
path=path msg = self.loop.run_until_complete(
) self.loop.create_task(client(path)))
msg = await client(path) server.stop()
server.close() self.assertEqual(msg, b"hello world!\n")
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
with self.subTest(msg="callback"): # test the server variant with a callback as client handler
async def handle_client_callback(client_reader, client_writer): with test_utils.unix_socket_path() as path:
asyncio.get_running_loop().create_task( server = MyServer(self.loop, path)
handle_client(client_reader, client_writer) server.start_callback()
) msg = self.loop.run_until_complete(
self.loop.create_task(client(path)))
server.stop()
self.assertEqual(msg, b"hello world!\n")
with test_utils.unix_socket_path() as path: self.assertEqual(messages, [])
server = await asyncio.start_unix_server(
handle_client_callback,
path=path
)
msg = await client(path)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
@unittest.skipIf(ssl is None, 'No ssl module') @unittest.skipIf(ssl is None, 'No ssl module')
async def test_start_tls(self): def test_start_tls(self):
class MyServer:
def __init__(self, loop):
self.server = None
self.loop = loop
async def handle_client(self, client_reader, client_writer):
data1 = await client_reader.readline()
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None
data2 = await client_reader.readline()
client_writer.write(data2)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()
def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()
def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None
async def client(addr): async def client(addr):
reader, writer = await asyncio.open_connection(*addr) reader, writer = await asyncio.open_connection(*addr)
@ -681,49 +757,18 @@ class NewStreamTests(unittest.IsolatedAsyncioTestCase):
await writer.wait_closed() await writer.wait_closed()
return msgback1, msgback2 return msgback1, msgback2
async def handle_client(client_reader, client_writer): messages = []
data1 = await client_reader.readline() self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None
data2 = await client_reader.readline() server = MyServer(self.loop)
client_writer.write(data2) addr = server.start()
await client_writer.drain() msg1, msg2 = self.loop.run_until_complete(client(addr))
client_writer.close() server.stop()
await client_writer.wait_closed()
server = await asyncio.start_server( self.assertEqual(messages, [])
handle_client,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
msg1, msg2 = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg1, b"hello world 1!\n") self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n") self.assertEqual(msg2, b"hello world 2!\n")
class StreamTests2(test_utils.TestCase):
def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
def tearDown(self):
# just in case if we have transport close callbacks
test_utils.run_briefly(self.loop)
self.loop.close()
gc.collect()
super().tearDown()
@unittest.skipIf(sys.platform == 'win32', "Don't have pipes") @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self): def test_read_all_from_pipe_reader(self):
# See asyncio issue 168. This test is derived from the example # See asyncio issue 168. This test is derived from the example
@ -941,32 +986,36 @@ os.close(fd)
self.assertEqual(str(e), str(e2)) self.assertEqual(str(e), str(e2))
self.assertEqual(e.consumed, e2.consumed) self.assertEqual(e.consumed, e2.consumed)
class NewStreamTests2(unittest.IsolatedAsyncioTestCase): def test_wait_closed_on_close(self):
async def test_wait_closed_on_close(self):
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
rd, wr = await asyncio.open_connection(*httpd.address) rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))
wr.write(b'GET / HTTP/1.0\r\n\r\n') wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline() f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read() f = rd.read()
data = self.loop.run_until_complete(f)
self.assertTrue(data.endswith(b'\r\n\r\nTest message')) self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
self.assertFalse(wr.is_closing()) self.assertFalse(wr.is_closing())
wr.close() wr.close()
self.assertTrue(wr.is_closing()) self.assertTrue(wr.is_closing())
await wr.wait_closed() self.loop.run_until_complete(wr.wait_closed())
async def test_wait_closed_on_close_with_unread_data(self): def test_wait_closed_on_close_with_unread_data(self):
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
rd, wr = await asyncio.open_connection(*httpd.address) rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))
wr.write(b'GET / HTTP/1.0\r\n\r\n') wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline() f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
wr.close() wr.close()
await wr.wait_closed() self.loop.run_until_complete(wr.wait_closed())
async def test_async_writer_api(self): def test_async_writer_api(self):
async def inner(httpd): async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address) rd, wr = await asyncio.open_connection(*httpd.address)
@ -978,10 +1027,15 @@ class NewStreamTests2(unittest.IsolatedAsyncioTestCase):
wr.close() wr.close()
await wr.wait_closed() await wr.wait_closed()
with test_utils.run_test_server() as httpd: messages = []
await inner(httpd) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
async def test_async_writer_api_exception_after_close(self): with test_utils.run_test_server() as httpd:
self.loop.run_until_complete(inner(httpd))
self.assertEqual(messages, [])
def test_async_writer_api_exception_after_close(self):
async def inner(httpd): async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address) rd, wr = await asyncio.open_connection(*httpd.address)
@ -995,19 +1049,33 @@ class NewStreamTests2(unittest.IsolatedAsyncioTestCase):
wr.write(b'data') wr.write(b'data')
await wr.drain() await wr.drain()
with test_utils.run_test_server() as httpd: messages = []
await inner(httpd) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
async def test_eof_feed_when_closing_writer(self):
# See http://bugs.python.org/issue35065
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
rd, wr = await asyncio.open_connection(*httpd.address) self.loop.run_until_complete(inner(httpd))
self.assertEqual(messages, [])
def test_eof_feed_when_closing_writer(self):
# See http://bugs.python.org/issue35065
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))
wr.close() wr.close()
await wr.wait_closed() f = wr.wait_closed()
self.loop.run_until_complete(f)
self.assertTrue(rd.at_eof()) self.assertTrue(rd.at_eof())
data = await rd.read() f = rd.read()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'') self.assertEqual(data, b'')
self.assertEqual(messages, [])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()