mirror of
https://github.com/python/cpython.git
synced 2025-07-24 19:54:21 +00:00

http://bugs.python.org/issue5542 If the socket is closed, the client has no chance to read the response from the server. EPIPE means that it isn't possible to write more data from the socket, but not that it is impossible to read. Also, various formatting changes.
381 lines
14 KiB
Python
381 lines
14 KiB
Python
import errno
|
|
from http import client
|
|
import io
|
|
import socket
|
|
|
|
from unittest import TestCase
|
|
|
|
from test import support
|
|
|
|
HOST = support.HOST
|
|
|
|
class FakeSocket:
|
|
def __init__(self, text, fileclass=io.BytesIO):
|
|
if isinstance(text, str):
|
|
text = text.encode("ascii")
|
|
self.text = text
|
|
self.fileclass = fileclass
|
|
self.data = b''
|
|
|
|
def sendall(self, data):
|
|
self.data += data
|
|
|
|
def makefile(self, mode, bufsize=None):
|
|
if mode != 'r' and mode != 'rb':
|
|
raise client.UnimplementedFileMode()
|
|
return self.fileclass(self.text)
|
|
|
|
class EPipeSocket(FakeSocket):
|
|
|
|
def __init__(self, text, pipe_trigger):
|
|
# When sendall() is called with pipe_trigger, raise EPIPE.
|
|
FakeSocket.__init__(self, text)
|
|
self.pipe_trigger = pipe_trigger
|
|
|
|
def sendall(self, data):
|
|
if self.pipe_trigger in data:
|
|
raise socket.error(errno.EPIPE, "gotcha")
|
|
self.data += data
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
class NoEOFStringIO(io.BytesIO):
|
|
"""Like StringIO, but raises AssertionError on EOF.
|
|
|
|
This is used below to test that http.client doesn't try to read
|
|
more from the underlying file than it should.
|
|
"""
|
|
def read(self, n=-1):
|
|
data = io.BytesIO.read(self, n)
|
|
if data == b'':
|
|
raise AssertionError('caller tried to read past EOF')
|
|
return data
|
|
|
|
def readline(self, length=None):
|
|
data = io.BytesIO.readline(self, length)
|
|
if data == b'':
|
|
raise AssertionError('caller tried to read past EOF')
|
|
return data
|
|
|
|
class HeaderTests(TestCase):
|
|
def test_auto_headers(self):
|
|
# Some headers are added automatically, but should not be added by
|
|
# .request() if they are explicitly set.
|
|
|
|
class HeaderCountingBuffer(list):
|
|
def __init__(self):
|
|
self.count = {}
|
|
def append(self, item):
|
|
kv = item.split(b':')
|
|
if len(kv) > 1:
|
|
# item is a 'Key: Value' header string
|
|
lcKey = kv[0].decode('ascii').lower()
|
|
self.count.setdefault(lcKey, 0)
|
|
self.count[lcKey] += 1
|
|
list.append(self, item)
|
|
|
|
for explicit_header in True, False:
|
|
for header in 'Content-length', 'Host', 'Accept-encoding':
|
|
conn = client.HTTPConnection('example.com')
|
|
conn.sock = FakeSocket('blahblahblah')
|
|
conn._buffer = HeaderCountingBuffer()
|
|
|
|
body = 'spamspamspam'
|
|
headers = {}
|
|
if explicit_header:
|
|
headers[header] = str(len(body))
|
|
conn.request('POST', '/', body, headers)
|
|
self.assertEqual(conn._buffer.count[header.lower()], 1)
|
|
|
|
class BasicTest(TestCase):
|
|
def test_status_lines(self):
|
|
# Test HTTP status lines
|
|
|
|
body = "HTTP/1.1 200 Ok\r\n\r\nText"
|
|
sock = FakeSocket(body)
|
|
resp = client.HTTPResponse(sock)
|
|
resp.begin()
|
|
self.assertEqual(resp.read(), b"Text")
|
|
self.assertTrue(resp.isclosed())
|
|
|
|
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
|
|
sock = FakeSocket(body)
|
|
resp = client.HTTPResponse(sock)
|
|
self.assertRaises(client.BadStatusLine, resp.begin)
|
|
|
|
def test_partial_reads(self):
|
|
# if we have a lenght, the system knows when to close itself
|
|
# same behaviour than when we read the whole thing with read()
|
|
body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
|
|
sock = FakeSocket(body)
|
|
resp = client.HTTPResponse(sock)
|
|
resp.begin()
|
|
self.assertEqual(resp.read(2), b'Te')
|
|
self.assertFalse(resp.isclosed())
|
|
self.assertEqual(resp.read(2), b'xt')
|
|
self.assertTrue(resp.isclosed())
|
|
|
|
def test_host_port(self):
|
|
# Check invalid host_port
|
|
|
|
for hp in ("www.python.org:abc", "www.python.org:"):
|
|
self.assertRaises(client.InvalidURL, client.HTTPConnection, hp)
|
|
|
|
for hp, h, p in (("[fe80::207:e9ff:fe9b]:8000",
|
|
"fe80::207:e9ff:fe9b", 8000),
|
|
("www.python.org:80", "www.python.org", 80),
|
|
("www.python.org", "www.python.org", 80),
|
|
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
|
|
c = client.HTTPConnection(hp)
|
|
self.assertEqual(h, c.host)
|
|
self.assertEqual(p, c.port)
|
|
|
|
def test_response_headers(self):
|
|
# test response with multiple message headers with the same field name.
|
|
text = ('HTTP/1.1 200 OK\r\n'
|
|
'Set-Cookie: Customer="WILE_E_COYOTE"; '
|
|
'Version="1"; Path="/acme"\r\n'
|
|
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
|
|
' Path="/acme"\r\n'
|
|
'\r\n'
|
|
'No body\r\n')
|
|
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
|
|
', '
|
|
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
|
|
s = FakeSocket(text)
|
|
r = client.HTTPResponse(s)
|
|
r.begin()
|
|
cookies = r.getheader("Set-Cookie")
|
|
self.assertEqual(cookies, hdr)
|
|
|
|
def test_read_head(self):
|
|
# Test that the library doesn't attempt to read any data
|
|
# from a HEAD request. (Tickles SF bug #622042.)
|
|
sock = FakeSocket(
|
|
'HTTP/1.1 200 OK\r\n'
|
|
'Content-Length: 14432\r\n'
|
|
'\r\n',
|
|
NoEOFStringIO)
|
|
resp = client.HTTPResponse(sock, method="HEAD")
|
|
resp.begin()
|
|
if resp.read():
|
|
self.fail("Did not expect response from HEAD request")
|
|
|
|
def test_send_file(self):
|
|
expected = (b'GET /foo HTTP/1.1\r\nHost: example.com\r\n'
|
|
b'Accept-Encoding: identity\r\nContent-Length:')
|
|
|
|
body = open(__file__, 'rb')
|
|
conn = client.HTTPConnection('example.com')
|
|
sock = FakeSocket(body)
|
|
conn.sock = sock
|
|
conn.request('GET', '/foo', body)
|
|
self.assertTrue(sock.data.startswith(expected), '%r != %r' %
|
|
(sock.data[:len(expected)], expected))
|
|
|
|
def test_chunked(self):
|
|
chunked_start = (
|
|
'HTTP/1.1 200 OK\r\n'
|
|
'Transfer-Encoding: chunked\r\n\r\n'
|
|
'a\r\n'
|
|
'hello worl\r\n'
|
|
'1\r\n'
|
|
'd\r\n'
|
|
)
|
|
sock = FakeSocket(chunked_start + '0\r\n')
|
|
resp = client.HTTPResponse(sock, method="GET")
|
|
resp.begin()
|
|
self.assertEquals(resp.read(), b'hello world')
|
|
resp.close()
|
|
|
|
for x in ('', 'foo\r\n'):
|
|
sock = FakeSocket(chunked_start + x)
|
|
resp = client.HTTPResponse(sock, method="GET")
|
|
resp.begin()
|
|
try:
|
|
resp.read()
|
|
except client.IncompleteRead as i:
|
|
self.assertEquals(i.partial, b'hello world')
|
|
self.assertEqual(repr(i),'IncompleteRead(11 bytes read)')
|
|
self.assertEqual(str(i),'IncompleteRead(11 bytes read)')
|
|
else:
|
|
self.fail('IncompleteRead expected')
|
|
finally:
|
|
resp.close()
|
|
|
|
def test_negative_content_length(self):
|
|
sock = FakeSocket(
|
|
'HTTP/1.1 200 OK\r\nContent-Length: -1\r\n\r\nHello\r\n')
|
|
resp = client.HTTPResponse(sock, method="GET")
|
|
resp.begin()
|
|
self.assertEquals(resp.read(), b'Hello\r\n')
|
|
resp.close()
|
|
|
|
def test_incomplete_read(self):
|
|
sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
|
|
resp = client.HTTPResponse(sock, method="GET")
|
|
resp.begin()
|
|
try:
|
|
resp.read()
|
|
except client.IncompleteRead as i:
|
|
self.assertEquals(i.partial, b'Hello\r\n')
|
|
self.assertEqual(repr(i),
|
|
"IncompleteRead(7 bytes read, 3 more expected)")
|
|
self.assertEqual(str(i),
|
|
"IncompleteRead(7 bytes read, 3 more expected)")
|
|
else:
|
|
self.fail('IncompleteRead expected')
|
|
finally:
|
|
resp.close()
|
|
|
|
def test_epipe(self):
|
|
sock = EPipeSocket(
|
|
"HTTP/1.0 401 Authorization Required\r\n"
|
|
"Content-type: text/html\r\n"
|
|
"WWW-Authenticate: Basic realm=\"example\"\r\n",
|
|
b"Content-Length")
|
|
conn = client.HTTPConnection("example.com")
|
|
conn.sock = sock
|
|
self.assertRaises(socket.error,
|
|
lambda: conn.request("PUT", "/url", "body"))
|
|
resp = conn.getresponse()
|
|
self.assertEqual(401, resp.status)
|
|
self.assertEqual("Basic realm=\"example\"",
|
|
resp.getheader("www-authenticate"))
|
|
|
|
class OfflineTest(TestCase):
|
|
def test_responses(self):
|
|
self.assertEquals(client.responses[client.NOT_FOUND], "Not Found")
|
|
|
|
class TimeoutTest(TestCase):
|
|
PORT = None
|
|
|
|
def setUp(self):
|
|
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
TimeoutTest.PORT = support.bind_port(self.serv)
|
|
self.serv.listen(5)
|
|
|
|
def tearDown(self):
|
|
self.serv.close()
|
|
self.serv = None
|
|
|
|
def testTimeoutAttribute(self):
|
|
# This will prove that the timeout gets through HTTPConnection
|
|
# and into the socket.
|
|
|
|
# default -- use global socket timeout
|
|
self.assert_(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(30)
|
|
try:
|
|
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT)
|
|
httpConn.connect()
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(httpConn.sock.gettimeout(), 30)
|
|
httpConn.close()
|
|
|
|
# no timeout -- do not use global socket default
|
|
self.assert_(socket.getdefaulttimeout() is None)
|
|
socket.setdefaulttimeout(30)
|
|
try:
|
|
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT,
|
|
timeout=None)
|
|
httpConn.connect()
|
|
finally:
|
|
socket.setdefaulttimeout(None)
|
|
self.assertEqual(httpConn.sock.gettimeout(), None)
|
|
httpConn.close()
|
|
|
|
# a value
|
|
httpConn = client.HTTPConnection(HOST, TimeoutTest.PORT, timeout=30)
|
|
httpConn.connect()
|
|
self.assertEqual(httpConn.sock.gettimeout(), 30)
|
|
httpConn.close()
|
|
|
|
class HTTPSTimeoutTest(TestCase):
|
|
# XXX Here should be tests for HTTPS, there isn't any right now!
|
|
|
|
def test_attributes(self):
|
|
# simple test to check it's storing it
|
|
if hasattr(client, 'HTTPSConnection'):
|
|
h = client.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
|
|
self.assertEqual(h.timeout, 30)
|
|
|
|
class RequestBodyTest(TestCase):
|
|
"""Test cases where a request includes a message body."""
|
|
|
|
def setUp(self):
|
|
self.conn = client.HTTPConnection('example.com')
|
|
self.conn.sock = self.sock = FakeSocket("")
|
|
self.conn.sock = self.sock
|
|
|
|
def get_headers_and_fp(self):
|
|
f = io.BytesIO(self.sock.data)
|
|
f.readline() # read the request line
|
|
message = client.parse_headers(f)
|
|
return message, f
|
|
|
|
def test_manual_content_length(self):
|
|
# Set an incorrect content-length so that we can verify that
|
|
# it will not be over-ridden by the library.
|
|
self.conn.request("PUT", "/url", "body",
|
|
{"Content-Length": "42"})
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("42", message.get("content-length"))
|
|
self.assertEqual(4, len(f.read()))
|
|
|
|
def test_ascii_body(self):
|
|
self.conn.request("PUT", "/url", "body")
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("text/plain", message.get_content_type())
|
|
self.assertEqual(None, message.get_charset())
|
|
self.assertEqual("4", message.get("content-length"))
|
|
self.assertEqual(b'body', f.read())
|
|
|
|
def test_latin1_body(self):
|
|
self.conn.request("PUT", "/url", "body\xc1")
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("text/plain", message.get_content_type())
|
|
self.assertEqual(None, message.get_charset())
|
|
self.assertEqual("5", message.get("content-length"))
|
|
self.assertEqual(b'body\xc1', f.read())
|
|
|
|
def test_bytes_body(self):
|
|
self.conn.request("PUT", "/url", b"body\xc1")
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("text/plain", message.get_content_type())
|
|
self.assertEqual(None, message.get_charset())
|
|
self.assertEqual("5", message.get("content-length"))
|
|
self.assertEqual(b'body\xc1', f.read())
|
|
|
|
def test_file_body(self):
|
|
f = open(support.TESTFN, "w")
|
|
f.write("body")
|
|
f.close()
|
|
f = open(support.TESTFN)
|
|
self.conn.request("PUT", "/url", f)
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("text/plain", message.get_content_type())
|
|
self.assertEqual(None, message.get_charset())
|
|
self.assertEqual("4", message.get("content-length"))
|
|
self.assertEqual(b'body', f.read())
|
|
|
|
def test_binary_file_body(self):
|
|
f = open(support.TESTFN, "wb")
|
|
f.write(b"body\xc1")
|
|
f.close()
|
|
f = open(support.TESTFN, "rb")
|
|
self.conn.request("PUT", "/url", f)
|
|
message, f = self.get_headers_and_fp()
|
|
self.assertEqual("text/plain", message.get_content_type())
|
|
self.assertEqual(None, message.get_charset())
|
|
self.assertEqual("5", message.get("content-length"))
|
|
self.assertEqual(b'body\xc1', f.read())
|
|
|
|
def test_main(verbose=None):
|
|
support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
|
|
HTTPSTimeoutTest, RequestBodyTest)
|
|
|
|
if __name__ == '__main__':
|
|
test_main()
|