mirror of
https://github.com/python/cpython.git
synced 2025-08-31 05:58:33 +00:00
#19662: add decode_data to smtpd so you can get at DATA in bytes form.
Otherwise smtpd is restricted to 7bit clean data, since even if the incoming data is actually utf-8, it will often break things to decode it before parsing the message. Patch by Maciej Szulik, with some adjustments (mostly the warning support).
This commit is contained in:
parent
38ee9afb34
commit
554bcbf1b9
4 changed files with 185 additions and 18 deletions
|
@ -7,13 +7,18 @@ import asyncore
|
|||
|
||||
|
||||
class DummyServer(smtpd.SMTPServer):
|
||||
def __init__(self, localaddr, remoteaddr):
|
||||
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr)
|
||||
def __init__(self, localaddr, remoteaddr, decode_data=True):
|
||||
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr,
|
||||
decode_data=decode_data)
|
||||
self.messages = []
|
||||
if decode_data:
|
||||
self.return_status = 'return status'
|
||||
else:
|
||||
self.return_status = b'return status'
|
||||
|
||||
def process_message(self, peer, mailfrom, rcpttos, data):
|
||||
self.messages.append((peer, mailfrom, rcpttos, data))
|
||||
if data == 'return status':
|
||||
if data == self.return_status:
|
||||
return '250 Okish'
|
||||
|
||||
|
||||
|
@ -31,9 +36,9 @@ class SMTPDServerTest(unittest.TestCase):
|
|||
smtpd.socket = asyncore.socket = mock_socket
|
||||
|
||||
def test_process_message_unimplemented(self):
|
||||
server = smtpd.SMTPServer('a', 'b')
|
||||
server = smtpd.SMTPServer('a', 'b', decode_data=True)
|
||||
conn, addr = server.accept()
|
||||
channel = smtpd.SMTPChannel(server, conn, addr)
|
||||
channel = smtpd.SMTPChannel(server, conn, addr, decode_data=True)
|
||||
|
||||
def write_line(line):
|
||||
channel.socket.queue_recv(line)
|
||||
|
@ -45,6 +50,10 @@ class SMTPDServerTest(unittest.TestCase):
|
|||
write_line(b'DATA')
|
||||
self.assertRaises(NotImplementedError, write_line, b'spam\r\n.\r\n')
|
||||
|
||||
def test_decode_data_default_warns(self):
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
smtpd.SMTPServer('a', 'b')
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
asyncore.socket = smtpd.socket = socket
|
||||
|
@ -57,7 +66,8 @@ class SMTPDChannelTest(unittest.TestCase):
|
|||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b')
|
||||
conn, addr = self.server.accept()
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr)
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
decode_data=True)
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
|
@ -502,6 +512,12 @@ class SMTPDChannelTest(unittest.TestCase):
|
|||
with support.check_warnings(('', DeprecationWarning)):
|
||||
self.channel._SMTPChannel__addr = 'spam'
|
||||
|
||||
def test_decode_data_default_warning(self):
|
||||
server = DummyServer('a', 'b')
|
||||
conn, addr = self.server.accept()
|
||||
with self.assertWarns(DeprecationWarning):
|
||||
smtpd.SMTPChannel(server, conn, addr)
|
||||
|
||||
|
||||
class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
|
||||
|
||||
|
@ -512,7 +528,8 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
|
|||
self.server = DummyServer('a', 'b')
|
||||
conn, addr = self.server.accept()
|
||||
# Set DATA size limit to 32 bytes for easy testing
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32)
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr, 32,
|
||||
decode_data=True)
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
|
@ -553,5 +570,92 @@ class SMTPDChannelWithDataSizeLimitTest(unittest.TestCase):
|
|||
b'552 Error: Too much mail data\r\n')
|
||||
|
||||
|
||||
class SMTPDChannelWithDecodeDataFalse(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b', decode_data=False)
|
||||
conn, addr = self.server.accept()
|
||||
# Set decode_data to False
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
decode_data=False)
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
asyncore.socket = smtpd.socket = socket
|
||||
smtpd.DEBUGSTREAM = self.old_debugstream
|
||||
|
||||
def write_line(self, line):
|
||||
self.channel.socket.queue_recv(line)
|
||||
self.channel.handle_read()
|
||||
|
||||
def test_ascii_data(self):
|
||||
self.write_line(b'HELO example')
|
||||
self.write_line(b'MAIL From:eggs@example')
|
||||
self.write_line(b'RCPT To:spam@example')
|
||||
self.write_line(b'DATA')
|
||||
self.write_line(b'plain ascii text')
|
||||
self.write_line(b'.')
|
||||
self.assertEqual(self.channel.received_data, b'plain ascii text')
|
||||
|
||||
def test_utf8_data(self):
|
||||
self.write_line(b'HELO example')
|
||||
self.write_line(b'MAIL From:eggs@example')
|
||||
self.write_line(b'RCPT To:spam@example')
|
||||
self.write_line(b'DATA')
|
||||
self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
|
||||
self.write_line(b'and some plain ascii')
|
||||
self.write_line(b'.')
|
||||
self.assertEqual(
|
||||
self.channel.received_data,
|
||||
b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87\n'
|
||||
b'and some plain ascii')
|
||||
|
||||
|
||||
class SMTPDChannelWithDecodeDataTrue(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
smtpd.socket = asyncore.socket = mock_socket
|
||||
self.old_debugstream = smtpd.DEBUGSTREAM
|
||||
self.debug = smtpd.DEBUGSTREAM = io.StringIO()
|
||||
self.server = DummyServer('a', 'b')
|
||||
conn, addr = self.server.accept()
|
||||
# Set decode_data to True
|
||||
self.channel = smtpd.SMTPChannel(self.server, conn, addr,
|
||||
decode_data=True)
|
||||
|
||||
def tearDown(self):
|
||||
asyncore.close_all()
|
||||
asyncore.socket = smtpd.socket = socket
|
||||
smtpd.DEBUGSTREAM = self.old_debugstream
|
||||
|
||||
def write_line(self, line):
|
||||
self.channel.socket.queue_recv(line)
|
||||
self.channel.handle_read()
|
||||
|
||||
def test_ascii_data(self):
|
||||
self.write_line(b'HELO example')
|
||||
self.write_line(b'MAIL From:eggs@example')
|
||||
self.write_line(b'RCPT To:spam@example')
|
||||
self.write_line(b'DATA')
|
||||
self.write_line(b'plain ascii text')
|
||||
self.write_line(b'.')
|
||||
self.assertEqual(self.channel.received_data, 'plain ascii text')
|
||||
|
||||
def test_utf8_data(self):
|
||||
self.write_line(b'HELO example')
|
||||
self.write_line(b'MAIL From:eggs@example')
|
||||
self.write_line(b'RCPT To:spam@example')
|
||||
self.write_line(b'DATA')
|
||||
self.write_line(b'utf8 enriched text: \xc5\xbc\xc5\xba\xc4\x87')
|
||||
self.write_line(b'and some plain ascii')
|
||||
self.write_line(b'.')
|
||||
self.assertEqual(
|
||||
self.channel.received_data,
|
||||
'utf8 enriched text: żźć\nand some plain ascii')
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue