An honest attempt to make this work on Unix, Windows, and even

Macintosh (the latter untested).

This closes Bug #110839.
This commit is contained in:
Guido van Rossum 2000-09-19 04:01:01 +00:00
parent d9a8e96543
commit e7d6b0a22e

View file

@ -3,28 +3,31 @@
This module builds on SimpleHTTPServer by implementing GET and POST This module builds on SimpleHTTPServer by implementing GET and POST
requests to cgi-bin scripts. requests to cgi-bin scripts.
If the os.fork() function is not present, this module will not work; If the os.fork() function is not present (e.g. on Windows),
SystemError will be raised instead. os.popen2() is used as a fallback, with slightly altered semantics; if
that function is not present either (e.g. on Macintosh), only Python
scripts are supported, and they are executed by the current process.
In all cases, the implementation is intentionally naive -- all
requests are executed sychronously.
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
-- it may execute arbitrary Python code or external programs.
""" """
__version__ = "0.3" __version__ = "0.4"
import os import os
import sys
import string import string
import urllib import urllib
import BaseHTTPServer import BaseHTTPServer
import SimpleHTTPServer import SimpleHTTPServer
try:
os.fork
except AttributeError:
raise SystemError, __name__ + " requires os.fork()"
class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""Complete HTTP server with GET, HEAD and POST commands. """Complete HTTP server with GET, HEAD and POST commands.
@ -35,6 +38,10 @@ class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
""" """
# Determine platform specifics
have_fork = hasattr(os, 'fork')
have_popen2 = hasattr(os, 'popen2')
# Make rfile unbuffered -- we need to read one line and then pass # Make rfile unbuffered -- we need to read one line and then pass
# the rest to a subprocess, so we can't use buffered input. # the rest to a subprocess, so we can't use buffered input.
rbufsize = 0 rbufsize = 0
@ -59,9 +66,9 @@ class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self) return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
def is_cgi(self): def is_cgi(self):
"""test whether PATH corresponds to a CGI script. """Test whether self.path corresponds to a CGI script.
Return a tuple (dir, rest) if PATH requires running a Return a tuple (dir, rest) if self.path requires running a
CGI script, None if not. Note that rest begins with a CGI script, None if not. Note that rest begins with a
slash if it is not empty. slash if it is not empty.
@ -83,6 +90,15 @@ class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
cgi_directories = ['/cgi-bin', '/htbin'] cgi_directories = ['/cgi-bin', '/htbin']
def is_executable(self, path):
"""Test whether argument path is an executable file."""
return executable(path)
def is_python(self, path):
"""Test whether argument path is a Python script."""
head, tail = os.path.splitext(path)
return tail.lower() in (".py", ".pyw")
def run_cgi(self): def run_cgi(self):
"""Execute a CGI script.""" """Execute a CGI script."""
dir, rest = self.cgi_info dir, rest = self.cgi_info
@ -105,79 +121,152 @@ class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
self.send_error(403, "CGI script is not a plain file (%s)" % self.send_error(403, "CGI script is not a plain file (%s)" %
`scriptname`) `scriptname`)
return return
if not executable(scriptfile): ispy = self.is_python(scriptname)
self.send_error(403, "CGI script is not executable (%s)" % if not ispy:
`scriptname`) if not (self.have_fork or self.have_popen2):
return self.send_error(403, "CGI script is not a Python script (%s)" %
nobody = nobody_uid() `scriptname`)
self.send_response(200, "Script output follows") return
self.wfile.flush() # Always flush before forking if not self.is_executable(scriptfile):
pid = os.fork() self.send_error(403, "CGI script is not executable (%s)" %
if pid != 0: `scriptname`)
# Parent return
pid, sts = os.waitpid(pid, 0)
if sts: # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
self.log_error("CGI script exit status x%x" % sts) # XXX Much of the following could be prepared ahead of time!
return env = {}
# Child env['SERVER_SOFTWARE'] = self.version_string()
try: env['SERVER_NAME'] = self.server.server_name
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html env['GATEWAY_INTERFACE'] = 'CGI/1.1'
# XXX Much of the following could be prepared ahead of time! env['SERVER_PROTOCOL'] = self.protocol_version
env = {} env['SERVER_PORT'] = str(self.server.server_port)
env['SERVER_SOFTWARE'] = self.version_string() env['REQUEST_METHOD'] = self.command
env['SERVER_NAME'] = self.server.server_name uqrest = urllib.unquote(rest)
env['GATEWAY_INTERFACE'] = 'CGI/1.1' env['PATH_INFO'] = uqrest
env['SERVER_PROTOCOL'] = self.protocol_version env['PATH_TRANSLATED'] = self.translate_path(uqrest)
env['SERVER_PORT'] = str(self.server.server_port) env['SCRIPT_NAME'] = scriptname
env['REQUEST_METHOD'] = self.command if query:
uqrest = urllib.unquote(rest) env['QUERY_STRING'] = query
env['PATH_INFO'] = uqrest host = self.address_string()
env['PATH_TRANSLATED'] = self.translate_path(uqrest) if host != self.client_address[0]:
env['SCRIPT_NAME'] = scriptname env['REMOTE_HOST'] = host
if query: env['REMOTE_ADDR'] = self.client_address[0]
env['QUERY_STRING'] = query # XXX AUTH_TYPE
host = self.address_string() # XXX REMOTE_USER
if host != self.client_address[0]: # XXX REMOTE_IDENT
env['REMOTE_HOST'] = host if self.headers.typeheader is None:
env['REMOTE_ADDR'] = self.client_address[0] env['CONTENT_TYPE'] = self.headers.type
# AUTH_TYPE else:
# REMOTE_USER env['CONTENT_TYPE'] = self.headers.typeheader
# REMOTE_IDENT length = self.headers.getheader('content-length')
if self.headers.typeheader is None: if length:
env['CONTENT_TYPE'] = self.headers.type env['CONTENT_LENGTH'] = length
accept = []
for line in self.headers.getallmatchingheaders('accept'):
if line[:1] in string.whitespace:
accept.append(string.strip(line))
else: else:
env['CONTENT_TYPE'] = self.headers.typeheader accept = accept + string.split(line[7:], ',')
length = self.headers.getheader('content-length') env['HTTP_ACCEPT'] = string.joinfields(accept, ',')
if length: ua = self.headers.getheader('user-agent')
env['CONTENT_LENGTH'] = length if ua:
accept = [] env['HTTP_USER_AGENT'] = ua
for line in self.headers.getallmatchingheaders('accept'): co = filter(None, self.headers.getheaders('cookie'))
if line[:1] in string.whitespace: if co:
accept.append(string.strip(line)) env['HTTP_COOKIE'] = string.join(co, ', ')
else: # XXX Other HTTP_* headers
accept = accept + string.split(line[7:], ',') if not self.have_fork:
env['HTTP_ACCEPT'] = string.joinfields(accept, ',') # Since we're setting the env in the parent, provide empty
ua = self.headers.getheader('user-agent') # values to override previously set values
if ua: for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
env['HTTP_USER_AGENT'] = ua 'HTTP_USER_AGENT', 'HTTP_COOKIE'):
co = filter(None, self.headers.getheaders('cookie')) env.setdefault(k, "")
if co:
env['HTTP_COOKIE'] = string.join(co, ', ') self.send_response(200, "Script output follows")
# XXX Other HTTP_* headers
decoded_query = string.replace(query, '+', ' ') decoded_query = string.replace(query, '+', ' ')
if self.have_fork:
# Unix -- fork as we should
args = [script]
if '=' not in decoded_query:
args.append(decoded_query)
nobody = nobody_uid()
self.wfile.flush() # Always flush before forking
pid = os.fork()
if pid != 0:
# Parent
pid, sts = os.waitpid(pid, 0)
if sts:
self.log_error("CGI script exit status %#x", sts)
return
# Child
try: try:
os.setuid(nobody) try:
except os.error: os.setuid(nobody)
pass except os.error:
os.dup2(self.rfile.fileno(), 0) pass
os.dup2(self.wfile.fileno(), 1) os.dup2(self.rfile.fileno(), 0)
print scriptfile, script, decoded_query os.dup2(self.wfile.fileno(), 1)
os.execve(scriptfile, os.execve(scriptfile, args, env)
[script, decoded_query], except:
env) self.server.handle_error(self.request, self.client_address)
except: os._exit(127)
self.server.handle_error(self.request, self.client_address)
os._exit(127) elif self.have_popen2:
# Windows -- use popen2 to create a subprocess
import shutil
os.environ.update(env)
cmdline = scriptfile
if self.is_python(scriptfile):
interp = sys.executable
if interp.lower().endswith("w.exe"):
# On Windows, use python.exe, not python.exe
interp = interp[:-5] = interp[-4:]
cmdline = "%s %s" % (interp, cmdline)
if '=' not in query and '"' not in query:
cmdline = '%s "%s"' % (cmdline, query)
self.log_error("command: %s", cmdline)
try:
nbytes = int(length)
except:
nbytes = 0
fi, fo = os.popen2(cmdline)
if self.command.lower() == "post" and nbytes > 0:
data = self.rfile.read(nbytes)
fi.write(data)
fi.close()
shutil.copyfileobj(fo, self.wfile)
sts = fo.close()
if sts:
self.log_error("CGI script exit status %#x", sts)
else:
self.log_error("CGI script exited OK")
else:
# Other O.S. -- execute script in this process
os.environ.update(env)
save_argv = sys.argv
save_stdin = sys.stdin
save_stdout = sys.stdout
save_stderr = sys.stderr
try:
try:
sys.argv = [scriptfile]
if '=' not in decoded_query:
sys.argv.append(decoded_query)
sys.stdout = self.wfile
sys.stdin = self.rfile
execfile(scriptfile, {"__name__": "__main__"})
finally:
sys.argv = save_argv
sys.stdin = save_stdin
sys.stdout = save_stdout
sys.stderr = save_stderr
except SystemExit, sts:
self.log_error("CGI script exit status %s", str(sts))
else:
self.log_error("CGI script exited OK")
nobody = None nobody = None
@ -187,7 +276,10 @@ def nobody_uid():
global nobody global nobody
if nobody: if nobody:
return nobody return nobody
import pwd try:
import pwd
except ImportError:
return -1
try: try:
nobody = pwd.getpwnam('nobody')[2] nobody = pwd.getpwnam('nobody')[2]
except KeyError: except KeyError: