mirror of
https://github.com/python/cpython.git
synced 2025-11-01 18:51:43 +00:00
Fix Issue #8797: Raise HTTPError on failed Basic Authentication immediately. Initial patch by Sam Bull.
This commit is contained in:
parent
f819ef74da
commit
b2e3a939bf
3 changed files with 88 additions and 18 deletions
|
|
@ -1,3 +1,4 @@
|
|||
import base64
|
||||
import os
|
||||
import email
|
||||
import urllib.parse
|
||||
|
|
@ -197,6 +198,48 @@ class DigestAuthHandler:
|
|||
return self._return_auth_challenge(request_handler)
|
||||
return True
|
||||
|
||||
|
||||
class BasicAuthHandler(http.server.SimpleHTTPRequestHandler):
|
||||
"""Handler for performing basic authentication."""
|
||||
# Server side values
|
||||
USER = 'testUser'
|
||||
PASSWD = 'testPass'
|
||||
REALM = 'Test'
|
||||
USER_PASSWD = "%s:%s" % (USER, PASSWD)
|
||||
ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
http.server.SimpleHTTPRequestHandler.__init__(self, *args, **kwargs)
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Suppress console log message
|
||||
pass
|
||||
|
||||
def do_HEAD(self):
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
|
||||
def do_AUTHHEAD(self):
|
||||
self.send_response(401)
|
||||
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
|
||||
def do_GET(self):
|
||||
if not self.headers.get("Authorization", ""):
|
||||
self.do_AUTHHEAD()
|
||||
self.wfile.write(b"No Auth header received")
|
||||
elif self.headers.get(
|
||||
"Authorization", "") == "Basic " + self.ENCODED_AUTH:
|
||||
http.server.SimpleHTTPRequestHandler.do_GET(self)
|
||||
else:
|
||||
self.do_AUTHHEAD()
|
||||
self.wfile.write(
|
||||
bytes(self.headers.get("Authorization", ""), "ascii"))
|
||||
self.wfile.write(b"Not Authenticated")
|
||||
|
||||
|
||||
# Proxy test infrastructure
|
||||
|
||||
class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
|
||||
|
|
@ -232,6 +275,45 @@ class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
|
|||
|
||||
# Test cases
|
||||
|
||||
@unittest.skipUnless(threading, "Threading required for this test.")
|
||||
class BasicAuthTests(unittest.TestCase):
|
||||
USER = "testUser"
|
||||
PASSWD = "testPass"
|
||||
INCORRECT_PASSWD = "Incorrect"
|
||||
REALM = "Test"
|
||||
|
||||
def setUp(self):
|
||||
super(BasicAuthTests, self).setUp()
|
||||
# With Basic Authentication
|
||||
def http_server_with_basic_auth_handler(*args, **kwargs):
|
||||
return BasicAuthHandler(*args, **kwargs)
|
||||
self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
|
||||
self.server_url = 'http://127.0.0.1:%s' % self.server.port
|
||||
self.server.start()
|
||||
self.server.ready.wait()
|
||||
|
||||
def tearDown(self):
|
||||
self.server.stop()
|
||||
super(BasicAuthTests, self).tearDown()
|
||||
|
||||
def test_basic_auth_success(self):
|
||||
ah = urllib.request.HTTPBasicAuthHandler()
|
||||
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
|
||||
urllib.request.install_opener(urllib.request.build_opener(ah))
|
||||
try:
|
||||
self.assertTrue(urllib.request.urlopen(self.server_url))
|
||||
except urllib.error.HTTPError:
|
||||
self.fail("Basic auth failed for the url: %s", self.server_url)
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
def test_basic_auth_httperror(self):
|
||||
ah = urllib.request.HTTPBasicAuthHandler()
|
||||
ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
|
||||
urllib.request.install_opener(urllib.request.build_opener(ah))
|
||||
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
|
||||
|
||||
|
||||
@unittest.skipUnless(threading, "Threading required for this test.")
|
||||
class ProxyAuthTests(unittest.TestCase):
|
||||
URL = "http://localhost"
|
||||
|
|
@ -245,6 +327,7 @@ class ProxyAuthTests(unittest.TestCase):
|
|||
self.digest_auth_handler = DigestAuthHandler()
|
||||
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
|
||||
self.digest_auth_handler.set_realm(self.REALM)
|
||||
# With Digest Authentication.
|
||||
def create_fake_proxy_handler(*args, **kwargs):
|
||||
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue