This commit is contained in:
Tadej Magajna 2025-12-23 14:13:53 +05:30 committed by GitHub
commit 41e7838d30
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 99 additions and 14 deletions

View file

@ -110,6 +110,11 @@ DEFAULT_ERROR_MESSAGE = """\
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
def _validate_header_string(value):
"""Validate header values preventing CRLF injection."""
if isinstance(value, str) and ('\r' in value or '\n' in value):
raise ValueError('Invalid header name/value: contains CR or LF')
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = True # Seems to make sense in testing environment
@ -545,12 +550,15 @@ class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
message = ''
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []
_validate_header_string(message)
self._headers_buffer.append(("%s %d %s\r\n" %
(self.protocol_version, code, message)).encode(
'latin-1', 'strict'))
def send_header(self, keyword, value):
"""Send a MIME header to the headers buffer."""
_validate_header_string(keyword)
_validate_header_string(value)
if self.request_version != 'HTTP/0.9':
if not hasattr(self, '_headers_buffer'):
self._headers_buffer = []

View file

@ -1052,6 +1052,19 @@ class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
handler.end_headers()
self.assertEqual(output.numWrites, 1)
def test_send_response_only_rejects_crlf_message(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'
with self.assertRaises(ValueError) as ctx:
handler.send_response_only(418, 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))
def test_header_buffering_of_send_header(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
@ -1068,6 +1081,32 @@ class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
self.assertEqual(output.numWrites, 1)
def test_crlf_injection_in_header_value(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'
with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Custom', 'value\r\nSet-Cookie: custom=true')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))
def test_crlf_injection_in_header_name(self):
input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
output = AuditableBytesIO()
handler = SocketlessRequestHandler()
handler.rfile = input
handler.wfile = output
handler.request_version = 'HTTP/1.1'
with self.assertRaises(ValueError) as ctx:
handler.send_header('X-Inj\r\nSet-Cookie', 'value')
self.assertIn('Invalid header name/value: contains CR or LF',
str(ctx.exception))
def test_header_unbuffered_when_continue(self):
def _readAndReseek(f):

View file

@ -503,6 +503,39 @@ class HeaderTests(TestCase):
'\r\n'
)
def test_crlf_rejection_in_setitem(self):
h = Headers()
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
with self.assertRaises(ValueError) as ctx:
h['X-Custom'] = f'value{crlf}Set-Cookie: test'
self.assertIn('CR or LF', str(ctx.exception))
def test_crlf_rejection_in_setdefault(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.setdefault('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))
def test_crlf_rejection_in_add_header(self):
for crlf in ('\r\n', '\r', '\n'):
with self.subTest(location='value', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('X-Custom', f'value{crlf}Set-Cookie: test')
self.assertIn('CR or LF', str(ctx.exception))
with self.subTest(location='param', crlf_repr=repr(crlf)):
h = Headers()
with self.assertRaises(ValueError) as ctx:
h.add_header('Content-Disposition',
'attachment',
filename=f'test{crlf}.txt')
self.assertIn('CR or LF', str(ctx.exception))
class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""

View file

@ -35,12 +35,14 @@ class Headers:
self._headers = headers
if __debug__:
for k, v in headers:
self._convert_string_type(k)
self._convert_string_type(v)
self._validate_header_string(k)
self._validate_header_string(v)
def _convert_string_type(self, value):
"""Convert/check value type."""
def _validate_header_string(self, value):
"""Validate header type and value."""
if type(value) is str:
if '\r' in value or '\n' in value:
raise ValueError('Invalid header name/value: contains CR or LF')
return value
raise AssertionError("Header names/values must be"
" of type str (got {0})".format(repr(value)))
@ -53,14 +55,15 @@ class Headers:
"""Set the value of a header."""
del self[name]
self._headers.append(
(self._convert_string_type(name), self._convert_string_type(val)))
(self._validate_header_string(name),
self._validate_header_string(val)))
def __delitem__(self,name):
"""Delete all occurrences of a header, if present.
Does *not* raise an exception if the header is missing.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]
def __getitem__(self,name):
@ -87,13 +90,13 @@ class Headers:
fields deleted and re-inserted are always appended to the header list.
If no fields exist with the given name, returns an empty list.
"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
return [kv[1] for kv in self._headers if kv[0].lower()==name]
def get(self,name,default=None):
"""Get the first header value for 'name', or return 'default'"""
name = self._convert_string_type(name.lower())
name = self._validate_header_string(name.lower())
for k,v in self._headers:
if k.lower()==name:
return v
@ -148,8 +151,8 @@ class Headers:
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((self._convert_string_type(name),
self._convert_string_type(value)))
self._headers.append((self._validate_header_string(name),
self._validate_header_string(value)))
return value
else:
return result
@ -172,13 +175,13 @@ class Headers:
"""
parts = []
if _value is not None:
_value = self._convert_string_type(_value)
_value = self._validate_header_string(_value)
parts.append(_value)
for k, v in _params.items():
k = self._convert_string_type(k)
k = self._validate_header_string(k)
if v is None:
parts.append(k.replace('_', '-'))
else:
v = self._convert_string_type(v)
v = self._validate_header_string(v)
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((self._convert_string_type(_name), "; ".join(parts)))
self._headers.append((self._validate_header_string(_name), "; ".join(parts)))

View file

@ -0,0 +1,2 @@
Reject CR/LF in HTTP headers in ``http.server`` and ``wsgiref.headers`` to
prevent CRLF injection attacks.