mirror of
https://github.com/python/cpython.git
synced 2025-12-04 00:30:19 +00:00
parent
1144da5821
commit
6fe56a329d
5 changed files with 58 additions and 15 deletions
|
|
@ -36,7 +36,8 @@ class SMTPDServerTest(unittest.TestCase):
|
|||
smtpd.socket = asyncore.socket = mock_socket
|
||||
|
||||
def test_process_message_unimplemented(self):
|
||||
server = smtpd.SMTPServer('a', 'b', decode_data=True)
|
||||
server = smtpd.SMTPServer((support.HOST, 0), ('b', 0),
|
||||
decode_data=True)
|
||||
conn, addr = server.accept()
|
||||
channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
|
||||
|
||||
|
|
@ -52,19 +53,39 @@ class SMTPDServerTest(unittest.TestCase):
|
|||
|
||||
def test_decode_data_default_warns(self):
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
smtpd.SMTPServer('a', 'b')
|
||||
smtpd.SMTPServer((support.HOST, 0), ('b', 0))
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
asyncore.socket = smtpd.socket = socket
|
||||
|
||||
|
||||
class TestFamilyDetection(unittest.TestCase):
|
||||
def setUp(self):
|
||||
smtpd.socket = asyncore.socket = mock_socket
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
asyncore.socket = smtpd.socket = socket
|
||||
|
||||
@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
|
||||
def test_socket_uses_IPv6(self):
|
||||
server = smtpd.SMTPServer((support.HOSTv6, 0), (support.HOST, 0),
|
||||
decode_data=False)
|
||||
self.assertEqual(server.socket.family, socket.AF_INET6)
|
||||
|
||||
def test_socket_uses_IPv4(self):
|
||||
server = smtpd.SMTPServer((support.HOST, 0), (support.HOSTv6, 0),
|
||||
decode_data=False)
|
||||
self.assertEqual(server.socket.family, socket.AF_INET)
|
||||
|
||||
|
||||
class SMTPDChannelTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b')
|
||||
self.server = DummyServer((support.HOST, 0), ('b', 0))
|
||||
conn, addr = self.server.accept()
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
decode_data=True)
|
||||
|
|
@ -79,7 +100,9 @@ class SMTPDChannelTest(unittest.TestCase):
|
|||
self.channel.handle_read()
|
||||
|
||||
def test_broken_connect(self):
|
||||
self.assertRaises(DummyDispatcherBroken, BrokenDummyServer, 'a', 'b')
|
||||
self.assertRaises(
|
||||
DummyDispatcherBroken, BrokenDummyServer,
|
||||
(support.HOST, 0), ('b', 0))
|
||||
|
||||
def test_server_accept(self):
|
||||
self.server.handle_accept()
|
||||
|
|
@ -513,11 +536,21 @@ class SMTPDChannelTest(unittest.TestCase):
|
|||
self.channel._SMTPChannel__addr = 'spam'
|
||||
|
||||
def test_decode_data_default_warning(self):
|
||||
server = DummyServer('a', 'b')
|
||||
server = DummyServer((support.HOST, 0), ('b', 0))
|
||||
conn, addr = self.server.accept()
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
smtpd.SMTPChannel(server, conn, addr)
|
||||
|
||||
@unittest.skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
|
||||
class SMTPDChannelIPv6Test(SMTPDChannelTest):
|
||||
def setUp(self):
|
||||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer((support.HOSTv6, 0), ('b', 0))
|
||||
conn, addr = self.server.accept()
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
decode_data=True)
|
||||
|
||||
class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
|
||||
|
||||
|
|
@ -525,7 +558,7 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
|
|||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b')
|
||||
self.server = DummyServer((support.HOST, 0), ('b', 0))
|
||||
conn, addr = self.server.accept()
|
||||
# Set DATA size limit to 32 bytes for easy testing
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
|
||||
|
|
@ -576,7 +609,8 @@ class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
|
|||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b', decode_data=False)
|
||||
self.server = DummyServer((support.HOST, 0), ('b', 0),
|
||||
decode_data=False)
|
||||
conn, addr = self.server.accept()
|
||||
# Set decode_data to False
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
|
|
@ -620,7 +654,8 @@ class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
|
|||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b')
|
||||
self.server = DummyServer((support.HOST, 0), ('b', 0),
|
||||
decode_data=True)
|
||||
conn, addr = self.server.accept()
|
||||
# Set decode_data to True
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue