mirror of
https://github.com/python/cpython.git
synced 2025-08-30 05:35:08 +00:00
asyncio: Synchronize with Tulip
* Issue #159: Fix windows_utils.socketpair() - Use "127.0.0.1" (IPv4) or "::1" (IPv6) host instead of "localhost", because "localhost" may be a different IP address - Reject also invalid arguments: only AF_INET/AF_INET6 with SOCK_STREAM (and proto=0) are supported * Reject add/remove reader/writer when event loop is closed. * Fix ResourceWarning warnings
This commit is contained in:
parent
c5cc5011ac
commit
eeeebcd816
4 changed files with 73 additions and 6 deletions
|
@ -1,8 +1,10 @@
|
|||
"""Tests for window_utils"""
|
||||
|
||||
import socket
|
||||
import sys
|
||||
import test.support
|
||||
import unittest
|
||||
from test.support import IPV6_ENABLED
|
||||
from unittest import mock
|
||||
|
||||
if sys.platform != 'win32':
|
||||
|
@ -16,23 +18,40 @@ from asyncio import _overlapped
|
|||
|
||||
class WinsocketpairTests(unittest.TestCase):
|
||||
|
||||
def test_winsocketpair(self):
|
||||
ssock, csock = windows_utils.socketpair()
|
||||
|
||||
def check_winsocketpair(self, ssock, csock):
|
||||
csock.send(b'xxx')
|
||||
self.assertEqual(b'xxx', ssock.recv(1024))
|
||||
|
||||
csock.close()
|
||||
ssock.close()
|
||||
|
||||
def test_winsocketpair(self):
|
||||
ssock, csock = windows_utils.socketpair()
|
||||
self.check_winsocketpair(ssock, csock)
|
||||
|
||||
@unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
|
||||
def test_winsocketpair_ipv6(self):
|
||||
ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
|
||||
self.check_winsocketpair(ssock, csock)
|
||||
|
||||
@mock.patch('asyncio.windows_utils.socket')
|
||||
def test_winsocketpair_exc(self, m_socket):
|
||||
m_socket.AF_INET = socket.AF_INET
|
||||
m_socket.SOCK_STREAM = socket.SOCK_STREAM
|
||||
m_socket.socket.return_value.getsockname.return_value = ('', 12345)
|
||||
m_socket.socket.return_value.accept.return_value = object(), object()
|
||||
m_socket.socket.return_value.connect.side_effect = OSError()
|
||||
|
||||
self.assertRaises(OSError, windows_utils.socketpair)
|
||||
|
||||
def test_winsocketpair_invalid_args(self):
|
||||
self.assertRaises(ValueError,
|
||||
windows_utils.socketpair, family=socket.AF_UNSPEC)
|
||||
self.assertRaises(ValueError,
|
||||
windows_utils.socketpair, type=socket.SOCK_DGRAM)
|
||||
self.assertRaises(ValueError,
|
||||
windows_utils.socketpair, proto=1)
|
||||
|
||||
|
||||
|
||||
class PipeTests(unittest.TestCase):
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue