gh-93852: Add test.support.create_unix_domain_name() (#93914)

test_asyncio, test_logging, test_socket and test_socketserver now
create AF_UNIX domains in the current directory to no longer fail
with OSError("AF_UNIX path too long") if the temporary directory (the
TMPDIR environment variable) is too long.

Modify the following tests to use create_unix_domain_name():

* test_asyncio
* test_logging
* test_socket
* test_socketserver

test_asyncio.utils: remove unused time import.
This commit is contained in:
Victor Stinner 2022-06-17 13:16:51 +02:00 committed by GitHub
parent ffc228dd4e
commit c5b750dc0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 82 additions and 86 deletions

View file

@ -315,11 +315,15 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
self.loop.run_until_complete(coro)
def test_create_unix_server_existing_path_nonsock(self):
with tempfile.NamedTemporaryFile() as file:
coro = self.loop.create_unix_server(lambda: None, file.name)
with self.assertRaisesRegex(OSError,
'Address.*is already in use'):
self.loop.run_until_complete(coro)
path = test_utils.gen_unix_socket_path()
self.addCleanup(os_helper.unlink, path)
# create the file
open(path, "wb").close()
coro = self.loop.create_unix_server(lambda: None, path)
with self.assertRaisesRegex(OSError,
'Address.*is already in use'):
self.loop.run_until_complete(coro)
def test_create_unix_server_ssl_bool(self):
coro = self.loop.create_unix_server(lambda: None, path='spam',
@ -356,20 +360,18 @@ class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
'no socket.SOCK_NONBLOCK (linux only)')
@socket_helper.skip_unless_bind_unix_socket
def test_create_unix_server_path_stream_bittype(self):
sock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
with tempfile.NamedTemporaryFile() as file:
fn = file.name
try:
with sock:
sock.bind(fn)
coro = self.loop.create_unix_server(lambda: None, path=None,
sock=sock)
srv = self.loop.run_until_complete(coro)
srv.close()
self.loop.run_until_complete(srv.wait_closed())
finally:
os.unlink(fn)
fn = test_utils.gen_unix_socket_path()
self.addCleanup(os_helper.unlink, fn)
sock = socket.socket(socket.AF_UNIX,
socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
with sock:
sock.bind(fn)
coro = self.loop.create_unix_server(lambda: None, path=None,
sock=sock)
srv = self.loop.run_until_complete(coro)
srv.close()
self.loop.run_until_complete(srv.wait_closed())
def test_create_unix_server_ssl_timeout_with_plain_sock(self):
coro = self.loop.create_unix_server(lambda: None, path='spam',