mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
bpo-34866: Adding max_num_fields to cgi.FieldStorage (GH-9660)
Adding `max_num_fields` to `cgi.FieldStorage` to make DOS attacks harder by limiting the number of `MiniFieldStorage` objects created by `FieldStorage`.
This commit is contained in:
parent
f081fd8303
commit
209144831b
5 changed files with 102 additions and 12 deletions
34
Lib/cgi.py
34
Lib/cgi.py
|
@ -311,7 +311,8 @@ class FieldStorage:
|
||||||
"""
|
"""
|
||||||
def __init__(self, fp=None, headers=None, outerboundary=b'',
|
def __init__(self, fp=None, headers=None, outerboundary=b'',
|
||||||
environ=os.environ, keep_blank_values=0, strict_parsing=0,
|
environ=os.environ, keep_blank_values=0, strict_parsing=0,
|
||||||
limit=None, encoding='utf-8', errors='replace'):
|
limit=None, encoding='utf-8', errors='replace',
|
||||||
|
max_num_fields=None):
|
||||||
"""Constructor. Read multipart/* until last part.
|
"""Constructor. Read multipart/* until last part.
|
||||||
|
|
||||||
Arguments, all optional:
|
Arguments, all optional:
|
||||||
|
@ -351,10 +352,14 @@ class FieldStorage:
|
||||||
for the page sending the form (content-type : meta http-equiv or
|
for the page sending the form (content-type : meta http-equiv or
|
||||||
header)
|
header)
|
||||||
|
|
||||||
|
max_num_fields: int. If set, then __init__ throws a ValueError
|
||||||
|
if there are more than n fields read by parse_qsl().
|
||||||
|
|
||||||
"""
|
"""
|
||||||
method = 'GET'
|
method = 'GET'
|
||||||
self.keep_blank_values = keep_blank_values
|
self.keep_blank_values = keep_blank_values
|
||||||
self.strict_parsing = strict_parsing
|
self.strict_parsing = strict_parsing
|
||||||
|
self.max_num_fields = max_num_fields
|
||||||
if 'REQUEST_METHOD' in environ:
|
if 'REQUEST_METHOD' in environ:
|
||||||
method = environ['REQUEST_METHOD'].upper()
|
method = environ['REQUEST_METHOD'].upper()
|
||||||
self.qs_on_post = None
|
self.qs_on_post = None
|
||||||
|
@ -578,12 +583,11 @@ class FieldStorage:
|
||||||
qs = qs.decode(self.encoding, self.errors)
|
qs = qs.decode(self.encoding, self.errors)
|
||||||
if self.qs_on_post:
|
if self.qs_on_post:
|
||||||
qs += '&' + self.qs_on_post
|
qs += '&' + self.qs_on_post
|
||||||
self.list = []
|
|
||||||
query = urllib.parse.parse_qsl(
|
query = urllib.parse.parse_qsl(
|
||||||
qs, self.keep_blank_values, self.strict_parsing,
|
qs, self.keep_blank_values, self.strict_parsing,
|
||||||
encoding=self.encoding, errors=self.errors)
|
encoding=self.encoding, errors=self.errors,
|
||||||
for key, value in query:
|
max_num_fields=self.max_num_fields)
|
||||||
self.list.append(MiniFieldStorage(key, value))
|
self.list = [MiniFieldStorage(key, value) for key, value in query]
|
||||||
self.skip_lines()
|
self.skip_lines()
|
||||||
|
|
||||||
FieldStorageClass = None
|
FieldStorageClass = None
|
||||||
|
@ -597,9 +601,9 @@ class FieldStorage:
|
||||||
if self.qs_on_post:
|
if self.qs_on_post:
|
||||||
query = urllib.parse.parse_qsl(
|
query = urllib.parse.parse_qsl(
|
||||||
self.qs_on_post, self.keep_blank_values, self.strict_parsing,
|
self.qs_on_post, self.keep_blank_values, self.strict_parsing,
|
||||||
encoding=self.encoding, errors=self.errors)
|
encoding=self.encoding, errors=self.errors,
|
||||||
for key, value in query:
|
max_num_fields=self.max_num_fields)
|
||||||
self.list.append(MiniFieldStorage(key, value))
|
self.list.extend(MiniFieldStorage(key, value) for key, value in query)
|
||||||
|
|
||||||
klass = self.FieldStorageClass or self.__class__
|
klass = self.FieldStorageClass or self.__class__
|
||||||
first_line = self.fp.readline() # bytes
|
first_line = self.fp.readline() # bytes
|
||||||
|
@ -633,11 +637,23 @@ class FieldStorage:
|
||||||
if 'content-length' in headers:
|
if 'content-length' in headers:
|
||||||
del headers['content-length']
|
del headers['content-length']
|
||||||
|
|
||||||
|
# Propagate max_num_fields into the sub class appropriately
|
||||||
|
sub_max_num_fields = self.max_num_fields
|
||||||
|
if sub_max_num_fields is not None:
|
||||||
|
sub_max_num_fields -= len(self.list)
|
||||||
|
|
||||||
part = klass(self.fp, headers, ib, environ, keep_blank_values,
|
part = klass(self.fp, headers, ib, environ, keep_blank_values,
|
||||||
strict_parsing,self.limit-self.bytes_read,
|
strict_parsing,self.limit-self.bytes_read,
|
||||||
self.encoding, self.errors)
|
self.encoding, self.errors, sub_max_num_fields)
|
||||||
|
|
||||||
|
max_num_fields = self.max_num_fields
|
||||||
|
if max_num_fields is not None and part.list:
|
||||||
|
max_num_fields -= len(part.list)
|
||||||
|
|
||||||
self.bytes_read += part.bytes_read
|
self.bytes_read += part.bytes_read
|
||||||
self.list.append(part)
|
self.list.append(part)
|
||||||
|
if max_num_fields is not None and max_num_fields < len(self.list):
|
||||||
|
raise ValueError('Max number of fields exceeded')
|
||||||
if part.done or self.bytes_read >= self.length > 0:
|
if part.done or self.bytes_read >= self.length > 0:
|
||||||
break
|
break
|
||||||
self.skip_lines()
|
self.skip_lines()
|
||||||
|
|
|
@ -381,6 +381,55 @@ Larry
|
||||||
v = gen_result(data, environ)
|
v = gen_result(data, environ)
|
||||||
self.assertEqual(self._qs_result, v)
|
self.assertEqual(self._qs_result, v)
|
||||||
|
|
||||||
|
def test_max_num_fields(self):
|
||||||
|
# For application/x-www-form-urlencoded
|
||||||
|
data = '&'.join(['a=a']*11)
|
||||||
|
environ = {
|
||||||
|
'CONTENT_LENGTH': str(len(data)),
|
||||||
|
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
|
||||||
|
'REQUEST_METHOD': 'POST',
|
||||||
|
}
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
cgi.FieldStorage(
|
||||||
|
fp=BytesIO(data.encode()),
|
||||||
|
environ=environ,
|
||||||
|
max_num_fields=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# For multipart/form-data
|
||||||
|
data = """---123
|
||||||
|
Content-Disposition: form-data; name="a"
|
||||||
|
|
||||||
|
a
|
||||||
|
---123
|
||||||
|
Content-Type: application/x-www-form-urlencoded
|
||||||
|
|
||||||
|
a=a&a=a
|
||||||
|
---123--
|
||||||
|
"""
|
||||||
|
environ = {
|
||||||
|
'CONTENT_LENGTH': str(len(data)),
|
||||||
|
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
|
||||||
|
'QUERY_STRING': 'a=a&a=a',
|
||||||
|
'REQUEST_METHOD': 'POST',
|
||||||
|
}
|
||||||
|
|
||||||
|
# 2 GET entities
|
||||||
|
# 2 top level POST entities
|
||||||
|
# 2 entities within the second POST entity
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
cgi.FieldStorage(
|
||||||
|
fp=BytesIO(data.encode()),
|
||||||
|
environ=environ,
|
||||||
|
max_num_fields=5,
|
||||||
|
)
|
||||||
|
cgi.FieldStorage(
|
||||||
|
fp=BytesIO(data.encode()),
|
||||||
|
environ=environ,
|
||||||
|
max_num_fields=6,
|
||||||
|
)
|
||||||
|
|
||||||
def testQSAndFormData(self):
|
def testQSAndFormData(self):
|
||||||
data = """---123
|
data = """---123
|
||||||
Content-Disposition: form-data; name="key2"
|
Content-Disposition: form-data; name="key2"
|
||||||
|
|
|
@ -880,6 +880,13 @@ class UrlParseTestCase(unittest.TestCase):
|
||||||
errors="ignore")
|
errors="ignore")
|
||||||
self.assertEqual(result, [('key', '\u0141-')])
|
self.assertEqual(result, [('key', '\u0141-')])
|
||||||
|
|
||||||
|
def test_parse_qsl_max_num_fields(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10)
|
||||||
|
urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10)
|
||||||
|
|
||||||
def test_urlencode_sequences(self):
|
def test_urlencode_sequences(self):
|
||||||
# Other tests incidentally urlencode things; test non-covered cases:
|
# Other tests incidentally urlencode things; test non-covered cases:
|
||||||
# Sequence and object values.
|
# Sequence and object values.
|
||||||
|
|
|
@ -628,7 +628,7 @@ def unquote(string, encoding='utf-8', errors='replace'):
|
||||||
|
|
||||||
|
|
||||||
def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||||
encoding='utf-8', errors='replace'):
|
encoding='utf-8', errors='replace', max_num_fields=None):
|
||||||
"""Parse a query given as a string argument.
|
"""Parse a query given as a string argument.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
|
@ -649,11 +649,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||||
encoding and errors: specify how to decode percent-encoded sequences
|
encoding and errors: specify how to decode percent-encoded sequences
|
||||||
into Unicode characters, as accepted by the bytes.decode() method.
|
into Unicode characters, as accepted by the bytes.decode() method.
|
||||||
|
|
||||||
|
max_num_fields: int. If set, then throws a ValueError if there
|
||||||
|
are more than n fields read by parse_qsl().
|
||||||
|
|
||||||
Returns a dictionary.
|
Returns a dictionary.
|
||||||
"""
|
"""
|
||||||
parsed_result = {}
|
parsed_result = {}
|
||||||
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
|
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
|
||||||
encoding=encoding, errors=errors)
|
encoding=encoding, errors=errors,
|
||||||
|
max_num_fields=max_num_fields)
|
||||||
for name, value in pairs:
|
for name, value in pairs:
|
||||||
if name in parsed_result:
|
if name in parsed_result:
|
||||||
parsed_result[name].append(value)
|
parsed_result[name].append(value)
|
||||||
|
@ -663,7 +667,7 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||||
|
|
||||||
|
|
||||||
def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
|
def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
|
||||||
encoding='utf-8', errors='replace'):
|
encoding='utf-8', errors='replace', max_num_fields=None):
|
||||||
"""Parse a query given as a string argument.
|
"""Parse a query given as a string argument.
|
||||||
|
|
||||||
Arguments:
|
Arguments:
|
||||||
|
@ -683,9 +687,21 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
|
||||||
encoding and errors: specify how to decode percent-encoded sequences
|
encoding and errors: specify how to decode percent-encoded sequences
|
||||||
into Unicode characters, as accepted by the bytes.decode() method.
|
into Unicode characters, as accepted by the bytes.decode() method.
|
||||||
|
|
||||||
|
max_num_fields: int. If set, then throws a ValueError
|
||||||
|
if there are more than n fields read by parse_qsl().
|
||||||
|
|
||||||
Returns a list, as G-d intended.
|
Returns a list, as G-d intended.
|
||||||
"""
|
"""
|
||||||
qs, _coerce_result = _coerce_args(qs)
|
qs, _coerce_result = _coerce_args(qs)
|
||||||
|
|
||||||
|
# If max_num_fields is defined then check that the number of fields
|
||||||
|
# is less than max_num_fields. This prevents a memory exhaustion DOS
|
||||||
|
# attack via post bodies with many fields.
|
||||||
|
if max_num_fields is not None:
|
||||||
|
num_fields = 1 + qs.count('&') + qs.count(';')
|
||||||
|
if max_num_fields < num_fields:
|
||||||
|
raise ValueError('Max number of fields exceeded')
|
||||||
|
|
||||||
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
|
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
|
||||||
r = []
|
r = []
|
||||||
for name_value in pairs:
|
for name_value in pairs:
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Adding ``max_num_fields`` to ``cgi.FieldStorage`` to make DOS attacks harder by
|
||||||
|
limiting the number of ``MiniFieldStorage`` objects created by ``FieldStorage``.
|
Loading…
Add table
Add a link
Reference in a new issue