#22027: Add RFC6531 support to smtplib.

Initial patch by Milan Oberkirch.
This commit is contained in:
R David Murray 2015-05-16 13:58:14 -04:00
parent b907a513c8
commit cee7cf6026
5 changed files with 194 additions and 7 deletions

View file

@ -977,6 +977,125 @@ class SMTPSimTests(unittest.TestCase):
self.assertIsNone(smtp.sock)
self.assertEqual(self.serv._SMTPchannel.rcpt_count, 0)
def test_smtputf8_NotSupportedError_if_no_server_support(self):
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
smtp.ehlo()
self.assertTrue(smtp.does_esmtp)
self.assertFalse(smtp.has_extn('smtputf8'))
self.assertRaises(
smtplib.SMTPNotSupportedError,
smtp.sendmail,
'John', 'Sally', '', mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
self.assertRaises(
smtplib.SMTPNotSupportedError,
smtp.mail, 'John', options=['BODY=8BITMIME', 'SMTPUTF8'])
def test_send_unicode_without_SMTPUTF8(self):
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
class SimSMTPUTF8Server(SimSMTPServer):
def __init__(self, *args, **kw):
# The base SMTP server turns these on automatically, but our test
# server is set up to munge the EHLO response, so we need to provide
# them as well. And yes, the call is to SMTPServer not SimSMTPServer.
self._extra_features = ['SMTPUTF8', '8BITMIME']
smtpd.SMTPServer.__init__(self, *args, **kw)
def handle_accepted(self, conn, addr):
self._SMTPchannel = self.channel_class(
self._extra_features, self, conn, addr,
decode_data=self._decode_data,
enable_SMTPUTF8=self.enable_SMTPUTF8,
)
def process_message(self, peer, mailfrom, rcpttos, data, mail_options=None,
rcpt_options=None):
self.last_peer = peer
self.last_mailfrom = mailfrom
self.last_rcpttos = rcpttos
self.last_message = data
self.last_mail_options = mail_options
self.last_rcpt_options = rcpt_options
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPUTF8SimTests(unittest.TestCase):
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
# Pick a random unused port by passing 0 for the port number
self.serv = SimSMTPUTF8Server((HOST, 0), ('nowhere', -1),
decode_data=False,
enable_SMTPUTF8=True)
# Keep a note of what port was assigned
self.port = self.serv.socket.getsockname()[1]
serv_args = (self.serv, self.serv_evt, self.client_evt)
self.thread = threading.Thread(target=debugging_server, args=serv_args)
self.thread.start()
# wait until server thread has assigned a port number
self.serv_evt.wait()
self.serv_evt.clear()
def tearDown(self):
socket.getfqdn = self.real_getfqdn
# indicate that the client is finished
self.client_evt.set()
# wait for the server thread to terminate
self.serv_evt.wait()
self.thread.join()
def test_test_server_supports_extensions(self):
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
smtp.ehlo()
self.assertTrue(smtp.does_esmtp)
self.assertTrue(smtp.has_extn('smtputf8'))
def test_send_unicode_with_SMTPUTF8_via_sendmail(self):
m = '¡a test message containing unicode!'.encode('utf-8')
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
smtp.sendmail('Jőhn', 'Sálly', m,
mail_options=['BODY=8BITMIME', 'SMTPUTF8'])
self.assertEqual(self.serv.last_mailfrom, 'Jőhn')
self.assertEqual(self.serv.last_rcpttos, ['Sálly'])
self.assertEqual(self.serv.last_message, m)
self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
self.assertIn('SMTPUTF8', self.serv.last_mail_options)
self.assertEqual(self.serv.last_rcpt_options, [])
def test_send_unicode_with_SMTPUTF8_via_low_level_API(self):
m = '¡a test message containing unicode!'.encode('utf-8')
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
smtp.ehlo()
self.assertEqual(
smtp.mail('', options=['BODY=8BITMIME', 'SMTPUTF8']),
(250, b'OK'))
self.assertEqual(smtp.rcpt('János'), (250, b'OK'))
self.assertEqual(smtp.data(m), (250, b'OK'))
self.assertEqual(self.serv.last_mailfrom, '')
self.assertEqual(self.serv.last_rcpttos, ['János'])
self.assertEqual(self.serv.last_message, m)
self.assertIn('BODY=8BITMIME', self.serv.last_mail_options)
self.assertIn('SMTPUTF8', self.serv.last_mail_options)
self.assertEqual(self.serv.last_rcpt_options, [])
@support.reap_threads
def test_main(verbose=None):