mirror of
https://github.com/python/cpython.git
synced 2025-12-04 00:30:19 +00:00
add tests for asyncio transport sockets (#100263)
This commit is contained in:
parent
3da71ff0f2
commit
5369bba8c5
1 changed files with 23 additions and 0 deletions
|
|
@ -823,6 +823,29 @@ class EventLoopTestsMixin:
|
|||
# close server
|
||||
server.close()
|
||||
|
||||
def test_create_server_trsock(self):
|
||||
proto = MyProto(self.loop)
|
||||
f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
|
||||
server = self.loop.run_until_complete(f)
|
||||
self.assertEqual(len(server.sockets), 1)
|
||||
sock = server.sockets[0]
|
||||
self.assertIsInstance(sock, asyncio.trsock.TransportSocket)
|
||||
host, port = sock.getsockname()
|
||||
self.assertEqual(host, '0.0.0.0')
|
||||
dup = sock.dup()
|
||||
self.addCleanup(dup.close)
|
||||
self.assertIsInstance(dup, socket.socket)
|
||||
self.assertFalse(sock.get_inheritable())
|
||||
with self.assertRaises(ValueError):
|
||||
sock.settimeout(1)
|
||||
sock.settimeout(0)
|
||||
self.assertEqual(sock.gettimeout(), 0)
|
||||
with self.assertRaises(ValueError):
|
||||
sock.setblocking(True)
|
||||
sock.setblocking(False)
|
||||
server.close()
|
||||
|
||||
|
||||
@unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
|
||||
def test_create_server_reuse_port(self):
|
||||
proto = MyProto(self.loop)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue