Merge the trunk changes in. Breaks socket.ssl for now.

Merged revisions 57392-57619 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r57395 | georg.brandl | 2007-08-24 19:23:23 +0200 (Fri, 24 Aug 2007) | 2 lines

  Bug #1011: fix rfc822.Message.getheader docs.
........
  r57397 | georg.brandl | 2007-08-24 19:38:49 +0200 (Fri, 24 Aug 2007) | 2 lines

  Patch #1006: port test_winreg to unittest.
........
  r57398 | georg.brandl | 2007-08-24 19:46:54 +0200 (Fri, 24 Aug 2007) | 2 lines

  Fix #1012: wrong URL to :mod:`site` in install/index.rst.
........
  r57399 | georg.brandl | 2007-08-24 20:07:52 +0200 (Fri, 24 Aug 2007) | 2 lines

  Patch #1008: port test_signal to unittest.
........
  r57400 | georg.brandl | 2007-08-24 20:22:54 +0200 (Fri, 24 Aug 2007) | 2 lines

  Port test_frozen to unittest.
........
  r57401 | georg.brandl | 2007-08-24 20:27:43 +0200 (Fri, 24 Aug 2007) | 2 lines

  Document new utility functions in test_support.
........
  r57402 | georg.brandl | 2007-08-24 20:30:06 +0200 (Fri, 24 Aug 2007) | 2 lines

  Remove test_rgbimg output file, there is no test_rgbimg.py.
........
  r57403 | georg.brandl | 2007-08-24 20:35:27 +0200 (Fri, 24 Aug 2007) | 2 lines

  Remove output file for test_ossaudiodev, also properly close the dsp object.
........
  r57404 | georg.brandl | 2007-08-24 20:46:27 +0200 (Fri, 24 Aug 2007) | 2 lines

  Convert test_linuxaudiodev to unittest. Fix a wrong finally clause in test_ossaudiodev.
........
  r57406 | collin.winter | 2007-08-24 21:13:58 +0200 (Fri, 24 Aug 2007) | 1 line

  Convert test_pkg to use unittest.
........
  r57408 | georg.brandl | 2007-08-24 21:22:34 +0200 (Fri, 24 Aug 2007) | 2 lines

  Catch the correct errors.
........
  r57409 | georg.brandl | 2007-08-24 21:33:53 +0200 (Fri, 24 Aug 2007) | 2 lines

  Port test_class to unittest. Patch #1671298.
........
  r57415 | collin.winter | 2007-08-24 23:09:42 +0200 (Fri, 24 Aug 2007) | 1 line

  Make test_structmembers pass when run with regrtests's -R flag.
........
  r57455 | nick.coghlan | 2007-08-25 06:32:07 +0200 (Sat, 25 Aug 2007) | 1 line

  Revert misguided attempt at fixing incompatibility between -m and -i switches (better fix coming soon)
........
  r57456 | nick.coghlan | 2007-08-25 06:35:54 +0200 (Sat, 25 Aug 2007) | 1 line

  Revert compile.c changes that shouldn't have been included in previous checkin
........
  r57461 | nick.coghlan | 2007-08-25 12:50:41 +0200 (Sat, 25 Aug 2007) | 1 line

  Fix bug 1764407 - the -i switch now does the right thing when using the -m switch
........
  r57464 | guido.van.rossum | 2007-08-25 17:08:43 +0200 (Sat, 25 Aug 2007) | 4 lines

  Server-side SSL and certificate validation, by Bill Janssen.
  While cleaning up Bill's C style, I may have cleaned up some code
  he didn't touch as well (in _ssl.c).
........
  r57465 | neal.norwitz | 2007-08-25 18:41:36 +0200 (Sat, 25 Aug 2007) | 3 lines

  Try to get this to build with Visual Studio by moving all the variable
  declarations to the beginning of a scope.
........
  r57466 | neal.norwitz | 2007-08-25 18:54:38 +0200 (Sat, 25 Aug 2007) | 1 line

  Fix test so it is skipped properly if there is no SSL support.
........
  r57467 | neal.norwitz | 2007-08-25 18:58:09 +0200 (Sat, 25 Aug 2007) | 2 lines

  Fix a few more variables to try to get this to compile with Visual Studio.
........
  r57473 | neal.norwitz | 2007-08-25 19:25:17 +0200 (Sat, 25 Aug 2007) | 1 line

  Try to get this test to pass for systems that do not have SO_REUSEPORT
........
  r57482 | gregory.p.smith | 2007-08-26 02:26:00 +0200 (Sun, 26 Aug 2007) | 7 lines

  keep setup.py from listing unneeded hash modules (_md5, _sha*) as
  missing when they were not built because _hashlib with openssl provided
  their functionality instead.

  don't build bsddb185 if bsddb was built.
........
  r57483 | neal.norwitz | 2007-08-26 03:08:16 +0200 (Sun, 26 Aug 2007) | 1 line

  Fix typo in docstring (missing c in reacquire)
........
  r57484 | neal.norwitz | 2007-08-26 03:42:03 +0200 (Sun, 26 Aug 2007) | 2 lines

  Spell check (also americanify behaviour, it's almost 3 times as common)
........
  r57503 | neal.norwitz | 2007-08-26 08:29:57 +0200 (Sun, 26 Aug 2007) | 4 lines

  Reap children before the test starts so hopefully SocketServer
  won't find any old children left around which causes an exception
  in collect_children() and the test to fail.
........
  r57510 | neal.norwitz | 2007-08-26 20:50:39 +0200 (Sun, 26 Aug 2007) | 1 line

  Fail gracefully if the cert files cannot be created
........
  r57513 | guido.van.rossum | 2007-08-26 21:35:09 +0200 (Sun, 26 Aug 2007) | 4 lines

  Bill Janssen wrote:
  Here's a patch which makes test_ssl a better player in the buildbots
  environment.  I deep-ended on "try-except-else" clauses.
........
  r57518 | neal.norwitz | 2007-08-26 23:40:16 +0200 (Sun, 26 Aug 2007) | 1 line

  Get the test passing by commenting out some writes (should they be removed?)
........
  r57522 | neal.norwitz | 2007-08-27 00:16:23 +0200 (Mon, 27 Aug 2007) | 3 lines

  Catch IOError for when the device file doesn't exist or the user doesn't have
  permission to write to the device.
........
  r57524 | neal.norwitz | 2007-08-27 00:20:03 +0200 (Mon, 27 Aug 2007) | 5 lines

  Another patch from Bill Janssen that:
  1)  Fixes the bug that two class names are initial-lower-case.
  2)  Replaces the poll waiting for the server to become ready with
      a threading.Event signal.
........
  r57536 | neal.norwitz | 2007-08-27 02:58:33 +0200 (Mon, 27 Aug 2007) | 1 line

  Stop using string.join (from the module) to ease upgrade to py3k
........
  r57537 | neal.norwitz | 2007-08-27 03:03:18 +0200 (Mon, 27 Aug 2007) | 1 line

  Make a utility function for handling (printing) an error
........
  r57538 | neal.norwitz | 2007-08-27 03:15:33 +0200 (Mon, 27 Aug 2007) | 4 lines

  If we can't create a certificate, print a warning, but don't fail the test.
  Modified patch from what Bill Janssen sent on python-3000.
........
  r57539 | facundo.batista | 2007-08-27 03:15:34 +0200 (Mon, 27 Aug 2007) | 7 lines


  Ignore test failures caused by 'resource temporarily unavailable'
  exceptions raised in the test server thread, since SimpleXMLRPCServer
  does not gracefully handle them.  Changed number of requests handled
  by tests server thread to one (was 2) because no tests require more
  than one request. [GSoC - Alan McIntyre]
........
  r57561 | guido.van.rossum | 2007-08-27 19:19:42 +0200 (Mon, 27 Aug 2007) | 8 lines

  > Regardless, building a fixed test certificate and checking it in sounds like
  > the better option.  Then the openssl command in the test code can be turned
  > into a comment describing how the test data was pregenerated.

  Here's a patch that does that.

  Bill
........
  r57568 | guido.van.rossum | 2007-08-27 20:42:23 +0200 (Mon, 27 Aug 2007) | 26 lines

  > Some of the code sets the error string in this directly before
  > returning NULL, and other pieces of the code call PySSL_SetError,
  > which creates the error string.  I think some of the places which set
  > the string directly probably shouldn't; instead, they should call
  > PySSL_SetError to cons up the error name directly from the err code.
  > However, PySSL_SetError only works after the construction of an ssl
  > object, which means it can't be used there...  I'll take a longer look
  > at it and see if there's a reasonable fix.

  Here's a patch which addresses this.  It also fixes the indentation in
  PySSL_SetError, bringing it into line with PEP 7, fixes a compile warning
  about one of the OpenSSL macros, and makes the namespace a bit more
  consistent.  I've tested it on FC 7 and OS X 10.4.

  % ./python ./Lib/test/regrtest.py -R :1: -u all test_ssl
  test_ssl
  beginning 6 repetitions
  123456
  ......
  1 test OK.
  [29244 refs]
  %

  [GvR: slightly edited to enforce 79-char line length, even if it required
   violating the style guide.]
........
  r57570 | guido.van.rossum | 2007-08-27 21:11:11 +0200 (Mon, 27 Aug 2007) | 2 lines

  Patch 10124 by Bill Janssen, docs for the new ssl code.
........
  r57574 | guido.van.rossum | 2007-08-27 22:51:00 +0200 (Mon, 27 Aug 2007) | 3 lines

  Patch # 1739906 by Christian Heimes -- add reduce to functools (importing
  it from __builtin__).
........
  r57575 | guido.van.rossum | 2007-08-27 22:52:10 +0200 (Mon, 27 Aug 2007) | 2 lines

  News about functools.reduce.
........
  r57611 | georg.brandl | 2007-08-28 10:29:08 +0200 (Tue, 28 Aug 2007) | 2 lines

  Document rev. 57574.
........
  r57612 | sean.reifschneider | 2007-08-28 11:07:54 +0200 (Tue, 28 Aug 2007) | 2 lines

  Adding basic imputil documentation.
........
  r57614 | georg.brandl | 2007-08-28 12:48:18 +0200 (Tue, 28 Aug 2007) | 2 lines

  Fix some glitches.
........
  r57616 | lars.gustaebel | 2007-08-28 14:31:09 +0200 (Tue, 28 Aug 2007) | 5 lines

  TarFile.__init__() no longer fails if no name argument is passed and
  the fileobj argument has no usable name attribute (e.g. StringIO).

  (will backport to 2.5)
........
  r57619 | thomas.wouters | 2007-08-28 17:28:19 +0200 (Tue, 28 Aug 2007) | 22 lines


  Improve extended slicing support in builtin types and classes. Specifically:

   - Specialcase extended slices that amount to a shallow copy the same way as
     is done for simple slices, in the tuple, string and unicode case.

   - Specialcase step-1 extended slices to optimize the common case for all
     involved types.

   - For lists, allow extended slice assignment of differing lengths as long
     as the step is 1. (Previously, 'l[:2:1] = []' failed even though
     'l[:2] = []' and 'l[:2:None] = []' do not.)

   - Implement extended slicing for buffer, array, structseq, mmap and
     UserString.UserString.

   - Implement slice-object support (but not non-step-1 slice assignment) for
     UserString.MutableString.

   - Add tests for all new functionality.
........
This commit is contained in:
Thomas Wouters 2007-08-28 21:37:11 +00:00
parent 9e7c8da61c
commit ed03b4121e
56 changed files with 3587 additions and 1146 deletions

View file

@ -185,15 +185,41 @@ class MutableString(UserString):
def __hash__(self):
raise TypeError, "unhashable type (it is mutable)"
def __setitem__(self, index, sub):
if index < 0:
index += len(self.data)
if index < 0 or index >= len(self.data): raise IndexError
self.data = self.data[:index] + sub + self.data[index+1:]
if isinstance(index, slice):
if isinstance(sub, UserString):
sub = sub.data
elif not isinstance(sub, basestring):
sub = str(sub)
start, stop, step = index.indices(len(self.data))
if step == -1:
start, stop = stop+1, start+1
sub = sub[::-1]
elif step != 1:
# XXX(twouters): I guess we should be reimplementing
# the extended slice assignment/deletion algorithm here...
raise TypeError, "invalid step in slicing assignment"
start = min(start, stop)
self.data = self.data[:start] + sub + self.data[stop:]
else:
if index < 0:
index += len(self.data)
if index < 0 or index >= len(self.data): raise IndexError
self.data = self.data[:index] + sub + self.data[index+1:]
def __delitem__(self, index):
if index < 0:
index += len(self.data)
if index < 0 or index >= len(self.data): raise IndexError
self.data = self.data[:index] + self.data[index+1:]
if isinstance(index, slice):
start, stop, step = index.indices(len(self.data))
if step == -1:
start, stop = stop+1, start+1
elif step != 1:
# XXX(twouters): see same block in __setitem__
raise TypeError, "invalid step in slicing deletion"
start = min(start, stop)
self.data = self.data[:start] + self.data[stop:]
else:
if index < 0:
index += len(self.data)
if index < 0 or index >= len(self.data): raise IndexError
self.data = self.data[:index] + self.data[index+1:]
def __setslice__(self, start, end, sub):
start = max(start, 0); end = max(end, 0)
if isinstance(sub, UserString):

View file

@ -336,7 +336,7 @@ class CDLL(object):
<obj>['qsort'] -> callable object
Calling the functions releases the Python GIL during the call and
reaquires it afterwards.
reacquires it afterwards.
"""
class _FuncPtr(_CFuncPtr):
_flags_ = _FUNCFLAG_CDECL

View file

@ -21,8 +21,9 @@ __all__ = [
]
def _run_code(code, run_globals, init_globals,
mod_name, mod_fname, mod_loader):
def _run_code(code, run_globals, init_globals=None,
mod_name=None, mod_fname=None,
mod_loader=None):
"""Helper for _run_module_code"""
if init_globals is not None:
run_globals.update(init_globals)
@ -33,21 +34,31 @@ def _run_code(code, run_globals, init_globals,
return run_globals
def _run_module_code(code, init_globals=None,
mod_name=None, mod_fname=None,
mod_loader=None, alter_sys=False):
mod_name=None, mod_fname=None,
mod_loader=None):
"""Helper for run_module"""
# Set up the top level namespace dictionary
if alter_sys:
# Modify sys.argv[0] and sys.modules[mod_name]
sys.argv[0] = mod_fname
module = imp.new_module(mod_name)
sys.modules[mod_name] = module
mod_globals = module.__dict__
else:
# Leave the sys module alone
mod_globals = {}
return _run_code(code, mod_globals, init_globals,
mod_name, mod_fname, mod_loader)
temp_module = imp.new_module(mod_name)
mod_globals = temp_module.__dict__
# Modify sys.argv[0] and sys.module[mod_name]
saved_argv0 = sys.argv[0]
restore_module = mod_name in sys.modules
if restore_module:
saved_module = sys.modules[mod_name]
sys.argv[0] = mod_fname
sys.modules[mod_name] = temp_module
try:
_run_code(code, mod_globals, init_globals,
mod_name, mod_fname, mod_loader)
finally:
sys.argv[0] = saved_argv0
if restore_module:
sys.modules[mod_name] = saved_module
else:
del sys.modules[mod_name]
# Copy the globals of the temporary module, as they
# may be cleared when the temporary module goes away
return mod_globals.copy()
# This helper is needed due to a missing component in the PEP 302
@ -60,13 +71,8 @@ def _get_filename(loader, mod_name):
else:
return get_filename(mod_name)
def run_module(mod_name, init_globals=None,
run_name=None, alter_sys=False):
"""Execute a module's code without importing it
Returns the resulting top level namespace dictionary
"""
# Helper to get the loader, code and filename for a module
def _get_module_details(mod_name):
loader = get_loader(mod_name)
if loader is None:
raise ImportError("No module named %s" % mod_name)
@ -77,10 +83,40 @@ def run_module(mod_name, init_globals=None,
if code is None:
raise ImportError("No code object available for %s" % mod_name)
filename = _get_filename(loader, mod_name)
return loader, code, filename
# XXX ncoghlan: Should this be documented and made public?
def _run_module_as_main(mod_name, set_argv0=True):
"""Runs the designated module in the __main__ namespace
These __*__ magic variables will be overwritten:
__file__
__loader__
"""
loader, code, fname = _get_module_details(mod_name)
main_globals = sys.modules["__main__"].__dict__
if set_argv0:
sys.argv[0] = fname
return _run_code(code, main_globals, None,
"__main__", fname, loader)
def run_module(mod_name, init_globals=None,
run_name=None, alter_sys=False):
"""Execute a module's code without importing it
Returns the resulting top level namespace dictionary
"""
loader, code, fname = _get_module_details(mod_name)
if run_name is None:
run_name = mod_name
return _run_module_code(code, init_globals, run_name,
filename, loader, alter_sys)
if alter_sys:
return _run_module_code(code, init_globals, run_name,
fname, loader)
else:
# Leave the sys module alone
return _run_code(code, {}, init_globals,
run_name, fname, loader)
if __name__ == "__main__":
@ -89,4 +125,4 @@ if __name__ == "__main__":
print("No module specified for execution", file=sys.stderr)
else:
del sys.argv[0] # Make the requested module sys.argv[0]
run_module(sys.argv[0], run_name="__main__", alter_sys=True)
_run_module_as_main(sys.argv[0])

View file

@ -65,6 +65,10 @@ __all__ = ["getfqdn"]
__all__.extend(os._get_exports_list(_socket))
if _have_ssl:
__all__.extend(os._get_exports_list(_ssl))
def ssl(sock, keyfile=None, certfile=None):
import ssl as realssl
return realssl.sslwrap_simple(sock, keyfile, certfile)
__all__.append("ssl")
# WSA error codes
if sys.platform.lower().startswith("win"):

253
Lib/ssl.py Normal file
View file

@ -0,0 +1,253 @@
# Wrapper module for _ssl, providing some additional facilities
# implemented in Python. Written by Bill Janssen.
"""\
This module provides some more Pythonic support for SSL.
Object types:
sslsocket -- subtype of socket.socket which does SSL over the socket
Exceptions:
sslerror -- exception raised for I/O errors
Functions:
cert_time_to_seconds -- convert time string used for certificate
notBefore and notAfter functions to integer
seconds past the Epoch (the time values
returned from time.time())
fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
by the server running on HOST at port PORT. No
validation of the certificate is performed.
Integer constants:
SSL_ERROR_ZERO_RETURN
SSL_ERROR_WANT_READ
SSL_ERROR_WANT_WRITE
SSL_ERROR_WANT_X509_LOOKUP
SSL_ERROR_SYSCALL
SSL_ERROR_SSL
SSL_ERROR_WANT_CONNECT
SSL_ERROR_EOF
SSL_ERROR_INVALID_ERROR_CODE
The following group define certificate requirements that one side is
allowing/requiring from the other side:
CERT_NONE - no certificates from the other side are required (or will
be looked at if provided)
CERT_OPTIONAL - certificates are not required, but if provided will be
validated, and if validation fails, the connection will
also fail
CERT_REQUIRED - certificates are required, and will be validated, and
if validation fails, the connection will also fail
The following constants identify various SSL protocol variants:
PROTOCOL_SSLv2
PROTOCOL_SSLv3
PROTOCOL_SSLv23
PROTOCOL_TLSv1
"""
import os, sys
import _ssl # if we can't import it, let the error propagate
from socket import socket
from _ssl import sslerror
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
# Root certs:
#
# The "ca_certs" argument to sslsocket() expects a file containing one or more
# certificates that are roots of various certificate signing chains. This file
# contains the certificates in PEM format (RFC ) where each certificate is
# encoded in base64 encoding and surrounded with a header and footer:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# The various certificates in the file are just concatenated together:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# -----BEGIN CERTIFICATE-----
# ... (a second CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
#
# Some "standard" root certificates are available at
#
# http://www.thawte.com/roots/ (for Thawte roots)
# http://www.verisign.com/support/roots.html (for Verisign)
class sslsocket (socket):
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None):
socket.__init__(self, _sock=sock._sock)
if certfile and not keyfile:
keyfile = certfile
if server_side:
self._sslobj = _ssl.sslwrap(self._sock, 1, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
else:
# see if it's connected
try:
socket.getpeername(self)
except:
# no, no connection yet
self._sslobj = None
else:
# yes, create the SSL object
self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
self.ssl_version = ssl_version
self.ca_certs = ca_certs
def read(self, len=1024):
return self._sslobj.read(len)
def write(self, data):
return self._sslobj.write(data)
def getpeercert(self):
return self._sslobj.peer_certificate()
def send (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to send() on %s" %
self.__class__)
return self._sslobj.write(data)
def send_to (self, data, addr, flags=0):
raise ValueError("send_to not allowed on instances of %s" %
self.__class__)
def sendall (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.write(data)
def recv (self, buflen=1024, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.read(data, buflen)
def recv_from (self, addr, buflen=1024, flags=0):
raise ValueError("recv_from not allowed on instances of %s" %
self.__class__)
def shutdown(self):
if self._sslobj:
self._sslobj.shutdown()
self._sslobj = None
else:
socket.shutdown(self)
def close(self):
if self._sslobj:
self.shutdown()
else:
socket.close(self)
def connect(self, addr):
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._sslobj or (self.getsockname()[1] != 0):
raise ValueError("attempt to connect already-connected sslsocket!")
socket.connect(self, addr)
self._sslobj = _ssl.sslwrap(self._sock, 0, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs)
def accept(self):
raise ValueError("accept() not supported on an sslsocket")
# some utility functions
def cert_time_to_seconds(cert_time):
import time
return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))
# a replacement for the old socket.ssl function
def sslwrap_simple (sock, keyfile=None, certfile=None):
return _ssl.sslwrap(sock._sock, 0, keyfile, certfile, CERT_NONE,
PROTOCOL_SSLv23, None)
# fetch the certificate that the server is providing in PEM form
def fetch_server_certificate (host, port):
import re, tempfile, os
def subproc(cmd):
from subprocess import Popen, PIPE, STDOUT
proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
status = proc.wait()
output = proc.stdout.read()
return status, output
def strip_to_x509_cert(certfile_contents, outfile=None):
m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n"
r".*[\r]*^[-]+END CERTIFICATE[-]+)$",
certfile_contents, re.MULTILINE | re.DOTALL)
if not m:
return None
else:
tn = tempfile.mktemp()
fp = open(tn, "w")
fp.write(m.group(1) + "\n")
fp.close()
try:
tn2 = (outfile or tempfile.mktemp())
status, output = subproc(r'openssl x509 -in "%s" -out "%s"' %
(tn, tn2))
if status != 0:
raise OperationError(status, tsig, output)
fp = open(tn2, 'rb')
data = fp.read()
fp.close()
os.unlink(tn2)
return data
finally:
os.unlink(tn)
if sys.platform.startswith("win"):
tfile = tempfile.mktemp()
fp = open(tfile, "w")
fp.write("quit\n")
fp.close()
try:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < "%s"' %
(host, port, tfile))
finally:
os.unlink(tfile)
else:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < /dev/null' %
(host, port))
if status != 0:
raise OSError(status)
certtext = strip_to_x509_cert(output)
if not certtext:
raise ValueError("Invalid response received from server at %s:%s" %
(host, port))
return certtext

View file

@ -1508,7 +1508,7 @@ class TarFile(object):
if hasattr(fileobj, "mode"):
self._mode = fileobj.mode
self._extfileobj = True
self.name = os.path.abspath(name)
self.name = os.path.abspath(name) if name else None
self.fileobj = fileobj
# Init attributes.

32
Lib/test/keycert.pem Normal file
View file

@ -0,0 +1,32 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----

View file

@ -179,8 +179,10 @@ class CommonTest(seq_tests.CommonTest):
self.assertEqual(a, self.type2test(range(10)))
self.assertRaises(TypeError, a.__setslice__, 0, 1, 5)
self.assertRaises(TypeError, a.__setitem__, slice(0, 1, 5))
self.assertRaises(TypeError, a.__setslice__)
self.assertRaises(TypeError, a.__setitem__)
def test_delslice(self):
a = self.type2test([0, 1])

View file

@ -1,64 +0,0 @@
test_class
__init__: ()
__add__: (1,)
__radd__: (1,)
__sub__: (1,)
__rsub__: (1,)
__mul__: (1,)
__rmul__: (1,)
__truediv__: (1,)
__rtruediv__: (1,)
__mod__: (1,)
__rmod__: (1,)
__divmod__: (1,)
__rdivmod__: (1,)
__pow__: (1,)
__rpow__: (1,)
__rshift__: (1,)
__rrshift__: (1,)
__lshift__: (1,)
__rlshift__: (1,)
__and__: (1,)
__rand__: (1,)
__or__: (1,)
__ror__: (1,)
__xor__: (1,)
__rxor__: (1,)
__contains__: (1,)
__getitem__: (1,)
__setitem__: (1, 1)
__delitem__: (1,)
__getslice__: (0, 42)
__setslice__: (0, 42, 'The Answer')
__delslice__: (0, 42)
__getitem__: (slice(2, 1024, 10),)
__setitem__: (slice(2, 1024, 10), 'A lot')
__delitem__: (slice(2, 1024, 10),)
__getitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),)
__setitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100), 'Strange')
__delitem__: ((slice(None, 42, None), Ellipsis, slice(None, 24, None), 24, 100),)
__getitem__: (slice(None, 42, None),)
__setitem__: (slice(None, 42, None), 'The Answer')
__delitem__: (slice(None, 42, None),)
__neg__: ()
__pos__: ()
__abs__: ()
__int__: ()
__int__: ()
__float__: ()
__index__: ()
__hash__: ()
__repr__: ()
__str__: ()
__eq__: (1,)
__lt__: (1,)
__gt__: (1,)
__ne__: (1,)
__eq__: (1,)
__gt__: (1,)
__lt__: (1,)
__ne__: (1,)
__del__: ()
__getattr__: ('spam',)
__setattr__: ('eggs', 'spam, spam, spam and ham')
__delattr__: ('cardinal',)

View file

@ -1,4 +0,0 @@
test_frozen
Hello world...
Hello world...
Hello world...

View file

@ -1,2 +0,0 @@
test_ossaudiodev
playing test sound file (expected running time: 2.93 sec)

View file

@ -1,2 +0,0 @@
test_signal
starting pause() loop...

View file

@ -1,3 +0,0 @@
test_winreg
Local registry tests worked
Remote registry calls can be tested using 'test_winreg.py --remote \\machine_name'

View file

@ -935,7 +935,6 @@ class MixinStrUnicodeUserStringTest:
self.checkequal('abc', 'abc', '__getitem__', slice(0, 1000))
self.checkequal('a', 'abc', '__getitem__', slice(0, 1))
self.checkequal('', 'abc', '__getitem__', slice(0, 0))
# FIXME What about negative indices? This is handled differently by [] and __getitem__(slice)
self.checkraises(TypeError, 'abc', '__getitem__', 'def')
@ -949,10 +948,21 @@ class MixinStrUnicodeUserStringTest:
self.checkequal('', 'abc', '__getslice__', 1000, 1000)
self.checkequal('', 'abc', '__getslice__', 2000, 1000)
self.checkequal('', 'abc', '__getslice__', 2, 1)
# FIXME What about negative indizes? This is handled differently by [] and __getslice__
self.checkraises(TypeError, 'abc', '__getslice__', 'def')
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing.
s = string.ascii_letters + string.digits
indices = (0, None, 1, 3, 41, -1, -2, -37)
for start in indices:
for stop in indices:
# Skip step 0 (invalid)
for step in indices[1:]:
L = list(s)[start:stop:step]
self.checkequal("".join(L), s, '__getitem__',
slice(start, stop, step))
def test_mul(self):
self.checkequal('', 'abc', '__mul__', -1)
self.checkequal('', 'abc', '__mul__', 0)

View file

@ -468,6 +468,18 @@ class BaseTest(unittest.TestCase):
array.array(self.typecode)
)
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing
# (Assumes list conversion works correctly, too)
a = array.array(self.typecode, self.example)
indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
for start in indices:
for stop in indices:
# Everything except the initial 0 (invalid step)
for step in indices[1:]:
self.assertEqual(list(a[start:stop:step]),
list(a)[start:stop:step])
def test_setslice(self):
a = array.array(self.typecode, self.example)
a[:1] = a
@ -551,12 +563,34 @@ class BaseTest(unittest.TestCase):
a = array.array(self.typecode, self.example)
self.assertRaises(TypeError, a.__setslice__, 0, 0, None)
self.assertRaises(TypeError, a.__setitem__, slice(0, 0), None)
self.assertRaises(TypeError, a.__setitem__, slice(0, 1), None)
b = array.array(self.badtypecode())
self.assertRaises(TypeError, a.__setslice__, 0, 0, b)
self.assertRaises(TypeError, a.__setitem__, slice(0, 0), b)
self.assertRaises(TypeError, a.__setitem__, slice(0, 1), b)
def test_extended_set_del_slice(self):
indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
for start in indices:
for stop in indices:
# Everything except the initial 0 (invalid step)
for step in indices[1:]:
a = array.array(self.typecode, self.example)
L = list(a)
# Make sure we have a slice of exactly the right length,
# but with (hopefully) different data.
data = L[start:stop:step]
data.reverse()
L[start:stop:step] = data
a[start:stop:step] = array.array(self.typecode, data)
self.assertEquals(a, array.array(self.typecode, L))
del L[start:stop:step]
del a[start:stop:step]
self.assertEquals(a, array.array(self.typecode, L))
def test_index(self):
example = 2*self.example
a = array.array(self.typecode, example)

View file

@ -37,6 +37,18 @@ class BufferTests(unittest.TestCase):
self.failIf(a == b)
self.assertRaises(TypeError, lambda: a < b)
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing.
s = bytes(range(255, -1, -1))
b = buffer(s)
indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
for start in indices:
for stop in indices:
# Skip step 0 (invalid)
for step in indices[1:]:
self.assertEqual(b[start:stop:step],
s[start:stop:step])
def test_main():
test_support.run_unittest(BufferTests)

View file

@ -1,6 +1,9 @@
"Test the functionality of Python classes implementing operators."
from test.test_support import TestFailed
import unittest
import sys
from test import test_support
testmeths = [
@ -53,340 +56,540 @@ testmeths = [
# "str",
# "repr",
# "int",
# "long",
# "float",
# "oct",
# "hex",
# These are separate because they can influence the test of other methods.
# "getattr",
# "setattr",
# "delattr",
class AllTests:
def __hash__(self, *args):
print("__hash__:", args)
return hash(id(self))
callLst = []
def trackCall(f):
def track(*args, **kwargs):
callLst.append((f.__name__, args))
return f(*args, **kwargs)
return track
def __str__(self, *args):
print("__str__:", args)
return "AllTests"
statictests = """
@trackCall
def __hash__(self, *args):
return hash(id(self))
def __repr__(self, *args):
print("__repr__:", args)
return "AllTests"
@trackCall
def __str__(self, *args):
return "AllTests"
def __int__(self, *args):
print("__int__:", args)
return 1
@trackCall
def __repr__(self, *args):
return "AllTests"
def __index__(self, *args):
print("__index__:", args)
return 1
@trackCall
def __int__(self, *args):
return 1
def __float__(self, *args):
print("__float__:", args)
return 1.0
@trackCall
def __index__(self, *args):
return 1
def __cmp__(self, *args):
print("__cmp__:", args)
return 0
@trackCall
def __float__(self, *args):
return 1.0
def __eq__(self, *args):
print("__eq__:", args)
return True
@trackCall
def __cmp__(self, *args):
return 0
def __ne__(self, *args):
print("__ne__:", args)
return False
@trackCall
def __eq__(self, *args):
return True
def __lt__(self, *args):
print("__lt__:", args)
return False
@trackCall
def __ne__(self, *args):
return False
def __le__(self, *args):
print("__le__:", args)
return True
@trackCall
def __lt__(self, *args):
return False
def __gt__(self, *args):
print("__gt__:", args)
return False
@trackCall
def __le__(self, *args):
return True
def __ge__(self, *args):
print("__ge__:", args)
return True
@trackCall
def __gt__(self, *args):
return False
def __del__(self, *args):
print("__del__:", args)
@trackCall
def __ge__(self, *args):
return True
"""
# Synthesize AllTests methods from the names in testmeths.
# Synthesize all the other AllTests methods from the names in testmeths.
method_template = """\
def __%(method)s__(self, *args):
print("__%(method)s__:", args)
@trackCall
def __%s__(self, *args):
pass
"""
d = {}
exec(statictests, globals(), d)
for method in testmeths:
exec(method_template % locals(), d)
for k in d:
setattr(AllTests, k, d[k])
del d, k
del method, method_template
exec(method_template % method, globals(), d)
AllTests = type("AllTests", (object,), d)
del d, statictests, method, method_template
# this also tests __init__ of course.
testme = AllTests()
class ClassTests(unittest.TestCase):
def setUp(self):
callLst[:] = []
# Binary operations
def assertCallStack(self, expected_calls):
actualCallList = callLst[:] # need to copy because the comparison below will add
# additional calls to callLst
if expected_calls != actualCallList:
self.fail("Expected call list:\n %s\ndoes not match actual call list\n %s" %
(expected_calls, actualCallList))
testme + 1
1 + testme
def testInit(self):
foo = AllTests()
self.assertCallStack([("__init__", (foo,))])
testme - 1
1 - testme
def testBinaryOps(self):
testme = AllTests()
# Binary operations
testme * 1
1 * testme
callLst[:] = []
testme + 1
self.assertCallStack([("__add__", (testme, 1))])
testme / 1
1 / testme
callLst[:] = []
1 + testme
self.assertCallStack([("__radd__", (testme, 1))])
testme % 1
1 % testme
callLst[:] = []
testme - 1
self.assertCallStack([("__sub__", (testme, 1))])
divmod(testme,1)
divmod(1, testme)
callLst[:] = []
1 - testme
self.assertCallStack([("__rsub__", (testme, 1))])
testme ** 1
1 ** testme
callLst[:] = []
testme * 1
self.assertCallStack([("__mul__", (testme, 1))])
testme >> 1
1 >> testme
callLst[:] = []
1 * testme
self.assertCallStack([("__rmul__", (testme, 1))])
testme << 1
1 << testme
testme & 1
1 & testme
testme | 1
1 | testme
testme ^ 1
1 ^ testme
if 1/2 == 0:
callLst[:] = []
testme / 1
self.assertCallStack([("__div__", (testme, 1))])
# List/dict operations
callLst[:] = []
1 / testme
self.assertCallStack([("__rdiv__", (testme, 1))])
class Empty: pass
callLst[:] = []
testme % 1
self.assertCallStack([("__mod__", (testme, 1))])
try:
1 in Empty()
print('failed, should have raised TypeError')
except TypeError:
pass
1 in testme
testme[1]
testme[1] = 1
del testme[1]
testme[:42]
testme[:42] = "The Answer"
del testme[:42]
testme[2:1024:10]
testme[2:1024:10] = "A lot"
del testme[2:1024:10]
testme[:42, ..., :24:, 24, 100]
testme[:42, ..., :24:, 24, 100] = "Strange"
del testme[:42, ..., :24:, 24, 100]
callLst[:] = []
1 % testme
self.assertCallStack([("__rmod__", (testme, 1))])
# Now remove the slice hooks to see if converting normal slices to slice
# object works.
callLst[:] = []
divmod(testme,1)
self.assertCallStack([("__divmod__", (testme, 1))])
del AllTests.__getslice__
del AllTests.__setslice__
del AllTests.__delslice__
callLst[:] = []
divmod(1, testme)
self.assertCallStack([("__rdivmod__", (testme, 1))])
import sys
if sys.platform[:4] != 'java':
testme[:42]
testme[:42] = "The Answer"
del testme[:42]
else:
# This works under Jython, but the actual slice values are
# different.
print("__getitem__: (slice(0, 42, None),)")
print("__setitem__: (slice(0, 42, None), 'The Answer')")
print("__delitem__: (slice(0, 42, None),)")
callLst[:] = []
testme ** 1
self.assertCallStack([("__pow__", (testme, 1))])
# Unary operations
callLst[:] = []
1 ** testme
self.assertCallStack([("__rpow__", (testme, 1))])
-testme
+testme
abs(testme)
int(testme)
int(testme)
float(testme)
oct(testme)
callLst[:] = []
testme >> 1
self.assertCallStack([("__rshift__", (testme, 1))])
# And the rest...
callLst[:] = []
1 >> testme
self.assertCallStack([("__rrshift__", (testme, 1))])
hash(testme)
repr(testme)
str(testme)
callLst[:] = []
testme << 1
self.assertCallStack([("__lshift__", (testme, 1))])
testme == 1
testme < 1
testme > 1
testme != 1
1 == testme
1 < testme
1 > testme
1 != testme
callLst[:] = []
1 << testme
self.assertCallStack([("__rlshift__", (testme, 1))])
# This test has to be last (duh.)
callLst[:] = []
testme & 1
self.assertCallStack([("__and__", (testme, 1))])
del testme
if sys.platform[:4] == 'java':
import java
java.lang.System.gc()
callLst[:] = []
1 & testme
self.assertCallStack([("__rand__", (testme, 1))])
# Interfering tests
callLst[:] = []
testme | 1
self.assertCallStack([("__or__", (testme, 1))])
class ExtraTests:
def __getattr__(self, *args):
print("__getattr__:", args)
return "SomeVal"
callLst[:] = []
1 | testme
self.assertCallStack([("__ror__", (testme, 1))])
def __setattr__(self, *args):
print("__setattr__:", args)
callLst[:] = []
testme ^ 1
self.assertCallStack([("__xor__", (testme, 1))])
def __delattr__(self, *args):
print("__delattr__:", args)
callLst[:] = []
1 ^ testme
self.assertCallStack([("__rxor__", (testme, 1))])
testme = ExtraTests()
testme.spam
testme.eggs = "spam, spam, spam and ham"
del testme.cardinal
def testListAndDictOps(self):
testme = AllTests()
# List/dict operations
class Empty: pass
try:
1 in Empty()
self.fail('failed, should have raised TypeError')
except TypeError:
pass
callLst[:] = []
1 in testme
self.assertCallStack([('__contains__', (testme, 1))])
callLst[:] = []
testme[1]
self.assertCallStack([('__getitem__', (testme, 1))])
callLst[:] = []
testme[1] = 1
self.assertCallStack([('__setitem__', (testme, 1, 1))])
callLst[:] = []
del testme[1]
self.assertCallStack([('__delitem__', (testme, 1))])
callLst[:] = []
testme[:42]
self.assertCallStack([('__getslice__', (testme, 0, 42))])
callLst[:] = []
testme[:42] = "The Answer"
self.assertCallStack([('__setslice__', (testme, 0, 42, "The Answer"))])
callLst[:] = []
del testme[:42]
self.assertCallStack([('__delslice__', (testme, 0, 42))])
callLst[:] = []
testme[2:1024:10]
self.assertCallStack([('__getitem__', (testme, slice(2, 1024, 10)))])
callLst[:] = []
testme[2:1024:10] = "A lot"
self.assertCallStack([('__setitem__', (testme, slice(2, 1024, 10),
"A lot"))])
callLst[:] = []
del testme[2:1024:10]
self.assertCallStack([('__delitem__', (testme, slice(2, 1024, 10)))])
callLst[:] = []
testme[:42, ..., :24:, 24, 100]
self.assertCallStack([('__getitem__', (testme, (slice(None, 42, None),
Ellipsis,
slice(None, 24, None),
24, 100)))])
callLst[:] = []
testme[:42, ..., :24:, 24, 100] = "Strange"
self.assertCallStack([('__setitem__', (testme, (slice(None, 42, None),
Ellipsis,
slice(None, 24, None),
24, 100), "Strange"))])
callLst[:] = []
del testme[:42, ..., :24:, 24, 100]
self.assertCallStack([('__delitem__', (testme, (slice(None, 42, None),
Ellipsis,
slice(None, 24, None),
24, 100)))])
# Now remove the slice hooks to see if converting normal slices to
# slice object works.
getslice = AllTests.__getslice__
del AllTests.__getslice__
setslice = AllTests.__setslice__
del AllTests.__setslice__
delslice = AllTests.__delslice__
del AllTests.__delslice__
# XXX when using new-style classes the slice testme[:42] produces
# slice(None, 42, None) instead of slice(0, 42, None). py3k will have
# to change this test.
callLst[:] = []
testme[0:42]
self.assertCallStack([('__getitem__', (testme, slice(0, 42, None)))])
callLst[:] = []
testme[:42] = "The Answer"
self.assertCallStack([('__setitem__', (testme, slice(None, 42, None),
"The Answer"))])
callLst[:] = []
del testme[0:42]
self.assertCallStack([('__delitem__', (testme, slice(0, 42, None)))])
# Restore the slice methods, or the tests will fail with regrtest -R.
AllTests.__getslice__ = getslice
AllTests.__setslice__ = setslice
AllTests.__delslice__ = delslice
# return values of some method are type-checked
class BadTypeClass:
def __int__(self):
return None
__float__ = __int__
__str__ = __int__
__repr__ = __int__
def testUnaryOps(self):
testme = AllTests()
def check_exc(stmt, exception):
"""Raise TestFailed if executing 'stmt' does not raise 'exception'
"""
try:
exec(stmt)
except exception:
pass
else:
raise TestFailed, "%s should raise %s" % (stmt, exception)
check_exc("int(BadTypeClass())", TypeError)
check_exc("float(BadTypeClass())", TypeError)
check_exc("str(BadTypeClass())", TypeError)
check_exc("repr(BadTypeClass())", TypeError)
check_exc("oct(BadTypeClass())", TypeError)
check_exc("hex(BadTypeClass())", TypeError)
# Test correct errors from hash() on objects with comparisons but no __hash__
class C0:
pass
hash(C0()) # This should work; the next two should raise TypeError
class C1:
def __cmp__(self, other): return 0
check_exc("hash(C1())", TypeError)
class C2:
def __eq__(self, other): return 1
check_exc("hash(C2())", TypeError)
# Test for SF bug 532646
class A:
pass
A.__call__ = A()
a = A()
try:
a() # This should not segfault
except RuntimeError:
pass
else:
raise TestFailed, "how could this not have overflowed the stack?"
callLst[:] = []
-testme
self.assertCallStack([('__neg__', (testme,))])
callLst[:] = []
+testme
self.assertCallStack([('__pos__', (testme,))])
callLst[:] = []
abs(testme)
self.assertCallStack([('__abs__', (testme,))])
callLst[:] = []
int(testme)
self.assertCallStack([('__int__', (testme,))])
callLst[:] = []
float(testme)
self.assertCallStack([('__float__', (testme,))])
callLst[:] = []
oct(testme)
self.assertCallStack([('__index__', (testme,))])
callLst[:] = []
hex(testme)
self.assertCallStack([('__index__', (testme,))])
# Tests for exceptions raised in instance_getattr2().
def testMisc(self):
testme = AllTests()
def booh(self):
raise AttributeError, "booh"
callLst[:] = []
hash(testme)
self.assertCallStack([('__hash__', (testme,))])
class A:
a = property(booh)
try:
A().a # Raised AttributeError: A instance has no attribute 'a'
except AttributeError as x:
if str(x) != "booh":
print("attribute error for A().a got masked:", str(x))
callLst[:] = []
repr(testme)
self.assertCallStack([('__repr__', (testme,))])
class E:
__eq__ = property(booh)
E() == E() # In debug mode, caused a C-level assert() to fail
callLst[:] = []
str(testme)
self.assertCallStack([('__str__', (testme,))])
class I:
__init__ = property(booh)
try:
I() # In debug mode, printed XXX undetected error and raises AttributeError
except AttributeError as x:
pass
else:
print("attribute error for I.__init__ got masked")
callLst[:] = []
testme == 1
self.assertCallStack([('__eq__', (testme, 1))])
callLst[:] = []
testme < 1
self.assertCallStack([('__lt__', (testme, 1))])
callLst[:] = []
testme > 1
self.assertCallStack([('__gt__', (testme, 1))])
callLst[:] = []
testme != 1
self.assertCallStack([('__ne__', (testme, 1))])
callLst[:] = []
1 == testme
self.assertCallStack([('__eq__', (1, testme))])
callLst[:] = []
1 < testme
self.assertCallStack([('__gt__', (1, testme))])
callLst[:] = []
1 > testme
self.assertCallStack([('__lt__', (1, testme))])
callLst[:] = []
1 != testme
self.assertCallStack([('__ne__', (1, testme))])
# Test comparison and hash of methods
class A:
def __init__(self, x):
self.x = x
def f(self):
pass
def g(self):
pass
def __eq__(self, other):
return self.x == other.x
def __hash__(self):
return self.x
class B(A):
pass
def testGetSetAndDel(self):
# Interfering tests
class ExtraTests(AllTests):
@trackCall
def __getattr__(self, *args):
return "SomeVal"
a1 = A(1)
a2 = A(2)
assert a1.f == a1.f
assert a1.f != a2.f
assert a1.f != a1.g
assert a1.f == A(1).f
assert hash(a1.f) == hash(a1.f)
assert hash(a1.f) == hash(A(1).f)
@trackCall
def __setattr__(self, *args):
pass
assert A.f != a1.f
assert A.f != A.g
assert B.f == A.f
assert hash(B.f) == hash(A.f)
@trackCall
def __delattr__(self, *args):
pass
# the following triggers a SystemError in 2.4
a = A(hash(A.f.im_func)^(-1))
hash(a.f)
testme = ExtraTests()
callLst[:] = []
testme.spam
self.assertCallStack([('__getattr__', (testme, "spam"))])
callLst[:] = []
testme.eggs = "spam, spam, spam and ham"
self.assertCallStack([('__setattr__', (testme, "eggs",
"spam, spam, spam and ham"))])
callLst[:] = []
del testme.cardinal
self.assertCallStack([('__delattr__', (testme, "cardinal"))])
def testDel(self):
x = []
class DelTest:
def __del__(self):
x.append("crab people, crab people")
testme = DelTest()
del testme
import gc
gc.collect()
self.assertEquals(["crab people, crab people"], x)
def testBadTypeReturned(self):
# return values of some method are type-checked
class BadTypeClass:
def __int__(self):
return None
__float__ = __int__
__str__ = __int__
__repr__ = __int__
__oct__ = __int__
__hex__ = __int__
for f in [int, float, str, repr, oct, hex]:
self.assertRaises(TypeError, f, BadTypeClass())
def testHashStuff(self):
# Test correct errors from hash() on objects with comparisons but
# no __hash__
class C0:
pass
hash(C0()) # This should work; the next two should raise TypeError
class C1:
def __cmp__(self, other): return 0
self.assertRaises(TypeError, hash, C1())
class C2:
def __eq__(self, other): return 1
self.assertRaises(TypeError, hash, C2())
def testSFBug532646(self):
# Test for SF bug 532646
class A:
pass
A.__call__ = A()
a = A()
try:
a() # This should not segfault
except RuntimeError:
pass
else:
self.fail("Failed to raise RuntimeError")
def testForExceptionsRaisedInInstanceGetattr2(self):
# Tests for exceptions raised in instance_getattr2().
def booh(self):
raise AttributeError("booh")
class A:
a = property(booh)
try:
A().a # Raised AttributeError: A instance has no attribute 'a'
except AttributeError as x:
if str(x) != "booh":
self.fail("attribute error for A().a got masked: %s" % x)
class E:
__eq__ = property(booh)
E() == E() # In debug mode, caused a C-level assert() to fail
class I:
__init__ = property(booh)
try:
# In debug mode, printed XXX undetected error and
# raises AttributeError
I()
except AttributeError as x:
pass
else:
self.fail("attribute error for I.__init__ got masked")
def testHashComparisonOfMethods(self):
# Test comparison and hash of methods
class A:
def __init__(self, x):
self.x = x
def f(self):
pass
def g(self):
pass
def __eq__(self, other):
return self.x == other.x
def __hash__(self):
return self.x
class B(A):
pass
a1 = A(1)
a2 = A(2)
self.assertEquals(a1.f, a1.f)
self.assertNotEquals(a1.f, a2.f)
self.assertNotEquals(a1.f, a1.g)
self.assertEquals(a1.f, A(1).f)
self.assertEquals(hash(a1.f), hash(a1.f))
self.assertEquals(hash(a1.f), hash(A(1).f))
self.assertNotEquals(A.f, a1.f)
self.assertNotEquals(A.f, A.g)
self.assertEquals(B.f, A.f)
self.assertEquals(hash(B.f), hash(A.f))
# the following triggers a SystemError in 2.4
a = A(hash(A.f.im_func)^(-1))
hash(a.f)
def test_main():
test_support.run_unittest(ClassTests)
if __name__=='__main__':
test_main()

View file

@ -3,18 +3,25 @@ import test.test_support, unittest
import sys
import subprocess
def _spawn_python(*args):
cmd_line = [sys.executable]
cmd_line.extend(args)
return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
def _kill_python(p):
p.stdin.close()
data = p.stdout.read()
p.stdout.close()
# try to cleanup the child so we don't appear to leak when running
# with regrtest -R. This should be a no-op on Windows.
subprocess._cleanup()
return data
class CmdLineTest(unittest.TestCase):
def start_python(self, cmd_line):
cmd = '"%s" %s' % (sys.executable, cmd_line)
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p.stdin.close()
data = p.stdout.read()
p.stdout.close()
# try to cleanup the child so we don't appear to leak when running
# with regrtest -R. This should be a no-op on Windows.
subprocess._cleanup()
return data
def start_python(self, *args):
p = _spawn_python(*args)
return _kill_python(p)
def exit_code(self, *args):
cmd_line = [sys.executable]
@ -72,6 +79,17 @@ class CmdLineTest(unittest.TestCase):
self.exit_code('-m', 'timeit', '-n', '1'),
0)
def test_run_module_bug1764407(self):
# -m and -i need to play well together
# Runs the timeit module and checks the __main__
# namespace has been populated appropriately
p = _spawn_python('-i', '-m', 'timeit', '-n', '1')
p.stdin.write('Timer\n')
p.stdin.write('exit()\n')
data = _kill_python(p)
self.assertTrue(data.find(b'1 loop') != -1)
self.assertTrue(data.find(b'__main__.Timer') != -1)
def test_run_code(self):
# Test expected operation of the '-c' switch
# Switch needs an argument

View file

@ -1,27 +1,40 @@
# Test the frozen module defined in frozen.c.
from __future__ import with_statement
from test.test_support import TestFailed
from test.test_support import captured_stdout, run_unittest
import unittest
import sys, os
try:
import __hello__
except ImportError as x:
raise TestFailed, "import __hello__ failed:" + str(x)
class FrozenTests(unittest.TestCase):
def test_frozen(self):
try:
import __phello__
except ImportError as x:
raise TestFailed, "import __phello__ failed:" + str(x)
with captured_stdout() as stdout:
try:
import __hello__
except ImportError as x:
self.fail("import __hello__ failed:" + str(x))
try:
import __phello__.spam
except ImportError as x:
raise TestFailed, "import __phello__.spam failed:" + str(x)
try:
import __phello__
except ImportError as x:
self.fail("import __phello__ failed:" + str(x))
if sys.platform != "mac": # On the Mac this import does succeed.
try:
import __phello__.foo
except ImportError:
pass
else:
raise TestFailed, "import __phello__.foo should have failed"
try:
import __phello__.spam
except ImportError as x:
self.fail("import __phello__.spam failed:" + str(x))
if sys.platform != "mac": # On the Mac this import does succeed.
try:
import __phello__.foo
except ImportError:
pass
else:
self.fail("import __phello__.foo should have failed")
self.assertEquals(stdout.getvalue(),
'Hello world...\nHello world...\nHello world...\n')
def test_main():
run_unittest(FrozenTests)

View file

@ -305,6 +305,40 @@ class MmapTests(unittest.TestCase):
m[x] = b
self.assertEqual(m[x], b)
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing.
s = bytes(reversed(range(256)))
m = mmap.mmap(-1, len(s))
m[:] = s
self.assertEqual(m[:], s)
indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
for start in indices:
for stop in indices:
# Skip step 0 (invalid)
for step in indices[1:]:
self.assertEqual(m[start:stop:step],
s[start:stop:step])
def test_extended_set_del_slice(self):
# Test extended slicing by comparing with list slicing.
s = bytes(reversed(range(256)))
m = mmap.mmap(-1, len(s))
indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
for start in indices:
for stop in indices:
# Skip invalid step 0
for step in indices[1:]:
m[:] = s
self.assertEqual(m[:], s)
L = list(s)
# Make sure we have a slice of exactly the right length,
# but with different data.
data = L[start:stop:step]
data = bytes(reversed(data))
L[start:stop:step] = data
m[start:stop:step] = data
self.assertEquals(m[:], bytes(L))
def test_main():
run_unittest(MmapTests)

View file

@ -77,8 +77,7 @@ class OSSAudioDevTests(unittest.TestCase):
# set parameters based on .au file headers
dsp.setparameters(AFMT_S16_NE, nchannels, rate)
print ("playing test sound file (expected running time: %.2f sec)"
% expected_time)
self.assertEquals("%.2f" % expected_time, "2.93")
t1 = time.time()
dsp.write(data)
dsp.close()
@ -121,7 +120,6 @@ class OSSAudioDevTests(unittest.TestCase):
"setparameters%r: returned %r" % (config, result))
def set_bad_parameters(self, dsp):
# Now try some configurations that are presumably bogus: eg. 300
# channels currently exceeds even Hollywood's ambitions, and
# negative sampling rate is utter nonsense. setparameters() should
@ -166,7 +164,7 @@ class OSSAudioDevTests(unittest.TestCase):
def test_main():
try:
dsp = ossaudiodev.open('w')
except IOError as msg:
except (ossaudiodev.error, IOError) as msg:
if msg.args[0] in (errno.EACCES, errno.ENOENT,
errno.ENODEV, errno.EBUSY):
raise TestSkipped(msg)

View file

@ -155,7 +155,8 @@ class Test(unittest.TestCase):
("t4 sub.py", "raise RuntimeError('Shouldnt load sub.py')"),
("t4 sub", None),
("t4 sub __init__.py", ""),
("t4 sub subsub.py", "raise RuntimeError('Shouldnt load subsub.py')"),
("t4 sub subsub.py",
"raise RuntimeError('Shouldnt load subsub.py')"),
("t4 sub subsub", None),
("t4 sub subsub __init__.py", "spam = 1"),
]
@ -196,7 +197,8 @@ class Test(unittest.TestCase):
def test_6(self):
hier = [
("t6", None),
("t6 __init__.py", "__all__ = ['spam', 'ham', 'eggs']"),
("t6 __init__.py",
"__all__ = ['spam', 'ham', 'eggs']"),
("t6 spam.py", ""),
("t6 ham.py", ""),
("t6 eggs.py", ""),
@ -223,10 +225,11 @@ class Test(unittest.TestCase):
("t7.py", ""),
("t7", None),
("t7 __init__.py", ""),
("t7 sub.py", "raise RuntimeError('Shouldnt load sub.py')"),
("t7 sub.py",
"raise RuntimeError('Shouldnt load sub.py')"),
("t7 sub", None),
("t7 sub __init__.py", ""),
("t7 sub subsub.py",
("t7 sub .py",
"raise RuntimeError('Shouldnt load subsub.py')"),
("t7 sub subsub", None),
("t7 sub subsub __init__.py",

View file

@ -5,7 +5,7 @@ import os.path
import sys
import tempfile
from test.test_support import verbose, run_unittest, forget
from runpy import _run_module_code, run_module
from runpy import _run_code, _run_module_code, _run_module_as_main, run_module
# Set up the test code and expected results
@ -29,6 +29,16 @@ class RunModuleCodeTest(unittest.TestCase):
"nested = runpy._run_module_code('x=1\\n', mod_name='<run>')\n"
)
def test_run_code(self):
saved_argv0 = sys.argv[0]
d = _run_code(self.test_source, {})
self.failUnless(d["result"] == self.expected_result)
self.failUnless(d["__name__"] is None)
self.failUnless(d["__file__"] is None)
self.failUnless(d["__loader__"] is None)
self.failUnless(d["run_argv0"] is saved_argv0)
self.failUnless("run_name" not in d)
self.failUnless(sys.argv[0] is saved_argv0)
def test_run_module_code(self):
initial = object()
@ -37,44 +47,24 @@ class RunModuleCodeTest(unittest.TestCase):
loader = "Now you're just being silly"
d1 = dict(initial=initial)
saved_argv0 = sys.argv[0]
try:
d2 = _run_module_code(self.test_source,
d1,
name,
file,
loader,
alter_sys=True)
self.failUnless("result" not in d1)
self.failUnless(d2["initial"] is initial)
self.assertEqual(d2["result"], self.expected_result)
self.assertEqual(d2["nested"]["x"], 1)
self.assertEqual(d2["nested"]["__name__"], "<run>")
self.failUnless(d2["__name__"] is name)
self.failUnless(d2["__file__"] is file)
self.failUnless(d2["__loader__"] is loader)
self.failUnless(d2["run_argv0"] is file)
self.failUnless(d2["run_name_in_sys_modules"])
self.failUnless(d2["module_in_sys_modules"])
self.failUnless(sys.argv[0] is not saved_argv0)
self.failUnless(name in sys.modules)
finally:
sys.argv[0] = saved_argv0
if name in sys.modules:
del sys.modules[name]
def test_run_module_code_defaults(self):
saved_argv0 = sys.argv[0]
d = _run_module_code(self.test_source)
self.assertEqual(d["result"], self.expected_result)
self.failUnless(d["nested"]["x"] == 1)
self.failUnless(d["nested"]["__name__"] == "<run>")
self.failUnless(d["__name__"] is None)
self.failUnless(d["__file__"] is None)
self.failUnless(d["__loader__"] is None)
self.failUnless(d["run_argv0"] is saved_argv0)
self.failUnless(not d["run_name_in_sys_modules"])
d2 = _run_module_code(self.test_source,
d1,
name,
file,
loader)
self.failUnless("result" not in d1)
self.failUnless(d2["initial"] is initial)
self.assertEqual(d2["result"], self.expected_result)
self.assertEqual(d2["nested"]["x"], 1)
self.failUnless(d2["__name__"] is name)
self.failUnless(d2["run_name_in_sys_modules"])
self.failUnless(d2["module_in_sys_modules"])
self.failUnless(d2["__file__"] is file)
self.failUnless(d2["run_argv0"] is file)
self.failUnless(d2["__loader__"] is loader)
self.failUnless(sys.argv[0] is saved_argv0)
self.failUnless(None not in sys.modules)
self.failUnless(name not in sys.modules)
class RunModuleTest(unittest.TestCase):

View file

@ -1,167 +1,178 @@
# Test the signal module
from test.test_support import verbose, TestSkipped, TestFailed, vereq
import unittest
from test import test_support
import signal
import os, sys, time
if sys.platform[:3] in ('win', 'os2'):
raise TestSkipped, "Can't test signal on %s" % sys.platform
MAX_DURATION = 20 # Entire test should last at most 20 sec.
if verbose:
x = '-x'
else:
x = '+x'
pid = os.getpid()
if verbose:
print("test runner's pid is", pid)
# Shell script that will send us asynchronous signals
script = """
(
set %(x)s
sleep 2
kill -HUP %(pid)d
sleep 2
kill -USR1 %(pid)d
sleep 2
kill -USR2 %(pid)d
) &
""" % vars()
a_called = b_called = False
def handlerA(*args):
global a_called
a_called = True
if verbose:
print("handlerA invoked", args)
class HandlerBCalled(Exception):
pass
def handlerB(*args):
global b_called
b_called = True
if verbose:
print("handlerB invoked", args)
raise HandlerBCalled, args
class InterProcessSignalTests(unittest.TestCase):
MAX_DURATION = 20 # Entire test should last at most 20 sec.
# Set up a child to send signals to us (the parent) after waiting long
# enough to receive the alarm. It seems we miss the alarm for some
# reason. This will hopefully stop the hangs on Tru64/Alpha.
# Alas, it doesn't. Tru64 appears to miss all the signals at times, or
# seemingly random subsets of them, and nothing done in force_test_exit
# so far has actually helped.
def force_test_exit():
# Sigh, both imports seem necessary to avoid errors.
import os
fork_pid = os.fork()
if fork_pid:
# In parent.
return fork_pid
# Set up a child to send signals to us (the parent) after waiting
# long enough to receive the alarm. It seems we miss the alarm
# for some reason. This will hopefully stop the hangs on
# Tru64/Alpha. Alas, it doesn't. Tru64 appears to miss all the
# signals at times, or seemingly random subsets of them, and
# nothing done in force_test_exit so far has actually helped.
def spawn_force_test_exit_process(self, parent_pid):
# Sigh, both imports seem necessary to avoid errors.
import os
fork_pid = os.fork()
if fork_pid:
# In parent.
return fork_pid
# In child.
import os, time
try:
# Wait 5 seconds longer than the expected alarm to give enough
# time for the normal sequence of events to occur. This is
# just a stop-gap to try to prevent the test from hanging.
time.sleep(MAX_DURATION + 5)
print(' child should not have to kill parent', file=sys.__stdout__)
for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
os.kill(pid, getattr(signal, signame))
print(" child sent", signame, "to", pid, file=sys.__stdout__)
time.sleep(1)
finally:
os._exit(0)
# In child.
import os, time
try:
# Wait 5 seconds longer than the expected alarm to give enough
# time for the normal sequence of events to occur. This is
# just a stop-gap to try to prevent the test from hanging.
time.sleep(self.MAX_DURATION + 5)
print(" child should not have to kill parent",
file=sys.__stdout__)
for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
os.kill(parent_pid, getattr(signal, signame))
print(" child sent", signame, "to",
parent_pid, file=sys.__stdout__)
time.sleep(1)
finally:
os._exit(0)
# Install handlers.
hup = signal.signal(signal.SIGHUP, handlerA)
usr1 = signal.signal(signal.SIGUSR1, handlerB)
usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
alrm = signal.signal(signal.SIGALRM, signal.default_int_handler)
def handlerA(self, *args):
self.a_called = True
if test_support.verbose:
print("handlerA invoked", args)
try:
def handlerB(self, *args):
self.b_called = True
if test_support.verbose:
print("handlerB invoked", args)
raise HandlerBCalled(*args)
signal.alarm(MAX_DURATION)
vereq(signal.getsignal(signal.SIGHUP), handlerA)
vereq(signal.getsignal(signal.SIGUSR1), handlerB)
vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler)
def test_main(self):
self.assertEquals(signal.getsignal(signal.SIGHUP), self.handlerA)
self.assertEquals(signal.getsignal(signal.SIGUSR1), self.handlerB)
self.assertEquals(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
self.assertEquals(signal.getsignal(signal.SIGALRM),
signal.default_int_handler)
# Try to ensure this test exits even if there is some problem with alarm.
# Tru64/Alpha often hangs and is ultimately killed by the buildbot.
fork_pid = force_test_exit()
# Launch an external script to send us signals.
# We expect the external script to:
# send HUP, which invokes handlerA to set a_called
# send USR1, which invokes handlerB to set b_called and raise
# HandlerBCalled
# send USR2, which is ignored
#
# Then we expect the alarm to go off, and its handler raises
# KeyboardInterrupt, finally getting us out of the loop.
try:
signal.getsignal(4242)
raise TestFailed('expected ValueError for invalid signal # to '
'getsignal()')
except ValueError:
pass
if test_support.verbose:
verboseflag = '-x'
else:
verboseflag = '+x'
try:
signal.signal(4242, handlerB)
raise TestFailed('expected ValueError for invalid signal # to '
'signal()')
except ValueError:
pass
pid = self.pid
if test_support.verbose:
print("test runner's pid is", pid)
try:
signal.signal(signal.SIGUSR1, None)
raise TestFailed('expected TypeError for non-callable')
except TypeError:
pass
# Shell script that will send us asynchronous signals
script = """
(
set %(verboseflag)s
sleep 2
kill -HUP %(pid)d
sleep 2
kill -USR1 %(pid)d
sleep 2
kill -USR2 %(pid)d
) &
""" % vars()
# Launch an external script to send us signals.
# We expect the external script to:
# send HUP, which invokes handlerA to set a_called
# send USR1, which invokes handlerB to set b_called and raise
# HandlerBCalled
# send USR2, which is ignored
#
# Then we expect the alarm to go off, and its handler raises
# KeyboardInterrupt, finally getting us out of the loop.
os.system(script)
try:
print("starting pause() loop...")
while 1:
try:
if verbose:
print("call pause()...")
signal.pause()
if verbose:
print("pause() returned")
except HandlerBCalled:
if verbose:
print("HandlerBCalled exception caught")
signal.alarm(self.MAX_DURATION)
except KeyboardInterrupt:
if verbose:
print("KeyboardInterrupt (the alarm() went off)")
handler_b_exception_raised = False
if not a_called:
print('HandlerA not called')
os.system(script)
try:
if test_support.verbose:
print("starting pause() loop...")
while 1:
try:
if test_support.verbose:
print("call pause()...")
signal.pause()
if test_support.verbose:
print("pause() returned")
except HandlerBCalled:
handler_b_exception_raised = True
if test_support.verbose:
print("HandlerBCalled exception caught")
if not b_called:
print('HandlerB not called')
except KeyboardInterrupt:
if test_support.verbose:
print("KeyboardInterrupt (the alarm() went off)")
finally:
# Forcibly kill the child we created to ping us if there was a test error.
try:
# Make sure we don't kill ourself if there was a fork error.
if fork_pid > 0:
os.kill(fork_pid, signal.SIGKILL)
except:
# If the child killed us, it has probably exited. Killing a
# non-existent process will raise an error which we don't care about.
pass
self.assert_(self.a_called)
self.assert_(self.b_called)
self.assert_(handler_b_exception_raised)
# Restore handlers.
signal.alarm(0) # cancel alarm in case we died early
signal.signal(signal.SIGHUP, hup)
signal.signal(signal.SIGUSR1, usr1)
signal.signal(signal.SIGUSR2, usr2)
signal.signal(signal.SIGALRM, alrm)
def setUp(self):
# Install handlers.
self.hup = signal.signal(signal.SIGHUP, self.handlerA)
self.usr1 = signal.signal(signal.SIGUSR1, self.handlerB)
self.usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
self.alrm = signal.signal(signal.SIGALRM,
signal.default_int_handler)
self.a_called = False
self.b_called = False
self.pid = os.getpid()
self.fork_pid = self.spawn_force_test_exit_process(self.pid)
def tearDown(self):
# Forcibly kill the child we created to ping us if there was a
# test error.
try:
# Make sure we don't kill ourself if there was a fork
# error.
if self.fork_pid > 0:
os.kill(self.fork_pid, signal.SIGKILL)
except:
# If the child killed us, it has probably exited. Killing
# a non-existent process will raise an error which we
# don't care about.
pass
# Restore handlers.
signal.alarm(0) # cancel alarm in case we died early
signal.signal(signal.SIGHUP, self.hup)
signal.signal(signal.SIGUSR1, self.usr1)
signal.signal(signal.SIGUSR2, self.usr2)
signal.signal(signal.SIGALRM, self.alrm)
class BasicSignalTests(unittest.TestCase):
def test_out_of_range_signal_number_raises_error(self):
self.assertRaises(ValueError, signal.getsignal, 4242)
def trivial_signal_handler(*args):
pass
self.assertRaises(ValueError, signal.signal, 4242,
trivial_signal_handler)
def test_setting_signal_handler_to_none_raises_error(self):
self.assertRaises(TypeError, signal.signal,
signal.SIGUSR1, None)
def test_main():
if sys.platform[:3] in ('win', 'os2'):
raise test_support.TestSkipped("Can't test signal on %s" % \
sys.platform)
test_support.run_unittest(BasicSignalTests, InterProcessSignalTests)
if __name__ == "__main__":
test_main()

372
Lib/test/test_ssl.py Normal file
View file

@ -0,0 +1,372 @@
# Test the support for SSL and sockets
import sys
import unittest
from test import test_support
import socket
import errno
import threading
import subprocess
import time
import os
import pprint
import urllib
import shutil
import traceback
# Optionally test SSL support, if we have it in the tested platform
skip_expected = False
try:
import ssl
except ImportError:
skip_expected = True
CERTFILE = None
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
sys.stdout.write(prefix + exc_format)
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# Then it rudely closes the socket, and sets Event `listener_gone`
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def testSSLconnect(self):
import os
with test_support.transient_internet():
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("pop.gmail.com", 995))
c = s.getpeercert()
if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("pop.gmail.com", 995))
except ssl.sslerror:
pass
finally:
s.close()
class ConnectedTests(unittest.TestCase):
def testTLSecho (self):
s1 = socket.socket()
try:
s1.connect(('127.0.0.1', 10024))
except:
handle_error("connection failure:\n")
raise test_support.TestFailed("Can't connect to test server")
else:
try:
c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
except:
handle_error("SSL handshake failure:\n")
raise test_support.TestFailed("Can't SSL-handshake with test server")
else:
if not c1:
raise test_support.TestFailed("Can't SSL-handshake with test server")
indata = "FOO\n"
c1.write(indata)
outdata = c1.read()
if outdata != indata.lower():
raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower()))
c1.close()
def testReadCert(self):
s2 = socket.socket()
try:
s2.connect(('127.0.0.1', 10024))
except:
handle_error("connection failure:\n")
raise test_support.TestFailed("Can't connect to test server")
else:
try:
c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
except:
handle_error("SSL handshake failure:\n")
raise test_support.TestFailed("Can't SSL-handshake with test server")
else:
if not c2:
raise test_support.TestFailed("Can't SSL-handshake with test server")
cert = c2.getpeercert()
if not cert:
raise test_support.TestFailed("Can't get peer certificate.")
if not cert.has_key('subject'):
raise test_support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if not (cert['subject'].has_key('organizationName')):
raise test_support.TestFailed(
"No 'organizationName' field in certificate subject: %s." %
pprint.pformat(cert))
if (cert['subject']['organizationName'] !=
"Python Software Foundation"):
raise test_support.TestFailed(
"Invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.");
c2.close()
class ThreadedEchoServer(threading.Thread):
class ConnectionHandler(threading.Thread):
def __init__(self, server, connsock):
self.server = server
self.running = False
self.sock = connsock
threading.Thread.__init__(self)
self.setDaemon(True)
def run (self):
self.running = True
try:
sslconn = ssl.sslsocket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
cert_reqs=self.server.certreqs)
except:
# here, we want to stop the server, because this shouldn't
# happen in the context of our test case
handle_error("Test server failure:\n")
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
return
while self.running:
try:
msg = sslconn.read()
if not msg:
# eof, so quit this handler
self.running = False
sslconn.close()
elif msg.strip() == 'over':
sslconn.close()
self.server.stop()
self.running = False
else:
if test_support.verbose:
sys.stdout.write("\nserver: %s\n" % msg.strip().lower())
sslconn.write(msg.lower())
except ssl.sslerror:
handle_error("Test server failure:\n")
sslconn.close()
self.running = False
# normally, we'd just stop here, but for the test
# harness, we want to stop the server
self.server.stop()
except:
handle_error('')
def __init__(self, port, certificate, ssl_version=None,
certreqs=None, cacerts=None):
if ssl_version is None:
ssl_version = ssl.PROTOCOL_TLSv1
if certreqs is None:
certreqs = ssl.CERT_NONE
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.sock = socket.socket()
self.flag = None
if hasattr(socket, 'SO_REUSEADDR'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
def start (self, flag=None):
self.flag = flag
threading.Thread.start(self)
def run (self):
self.sock.settimeout(0.5)
self.sock.listen(5)
self.active = True
if self.flag:
# signal an event
self.flag.set()
while self.active:
try:
newconn, connaddr = self.sock.accept()
if test_support.verbose:
sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n')
handler = self.ConnectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
self.stop()
except:
handle_error("Test server failure:\n")
def stop (self):
self.active = False
self.sock.close()
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
[ req ]
default_bits = 1024
encrypt_key = yes
distinguished_name = req_dn
x509_extensions = cert_type
[ req_dn ]
countryName = Country Name (2 letter code)
countryName_default = US
countryName_min = 2
countryName_max = 2
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = %(state)s
localityName = Locality Name (eg, city)
localityName_default = %(city)s
0.organizationName = Organization Name (eg, company)
0.organizationName_default = %(organization)s
organizationalUnitName = Organizational Unit Name (eg, section)
organizationalUnitName_default = %(unit)s
0.commonName = Common Name (FQDN of your server)
0.commonName_default = %(common-name)s
# To create a certificate for more than one name uncomment:
# 1.commonName = DNS alias of your server
# 2.commonName = DNS alias of your server
# ...
# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
# to see how Netscape understands commonName.
[ cert_type ]
nsCertType = server
"""
def create_cert_files(hostname=None):
"""This is the routine that was run to create the certificate
and private key contained in keycert.pem."""
import tempfile, socket, os
d = tempfile.mkdtemp()
# now create a configuration file for the CA signing cert
fqdn = hostname or socket.getfqdn()
crtfile = os.path.join(d, "cert.pem")
conffile = os.path.join(d, "ca.conf")
fp = open(conffile, "w")
fp.write(CERTFILE_CONFIG_TEMPLATE %
{'state': "Delaware",
'city': "Wilmington",
'organization': "Python Software Foundation",
'unit': "SSL",
'common-name': fqdn,
})
fp.close()
error = os.system(
"openssl req -batch -new -x509 -days 2000 -nodes -config %s "
"-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
(conffile, crtfile, crtfile))
# now we have a self-signed server cert in crtfile
os.unlink(conffile)
if (os.WEXITSTATUS(error) or
not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0):
if test_support.verbose:
sys.stdout.write("Unable to create certificate for test, "
+ "error status %d\n" % (error >> 8))
crtfile = None
elif test_support.verbose:
sys.stdout.write(open(crtfile, 'r').read() + '\n')
return d, crtfile
def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("socket module has no ssl support")
global CERTFILE
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
if not CERTFILE:
sys.__stdout__.write("Skipping test_ssl ConnectedTests; "
"couldn't create a certificate.\n")
tests = [BasicTests]
server = None
if CERTFILE and test_support.is_resource_enabled('network'):
server = ThreadedEchoServer(10024, CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
flag.wait()
tests.append(ConnectedTests)
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
finally:
if server is not None and server.active:
server.stop()
# wait for it to stop
server.join()
test_support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()

View file

@ -5,7 +5,7 @@ from _testcapi import test_structmembersType, \
LONG_MAX, LONG_MIN, ULONG_MAX, \
LLONG_MAX, LLONG_MIN, ULLONG_MAX
import warnings, unittest
import warnings, unittest, sys
from test import test_support
ts=test_structmembersType(1,2,3,4,5,6,7,8,9.99999,10.1010101010)
@ -59,7 +59,7 @@ class ReadWriteTests(unittest.TestCase):
class TestWarnings(unittest.TestCase):
def has_warned(self, w):
self.assert_(w.category is RuntimeWarning)
self.assertEqual(w.category, RuntimeWarning)
def test_byte_max(self):
with test_support.catch_warning() as w:
@ -94,10 +94,13 @@ class TestWarnings(unittest.TestCase):
def test_main(verbose=None):
test_support.run_unittest(
ReadWriteTests,
TestWarnings
)
# Obscure hack so that this test passes after reloads or repeated calls
# to test_main (regrtest -R).
if '__warningregistry__' in globals():
del globals()['__warningregistry__']
if hasattr(sys, '__warningregistry__'):
del sys.__warningregistry__
test_support.run_unittest(__name__)
if __name__ == "__main__":
test_main(verbose=True)

View file

@ -97,6 +97,18 @@ class StructSeqTest(unittest.TestCase):
t = time.gmtime()
x = t.__reduce__()
def test_extended_getslice(self):
# Test extended slicing by comparing with list slicing.
t = time.gmtime()
L = list(t)
indices = (0, None, 1, 3, 19, 300, -1, -2, -31, -300)
for start in indices:
for stop in indices:
# Skip step 0 (invalid)
for step in indices[1:]:
self.assertEqual(list(t[start:stop:step]),
L[start:stop:step])
def test_main():
test_support.run_unittest(StructSeqTest)

View file

@ -362,6 +362,22 @@ def transient_internet():
return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
@contextlib.contextmanager
def captured_stdout():
"""Run the with statement body using a StringIO object as sys.stdout.
Example use::
with captured_stdout() as s:
print "hello"
assert s.getvalue() == "hello"
"""
import io
orig_stdout = sys.stdout
sys.stdout = io.StringIO()
yield sys.stdout
sys.stdout = orig_stdout
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.

View file

@ -140,11 +140,25 @@ class UstarReadTest(ReadTest):
class MiscReadTest(ReadTest):
def test_no_filename(self):
def test_no_name_argument(self):
fobj = open(self.tarname, "rb")
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name))
def test_no_name_attribute(self):
data = open(self.tarname, "rb").read()
fobj = io.BytesIO(data)
self.assertRaises(AttributeError, getattr, fobj, "name")
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, None)
def test_empty_name_attribute(self):
data = open(self.tarname, "rb").read()
fobj = io.BytesIO(data)
fobj.name = ""
tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, None)
def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
if self.mode == "r:":

View file

@ -3,6 +3,7 @@
# UserString instances should behave similar to builtin string objects.
import unittest
import string
from test import test_support, string_tests
from UserString import UserString, MutableString
@ -86,6 +87,28 @@ class MutableStringTest(UserStringTest):
del s[-1:10]
self.assertEqual(s, "fo")
def test_extended_set_del_slice(self):
indices = (0, None, 1, 3, 19, 100, -1, -2, -31, -100)
orig = string.ascii_letters + string.digits
for start in indices:
for stop in indices:
# Use indices[1:] when MutableString can handle real
# extended slices
for step in (None, 1, -1):
s = self.type2test(orig)
L = list(orig)
# Make sure we have a slice of exactly the right length,
# but with (hopefully) different data.
data = L[start:stop:step]
data.reverse()
L[start:stop:step] = data
s[start:stop:step] = "".join(data)
self.assertEquals(s, "".join(L))
del L[start:stop:step]
del s[start:stop:step]
self.assertEquals(s, "".join(L))
def test_immutable(self):
s = self.type2test("foobar")
s2 = s.immutable()

View file

@ -3,8 +3,9 @@
from _winreg import *
import os, sys
import unittest
from test.test_support import verify
from test import test_support
test_key_name = "SOFTWARE\\Python Registry Test Key - Delete Me"
@ -13,137 +14,152 @@ test_data = [
("String Val", "A string value", REG_SZ),
("StringExpand", "The path is %path%", REG_EXPAND_SZ),
("Multi-string", ["Lots", "of", "string", "values"], REG_MULTI_SZ),
("Raw Data", bytes("binary"+chr(0)+"data"), REG_BINARY),
("Raw Data", b"binary\x00data", REG_BINARY),
("Big String", "x"*(2**14-1), REG_SZ),
("Big Binary", b"x"*(2**14), REG_BINARY),
]
def WriteTestData(root_key):
# Set the default value for this key.
SetValue(root_key, test_key_name, REG_SZ, "Default value")
key = CreateKey(root_key, test_key_name)
# Create a sub-key
sub_key = CreateKey(key, "sub_key")
# Give the sub-key some named values
for value_name, value_data, value_type in test_data:
SetValueEx(sub_key, value_name, 0, value_type, value_data)
# Check we wrote as many items as we thought.
nkeys, nvalues, since_mod = QueryInfoKey(key)
verify(nkeys==1, "Not the correct number of sub keys")
verify(nvalues==1, "Not the correct number of values")
nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
verify(nkeys==0, "Not the correct number of sub keys")
verify(nvalues==len(test_data), "Not the correct number of values")
# Close this key this way...
# (but before we do, copy the key as an integer - this allows
# us to test that the key really gets closed).
int_sub_key = int(sub_key)
CloseKey(sub_key)
try:
QueryInfoKey(int_sub_key)
raise RuntimeError, "It appears the CloseKey() function does not close the actual key!"
except EnvironmentError:
pass
# ... and close that key that way :-)
int_key = int(key)
key.Close()
try:
QueryInfoKey(int_key)
raise RuntimeError, "It appears the key.Close() function does not close the actual key!"
except EnvironmentError:
pass
def ReadTestData(root_key):
# Check we can get default value for this key.
val = QueryValue(root_key, test_key_name)
verify(type(val) is str and val=="Default value", "Registry didn't give back the correct value")
key = OpenKey(root_key, test_key_name)
# Read the sub-keys
sub_key = OpenKey(key, "sub_key")
# Check I can enumerate over the values.
index = 0
while 1:
try:
data = EnumValue(sub_key, index)
except EnvironmentError:
break
verify(data in test_data, "Didn't read back the correct test data")
index = index + 1
verify(index==len(test_data), "Didn't read the correct number of items")
# Check I can directly access each item
for value_name, value_data, value_type in test_data:
read_val, read_typ = QueryValueEx(sub_key, value_name)
verify(read_val==value_data and read_typ == value_type, \
"Could not directly read the value" )
sub_key.Close()
# Enumerate our main key.
read_val = EnumKey(key, 0)
verify(read_val == "sub_key", "Read subkey value wrong")
try:
EnumKey(key, 1)
verify(0, "Was able to get a second key when I only have one!")
except EnvironmentError:
pass
key.Close()
def DeleteTestData(root_key):
key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS)
sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS)
# It is not necessary to delete the values before deleting
# the key (although subkeys must not exist). We delete them
# manually just to prove we can :-)
for value_name, value_data, value_type in test_data:
DeleteValue(sub_key, value_name)
nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
verify(nkeys==0 and nvalues==0, "subkey not empty before delete")
sub_key.Close()
DeleteKey(key, "sub_key")
try:
# Shouldnt be able to delete it twice!
DeleteKey(key, "sub_key")
verify(0, "Deleting the key twice succeeded")
except EnvironmentError:
pass
key.Close()
DeleteKey(root_key, test_key_name)
# Opening should now fail!
try:
key = OpenKey(root_key, test_key_name)
verify(0, "Could open the non-existent key")
except WindowsError: # Use this error name this time
pass
def TestAll(root_key):
WriteTestData(root_key)
ReadTestData(root_key)
DeleteTestData(root_key)
# Test on my local machine.
TestAll(HKEY_CURRENT_USER)
print("Local registry tests worked")
try:
remote_name = sys.argv[sys.argv.index("--remote")+1]
except (IndexError, ValueError):
class WinregTests(unittest.TestCase):
remote_name = None
if remote_name is not None:
def WriteTestData(self, root_key):
# Set the default value for this key.
SetValue(root_key, test_key_name, REG_SZ, "Default value")
key = CreateKey(root_key, test_key_name)
# Create a sub-key
sub_key = CreateKey(key, "sub_key")
# Give the sub-key some named values
for value_name, value_data, value_type in test_data:
SetValueEx(sub_key, value_name, 0, value_type, value_data)
# Check we wrote as many items as we thought.
nkeys, nvalues, since_mod = QueryInfoKey(key)
self.assertEquals(nkeys, 1, "Not the correct number of sub keys")
self.assertEquals(nvalues, 1, "Not the correct number of values")
nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
self.assertEquals(nkeys, 0, "Not the correct number of sub keys")
self.assertEquals(nvalues, len(test_data),
"Not the correct number of values")
# Close this key this way...
# (but before we do, copy the key as an integer - this allows
# us to test that the key really gets closed).
int_sub_key = int(sub_key)
CloseKey(sub_key)
try:
QueryInfoKey(int_sub_key)
self.fail("It appears the CloseKey() function does "
"not close the actual key!")
except EnvironmentError:
pass
# ... and close that key that way :-)
int_key = int(key)
key.Close()
try:
QueryInfoKey(int_key)
self.fail("It appears the key.Close() function "
"does not close the actual key!")
except EnvironmentError:
pass
def ReadTestData(self, root_key):
# Check we can get default value for this key.
val = QueryValue(root_key, test_key_name)
self.assertEquals(val, "Default value",
"Registry didn't give back the correct value")
key = OpenKey(root_key, test_key_name)
# Read the sub-keys
sub_key = OpenKey(key, "sub_key")
# Check I can enumerate over the values.
index = 0
while 1:
try:
data = EnumValue(sub_key, index)
except EnvironmentError:
break
self.assertEquals(data in test_data, True,
"Didn't read back the correct test data")
index = index + 1
self.assertEquals(index, len(test_data),
"Didn't read the correct number of items")
# Check I can directly access each item
for value_name, value_data, value_type in test_data:
read_val, read_typ = QueryValueEx(sub_key, value_name)
self.assertEquals(read_val, value_data,
"Could not directly read the value")
self.assertEquals(read_typ, value_type,
"Could not directly read the value")
sub_key.Close()
# Enumerate our main key.
read_val = EnumKey(key, 0)
self.assertEquals(read_val, "sub_key", "Read subkey value wrong")
try:
EnumKey(key, 1)
self.fail("Was able to get a second key when I only have one!")
except EnvironmentError:
pass
key.Close()
def DeleteTestData(self, root_key):
key = OpenKey(root_key, test_key_name, 0, KEY_ALL_ACCESS)
sub_key = OpenKey(key, "sub_key", 0, KEY_ALL_ACCESS)
# It is not necessary to delete the values before deleting
# the key (although subkeys must not exist). We delete them
# manually just to prove we can :-)
for value_name, value_data, value_type in test_data:
DeleteValue(sub_key, value_name)
nkeys, nvalues, since_mod = QueryInfoKey(sub_key)
self.assertEquals(nkeys, 0, "subkey not empty before delete")
self.assertEquals(nvalues, 0, "subkey not empty before delete")
sub_key.Close()
DeleteKey(key, "sub_key")
try:
# Shouldnt be able to delete it twice!
DeleteKey(key, "sub_key")
self.fail("Deleting the key twice succeeded")
except EnvironmentError:
pass
key.Close()
DeleteKey(root_key, test_key_name)
# Opening should now fail!
try:
key = OpenKey(root_key, test_key_name)
self.fail("Could open the non-existent key")
except WindowsError: # Use this error name this time
pass
def TestAll(self, root_key):
self.WriteTestData(root_key)
self.ReadTestData(root_key)
self.DeleteTestData(root_key)
def testLocalMachineRegistryWorks(self):
self.TestAll(HKEY_CURRENT_USER)
def testConnectRegistryToLocalMachineWorks(self):
# perform minimal ConnectRegistry test which just invokes it
h = ConnectRegistry(None, HKEY_LOCAL_MACHINE)
h.Close()
def testRemoteMachineRegistryWorks(self):
if not self.remote_name:
raise test_support.TestSkipped("Remote machine name "
"not specified.")
remote_key = ConnectRegistry(self.remote_name, HKEY_CURRENT_USER)
self.TestAll(remote_key)
def test_main():
test_support.run_unittest(WinregTests)
if __name__ == "__main__":
try:
remote_key = ConnectRegistry(remote_name, HKEY_CURRENT_USER)
except EnvironmentError as exc:
print("Could not connect to the remote machine -", exc.strerror)
remote_key = None
if remote_key is not None:
TestAll(remote_key)
print("Remote registry tests worked")
else:
print("Remote registry calls can be tested using", end=' ')
print("'test_winreg.py --remote \\\\machine_name'")
# perform minimal ConnectRegistry test which just invokes it
h = ConnectRegistry(None, HKEY_LOCAL_MACHINE)
h.Close()
WinregTests.remote_name = sys.argv[sys.argv.index("--remote")+1]
except (IndexError, ValueError):
print("Remote registry calls can be tested using",
"'test_winreg.py --remote \\\\machine_name'")
WinregTests.remote_name = None
test_main()

View file

@ -284,6 +284,27 @@ def http_server(evt, numrequests):
evt.set()
def is_unavailable_exception(e):
'''Returns True if the given ProtocolError is the product of a server-side
exception caused by the 'temporarily unavailable' response sometimes
given by operations on non-blocking sockets.'''
# sometimes we get a -1 error code and/or empty headers
if e.errcode == -1 or e.headers is None:
return True
exc_mess = e.headers.get('X-exception')
if exc_mess and 'temporarily unavailable' in exc_mess.lower():
return True
return False
# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket.
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.
class SimpleServerTestCase(unittest.TestCase):
def setUp(self):
# enable traceback reporting
@ -291,7 +312,7 @@ class SimpleServerTestCase(unittest.TestCase):
self.evt = threading.Event()
# start server thread to handle requests
serv_args = (self.evt, 2)
serv_args = (self.evt, 1)
threading.Thread(target=http_server, args=serv_args).start()
# wait for port to be assigned to server
@ -314,8 +335,10 @@ class SimpleServerTestCase(unittest.TestCase):
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
self.assertEqual(p.pow(6,8), 6**8)
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
def test_introspection1(self):
try:
@ -325,8 +348,10 @@ class SimpleServerTestCase(unittest.TestCase):
'system.methodHelp', 'system.methodSignature', 'system.multicall'])
self.assertEqual(set(meth), expected_methods)
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
def test_introspection2(self):
try:
@ -334,19 +359,23 @@ class SimpleServerTestCase(unittest.TestCase):
divhelp = p.system.methodHelp('div')
self.assertEqual(divhelp, 'This is the div function')
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
def test_introspection3(self):
# the SimpleXMLRPCServer doesn't support signatures, but
# at least check that we can try
# at least check that we can try making the call
try:
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
divsig = p.system.methodSignature('div')
self.assertEqual(divsig, 'signatures not supported')
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
def test_multicall(self):
try:
@ -360,8 +389,10 @@ class SimpleServerTestCase(unittest.TestCase):
self.assertEqual(pow_result, 6**8)
self.assertEqual(div_result, 127//42)
except xmlrpclib.ProtocolError as e:
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, e.headers))
# This is a contrived way to make a failure occur on the server side