mirror of
https://github.com/python/cpython.git
synced 2025-08-28 04:35:02 +00:00
- Issue #15014: SMTP.auth() and SMTP.login() now support RFC 4954's optional
initial-response argument to the SMTP AUTH command.
This commit is contained in:
parent
b85b427507
commit
c5ea754e48
4 changed files with 153 additions and 39 deletions
|
@ -1,6 +1,7 @@
|
|||
import asyncore
|
||||
import email.mime.text
|
||||
from email.message import EmailMessage
|
||||
from email.base64mime import body_encode as encode_base64
|
||||
import email.utils
|
||||
import socket
|
||||
import smtpd
|
||||
|
@ -814,11 +815,11 @@ class SMTPSimTests(unittest.TestCase):
|
|||
def testVRFY(self):
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
|
||||
for email, name in sim_users.items():
|
||||
for addr_spec, name in sim_users.items():
|
||||
expected_known = (250, bytes('%s %s' %
|
||||
(name, smtplib.quoteaddr(email)),
|
||||
(name, smtplib.quoteaddr(addr_spec)),
|
||||
"ascii"))
|
||||
self.assertEqual(smtp.vrfy(email), expected_known)
|
||||
self.assertEqual(smtp.vrfy(addr_spec), expected_known)
|
||||
|
||||
u = 'nobody@nowhere.com'
|
||||
expected_unknown = (550, ('No such user: %s' % u).encode('ascii'))
|
||||
|
@ -851,7 +852,7 @@ class SMTPSimTests(unittest.TestCase):
|
|||
def testAUTH_PLAIN(self):
|
||||
self.serv.add_feature("AUTH PLAIN")
|
||||
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
|
||||
try: smtp.login(sim_auth[0], sim_auth[1])
|
||||
try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_plain, str(err))
|
||||
smtp.close()
|
||||
|
@ -892,7 +893,7 @@ class SMTPSimTests(unittest.TestCase):
|
|||
'LOGIN': smtp.auth_login,
|
||||
}
|
||||
for mechanism, method in supported.items():
|
||||
try: smtp.auth(mechanism, method)
|
||||
try: smtp.auth(mechanism, method, initial_response_ok=False)
|
||||
except smtplib.SMTPAuthenticationError as err:
|
||||
self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
|
||||
str(err))
|
||||
|
@ -1142,12 +1143,85 @@ class SMTPUTF8SimTests(unittest.TestCase):
|
|||
smtp.send_message(msg))
|
||||
|
||||
|
||||
EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
|
||||
|
||||
class SimSMTPAUTHInitialResponseChannel(SimSMTPChannel):
|
||||
def smtp_AUTH(self, arg):
|
||||
# RFC 4954's AUTH command allows for an optional initial-response.
|
||||
# Not all AUTH methods support this; some require a challenge. AUTH
|
||||
# PLAIN does those, so test that here. See issue #15014.
|
||||
args = arg.split()
|
||||
if args[0].lower() == 'plain':
|
||||
if len(args) == 2:
|
||||
# AUTH PLAIN <initial-response> with the response base 64
|
||||
# encoded. Hard code the expected response for the test.
|
||||
if args[1] == EXPECTED_RESPONSE:
|
||||
self.push('235 Ok')
|
||||
return
|
||||
self.push('571 Bad authentication')
|
||||
|
||||
class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
|
||||
channel_class = SimSMTPAUTHInitialResponseChannel
|
||||
|
||||
|
||||
@unittest.skipUnless(threading, 'Threading required for this test.')
|
||||
class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.real_getfqdn = socket.getfqdn
|
||||
socket.getfqdn = mock_socket.getfqdn
|
||||
self.serv_evt = threading.Event()
|
||||
self.client_evt = threading.Event()
|
||||
# Pick a random unused port by passing 0 for the port number
|
||||
self.serv = SimSMTPAUTHInitialResponseServer(
|
||||
(HOST, 0), ('nowhere', -1), decode_data=True)
|
||||
# Keep a note of what port was assigned
|
||||
self.port = self.serv.socket.getsockname()[1]
|
||||
serv_args = (self.serv, self.serv_evt, self.client_evt)
|
||||
self.thread = threading.Thread(target=debugging_server, args=serv_args)
|
||||
self.thread.start()
|
||||
|
||||
# wait until server thread has assigned a port number
|
||||
self.serv_evt.wait()
|
||||
self.serv_evt.clear()
|
||||
|
||||
def tearDown(self):
|
||||
socket.getfqdn = self.real_getfqdn
|
||||
# indicate that the client is finished
|
||||
self.client_evt.set()
|
||||
# wait for the server thread to terminate
|
||||
self.serv_evt.wait()
|
||||
self.thread.join()
|
||||
|
||||
def testAUTH_PLAIN_initial_response_login(self):
|
||||
self.serv.add_feature('AUTH PLAIN')
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
local_hostname='localhost', timeout=15)
|
||||
smtp.login('psu', 'doesnotexist')
|
||||
smtp.close()
|
||||
|
||||
def testAUTH_PLAIN_initial_response_auth(self):
|
||||
self.serv.add_feature('AUTH PLAIN')
|
||||
smtp = smtplib.SMTP(HOST, self.port,
|
||||
local_hostname='localhost', timeout=15)
|
||||
smtp.user = 'psu'
|
||||
smtp.password = 'doesnotexist'
|
||||
code, response = smtp.auth('plain', smtp.auth_plain)
|
||||
smtp.close()
|
||||
self.assertEqual(code, 235)
|
||||
|
||||
|
||||
@support.reap_threads
|
||||
def test_main(verbose=None):
|
||||
support.run_unittest(GeneralTests, DebuggingServerTests,
|
||||
NonConnectingTests,
|
||||
BadHELOServerTests, SMTPSimTests,
|
||||
TooLongLineTests)
|
||||
support.run_unittest(
|
||||
BadHELOServerTests,
|
||||
DebuggingServerTests,
|
||||
GeneralTests,
|
||||
NonConnectingTests,
|
||||
SMTPAUTHInitialResponseSimTests,
|
||||
SMTPSimTests,
|
||||
TooLongLineTests,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue