This commit is contained in:
Kristjan Valur Jonsson 2011-03-30 11:39:24 +00:00
commit 3c136e19b9
10 changed files with 600 additions and 57 deletions

View file

@ -102,7 +102,7 @@ as a string. :class:`HeaderParser` has the same API as the :class:`Parser`
class. class.
.. class:: Parser(_class=email.message.Message, strict=None) .. class:: Parser(_class=email.message.Message)
The constructor for the :class:`Parser` class takes an optional argument The constructor for the :class:`Parser` class takes an optional argument
*_class*. This must be a callable factory (such as a function or a class), and *_class*. This must be a callable factory (such as a function or a class), and
@ -110,13 +110,8 @@ class.
:class:`~email.message.Message` (see :mod:`email.message`). The factory will :class:`~email.message.Message` (see :mod:`email.message`). The factory will
be called without arguments. be called without arguments.
The optional *strict* flag is ignored. .. versionchanged:: 3.2
Removed the *strict* argument that was deprecated in 2.4.
.. deprecated:: 2.4
Because the :class:`Parser` class is a backward compatible API wrapper
around the new-in-Python 2.4 :class:`FeedParser`, *all* parsing is
effectively non-strict. You should simply stop passing a *strict* flag to
the :class:`Parser` constructor.
The other public :class:`Parser` methods are: The other public :class:`Parser` methods are:

View file

@ -650,6 +650,10 @@ HTTPRedirectHandler Objects
is the case, :exc:`HTTPError` is raised. See :rfc:`2616` for details of the is the case, :exc:`HTTPError` is raised. See :rfc:`2616` for details of the
precise meanings of the various redirection codes. precise meanings of the various redirection codes.
An :class:`HTTPError` exception raised as a security consideration if the
HTTPRedirectHandler is presented with a redirected url which is not an HTTP,
HTTPS or FTP url.
.. method:: HTTPRedirectHandler.redirect_request(req, fp, code, msg, hdrs, newurl) .. method:: HTTPRedirectHandler.redirect_request(req, fp, code, msg, hdrs, newurl)

View file

@ -15,7 +15,7 @@ from email.message import Message
class Parser: class Parser:
def __init__(self, *args, **kws): def __init__(self, _class=Message):
"""Parser of RFC 2822 and MIME email messages. """Parser of RFC 2822 and MIME email messages.
Creates an in-memory object tree representing the email message, which Creates an in-memory object tree representing the email message, which
@ -31,27 +31,7 @@ class Parser:
must be created. This class must have a constructor that can take must be created. This class must have a constructor that can take
zero arguments. Default is Message.Message. zero arguments. Default is Message.Message.
""" """
if len(args) >= 1: self._class = _class
if '_class' in kws:
raise TypeError("Multiple values for keyword arg '_class'")
kws['_class'] = args[0]
if len(args) == 2:
if 'strict' in kws:
raise TypeError("Multiple values for keyword arg 'strict'")
kws['strict'] = args[1]
if len(args) > 2:
raise TypeError('Too many arguments')
if '_class' in kws:
self._class = kws['_class']
del kws['_class']
else:
self._class = Message
if 'strict' in kws:
warnings.warn("'strict' argument is deprecated (and ignored)",
DeprecationWarning, 2)
del kws['strict']
if kws:
raise TypeError('Unexpected keyword arguments')
def parse(self, fp, headersonly=False): def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a file. """Create a message structure from the data in a file.

View file

@ -1,4 +1,4 @@
# Copyright 2001-2010 by Vinay Sajip. All Rights Reserved. # Copyright 2001-2011 by Vinay Sajip. All Rights Reserved.
# #
# Permission to use, copy, modify, and distribute this software and its # Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted, # documentation for any purpose and without fee is hereby granted,
@ -18,7 +18,7 @@
Logging package for Python. Based on PEP 282 and comments thereto in Logging package for Python. Based on PEP 282 and comments thereto in
comp.lang.python, and influenced by Apache's log4j system. comp.lang.python, and influenced by Apache's log4j system.
Copyright (C) 2001-2010 Vinay Sajip. All Rights Reserved. Copyright (C) 2001-2011 Vinay Sajip. All Rights Reserved.
To use, simply 'import logging' and log away! To use, simply 'import logging' and log away!
""" """
@ -1826,10 +1826,10 @@ class NullHandler(Handler):
package. package.
""" """
def handle(self, record): def handle(self, record):
pass """Stub."""
def emit(self, record): def emit(self, record):
pass """Stub."""
def createLock(self): def createLock(self):
self.lock = None self.lock = None

View file

@ -40,7 +40,7 @@ from socketserver import ThreadingTCPServer, StreamRequestHandler
import struct import struct
import sys import sys
import tempfile import tempfile
from test.support import captured_stdout, run_with_locale, run_unittest from test.support import captured_stdout, run_with_locale, run_unittest, patch
import textwrap import textwrap
import unittest import unittest
import warnings import warnings
@ -1082,28 +1082,39 @@ class WarningsTest(BaseTest):
def test_warnings(self): def test_warnings(self):
with warnings.catch_warnings(): with warnings.catch_warnings():
logging.captureWarnings(True) logging.captureWarnings(True)
try: self.addCleanup(lambda: logging.captureWarnings(False))
warnings.filterwarnings("always", category=UserWarning) warnings.filterwarnings("always", category=UserWarning)
file = io.StringIO() stream = io.StringIO()
h = logging.StreamHandler(file) h = logging.StreamHandler(stream)
logger = logging.getLogger("py.warnings") logger = logging.getLogger("py.warnings")
logger.addHandler(h) logger.addHandler(h)
warnings.warn("I'm warning you...") warnings.warn("I'm warning you...")
logger.removeHandler(h) logger.removeHandler(h)
s = file.getvalue() s = stream.getvalue()
h.close() h.close()
self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0) self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
#See if an explicit file uses the original implementation #See if an explicit file uses the original implementation
file = io.StringIO() a_file = io.StringIO()
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
file, "Dummy line") a_file, "Dummy line")
s = file.getvalue() s = a_file.getvalue()
file.close() a_file.close()
self.assertEqual(s, self.assertEqual(s,
"dummy.py:42: UserWarning: Explicit\n Dummy line\n") "dummy.py:42: UserWarning: Explicit\n Dummy line\n")
finally:
logging.captureWarnings(False) def test_warnings_no_handlers(self):
with warnings.catch_warnings():
logging.captureWarnings(True)
self.addCleanup(lambda: logging.captureWarnings(False))
# confirm our assumption: no loggers are set
logger = logging.getLogger("py.warnings")
assert logger.handlers == []
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
self.assertTrue(len(logger.handlers) == 1)
self.assertIsInstance(logger.handlers[0], logging.NullHandler)
def formatFunc(format, datefmt=None): def formatFunc(format, datefmt=None):
@ -2007,6 +2018,11 @@ class ManagerTest(BaseTest):
self.assertEqual(logged, ['should appear in logged']) self.assertEqual(logged, ['should appear in logged'])
def test_set_log_record_factory(self):
man = logging.Manager(None)
expected = object()
man.setLogRecordFactory(expected)
self.assertEqual(man.logRecordFactory, expected)
class ChildLoggerTest(BaseTest): class ChildLoggerTest(BaseTest):
def test_child_loggers(self): def test_child_loggers(self):
@ -2198,6 +2214,479 @@ class LastResortTest(BaseTest):
logging.raiseExceptions = old_raise_exceptions logging.raiseExceptions = old_raise_exceptions
class FakeHandler:
def __init__(self, identifier, called):
for method in ('acquire', 'flush', 'close', 'release'):
setattr(self, method, self.record_call(identifier, method, called))
def record_call(self, identifier, method_name, called):
def inner():
called.append('{} - {}'.format(identifier, method_name))
return inner
class RecordingHandler(logging.NullHandler):
def __init__(self, *args, **kwargs):
super(RecordingHandler, self).__init__(*args, **kwargs)
self.records = []
def handle(self, record):
"""Keep track of all the emitted records."""
self.records.append(record)
class ShutdownTest(BaseTest):
"""Tets suite for the shutdown method."""
def setUp(self):
super(ShutdownTest, self).setUp()
self.called = []
raise_exceptions = logging.raiseExceptions
self.addCleanup(lambda: setattr(logging, 'raiseExceptions', raise_exceptions))
def raise_error(self, error):
def inner():
raise error()
return inner
def test_no_failure(self):
# create some fake handlers
handler0 = FakeHandler(0, self.called)
handler1 = FakeHandler(1, self.called)
handler2 = FakeHandler(2, self.called)
# create live weakref to those handlers
handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
logging.shutdown(handlerList=list(handlers))
expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
'1 - acquire', '1 - flush', '1 - close', '1 - release',
'0 - acquire', '0 - flush', '0 - close', '0 - release']
self.assertEqual(expected, self.called)
def _test_with_failure_in_method(self, method, error):
handler = FakeHandler(0, self.called)
setattr(handler, method, self.raise_error(error))
handlers = [logging.weakref.ref(handler)]
logging.shutdown(handlerList=list(handlers))
self.assertEqual('0 - release', self.called[-1])
def test_with_ioerror_in_acquire(self):
self._test_with_failure_in_method('acquire', IOError)
def test_with_ioerror_in_flush(self):
self._test_with_failure_in_method('flush', IOError)
def test_with_ioerror_in_close(self):
self._test_with_failure_in_method('close', IOError)
def test_with_valueerror_in_acquire(self):
self._test_with_failure_in_method('acquire', ValueError)
def test_with_valueerror_in_flush(self):
self._test_with_failure_in_method('flush', ValueError)
def test_with_valueerror_in_close(self):
self._test_with_failure_in_method('close', ValueError)
def test_with_other_error_in_acquire_without_raise(self):
logging.raiseExceptions = False
self._test_with_failure_in_method('acquire', IndexError)
def test_with_other_error_in_flush_without_raise(self):
logging.raiseExceptions = False
self._test_with_failure_in_method('flush', IndexError)
def test_with_other_error_in_close_without_raise(self):
logging.raiseExceptions = False
self._test_with_failure_in_method('close', IndexError)
def test_with_other_error_in_acquire_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'acquire', IndexError)
def test_with_other_error_in_flush_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'flush', IndexError)
def test_with_other_error_in_close_with_raise(self):
logging.raiseExceptions = True
self.assertRaises(IndexError, self._test_with_failure_in_method,
'close', IndexError)
class ModuleLevelMiscTest(BaseTest):
"""Tets suite for some module level methods."""
def test_disable(self):
old_disable = logging.root.manager.disable
# confirm our assumptions are correct
assert old_disable == 0
self.addCleanup(lambda: logging.disable(old_disable))
logging.disable(83)
self.assertEqual(logging.root.manager.disable, 83)
def _test_log(self, method, level=None):
called = []
patch(self, logging, 'basicConfig',
lambda *a, **kw: called.append(a, kw))
recording = RecordingHandler()
logging.root.addHandler(recording)
log_method = getattr(logging, method)
if level is not None:
log_method(level, "test me: %r", recording)
else:
log_method("test me: %r", recording)
self.assertEqual(len(recording.records), 1)
record = recording.records[0]
self.assertEqual(record.getMessage(), "test me: %r" % recording)
expected_level = level if level is not None else getattr(logging, method.upper())
self.assertEqual(record.levelno, expected_level)
# basicConfig was not called!
self.assertEqual(called, [])
def test_log(self):
self._test_log('log', logging.ERROR)
def test_debug(self):
self._test_log('debug')
def test_info(self):
self._test_log('info')
def test_warning(self):
self._test_log('warning')
def test_error(self):
self._test_log('error')
def test_critical(self):
self._test_log('critical')
def test_set_logger_class(self):
self.assertRaises(TypeError, logging.setLoggerClass, object)
class MyLogger(logging.Logger):
pass
logging.setLoggerClass(MyLogger)
self.assertEqual(logging.getLoggerClass(), MyLogger)
logging.setLoggerClass(logging.Logger)
self.assertEqual(logging.getLoggerClass(), logging.Logger)
class BasicConfigTest(unittest.TestCase):
"""Tets suite for logging.basicConfig."""
def setUp(self):
super(BasicConfigTest, self).setUp()
handlers = logging.root.handlers
self.addCleanup(lambda: setattr(logging.root, 'handlers', handlers))
logging.root.handlers = []
def tearDown(self):
logging.shutdown()
super(BasicConfigTest, self).tearDown()
def test_no_kwargs(self):
logging.basicConfig()
# handler defaults to a StreamHandler to sys.stderr
self.assertEqual(len(logging.root.handlers), 1)
handler = logging.root.handlers[0]
self.assertIsInstance(handler, logging.StreamHandler)
self.assertEqual(handler.stream, sys.stderr)
formatter = handler.formatter
# format defaults to logging.BASIC_FORMAT
self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
# datefmt defaults to None
self.assertIsNone(formatter.datefmt)
# style defaults to %
self.assertIsInstance(formatter._style, logging.PercentStyle)
# level is not explicitely set
self.assertEqual(logging.root.level, logging.WARNING)
def test_filename(self):
logging.basicConfig(filename='test.log')
self.assertEqual(len(logging.root.handlers), 1)
handler = logging.root.handlers[0]
self.assertIsInstance(handler, logging.FileHandler)
expected = logging.FileHandler('test.log', 'a')
self.addCleanup(expected.close)
self.assertEqual(handler.stream.mode, expected.stream.mode)
self.assertEqual(handler.stream.name, expected.stream.name)
def test_filemode(self):
logging.basicConfig(filename='test.log', filemode='wb')
handler = logging.root.handlers[0]
expected = logging.FileHandler('test.log', 'wb')
self.addCleanup(expected.close)
self.assertEqual(handler.stream.mode, expected.stream.mode)
def test_stream(self):
stream = io.StringIO()
self.addCleanup(stream.close)
logging.basicConfig(stream=stream)
self.assertEqual(len(logging.root.handlers), 1)
handler = logging.root.handlers[0]
self.assertIsInstance(handler, logging.StreamHandler)
self.assertEqual(handler.stream, stream)
def test_format(self):
logging.basicConfig(format='foo')
formatter = logging.root.handlers[0].formatter
self.assertEqual(formatter._style._fmt, 'foo')
def test_datefmt(self):
logging.basicConfig(datefmt='bar')
formatter = logging.root.handlers[0].formatter
self.assertEqual(formatter.datefmt, 'bar')
def test_style(self):
logging.basicConfig(style='$')
formatter = logging.root.handlers[0].formatter
self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
def test_level(self):
old_level = logging.root.level
self.addCleanup(lambda: logging.root.setLevel(old_level))
logging.basicConfig(level=57)
self.assertEqual(logging.root.level, 57)
def _test_log(self, method, level=None):
# logging.root has no handlers so basicConfig should be called
called = []
old_basic_config = logging.basicConfig
def my_basic_config(*a, **kw):
old_basic_config()
old_level = logging.root.level
logging.root.setLevel(100) # avoid having messages in stderr
self.addCleanup(lambda: logging.root.setLevel(old_level))
called.append((a, kw))
patch(self, logging, 'basicConfig', my_basic_config)
log_method = getattr(logging, method)
if level is not None:
log_method(level, "test me")
else:
log_method("test me")
# basicConfig was called with no arguments
self.assertEqual(called, [((), {})])
def test_log(self):
self._test_log('log', logging.WARNING)
def test_debug(self):
self._test_log('debug')
def test_info(self):
self._test_log('info')
def test_warning(self):
self._test_log('warning')
def test_error(self):
self._test_log('error')
def test_critical(self):
self._test_log('critical')
class LoggerAdapterTest(unittest.TestCase):
def setUp(self):
super(LoggerAdapterTest, self).setUp()
old_handler_list = logging._handlerList[:]
self.recording = RecordingHandler()
self.logger = logging.root
self.logger.addHandler(self.recording)
self.addCleanup(lambda: self.logger.removeHandler(self.recording))
self.addCleanup(self.recording.close)
def cleanup():
logging._handlerList[:] = old_handler_list
self.addCleanup(cleanup)
self.addCleanup(logging.shutdown)
self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
def test_exception(self):
msg = 'testing exception: %r'
exc = None
try:
assert False
except AssertionError as e:
exc = e
self.adapter.exception(msg, self.recording)
self.assertEqual(len(self.recording.records), 1)
record = self.recording.records[0]
self.assertEqual(record.levelno, logging.ERROR)
self.assertEqual(record.msg, msg)
self.assertEqual(record.args, (self.recording,))
self.assertEqual(record.exc_info,
(exc.__class__, exc, exc.__traceback__))
def test_critical(self):
msg = 'critical test! %r'
self.adapter.critical(msg, self.recording)
self.assertEqual(len(self.recording.records), 1)
record = self.recording.records[0]
self.assertEqual(record.levelno, logging.CRITICAL)
self.assertEqual(record.msg, msg)
self.assertEqual(record.args, (self.recording,))
def test_is_enabled_for(self):
old_disable = self.adapter.logger.manager.disable
self.adapter.logger.manager.disable = 33
self.addCleanup(lambda: setattr(self.adapter.logger.manager,
'disable', old_disable))
self.assertFalse(self.adapter.isEnabledFor(32))
def test_has_handlers(self):
self.assertTrue(self.adapter.hasHandlers())
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
assert not self.logger.hasHandlers()
self.assertFalse(self.adapter.hasHandlers())
class LoggerTest(BaseTest):
def setUp(self):
super(LoggerTest, self).setUp()
self.recording = RecordingHandler()
self.logger = logging.Logger(name='blah')
self.logger.addHandler(self.recording)
self.addCleanup(lambda: self.logger.removeHandler(self.recording))
self.addCleanup(self.recording.close)
self.addCleanup(logging.shutdown)
def test_set_invalid_level(self):
self.assertRaises(TypeError, self.logger.setLevel, object())
def test_exception(self):
msg = 'testing exception: %r'
exc = None
try:
assert False
except AssertionError as e:
exc = e
self.logger.exception(msg, self.recording)
self.assertEqual(len(self.recording.records), 1)
record = self.recording.records[0]
self.assertEqual(record.levelno, logging.ERROR)
self.assertEqual(record.msg, msg)
self.assertEqual(record.args, (self.recording,))
self.assertEqual(record.exc_info,
(exc.__class__, exc, exc.__traceback__))
def test_log_invalid_level_with_raise(self):
old_raise = logging.raiseExceptions
self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
logging.raiseExceptions = True
self.assertRaises(TypeError, self.logger.log, '10', 'test message')
def test_log_invalid_level_no_raise(self):
old_raise = logging.raiseExceptions
self.addCleanup(lambda: setattr(logging, 'raiseExecptions', old_raise))
logging.raiseExceptions = False
self.logger.log('10', 'test message') # no exception happens
def test_find_caller_with_stack_info(self):
called = []
patch(self, logging.traceback, 'print_stack',
lambda f, file: called.append(file.getvalue()))
self.logger.findCaller(stack_info=True)
self.assertEqual(len(called), 1)
self.assertEqual('Stack (most recent call last):\n', called[0])
def test_make_record_with_extra_overwrite(self):
name = 'my record'
level = 13
fn = lno = msg = args = exc_info = func = sinfo = None
rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
exc_info, func, sinfo)
for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
extra = {key: 'some value'}
self.assertRaises(KeyError, self.logger.makeRecord, name, level,
fn, lno, msg, args, exc_info,
extra=extra, sinfo=sinfo)
def test_make_record_with_extra_no_overwrite(self):
name = 'my record'
level = 13
fn = lno = msg = args = exc_info = func = sinfo = None
extra = {'valid_key': 'some value'}
result = self.logger.makeRecord(name, level, fn, lno, msg, args,
exc_info, extra=extra, sinfo=sinfo)
self.assertIn('valid_key', result.__dict__)
def test_has_handlers(self):
self.assertTrue(self.logger.hasHandlers())
for handler in self.logger.handlers:
self.logger.removeHandler(handler)
assert not self.logger.hasHandlers()
self.assertFalse(self.logger.hasHandlers())
def test_has_handlers_no_propagate(self):
child_logger = logging.getLogger('blah.child')
child_logger.propagate = False
assert child_logger.handlers == []
self.assertFalse(child_logger.hasHandlers())
def test_is_enabled_for(self):
old_disable = self.logger.manager.disable
self.logger.manager.disable = 23
self.addCleanup(lambda: setattr(self.logger.manager,
'disable', old_disable))
self.assertFalse(self.logger.isEnabledFor(22))
class BaseFileTest(BaseTest): class BaseFileTest(BaseTest):
"Base class for handler tests that write log files" "Base class for handler tests that write log files"
@ -2319,6 +2808,8 @@ def test_main():
EncodingTest, WarningsTest, ConfigDictTest, ManagerTest, EncodingTest, WarningsTest, ConfigDictTest, ManagerTest,
FormatterTest, FormatterTest,
LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest,
ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
LoggerAdapterTest, LoggerTest,
RotatingFileHandlerTest, RotatingFileHandlerTest,
LastResortTest, LastResortTest,
TimedRotatingFileHandlerTest TimedRotatingFileHandlerTest

View file

@ -2,6 +2,7 @@
import urllib.parse import urllib.parse
import urllib.request import urllib.request
import urllib.error
import http.client import http.client
import email.message import email.message
import io import io
@ -206,6 +207,21 @@ Content-Type: text/html; charset=iso-8859-1
finally: finally:
self.unfakehttp() self.unfakehttp()
def test_invalid_redirect(self):
# urlopen() should raise IOError for many error codes.
self.fakehttp(b'''HTTP/1.1 302 Found
Date: Wed, 02 Jan 2008 03:03:54 GMT
Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
Location: file://guidocomputer.athome.com:/python/license
Connection: close
Content-Type: text/html; charset=iso-8859-1
''')
try:
self.assertRaises(urllib.error.HTTPError, urlopen,
"http://python.org/")
finally:
self.unfakehttp()
def test_empty_socket(self): def test_empty_socket(self):
# urlopen() raises IOError if the underlying socket does not send any # urlopen() raises IOError if the underlying socket does not send any
# data. (#1680230) # data. (#1680230)

View file

@ -10,6 +10,7 @@ import urllib.request
# The proxy bypass method imported below has logic specific to the OSX # The proxy bypass method imported below has logic specific to the OSX
# proxy config data structure but is testable on all platforms. # proxy config data structure but is testable on all platforms.
from urllib.request import Request, OpenerDirector, _proxy_bypass_macosx_sysconf from urllib.request import Request, OpenerDirector, _proxy_bypass_macosx_sysconf
import urllib.error
# XXX # XXX
# Request # Request
@ -1031,6 +1032,29 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(count, self.assertEqual(count,
urllib.request.HTTPRedirectHandler.max_redirections) urllib.request.HTTPRedirectHandler.max_redirections)
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http','https','ftp']
invalid_schemes = ['file','imap','ldap']
schemeless_url = "example.com/b.html"
h = urllib.request.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib.error.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def test_cookie_redirect(self): def test_cookie_redirect(self):
# cookies shouldn't leak into redirected requests # cookies shouldn't leak into redirected requests
from http.cookiejar import CookieJar from http.cookiejar import CookieJar

View file

@ -545,6 +545,17 @@ class HTTPRedirectHandler(BaseHandler):
# fix a possible malformed URL # fix a possible malformed URL
urlparts = urlparse(newurl) urlparts = urlparse(newurl)
# For security reasons we don't allow redirection to anything other
# than http, https or ftp.
if not urlparts.scheme in ('http', 'https', 'ftp'):
raise HTTPError(newurl, code,
msg +
" - Redirection to url '%s' is not allowed" %
newurl,
headers, fp)
if not urlparts.path: if not urlparts.path:
urlparts = list(urlparts) urlparts = list(urlparts)
urlparts[2] = "/" urlparts[2] = "/"
@ -1903,8 +1914,24 @@ class FancyURLopener(URLopener):
return return
void = fp.read() void = fp.read()
fp.close() fp.close()
# In case the server sent a relative URL, join with original: # In case the server sent a relative URL, join with original:
newurl = urljoin(self.type + ":" + url, newurl) newurl = urljoin(self.type + ":" + url, newurl)
urlparts = urlparse(newurl)
# For security reasons, we don't allow redirection to anything other
# than http, https and ftp.
# We are using newer HTTPError with older redirect_internal method
# This older method will get deprecated in 3.3
if not urlparts.scheme in ('http', 'https', 'ftp'):
raise HTTPError(newurl, errcode,
errmsg +
" Redirection to url '%s' is not allowed." % newurl,
headers, fp)
return self.open(newurl) return self.open(newurl)
def http_error_301(self, url, fp, errcode, errmsg, headers, data=None): def http_error_301(self, url, fp, errcode, errmsg, headers, data=None):

View file

@ -87,6 +87,9 @@ Core and Builtins
Library Library
------- -------
- Removed the 'strict' argument to email.parser.Parser, which has been
deprecated since Python 2.4.
- Issue #11256: Fix inspect.getcallargs on functions that take only keyword - Issue #11256: Fix inspect.getcallargs on functions that take only keyword
arguments. arguments.
@ -136,6 +139,9 @@ Library
- Issue #11666: let help() display named tuple attributes and methods - Issue #11666: let help() display named tuple attributes and methods
that start with a leading underscore. that start with a leading underscore.
- Issue #11662: Make urllib and urllib2 ignore redirections if the
scheme is not HTTP, HTTPS or FTP (CVE-2011-1521).
- Issue #5537: Fix time2isoz() and time2netscape() functions of - Issue #5537: Fix time2isoz() and time2netscape() functions of
httplib.cookiejar for expiration year greater than 2038 on 32-bit systems. httplib.cookiejar for expiration year greater than 2038 on 32-bit systems.

View file

@ -1002,7 +1002,7 @@ static PyMethodDef deque_methods[] = {
PyDoc_STRVAR(deque_doc, PyDoc_STRVAR(deque_doc,
"deque(iterable[, maxlen]) --> deque object\n\ "deque(iterable[, maxlen]) --> deque object\n\
\n\ \n\
Build an ordered collection accessible from endpoints only."); Build an ordered collection with optimized access from its endpoints.");
static PyTypeObject deque_type = { static PyTypeObject deque_type = {
PyVarObject_HEAD_INIT(NULL, 0) PyVarObject_HEAD_INIT(NULL, 0)