mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
#2621 rename test.test_support to test.support
This commit is contained in:
parent
6a654814ea
commit
ee8712cda4
358 changed files with 1308 additions and 1307 deletions
|
@ -2,7 +2,7 @@
|
|||
|
||||
import sys
|
||||
import unittest
|
||||
from test import test_support
|
||||
from test import support
|
||||
import socket
|
||||
import select
|
||||
import errno
|
||||
|
@ -25,27 +25,27 @@ try:
|
|||
except ImportError:
|
||||
skip_expected = True
|
||||
|
||||
HOST = test_support.HOST
|
||||
HOST = support.HOST
|
||||
CERTFILE = None
|
||||
SVN_PYTHON_ORG_ROOT_CERT = None
|
||||
|
||||
def handle_error(prefix):
|
||||
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(prefix + exc_format)
|
||||
|
||||
|
||||
class BasicTests(unittest.TestCase):
|
||||
|
||||
def testSSLconnect(self):
|
||||
if not test_support.is_resource_enabled('network'):
|
||||
if not support.is_resource_enabled('network'):
|
||||
return
|
||||
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
|
||||
cert_reqs=ssl.CERT_NONE)
|
||||
s.connect(("svn.python.org", 443))
|
||||
c = s.getpeercert()
|
||||
if c:
|
||||
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
raise support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
s.close()
|
||||
|
||||
# this should fail because we have no verification certs
|
||||
|
@ -69,7 +69,7 @@ class BasicTests(unittest.TestCase):
|
|||
|
||||
def testRAND(self):
|
||||
v = ssl.RAND_status()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n RAND_status is %d (%s)\n"
|
||||
% (v, (v and "sufficient randomness") or
|
||||
"insufficient randomness"))
|
||||
|
@ -86,7 +86,7 @@ class BasicTests(unittest.TestCase):
|
|||
# provided solely for this test, to exercise the certificate
|
||||
# parsing code
|
||||
p = ssl._ssl._test_decode_cert(CERTFILE, False)
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
|
||||
|
||||
def testDERtoPEM(self):
|
||||
|
@ -96,7 +96,7 @@ class BasicTests(unittest.TestCase):
|
|||
p2 = ssl.DER_cert_to_PEM_cert(d1)
|
||||
d2 = ssl.PEM_cert_to_DER_cert(p2)
|
||||
if (d1 != d2):
|
||||
raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
|
||||
raise support.TestFailed("PEM-to-DER or DER-to-PEM translation failed")
|
||||
|
||||
class NetworkedTests(unittest.TestCase):
|
||||
|
||||
|
@ -106,7 +106,7 @@ class NetworkedTests(unittest.TestCase):
|
|||
s.connect(("svn.python.org", 443))
|
||||
c = s.getpeercert()
|
||||
if c:
|
||||
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
raise support.TestFailed("Peer cert %s shouldn't be here!")
|
||||
s.close()
|
||||
|
||||
# this should fail because we have no verification certs
|
||||
|
@ -126,7 +126,7 @@ class NetworkedTests(unittest.TestCase):
|
|||
try:
|
||||
s.connect(("svn.python.org", 443))
|
||||
except ssl.SSLError as x:
|
||||
raise test_support.TestFailed("Unexpected exception %s" % x)
|
||||
raise support.TestFailed("Unexpected exception %s" % x)
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
|
@ -151,14 +151,14 @@ class NetworkedTests(unittest.TestCase):
|
|||
else:
|
||||
raise
|
||||
s.close()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
|
||||
|
||||
def testFetchServerCert(self):
|
||||
|
||||
pem = ssl.get_server_certificate(("svn.python.org", 443))
|
||||
if not pem:
|
||||
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
raise support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
|
||||
return
|
||||
|
||||
|
@ -166,15 +166,15 @@ class NetworkedTests(unittest.TestCase):
|
|||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
|
||||
except ssl.SSLError as x:
|
||||
#should fail
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("%s\n" % x)
|
||||
else:
|
||||
raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
|
||||
raise support.TestFailed("Got server certificate %s for svn.python.org!" % pem)
|
||||
|
||||
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
|
||||
if not pem:
|
||||
raise test_support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
if test_support.verbose:
|
||||
raise support.TestFailed("No server certificate on svn.python.org:443!")
|
||||
if support.verbose:
|
||||
sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
|
||||
|
||||
|
||||
|
@ -227,13 +227,13 @@ else:
|
|||
else:
|
||||
if self.server.certreqs == ssl.CERT_REQUIRED:
|
||||
cert = self.sslconn.getpeercert()
|
||||
if test_support.verbose and self.server.chatty:
|
||||
if support.verbose and self.server.chatty:
|
||||
sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
|
||||
cert_binary = self.sslconn.getpeercert(True)
|
||||
if test_support.verbose and self.server.chatty:
|
||||
if support.verbose and self.server.chatty:
|
||||
sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
|
||||
cipher = self.sslconn.cipher()
|
||||
if test_support.verbose and self.server.chatty:
|
||||
if support.verbose and self.server.chatty:
|
||||
sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
|
||||
return True
|
||||
|
||||
|
@ -269,19 +269,19 @@ else:
|
|||
self.running = False
|
||||
self.close()
|
||||
elif amsg.strip() == 'over':
|
||||
if test_support.verbose and self.server.connectionchatty:
|
||||
if support.verbose and self.server.connectionchatty:
|
||||
sys.stdout.write(" server: client closed connection\n")
|
||||
self.close()
|
||||
return
|
||||
elif (self.server.starttls_server and
|
||||
amsg.strip() == 'STARTTLS'):
|
||||
if test_support.verbose and self.server.connectionchatty:
|
||||
if support.verbose and self.server.connectionchatty:
|
||||
sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
|
||||
self.write("OK\n".encode("ASCII", "strict"))
|
||||
if not self.wrap_conn():
|
||||
return
|
||||
else:
|
||||
if (test_support.verbose and
|
||||
if (support.verbose and
|
||||
self.server.connectionchatty):
|
||||
ctype = (self.sslconn and "encrypted") or "unencrypted"
|
||||
sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
|
||||
|
@ -314,7 +314,7 @@ else:
|
|||
self.connectionchatty = connectionchatty
|
||||
self.starttls_server = starttls_server
|
||||
self.sock = socket.socket()
|
||||
self.port = test_support.bind_port(self.sock)
|
||||
self.port = support.bind_port(self.sock)
|
||||
self.flag = None
|
||||
self.active = False
|
||||
threading.Thread.__init__(self)
|
||||
|
@ -334,7 +334,7 @@ else:
|
|||
while self.active:
|
||||
try:
|
||||
newconn, connaddr = self.sock.accept()
|
||||
if test_support.verbose and self.chatty:
|
||||
if support.verbose and self.chatty:
|
||||
sys.stdout.write(' server: new connection from '
|
||||
+ repr(connaddr) + '\n')
|
||||
handler = self.ConnectionHandler(self, newconn, connaddr)
|
||||
|
@ -457,7 +457,7 @@ else:
|
|||
|
||||
# we override this to suppress logging unless "verbose"
|
||||
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
|
||||
(self.server.server_address,
|
||||
self.server.server_port,
|
||||
|
@ -470,7 +470,7 @@ else:
|
|||
self.flag = None
|
||||
self.active = False
|
||||
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
|
||||
self.port = test_support.find_unused_port()
|
||||
self.port = support.find_unused_port()
|
||||
self.server = self.HTTPSServer(
|
||||
(HOST, self.port), self.RootedHTTPRequestHandler, certfile)
|
||||
threading.Thread.__init__(self)
|
||||
|
@ -522,7 +522,7 @@ else:
|
|||
|
||||
def handle_read(self):
|
||||
data = self.recv(1024)
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" server: read %s from client\n" % repr(data))
|
||||
if not data:
|
||||
self.close()
|
||||
|
@ -530,7 +530,7 @@ else:
|
|||
self.send(str(data, 'ASCII', 'strict').lower().encode('ASCII', 'strict'))
|
||||
|
||||
def handle_close(self):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" server: closed connection %s\n" % self.socket)
|
||||
|
||||
def handle_error(self):
|
||||
|
@ -546,7 +546,7 @@ else:
|
|||
|
||||
def handle_accept(self):
|
||||
sock_obj, addr = self.accept()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" server: new connection from %s:%s\n" %addr)
|
||||
self.ConnectionHandler(sock_obj, self.certfile)
|
||||
|
||||
|
@ -556,7 +556,7 @@ else:
|
|||
def __init__(self, certfile):
|
||||
self.flag = None
|
||||
self.active = False
|
||||
self.port = test_support.find_unused_port()
|
||||
self.port = support.find_unused_port()
|
||||
self.server = self.EchoServer(self.port, certfile)
|
||||
threading.Thread.__init__(self)
|
||||
self.setDaemon(True)
|
||||
|
@ -599,10 +599,10 @@ else:
|
|||
ssl_version=ssl.PROTOCOL_TLSv1)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError as x:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\nSSLError is %s\n" % x)
|
||||
else:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Use of invalid cert should have failed!")
|
||||
finally:
|
||||
server.stop()
|
||||
|
@ -635,28 +635,28 @@ else:
|
|||
ssl_version=client_protocol)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError as x:
|
||||
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
raise support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
except Exception as x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
raise support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(indata)))
|
||||
s.write(indata.encode('ASCII', 'strict'))
|
||||
outdata = s.read()
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
outdata = str(outdata, 'ASCII', 'strict')
|
||||
if outdata != indata.lower():
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
|
||||
repr(indata[:min(len(indata),20)].lower()), len(indata)))
|
||||
s.write("over\n".encode("ASCII", "strict"))
|
||||
if connectionchatty:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
finally:
|
||||
|
@ -677,7 +677,7 @@ else:
|
|||
certtype = "CERT_OPTIONAL"
|
||||
elif certsreqs == ssl.CERT_REQUIRED:
|
||||
certtype = "CERT_REQUIRED"
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
formatstr = (expectedToWork and " %s->%s %s\n") or " {%s->%s} %s\n"
|
||||
sys.stdout.write(formatstr %
|
||||
(ssl.get_protocol_name(client_protocol),
|
||||
|
@ -687,12 +687,12 @@ else:
|
|||
serverParamsTest(CERTFILE, server_protocol, certsreqs,
|
||||
CERTFILE, CERTFILE, client_protocol,
|
||||
chatty=False, connectionchatty=False)
|
||||
except test_support.TestFailed:
|
||||
except support.TestFailed:
|
||||
if expectedToWork:
|
||||
raise
|
||||
else:
|
||||
if not expectedToWork:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Client protocol %s succeeded with server protocol %s!"
|
||||
% (ssl.get_protocol_name(client_protocol),
|
||||
ssl.get_protocol_name(server_protocol)))
|
||||
|
@ -702,7 +702,7 @@ else:
|
|||
|
||||
def testEcho (self):
|
||||
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
serverParamsTest(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
|
||||
CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
|
||||
|
@ -710,7 +710,7 @@ else:
|
|||
|
||||
def testReadCert(self):
|
||||
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
s2 = socket.socket()
|
||||
server = ThreadedEchoServer(CERTFILE,
|
||||
|
@ -732,30 +732,30 @@ else:
|
|||
ssl_version=ssl.PROTOCOL_SSLv23)
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError as x:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Unexpected SSL error: " + str(x))
|
||||
except Exception as x:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Unexpected exception: " + str(x))
|
||||
else:
|
||||
if not s:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Can't SSL-handshake with test server")
|
||||
cert = s.getpeercert()
|
||||
if not cert:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Can't get peer certificate.")
|
||||
cipher = s.cipher()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(pprint.pformat(cert) + '\n')
|
||||
sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
|
||||
if 'subject' not in cert:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"No subject field in certificate: %s." %
|
||||
pprint.pformat(cert))
|
||||
if ((('organizationName', 'Python Software Foundation'),)
|
||||
not in cert['subject']):
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Missing or invalid 'organizationName' field in certificate subject; "
|
||||
"should be 'Python Software Foundation'.")
|
||||
s.close()
|
||||
|
@ -777,7 +777,7 @@ else:
|
|||
|
||||
listener_ready = threading.Event()
|
||||
listener_gone = threading.Event()
|
||||
port = test_support.find_unused_port()
|
||||
port = support.find_unused_port()
|
||||
|
||||
# `listener` runs in a thread. It opens a socket listening on
|
||||
# PORT, and sits in an accept() until the main thread connects.
|
||||
|
@ -802,7 +802,7 @@ else:
|
|||
except IOError:
|
||||
pass
|
||||
else:
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
'connecting to closed SSL socket should have failed')
|
||||
|
||||
t = threading.Thread(target=listener)
|
||||
|
@ -811,7 +811,7 @@ else:
|
|||
t.join()
|
||||
|
||||
def testProtocolSSL2(self):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
|
||||
|
@ -821,13 +821,13 @@ else:
|
|||
tryProtocolCombo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
|
||||
|
||||
def testProtocolSSL23(self):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
try:
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
|
||||
except test_support.TestFailed as x:
|
||||
except support.TestFailed as x:
|
||||
# this fails on some older versions of OpenSSL (0.9.7l, for instance)
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
" SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
|
||||
% str(x))
|
||||
|
@ -844,7 +844,7 @@ else:
|
|||
tryProtocolCombo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
|
||||
|
||||
def testProtocolSSL3(self):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
|
||||
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
|
||||
|
@ -854,7 +854,7 @@ else:
|
|||
tryProtocolCombo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
|
||||
|
||||
def testProtocolTLS1(self):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
|
||||
tryProtocolCombo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
|
||||
|
@ -884,13 +884,13 @@ else:
|
|||
s.setblocking(1)
|
||||
s.connect((HOST, server.port))
|
||||
except Exception as x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
raise support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
for indata in msgs:
|
||||
msg = indata.encode('ASCII', 'replace')
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % repr(msg))
|
||||
if wrapped:
|
||||
|
@ -901,7 +901,7 @@ else:
|
|||
outdata = s.recv(1024)
|
||||
if (indata == "STARTTLS" and
|
||||
str(outdata, 'ASCII', 'replace').strip().lower().startswith("ok")):
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
msg = str(outdata, 'ASCII', 'replace')
|
||||
sys.stdout.write(
|
||||
" client: read %s from server, starting TLS...\n"
|
||||
|
@ -910,11 +910,11 @@ else:
|
|||
|
||||
wrapped = True
|
||||
else:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
msg = str(outdata, 'ASCII', 'replace')
|
||||
sys.stdout.write(
|
||||
" client: read %s from server\n" % repr(msg))
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
if wrapped:
|
||||
conn.write("over\n".encode("ASCII", "strict"))
|
||||
|
@ -937,7 +937,7 @@ else:
|
|||
flag.wait()
|
||||
# try to connect
|
||||
try:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write('\n')
|
||||
d1 = open(CERTFILE, 'rb').read()
|
||||
d2 = ''
|
||||
|
@ -948,33 +948,33 @@ else:
|
|||
dlen = f.info().getheader("content-length")
|
||||
if dlen and (int(dlen) > 0):
|
||||
d2 = f.read(int(dlen))
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: read %d bytes from remote server '%s'\n"
|
||||
% (len(d2), server))
|
||||
f.close()
|
||||
except:
|
||||
msg = ''.join(traceback.format_exception(*sys.exc_info()))
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write('\n' + msg)
|
||||
raise test_support.TestFailed(msg)
|
||||
raise support.TestFailed(msg)
|
||||
else:
|
||||
if not (d1 == d2):
|
||||
print("d1 is", len(d1), repr(d1))
|
||||
print("d2 is", len(d2), repr(d2))
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"Couldn't fetch data from HTTPS server")
|
||||
finally:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write('stopping server\n')
|
||||
server.stop()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write('joining thread\n')
|
||||
server.join()
|
||||
|
||||
def testAsyncoreServer(self):
|
||||
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write("\n")
|
||||
|
||||
indata="FOO\n"
|
||||
|
@ -988,25 +988,25 @@ else:
|
|||
s = ssl.wrap_socket(socket.socket())
|
||||
s.connect((HOST, server.port))
|
||||
except ssl.SSLError as x:
|
||||
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
raise support.TestFailed("Unexpected SSL error: " + str(x))
|
||||
except Exception as x:
|
||||
raise test_support.TestFailed("Unexpected exception: " + str(x))
|
||||
raise support.TestFailed("Unexpected exception: " + str(x))
|
||||
else:
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(
|
||||
" client: sending %s...\n" % (repr(indata)))
|
||||
s.sendall(indata.encode('ASCII', 'strict'))
|
||||
outdata = s.recv()
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" client: read %s\n" % repr(outdata))
|
||||
outdata = str(outdata, 'ASCII', 'strict')
|
||||
if outdata != indata.lower():
|
||||
raise test_support.TestFailed(
|
||||
raise support.TestFailed(
|
||||
"bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
|
||||
% (repr(outdata[:min(len(outdata),20)]), len(outdata),
|
||||
repr(indata[:min(len(indata),20)].lower()), len(indata)))
|
||||
s.write("over\n".encode("ASCII", "strict"))
|
||||
if test_support.verbose:
|
||||
if support.verbose:
|
||||
sys.stdout.write(" client: closing connection.\n")
|
||||
s.close()
|
||||
finally:
|
||||
|
@ -1015,7 +1015,7 @@ else:
|
|||
|
||||
def test_main(verbose=False):
|
||||
if skip_expected:
|
||||
raise test_support.TestSkipped("No SSL support")
|
||||
raise support.TestSkipped("No SSL support")
|
||||
|
||||
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
|
||||
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
|
||||
|
@ -1026,22 +1026,22 @@ def test_main(verbose=False):
|
|||
|
||||
if (not os.path.exists(CERTFILE) or
|
||||
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
|
||||
raise test_support.TestFailed("Can't read certificate files!")
|
||||
raise support.TestFailed("Can't read certificate files!")
|
||||
|
||||
tests = [BasicTests]
|
||||
|
||||
if test_support.is_resource_enabled('network'):
|
||||
if support.is_resource_enabled('network'):
|
||||
tests.append(NetworkedTests)
|
||||
|
||||
if _have_threads:
|
||||
thread_info = test_support.threading_setup()
|
||||
if thread_info and test_support.is_resource_enabled('network'):
|
||||
thread_info = support.threading_setup()
|
||||
if thread_info and support.is_resource_enabled('network'):
|
||||
tests.append(ThreadedTests)
|
||||
|
||||
test_support.run_unittest(*tests)
|
||||
support.run_unittest(*tests)
|
||||
|
||||
if _have_threads:
|
||||
test_support.threading_cleanup(*thread_info)
|
||||
support.threading_cleanup(*thread_info)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue