Server-side SSL and certificate validation, by Bill Janssen.

While cleaning up Bill's C style, I may have cleaned up some code
he didn't touch as well (in _ssl.c).
This commit is contained in:
Guido van Rossum 2007-08-25 15:08:43 +00:00
parent 1a42ece0c7
commit 4f2c3ddca4
7 changed files with 1037 additions and 97 deletions

View file

@ -774,9 +774,18 @@ SSL objects have the following methods.
.. method:: SSL.server() .. method:: SSL.server()
Returns a string describing the server's certificate. Useful for debugging Returns a string describing the server's certificate. Useful for
purposes; do not parse the content of this string because its format can't be debugging purposes; do not parse the content of this string because
parsed unambiguously. its format can't be parsed unambiguously. And don't *trust* the
content of this string, because certificates aren't validated if you
use the function :func:`ssl` to create an SSL binding. If you need to
see the content of a peer certificate, you should use the
:func:`sslsocket` function in the :mod:`ssl` module to create the SSL
object, specifying the parameter `cert_req` as :const:`CERT_REQUIRED`,
and passing the name of a file containing a collection of certificates
to use to validate the peer certificate as the value of the `ca_certs`
parameter. Then use the :meth:`getpeercert` method on that instance
to retrieve the contents of the certificate.
.. method:: SSL.issuer() .. method:: SSL.issuer()

View file

@ -68,11 +68,10 @@ if _have_ssl:
_realsocket = socket _realsocket = socket
if _have_ssl: if _have_ssl:
_realssl = ssl
def ssl(sock, keyfile=None, certfile=None): def ssl(sock, keyfile=None, certfile=None):
if hasattr(sock, "_sock"): import ssl as realssl
sock = sock._sock return realssl.sslwrap_simple(sock, keyfile, certfile)
return _realssl(sock, keyfile, certfile) __all__.append("ssl")
# WSA error codes # WSA error codes
if sys.platform.lower().startswith("win"): if sys.platform.lower().startswith("win"):

252
Lib/ssl.py Normal file
View file

@ -0,0 +1,252 @@
# Wrapper module for _ssl, providing some additional facilities
# implemented in Python. Written by Bill Janssen.
"""\
This module provides some more Pythonic support for SSL.
Object types:
sslsocket -- subtype of socket.socket which does SSL over the socket
Exceptions:
sslerror -- exception raised for I/O errors
Functions:
cert_time_to_seconds -- convert time string used for certificate
notBefore and notAfter functions to integer
seconds past the Epoch (the time values
returned from time.time())
fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
by the server running on HOST at port PORT. No
validation of the certificate is performed.
Integer constants:
SSL_ERROR_ZERO_RETURN
SSL_ERROR_WANT_READ
SSL_ERROR_WANT_WRITE
SSL_ERROR_WANT_X509_LOOKUP
SSL_ERROR_SYSCALL
SSL_ERROR_SSL
SSL_ERROR_WANT_CONNECT
SSL_ERROR_EOF
SSL_ERROR_INVALID_ERROR_CODE
The following group define certificate requirements that one side is
allowing/requiring from the other side:
CERT_NONE - no certificates from the other side are required (or will
be looked at if provided)
CERT_OPTIONAL - certificates are not required, but if provided will be
validated, and if validation fails, the connection will
also fail
CERT_REQUIRED - certificates are required, and will be validated, and
if validation fails, the connection will also fail
The following constants identify various SSL protocol variants:
PROTOCOL_SSLv2
PROTOCOL_SSLv3
PROTOCOL_SSLv23
PROTOCOL_TLSv1
"""
import os, sys
import _ssl # if we can't import it, let the error propagate
from socket import socket
from _ssl import sslerror
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import PROTOCOL_SSLv2, PROTOCOL_SSLv3, PROTOCOL_SSLv23, PROTOCOL_TLSv1
# Root certs:
#
# The "ca_certs" argument to sslsocket() expects a file containing one or more
# certificates that are roots of various certificate signing chains. This file
# contains the certificates in PEM format (RFC ) where each certificate is
# encoded in base64 encoding and surrounded with a header and footer:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# The various certificates in the file are just concatenated together:
# -----BEGIN CERTIFICATE-----
# ... (CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
# -----BEGIN CERTIFICATE-----
# ... (a second CA certificate in base64 encoding) ...
# -----END CERTIFICATE-----
#
# Some "standard" root certificates are available at
#
# http://www.thawte.com/roots/ (for Thawte roots)
# http://www.verisign.com/support/roots.html (for Verisign)
class sslsocket (socket):
def __init__(self, sock, keyfile=None, certfile=None,
server_side=False, cert_reqs=CERT_NONE,
ssl_version=PROTOCOL_SSLv23, ca_certs=None):
socket.__init__(self, _sock=sock._sock)
if certfile and not keyfile:
keyfile = certfile
if server_side:
self._sslobj = _ssl.sslwrap(self._sock, 1, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
else:
# see if it's connected
try:
socket.getpeername(self)
# yes
self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile,
cert_reqs, ssl_version, ca_certs)
except:
# no
self._sslobj = None
self.keyfile = keyfile
self.certfile = certfile
self.cert_reqs = cert_reqs
self.ssl_version = ssl_version
self.ca_certs = ca_certs
def read(self, len=1024):
return self._sslobj.read(len)
def write(self, data):
return self._sslobj.write(data)
def getpeercert(self):
return self._sslobj.peer_certificate()
def send (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to send() on %s" %
self.__class__)
return self._sslobj.write(data)
def send_to (self, data, addr, flags=0):
raise ValueError("send_to not allowed on instances of %s" %
self.__class__)
def sendall (self, data, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.write(data)
def recv (self, buflen=1024, flags=0):
if flags != 0:
raise ValueError(
"non-zero flags not allowed in calls to sendall() on %s" %
self.__class__)
return self._sslobj.read(data, buflen)
def recv_from (self, addr, buflen=1024, flags=0):
raise ValueError("recv_from not allowed on instances of %s" %
self.__class__)
def shutdown(self):
if self._sslobj:
self._sslobj.shutdown()
self._sslobj = None
else:
socket.shutdown(self)
def close(self):
if self._sslobj:
self.shutdown()
else:
socket.close(self)
def connect(self, addr):
# Here we assume that the socket is client-side, and not
# connected at the time of the call. We connect it, then wrap it.
if self._sslobj or (self.getsockname()[1] != 0):
raise ValueError("attempt to connect already-connected sslsocket!")
socket.connect(self, addr)
self._sslobj = _ssl.sslwrap(self._sock, 0, self.keyfile, self.certfile,
self.cert_reqs, self.ssl_version,
self.ca_certs)
def accept(self):
raise ValueError("accept() not supported on an sslsocket")
# some utility functions
def cert_time_to_seconds(cert_time):
import time
return time.mktime(time.strptime(cert_time, "%b %d %H:%M:%S %Y GMT"))
# a replacement for the old socket.ssl function
def sslwrap_simple (sock, keyfile=None, certfile=None):
return _ssl.sslwrap(sock._sock, 0, keyfile, certfile, CERT_NONE,
PROTOCOL_SSLv23, None)
# fetch the certificate that the server is providing in PEM form
def fetch_server_certificate (host, port):
import re, tempfile, os
def subproc(cmd):
from subprocess import Popen, PIPE, STDOUT
proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
status = proc.wait()
output = proc.stdout.read()
return status, output
def strip_to_x509_cert(certfile_contents, outfile=None):
m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n"
r".*[\r]*^[-]+END CERTIFICATE[-]+)$",
certfile_contents, re.MULTILINE | re.DOTALL)
if not m:
return None
else:
tn = tempfile.mktemp()
fp = open(tn, "w")
fp.write(m.group(1) + "\n")
fp.close()
try:
tn2 = (outfile or tempfile.mktemp())
status, output = subproc(r'openssl x509 -in "%s" -out "%s"' %
(tn, tn2))
if status != 0:
raise OperationError(status, tsig, output)
fp = open(tn2, 'rb')
data = fp.read()
fp.close()
os.unlink(tn2)
return data
finally:
os.unlink(tn)
if sys.platform.startswith("win"):
tfile = tempfile.mktemp()
fp = open(tfile, "w")
fp.write("quit\n")
fp.close()
try:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < "%s"' %
(host, port, tfile))
finally:
os.unlink(tfile)
else:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < /dev/null' %
(host, port))
if status != 0:
raise OSError(status)
certtext = strip_to_x509_cert(output)
if not certtext:
raise ValueError("Invalid response received from server at %s:%s" %
(host, port))
return certtext

304
Lib/test/test_ssl.py Normal file
View file

@ -0,0 +1,304 @@
# Test the support for SSL and sockets
import sys
import unittest
from test import test_support
import socket
import errno
import threading
import subprocess
import time
import os
import pprint
import urllib
import shutil
import string
import traceback
# Optionally test SSL support, if we have it in the tested platform
skip_expected = False
try:
import ssl
except ImportError:
skip_expected = True
CERTFILE = None
GMAIL_POP_CERTFILE = None
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# Then it rudely closes the socket, and sets Event `listener_gone`
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def testSSLconnect(self):
import os
with test_support.transient_internet():
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_NONE)
s.connect(("pop.gmail.com", 995))
c = s.getpeercert()
if c:
raise test_support.TestFailed("Peer cert %s shouldn't be here!")
s.close()
# this should fail because we have no verification certs
s = ssl.sslsocket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED)
try:
s.connect(("pop.gmail.com", 995))
except ssl.sslerror:
pass
finally:
s.close()
class ConnectedTests(unittest.TestCase):
def testTLSecho (self):
s1 = socket.socket()
s1.connect(('127.0.0.1', 10024))
c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1)
indata = "FOO\n"
c1.write(indata)
outdata = c1.read()
if outdata != indata.lower():
sys.stderr.write("bad data <<%s>> received\n" % data)
c1.close()
def testReadCert(self):
s2 = socket.socket()
s2.connect(('127.0.0.1', 10024))
c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1,
cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE)
cert = c2.getpeercert()
if not cert:
raise test_support.TestFailed("Can't get peer certificate.")
if not cert.has_key('subject'):
raise test_support.TestFailed(
"No subject field in certificate: %s." %
pprint.pformat(cert))
if not (cert['subject'].has_key('organizationName')):
raise test_support.TestFailed(
"No 'organizationName' field in certificate subject: %s." %
pprint.pformat(cert))
if (cert['subject']['organizationName'] !=
"Python Software Foundation"):
raise test_support.TestFailed(
"Invalid 'organizationName' field in certificate subject; "
"should be 'Python Software Foundation'.");
c2.close()
class threadedEchoServer(threading.Thread):
class connectionHandler(threading.Thread):
def __init__(self, server, connsock):
self.server = server
self.running = False
self.sock = connsock
threading.Thread.__init__(self)
self.setDaemon(True)
def run (self):
self.running = True
sslconn = ssl.sslsocket(self.sock, server_side=True,
certfile=self.server.certificate,
ssl_version=self.server.protocol,
cert_reqs=self.server.certreqs)
while self.running:
try:
msg = sslconn.read()
if not msg:
# eof, so quit this handler
self.running = False
sslconn.close()
elif msg.strip() == 'over':
sslconn.close()
self.server.stop()
self.running = False
else:
# print "server:", msg.strip().lower()
sslconn.write(msg.lower())
except ssl.sslerror:
sys.stderr.write(string.join(
traceback.format_exception(*sys.exc_info())))
sslconn.close()
self.running = False
except:
sys.stderr.write(string.join(
traceback.format_exception(*sys.exc_info())))
def __init__(self, port, certificate, ssl_version=ssl.PROTOCOL_TLSv1,
certreqs=ssl.CERT_NONE, cacerts=None):
self.certificate = certificate
self.protocol = ssl_version
self.certreqs = certreqs
self.cacerts = cacerts
self.sock = socket.socket()
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
def run (self):
self.sock.settimeout(0.5)
self.sock.listen(5)
self.active = True
while self.active:
try:
newconn, connaddr = self.sock.accept()
# sys.stderr.write('new connection from ' + str(connaddr))
handler = self.connectionHandler(self, newconn)
handler.start()
except socket.timeout:
pass
except KeyboardInterrupt:
self.active = False
except:
sys.stderr.write(string.join(
traceback.format_exception(*sys.exc_info())))
def stop (self):
self.active = False
CERTFILE_CONFIG_TEMPLATE = """
# create RSA certs - Server
[ req ]
default_bits = 1024
encrypt_key = yes
distinguished_name = req_dn
x509_extensions = cert_type
[ req_dn ]
countryName = Country Name (2 letter code)
countryName_default = US
countryName_min = 2
countryName_max = 2
stateOrProvinceName = State or Province Name (full name)
stateOrProvinceName_default = %(state)s
localityName = Locality Name (eg, city)
localityName_default = %(city)s
0.organizationName = Organization Name (eg, company)
0.organizationName_default = %(organization)s
organizationalUnitName = Organizational Unit Name (eg, section)
organizationalUnitName_default = %(unit)s
0.commonName = Common Name (FQDN of your server)
0.commonName_default = %(common-name)s
# To create a certificate for more than one name uncomment:
# 1.commonName = DNS alias of your server
# 2.commonName = DNS alias of your server
# ...
# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html
# to see how Netscape understands commonName.
[ cert_type ]
nsCertType = server
"""
def create_cert_files():
import tempfile, socket, os
d = tempfile.mkdtemp()
# now create a configuration file for the CA signing cert
fqdn = socket.getfqdn()
crtfile = os.path.join(d, "cert.pem")
conffile = os.path.join(d, "ca.conf")
fp = open(conffile, "w")
fp.write(CERTFILE_CONFIG_TEMPLATE %
{'state': "Delaware",
'city': "Wilmington",
'organization': "Python Software Foundation",
'unit': "SSL",
'common-name': fqdn,
})
fp.close()
os.system(
"openssl req -batch -new -x509 -days 10 -nodes -config %s "
"-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" %
(conffile, crtfile, crtfile))
# now we have a self-signed server cert in crtfile
os.unlink(conffile)
#sf_certfile = os.path.join(d, "sourceforge-imap.pem")
#sf_cert = ssl.fetch_server_certificate('pop.gmail.com', 995)
#open(sf_certfile, 'w').write(sf_cert)
#return d, crtfile, sf_certfile
# sys.stderr.write(open(crtfile, 'r').read() + '\n')
return d, crtfile
def test_main():
if skip_expected:
raise test_support.TestSkipped("socket module has no ssl support")
global CERTFILE
tdir, CERTFILE = create_cert_files()
tests = [BasicTests]
server = None
if test_support.is_resource_enabled('network'):
server = threadedEchoServer(10024, CERTFILE)
server.start()
time.sleep(1)
tests.append(ConnectedTests)
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
finally:
if server is not None and server.active:
server.stop()
# wait for it to stop
server.join()
shutil.rmtree(tdir)
test_support.threading_cleanup(*thread_info)
if __name__ == "__main__":
test_main()

View file

@ -240,6 +240,8 @@ Core and builtins
Library Library
------- -------
- Server-side SSL support and cert verification added, by Bill Janssen.
- uuid creation is now threadsafe. - uuid creation is now threadsafe.
- EUC-KR codec now handles the cheot-ga-keut composed make-up hangul - EUC-KR codec now handles the cheot-ga-keut composed make-up hangul

View file

@ -1,4 +1,4 @@
/* SSL socket module /* SSL socket module
SSL support based on patches by Brian E Gallew and Laszlo Kovacs. SSL support based on patches by Brian E Gallew and Laszlo Kovacs.
@ -8,25 +8,44 @@
*/ */
#include "Python.h" #include "Python.h"
enum py_ssl_error { enum py_ssl_error {
/* these mirror ssl.h */ /* these mirror ssl.h */
PY_SSL_ERROR_NONE, PY_SSL_ERROR_NONE,
PY_SSL_ERROR_SSL, PY_SSL_ERROR_SSL,
PY_SSL_ERROR_WANT_READ, PY_SSL_ERROR_WANT_READ,
PY_SSL_ERROR_WANT_WRITE, PY_SSL_ERROR_WANT_WRITE,
PY_SSL_ERROR_WANT_X509_LOOKUP, PY_SSL_ERROR_WANT_X509_LOOKUP,
PY_SSL_ERROR_SYSCALL, /* look at error stack/return value/errno */ PY_SSL_ERROR_SYSCALL, /* look at error stack/return value/errno */
PY_SSL_ERROR_ZERO_RETURN, PY_SSL_ERROR_ZERO_RETURN,
PY_SSL_ERROR_WANT_CONNECT, PY_SSL_ERROR_WANT_CONNECT,
/* start of non ssl.h errorcodes */ /* start of non ssl.h errorcodes */
PY_SSL_ERROR_EOF, /* special case of SSL_ERROR_SYSCALL */ PY_SSL_ERROR_EOF, /* special case of SSL_ERROR_SYSCALL */
PY_SSL_ERROR_INVALID_ERROR_CODE PY_SSL_ERROR_INVALID_ERROR_CODE
}; };
enum py_ssl_server_or_client {
PY_SSL_CLIENT,
PY_SSL_SERVER
};
enum py_ssl_cert_requirements {
PY_SSL_CERT_NONE,
PY_SSL_CERT_OPTIONAL,
PY_SSL_CERT_REQUIRED
};
enum py_ssl_version {
PY_SSL_VERSION_SSL2,
PY_SSL_VERSION_SSL3,
PY_SSL_VERSION_SSL23,
PY_SSL_VERSION_TLS1,
};
/* Include symbols from _socket module */ /* Include symbols from _socket module */
#include "socketmodule.h" #include "socketmodule.h"
#if defined(HAVE_POLL_H) #if defined(HAVE_POLL_H)
#include <poll.h> #include <poll.h>
#elif defined(HAVE_SYS_POLL_H) #elif defined(HAVE_SYS_POLL_H)
#include <sys/poll.h> #include <sys/poll.h>
@ -58,10 +77,10 @@ static PyObject *PySSLErrorObject;
typedef struct { typedef struct {
PyObject_HEAD PyObject_HEAD
PySocketSockObject *Socket; /* Socket on which we're layered */ PySocketSockObject *Socket; /* Socket on which we're layered */
SSL_CTX* ctx; SSL_CTX* ctx;
SSL* ssl; SSL* ssl;
X509* server_cert; X509* peer_cert;
char server[X509_NAME_MAXLEN]; char server[X509_NAME_MAXLEN];
char issuer[X509_NAME_MAXLEN]; char issuer[X509_NAME_MAXLEN];
} PySSLObject; } PySSLObject;
@ -69,8 +88,10 @@ typedef struct {
static PyTypeObject PySSL_Type; static PyTypeObject PySSL_Type;
static PyObject *PySSL_SSLwrite(PySSLObject *self, PyObject *args); static PyObject *PySSL_SSLwrite(PySSLObject *self, PyObject *args);
static PyObject *PySSL_SSLread(PySSLObject *self, PyObject *args); static PyObject *PySSL_SSLread(PySSLObject *self, PyObject *args);
static int check_socket_and_wait_for_timeout(PySocketSockObject *s, static int check_socket_and_wait_for_timeout(PySocketSockObject *s,
int writing); int writing);
static PyObject *PySSL_peercert(PySSLObject *self);
#define PySSLObject_Check(v) (Py_Type(v) == &PySSL_Type) #define PySSLObject_Check(v) (Py_Type(v) == &PySSL_Type)
@ -83,21 +104,27 @@ typedef enum {
SOCKET_OPERATION_OK SOCKET_OPERATION_OK
} timeout_state; } timeout_state;
/* Wrap error strings with filename and line # */
#define STRINGIFY1(x) #x
#define STRINGIFY2(x) STRINGIFY1(x)
#define ERRSTR1(x,y,z) (x ":" y ": " z)
#define ERRSTR(x) ERRSTR1("_ssl.c", STRINGIFY2(__LINE__), x)
/* XXX It might be helpful to augment the error message generated /* XXX It might be helpful to augment the error message generated
below with the name of the SSL function that generated the error. below with the name of the SSL function that generated the error.
I expect it's obvious most of the time. I expect it's obvious most of the time.
*/ */
static PyObject * static PyObject *
PySSL_SetError(PySSLObject *obj, int ret) PySSL_SetError(PySSLObject *obj, int ret, char *filename, int lineno)
{ {
PyObject *v, *n, *s; PyObject *v;
char *errstr; char *errstr;
int err; int err;
enum py_ssl_error p; enum py_ssl_error p;
assert(ret <= 0); assert(ret <= 0);
err = SSL_get_error(obj->ssl, ret); err = SSL_get_error(obj->ssl, ret);
switch (err) { switch (err) {
@ -141,12 +168,12 @@ PySSL_SetError(PySSLObject *obj, int ret)
errstr = ERR_error_string(e, NULL); errstr = ERR_error_string(e, NULL);
} }
break; break;
} }
case SSL_ERROR_SSL: case SSL_ERROR_SSL:
{ {
unsigned long e = ERR_get_error(); unsigned long e = ERR_get_error();
p = PY_SSL_ERROR_SSL; p = PY_SSL_ERROR_SSL;
if (e != 0) if (e != 0)
/* XXX Protected by global interpreter lock */ /* XXX Protected by global interpreter lock */
errstr = ERR_error_string(e, NULL); errstr = ERR_error_string(e, NULL);
else { /* possible? */ else { /* possible? */
@ -158,29 +185,23 @@ PySSL_SetError(PySSLObject *obj, int ret)
p = PY_SSL_ERROR_INVALID_ERROR_CODE; p = PY_SSL_ERROR_INVALID_ERROR_CODE;
errstr = "Invalid error code"; errstr = "Invalid error code";
} }
n = PyInt_FromLong((long) p);
if (n == NULL)
return NULL;
v = PyTuple_New(2);
if (v == NULL) {
Py_DECREF(n);
return NULL;
}
s = PyString_FromString(errstr); char buf[2048];
if (s == NULL) { PyOS_snprintf(buf, sizeof(buf), "_ssl.c:%d: %s", lineno, errstr);
v = Py_BuildValue("(is)", p, buf);
if (v != NULL) {
PyErr_SetObject(PySSLErrorObject, v);
Py_DECREF(v); Py_DECREF(v);
Py_DECREF(n);
} }
PyTuple_SET_ITEM(v, 0, n);
PyTuple_SET_ITEM(v, 1, s);
PyErr_SetObject(PySSLErrorObject, v);
Py_DECREF(v);
return NULL; return NULL;
} }
static PySSLObject * static PySSLObject *
newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file) newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file,
enum py_ssl_server_or_client socket_type,
enum py_ssl_cert_requirements certreq,
enum py_ssl_version proto_version,
char *cacerts_file)
{ {
PySSLObject *self; PySSLObject *self;
char *errstr = NULL; char *errstr = NULL;
@ -193,31 +214,60 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
return NULL; return NULL;
memset(self->server, '\0', sizeof(char) * X509_NAME_MAXLEN); memset(self->server, '\0', sizeof(char) * X509_NAME_MAXLEN);
memset(self->issuer, '\0', sizeof(char) * X509_NAME_MAXLEN); memset(self->issuer, '\0', sizeof(char) * X509_NAME_MAXLEN);
self->server_cert = NULL; self->peer_cert = NULL;
self->ssl = NULL; self->ssl = NULL;
self->ctx = NULL; self->ctx = NULL;
self->Socket = NULL; self->Socket = NULL;
if ((key_file && !cert_file) || (!key_file && cert_file)) { if ((key_file && !cert_file) || (!key_file && cert_file)) {
errstr = "Both the key & certificate files must be specified"; errstr = ERRSTR("Both the key & certificate files must be specified");
goto fail;
}
if ((socket_type == PY_SSL_SERVER) &&
((key_file == NULL) || (cert_file == NULL))) {
errstr = ERRSTR("Both the key & certificate files must be specified for server-side operation");
goto fail; goto fail;
} }
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
self->ctx = SSL_CTX_new(SSLv23_method()); /* Set up context */ if (proto_version == PY_SSL_VERSION_TLS1)
self->ctx = SSL_CTX_new(TLSv1_method()); /* Set up context */
else if (proto_version == PY_SSL_VERSION_SSL3)
self->ctx = SSL_CTX_new(SSLv3_method()); /* Set up context */
else if (proto_version == PY_SSL_VERSION_SSL2)
self->ctx = SSL_CTX_new(SSLv2_method()); /* Set up context */
else
self->ctx = SSL_CTX_new(SSLv23_method()); /* Set up context */
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (self->ctx == NULL) { if (self->ctx == NULL) {
errstr = "SSL_CTX_new error"; errstr = ERRSTR("Invalid SSL protocol variant specified.");
goto fail; goto fail;
} }
if (certreq != PY_SSL_CERT_NONE) {
if (cacerts_file == NULL) {
errstr = ERRSTR("No root certificates specified for verification of other-side certificates.");
goto fail;
} else {
Py_BEGIN_ALLOW_THREADS
ret = SSL_CTX_load_verify_locations(self->ctx,
cacerts_file, NULL);
Py_END_ALLOW_THREADS
if (ret < 1) {
errstr = ERRSTR("SSL_CTX_load_verify_locations");
goto fail;
}
}
}
if (key_file) { if (key_file) {
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
ret = SSL_CTX_use_PrivateKey_file(self->ctx, key_file, ret = SSL_CTX_use_PrivateKey_file(self->ctx, key_file,
SSL_FILETYPE_PEM); SSL_FILETYPE_PEM);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if (ret < 1) { if (ret < 1) {
errstr = "SSL_CTX_use_PrivateKey_file error"; errstr = ERRSTR("SSL_CTX_use_PrivateKey_file error");
goto fail; goto fail;
} }
@ -225,16 +275,23 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
ret = SSL_CTX_use_certificate_chain_file(self->ctx, ret = SSL_CTX_use_certificate_chain_file(self->ctx,
cert_file); cert_file);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
SSL_CTX_set_options(self->ctx, SSL_OP_ALL); /* ssl compatibility */
if (ret < 1) { if (ret < 1) {
errstr = "SSL_CTX_use_certificate_chain_file error"; errstr = ERRSTR("SSL_CTX_use_certificate_chain_file error") ;
goto fail; goto fail;
} }
SSL_CTX_set_options(self->ctx, SSL_OP_ALL); /* ssl compatibility */
} }
int verification_mode = SSL_VERIFY_NONE;
if (certreq == PY_SSL_CERT_OPTIONAL)
verification_mode = SSL_VERIFY_PEER;
else if (certreq == PY_SSL_CERT_REQUIRED)
verification_mode = (SSL_VERIFY_PEER |
SSL_VERIFY_FAIL_IF_NO_PEER_CERT);
SSL_CTX_set_verify(self->ctx, verification_mode,
NULL); /* set verify lvl */
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
SSL_CTX_set_verify(self->ctx,
SSL_VERIFY_NONE, NULL); /* set verify lvl */
self->ssl = SSL_new(self->ctx); /* New ssl struct */ self->ssl = SSL_new(self->ctx); /* New ssl struct */
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
SSL_set_fd(self->ssl, Sock->sock_fd); /* Set the socket for SSL */ SSL_set_fd(self->ssl, Sock->sock_fd); /* Set the socket for SSL */
@ -249,7 +306,10 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
} }
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
SSL_set_connect_state(self->ssl); if (socket_type == PY_SSL_CLIENT)
SSL_set_connect_state(self->ssl);
else
SSL_set_accept_state(self->ssl);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
/* Actually negotiate SSL connection */ /* Actually negotiate SSL connection */
@ -257,11 +317,14 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
sockstate = 0; sockstate = 0;
do { do {
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
ret = SSL_connect(self->ssl); if (socket_type == PY_SSL_CLIENT)
ret = SSL_connect(self->ssl);
else
ret = SSL_accept(self->ssl);
err = SSL_get_error(self->ssl, ret); err = SSL_get_error(self->ssl, ret);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
if(PyErr_CheckSignals()) { if(PyErr_CheckSignals()) {
goto fail; goto fail;
} }
if (err == SSL_ERROR_WANT_READ) { if (err == SSL_ERROR_WANT_READ) {
sockstate = check_socket_and_wait_for_timeout(Sock, 0); sockstate = check_socket_and_wait_for_timeout(Sock, 0);
@ -270,30 +333,33 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
} else { } else {
sockstate = SOCKET_OPERATION_OK; sockstate = SOCKET_OPERATION_OK;
} }
if (sockstate == SOCKET_HAS_TIMED_OUT) { if (sockstate == SOCKET_HAS_TIMED_OUT) {
PyErr_SetString(PySSLErrorObject, "The connect operation timed out"); PyErr_SetString(PySSLErrorObject,
ERRSTR("The connect operation timed out"));
goto fail; goto fail;
} else if (sockstate == SOCKET_HAS_BEEN_CLOSED) { } else if (sockstate == SOCKET_HAS_BEEN_CLOSED) {
PyErr_SetString(PySSLErrorObject, "Underlying socket has been closed."); PyErr_SetString(PySSLErrorObject,
ERRSTR("Underlying socket has been closed."));
goto fail; goto fail;
} else if (sockstate == SOCKET_TOO_LARGE_FOR_SELECT) { } else if (sockstate == SOCKET_TOO_LARGE_FOR_SELECT) {
PyErr_SetString(PySSLErrorObject, "Underlying socket too large for select()."); PyErr_SetString(PySSLErrorObject,
ERRSTR("Underlying socket too large for select()."));
goto fail; goto fail;
} else if (sockstate == SOCKET_IS_NONBLOCKING) { } else if (sockstate == SOCKET_IS_NONBLOCKING) {
break; break;
} }
} while (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE); } while (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE);
if (ret <= 0) { if (ret < 1) {
PySSL_SetError(self, ret); PySSL_SetError(self, ret, __FILE__, __LINE__);
goto fail; goto fail;
} }
self->ssl->debug = 1; self->ssl->debug = 1;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
if ((self->server_cert = SSL_get_peer_certificate(self->ssl))) { if ((self->peer_cert = SSL_get_peer_certificate(self->ssl))) {
X509_NAME_oneline(X509_get_subject_name(self->server_cert), X509_NAME_oneline(X509_get_subject_name(self->peer_cert),
self->server, X509_NAME_MAXLEN); self->server, X509_NAME_MAXLEN);
X509_NAME_oneline(X509_get_issuer_name(self->server_cert), X509_NAME_oneline(X509_get_issuer_name(self->peer_cert),
self->issuer, X509_NAME_MAXLEN); self->issuer, X509_NAME_MAXLEN);
} }
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
@ -310,25 +376,39 @@ newPySSLObject(PySocketSockObject *Sock, char *key_file, char *cert_file)
static PyObject * static PyObject *
PySocket_ssl(PyObject *self, PyObject *args) PySocket_ssl(PyObject *self, PyObject *args)
{ {
PySSLObject *rv;
PySocketSockObject *Sock; PySocketSockObject *Sock;
int server_side = 0;
int verification_mode = PY_SSL_CERT_NONE;
int protocol = PY_SSL_VERSION_SSL23;
char *key_file = NULL; char *key_file = NULL;
char *cert_file = NULL; char *cert_file = NULL;
char *cacerts_file = NULL;
if (!PyArg_ParseTuple(args, "O!|zz:ssl", if (!PyArg_ParseTuple(args, "O!i|zziiz:sslwrap",
PySocketModule.Sock_Type, PySocketModule.Sock_Type,
&Sock, &Sock,
&key_file, &cert_file)) &server_side,
&key_file, &cert_file,
&verification_mode, &protocol,
&cacerts_file))
return NULL; return NULL;
rv = newPySSLObject(Sock, key_file, cert_file); /*
if (rv == NULL) fprintf(stderr,
return NULL; "server_side is %d, keyfile %p, certfile %p, verify_mode %d, "
return (PyObject *)rv; "protocol %d, certs %p\n",
server_side, key_file, cert_file, verification_mode,
protocol, cacerts_file);
*/
return (PyObject *) newPySSLObject(Sock, key_file, cert_file,
server_side, verification_mode,
protocol, cacerts_file);
} }
PyDoc_STRVAR(ssl_doc, PyDoc_STRVAR(ssl_doc,
"ssl(socket, [keyfile, certfile]) -> sslobject"); "sslwrap(socket, server_side, [keyfile, certfile, certs_mode, protocol,\n"
" cacertsfile]) -> sslobject");
/* SSL object methods */ /* SSL object methods */
@ -344,15 +424,153 @@ PySSL_issuer(PySSLObject *self)
return PyString_FromString(self->issuer); return PyString_FromString(self->issuer);
} }
static PyObject *
_create_dict_for_X509_NAME (X509_NAME *xname)
{
PyObject *pd = PyDict_New();
int index_counter;
for (index_counter = 0;
index_counter < X509_NAME_entry_count(xname);
index_counter++)
{
char namebuf[X509_NAME_MAXLEN];
int buflen;
X509_NAME_ENTRY *entry = X509_NAME_get_entry(xname,
index_counter);
ASN1_OBJECT *name = X509_NAME_ENTRY_get_object(entry);
buflen = OBJ_obj2txt(namebuf, sizeof(namebuf), name, 0);
if (buflen < 0)
goto fail0;
PyObject *name_obj = PyString_FromStringAndSize(namebuf,
buflen);
if (name_obj == NULL)
goto fail0;
ASN1_STRING *value = X509_NAME_ENTRY_get_data(entry);
unsigned char *valuebuf = NULL;
buflen = ASN1_STRING_to_UTF8(&valuebuf, value);
if (buflen < 0) {
Py_DECREF(name_obj);
goto fail0;
}
PyObject *value_obj = PyUnicode_DecodeUTF8((char *) valuebuf,
buflen, "strict");
OPENSSL_free(valuebuf);
if (value_obj == NULL) {
Py_DECREF(name_obj);
goto fail0;
}
if (PyDict_SetItem(pd, name_obj, value_obj) < 0) {
Py_DECREF(name_obj);
Py_DECREF(value_obj);
goto fail0;
}
Py_DECREF(name_obj);
Py_DECREF(value_obj);
}
return pd;
fail0:
Py_XDECREF(pd);
return NULL;
}
static PyObject *
PySSL_peercert(PySSLObject *self)
{
PyObject *retval = NULL;
BIO *biobuf = NULL;
if (!self->peer_cert)
Py_RETURN_NONE;
retval = PyDict_New();
if (retval == NULL)
return NULL;
int verification = SSL_CTX_get_verify_mode(self->ctx);
if ((verification & SSL_VERIFY_PEER) == 0)
return retval;
PyObject *peer = _create_dict_for_X509_NAME(
X509_get_subject_name(self->peer_cert));
if (peer == NULL)
goto fail0;
if (PyDict_SetItemString(retval, (const char *) "subject", peer) < 0) {
Py_DECREF(peer);
goto fail0;
}
Py_DECREF(peer);
PyObject *issuer = _create_dict_for_X509_NAME(
X509_get_issuer_name(self->peer_cert));
if (issuer == NULL)
goto fail0;
if (PyDict_SetItemString(retval, (const char *) "issuer", issuer) < 0) {
Py_DECREF(issuer);
goto fail0;
}
Py_DECREF(issuer);
PyObject *version = PyInt_FromLong(X509_get_version(self->peer_cert));
if (PyDict_SetItemString(retval, "version", version) < 0) {
Py_DECREF(version);
goto fail0;
}
Py_DECREF(version);
char buf[2048];
int len;
/* get a memory buffer */
biobuf = BIO_new(BIO_s_mem());
ASN1_TIME *notBefore = X509_get_notBefore(self->peer_cert);
ASN1_TIME_print(biobuf, notBefore);
len = BIO_gets(biobuf, buf, sizeof(buf)-1);
PyObject *pnotBefore = PyString_FromStringAndSize(buf, len);
if (pnotBefore == NULL)
goto fail1;
if (PyDict_SetItemString(retval, "notBefore", pnotBefore) < 0) {
Py_DECREF(pnotBefore);
goto fail1;
}
Py_DECREF(pnotBefore);
BIO_reset(biobuf);
ASN1_TIME *notAfter = X509_get_notAfter(self->peer_cert);
ASN1_TIME_print(biobuf, notAfter);
len = BIO_gets(biobuf, buf, sizeof(buf)-1);
BIO_free(biobuf);
PyObject *pnotAfter = PyString_FromStringAndSize(buf, len);
if (pnotAfter == NULL)
goto fail0;
if (PyDict_SetItemString(retval, "notAfter", pnotAfter) < 0) {
Py_DECREF(pnotAfter);
goto fail0;
}
Py_DECREF(pnotAfter);
return retval;
fail1:
if (biobuf != NULL)
BIO_free(biobuf);
fail0:
Py_XDECREF(retval);
return NULL;
}
static void PySSL_dealloc(PySSLObject *self) static void PySSL_dealloc(PySSLObject *self)
{ {
if (self->server_cert) /* Possible not to have one? */ if (self->peer_cert) /* Possible not to have one? */
X509_free (self->server_cert); X509_free (self->peer_cert);
if (self->ssl) if (self->ssl)
SSL_free(self->ssl); SSL_free(self->ssl);
if (self->ctx) if (self->ctx)
SSL_CTX_free(self->ctx); SSL_CTX_free(self->ctx);
Py_XDECREF(self->Socket); Py_XDECREF(self->Socket);
PyObject_Del(self); PyObject_Del(self);
} }
@ -463,7 +681,7 @@ static PyObject *PySSL_SSLwrite(PySSLObject *self, PyObject *args)
} else { } else {
sockstate = SOCKET_OPERATION_OK; sockstate = SOCKET_OPERATION_OK;
} }
if (sockstate == SOCKET_HAS_TIMED_OUT) { if (sockstate == SOCKET_HAS_TIMED_OUT) {
PyErr_SetString(PySSLErrorObject, "The write operation timed out"); PyErr_SetString(PySSLErrorObject, "The write operation timed out");
return NULL; return NULL;
} else if (sockstate == SOCKET_HAS_BEEN_CLOSED) { } else if (sockstate == SOCKET_HAS_BEEN_CLOSED) {
@ -476,7 +694,7 @@ static PyObject *PySSL_SSLwrite(PySSLObject *self, PyObject *args)
if (len > 0) if (len > 0)
return PyInt_FromLong(len); return PyInt_FromLong(len);
else else
return PySSL_SetError(self, len); return PySSL_SetError(self, len, __FILE__, __LINE__);
} }
PyDoc_STRVAR(PySSL_SSLwrite_doc, PyDoc_STRVAR(PySSL_SSLwrite_doc,
@ -498,7 +716,7 @@ static PyObject *PySSL_SSLread(PySSLObject *self, PyObject *args)
if (!(buf = PyString_FromStringAndSize((char *) 0, len))) if (!(buf = PyString_FromStringAndSize((char *) 0, len)))
return NULL; return NULL;
/* first check if there are bytes ready to be read */ /* first check if there are bytes ready to be read */
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
count = SSL_pending(self->ssl); count = SSL_pending(self->ssl);
@ -507,12 +725,28 @@ static PyObject *PySSL_SSLread(PySSLObject *self, PyObject *args)
if (!count) { if (!count) {
sockstate = check_socket_and_wait_for_timeout(self->Socket, 0); sockstate = check_socket_and_wait_for_timeout(self->Socket, 0);
if (sockstate == SOCKET_HAS_TIMED_OUT) { if (sockstate == SOCKET_HAS_TIMED_OUT) {
PyErr_SetString(PySSLErrorObject, "The read operation timed out"); PyErr_SetString(PySSLErrorObject,
"The read operation timed out");
Py_DECREF(buf); Py_DECREF(buf);
return NULL; return NULL;
} else if (sockstate == SOCKET_TOO_LARGE_FOR_SELECT) { } else if (sockstate == SOCKET_TOO_LARGE_FOR_SELECT) {
PyErr_SetString(PySSLErrorObject, "Underlying socket too large for select()."); PyErr_SetString(PySSLErrorObject,
"Underlying socket too large for select().");
Py_DECREF(buf);
return NULL; return NULL;
} else if (sockstate == SOCKET_HAS_BEEN_CLOSED) {
if (SSL_get_shutdown(self->ssl) !=
SSL_RECEIVED_SHUTDOWN)
{
Py_DECREF(buf);
PyErr_SetString(PySSLErrorObject,
"Socket closed without SSL shutdown handshake");
return NULL;
} else {
/* should contain a zero-length string */
_PyString_Resize(&buf, 0);
return buf;
}
} }
} }
do { do {
@ -526,23 +760,32 @@ static PyObject *PySSL_SSLread(PySSLObject *self, PyObject *args)
return NULL; return NULL;
} }
if (err == SSL_ERROR_WANT_READ) { if (err == SSL_ERROR_WANT_READ) {
sockstate = check_socket_and_wait_for_timeout(self->Socket, 0); sockstate =
check_socket_and_wait_for_timeout(self->Socket, 0);
} else if (err == SSL_ERROR_WANT_WRITE) { } else if (err == SSL_ERROR_WANT_WRITE) {
sockstate = check_socket_and_wait_for_timeout(self->Socket, 1); sockstate =
check_socket_and_wait_for_timeout(self->Socket, 1);
} else if ((err == SSL_ERROR_ZERO_RETURN) &&
(SSL_get_shutdown(self->ssl) ==
SSL_RECEIVED_SHUTDOWN))
{
_PyString_Resize(&buf, 0);
return buf;
} else { } else {
sockstate = SOCKET_OPERATION_OK; sockstate = SOCKET_OPERATION_OK;
} }
if (sockstate == SOCKET_HAS_TIMED_OUT) { if (sockstate == SOCKET_HAS_TIMED_OUT) {
PyErr_SetString(PySSLErrorObject, "The read operation timed out"); PyErr_SetString(PySSLErrorObject,
"The read operation timed out");
Py_DECREF(buf); Py_DECREF(buf);
return NULL; return NULL;
} else if (sockstate == SOCKET_IS_NONBLOCKING) { } else if (sockstate == SOCKET_IS_NONBLOCKING) {
break; break;
} }
} while (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE); } while (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE);
if (count <= 0) { if (count <= 0) {
Py_DECREF(buf); Py_DECREF(buf);
return PySSL_SetError(self, count); return PySSL_SetError(self, count, __FILE__, __LINE__);
} }
if (count != len) if (count != len)
_PyString_Resize(&buf, count); _PyString_Resize(&buf, count);
@ -554,13 +797,48 @@ PyDoc_STRVAR(PySSL_SSLread_doc,
\n\ \n\
Read up to len bytes from the SSL socket."); Read up to len bytes from the SSL socket.");
static PyObject *PySSL_SSLshutdown(PySSLObject *self, PyObject *args)
{
int err;
/* Guard against closed socket */
if (self->Socket->sock_fd < 0) {
PyErr_SetString(PySSLErrorObject,
"Underlying socket has been closed.");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
err = SSL_shutdown(self->ssl);
if (err == 0) {
/* we need to call it again to finish the shutdown */
err = SSL_shutdown(self->ssl);
}
Py_END_ALLOW_THREADS
if (err < 0)
return PySSL_SetError(self, err, __FILE__, __LINE__);
else {
Py_INCREF(self->Socket);
return (PyObject *) (self->Socket);
}
}
PyDoc_STRVAR(PySSL_SSLshutdown_doc,
"shutdown(s) -> socket\n\
\n\
Does the SSL shutdown handshake with the remote end, and returns\n\
the underlying socket object.");
static PyMethodDef PySSLMethods[] = { static PyMethodDef PySSLMethods[] = {
{"write", (PyCFunction)PySSL_SSLwrite, METH_VARARGS, {"write", (PyCFunction)PySSL_SSLwrite, METH_VARARGS,
PySSL_SSLwrite_doc}, PySSL_SSLwrite_doc},
{"read", (PyCFunction)PySSL_SSLread, METH_VARARGS, {"read", (PyCFunction)PySSL_SSLread, METH_VARARGS,
PySSL_SSLread_doc}, PySSL_SSLread_doc},
{"server", (PyCFunction)PySSL_server, METH_NOARGS}, {"server", (PyCFunction)PySSL_server, METH_NOARGS},
{"issuer", (PyCFunction)PySSL_issuer, METH_NOARGS}, {"issuer", (PyCFunction)PySSL_issuer, METH_NOARGS},
{"peer_certificate", (PyCFunction)PySSL_peercert, METH_NOARGS},
{"shutdown", (PyCFunction)PySSL_SSLshutdown, METH_NOARGS, PySSL_SSLshutdown_doc},
{NULL, NULL} {NULL, NULL}
}; };
@ -654,17 +932,17 @@ if it does provide enough data to seed PRNG.");
/* List of functions exported by this module. */ /* List of functions exported by this module. */
static PyMethodDef PySSL_methods[] = { static PyMethodDef PySSL_methods[] = {
{"ssl", PySocket_ssl, {"sslwrap", PySocket_ssl,
METH_VARARGS, ssl_doc}, METH_VARARGS, ssl_doc},
#ifdef HAVE_OPENSSL_RAND #ifdef HAVE_OPENSSL_RAND
{"RAND_add", PySSL_RAND_add, METH_VARARGS, {"RAND_add", PySSL_RAND_add, METH_VARARGS,
PySSL_RAND_add_doc}, PySSL_RAND_add_doc},
{"RAND_egd", PySSL_RAND_egd, METH_O, {"RAND_egd", PySSL_RAND_egd, METH_O,
PySSL_RAND_egd_doc}, PySSL_RAND_egd_doc},
{"RAND_status", (PyCFunction)PySSL_RAND_status, METH_NOARGS, {"RAND_status", (PyCFunction)PySSL_RAND_status, METH_NOARGS,
PySSL_RAND_status_doc}, PySSL_RAND_status_doc},
#endif #endif
{NULL, NULL} /* Sentinel */ {NULL, NULL} /* Sentinel */
}; };
@ -686,7 +964,7 @@ init_ssl(void)
/* Load _socket module and its C API */ /* Load _socket module and its C API */
if (PySocketModule_ImportModuleAndAPI()) if (PySocketModule_ImportModuleAndAPI())
return; return;
/* Init OpenSSL */ /* Init OpenSSL */
SSL_load_error_strings(); SSL_load_error_strings();
@ -694,11 +972,12 @@ init_ssl(void)
/* Add symbols to module dict */ /* Add symbols to module dict */
PySSLErrorObject = PyErr_NewException("socket.sslerror", PySSLErrorObject = PyErr_NewException("socket.sslerror",
PySocketModule.error, PySocketModule.error,
NULL); NULL);
if (PySSLErrorObject == NULL) if (PySSLErrorObject == NULL)
return; return;
PyDict_SetItemString(d, "sslerror", PySSLErrorObject); if (PyDict_SetItemString(d, "sslerror", PySSLErrorObject) != 0)
return;
if (PyDict_SetItemString(d, "SSLType", if (PyDict_SetItemString(d, "SSLType",
(PyObject *)&PySSL_Type) != 0) (PyObject *)&PySSL_Type) != 0)
return; return;
@ -721,5 +1000,21 @@ init_ssl(void)
PY_SSL_ERROR_EOF); PY_SSL_ERROR_EOF);
PyModule_AddIntConstant(m, "SSL_ERROR_INVALID_ERROR_CODE", PyModule_AddIntConstant(m, "SSL_ERROR_INVALID_ERROR_CODE",
PY_SSL_ERROR_INVALID_ERROR_CODE); PY_SSL_ERROR_INVALID_ERROR_CODE);
/* cert requirements */
PyModule_AddIntConstant(m, "CERT_NONE",
PY_SSL_CERT_NONE);
PyModule_AddIntConstant(m, "CERT_OPTIONAL",
PY_SSL_CERT_OPTIONAL);
PyModule_AddIntConstant(m, "CERT_REQUIRED",
PY_SSL_CERT_REQUIRED);
/* protocol versions */
PyModule_AddIntConstant(m, "PROTOCOL_SSLv2",
PY_SSL_VERSION_SSL2);
PyModule_AddIntConstant(m, "PROTOCOL_SSLv3",
PY_SSL_VERSION_SSL3);
PyModule_AddIntConstant(m, "PROTOCOL_SSLv23",
PY_SSL_VERSION_SSL23);
PyModule_AddIntConstant(m, "PROTOCOL_TLSv1",
PY_SSL_VERSION_TLS1);
} }

View file

@ -0,0 +1,79 @@
#!/usr/bin/env python
#
# fetch the certificate that the server(s) are providing in PEM form
#
# args are HOST:PORT [, HOST:PORT...]
#
# By Bill Janssen.
import sys, os
def fetch_server_certificate (host, port):
import re, tempfile, os, ssl
def subproc(cmd):
from subprocess import Popen, PIPE, STDOUT
proc = Popen(cmd, stdout=PIPE, stderr=STDOUT, shell=True)
status = proc.wait()
output = proc.stdout.read()
return status, output
def strip_to_x509_cert(certfile_contents, outfile=None):
m = re.search(r"^([-]+BEGIN CERTIFICATE[-]+[\r]*\n"
r".*[\r]*^[-]+END CERTIFICATE[-]+)$",
certfile_contents, re.MULTILINE | re.DOTALL)
if not m:
return None
else:
tn = tempfile.mktemp()
fp = open(tn, "w")
fp.write(m.group(1) + "\n")
fp.close()
try:
tn2 = (outfile or tempfile.mktemp())
status, output = subproc(r'openssl x509 -in "%s" -out "%s"' %
(tn, tn2))
if status != 0:
raise OperationError(status, tsig, output)
fp = open(tn2, 'rb')
data = fp.read()
fp.close()
os.unlink(tn2)
return data
finally:
os.unlink(tn)
if sys.platform.startswith("win"):
tfile = tempfile.mktemp()
fp = open(tfile, "w")
fp.write("quit\n")
fp.close()
try:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < "%s"' %
(host, port, tfile))
finally:
os.unlink(tfile)
else:
status, output = subproc(
'openssl s_client -connect "%s:%s" -showcerts < /dev/null' %
(host, port))
if status != 0:
raise OSError(status)
certtext = strip_to_x509_cert(output)
if not certtext:
raise ValueError("Invalid response received from server at %s:%s" %
(host, port))
return certtext
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.stderr.write(
"Usage: %s HOSTNAME:PORTNUMBER [, HOSTNAME:PORTNUMBER...]\n" %
sys.argv[0])
sys.exit(1)
for arg in sys.argv[1:]:
host, port = arg.split(":")
sys.stdout.write(fetch_server_certificate(host, int(port)))
sys.exit(0)