mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 00:08:32 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1037 lines
		
	
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			1037 lines
		
	
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #! /usr/local/bin/python
 | |
| 
 | |
| # NOTE: the above "/usr/local/bin/python" is NOT a mistake.  It is
 | |
| # intentionally NOT "/usr/bin/env python".  On many systems
 | |
| # (e.g. Solaris), /usr/local/bin is not in $PATH as passed to CGI
 | |
| # scripts, and /usr/local/bin is the default directory where Python is
 | |
| # installed, so /usr/bin/env would be unable to find python.  Granted,
 | |
| # binary installations by Linux vendors often install Python in
 | |
| # /usr/bin.  So let those vendors patch cgi.py to match their choice
 | |
| # of installation.
 | |
| 
 | |
| """Support module for CGI (Common Gateway Interface) scripts.
 | |
| 
 | |
| This module defines a number of utilities for use by CGI scripts
 | |
| written in Python.
 | |
| """
 | |
| 
 | |
| # History
 | |
| # -------
 | |
| #
 | |
| # Michael McLay started this module.  Steve Majewski changed the
 | |
| # interface to SvFormContentDict and FormContentDict.  The multipart
 | |
| # parsing was inspired by code submitted by Andreas Paepcke.  Guido van
 | |
| # Rossum rewrote, reformatted and documented the module and is currently
 | |
| # responsible for its maintenance.
 | |
| #
 | |
| 
 | |
| __version__ = "2.6"
 | |
| 
 | |
| 
 | |
| # Imports
 | |
| # =======
 | |
| 
 | |
| from io import StringIO, BytesIO, TextIOWrapper
 | |
| import sys
 | |
| import os
 | |
| import urllib.parse
 | |
| from email.parser import FeedParser
 | |
| from warnings import warn
 | |
| import html
 | |
| import locale
 | |
| import tempfile
 | |
| 
 | |
| __all__ = ["MiniFieldStorage", "FieldStorage",
 | |
|            "parse", "parse_qs", "parse_qsl", "parse_multipart",
 | |
|            "parse_header", "print_exception", "print_environ",
 | |
|            "print_form", "print_directory", "print_arguments",
 | |
|            "print_environ_usage", "escape"]
 | |
| 
 | |
| # Logging support
 | |
| # ===============
 | |
| 
 | |
| logfile = ""            # Filename to log to, if not empty
 | |
| logfp = None            # File object to log to, if not None
 | |
| 
 | |
| def initlog(*allargs):
 | |
|     """Write a log message, if there is a log file.
 | |
| 
 | |
|     Even though this function is called initlog(), you should always
 | |
|     use log(); log is a variable that is set either to initlog
 | |
|     (initially), to dolog (once the log file has been opened), or to
 | |
|     nolog (when logging is disabled).
 | |
| 
 | |
|     The first argument is a format string; the remaining arguments (if
 | |
|     any) are arguments to the % operator, so e.g.
 | |
|         log("%s: %s", "a", "b")
 | |
|     will write "a: b" to the log file, followed by a newline.
 | |
| 
 | |
|     If the global logfp is not None, it should be a file object to
 | |
|     which log data is written.
 | |
| 
 | |
|     If the global logfp is None, the global logfile may be a string
 | |
|     giving a filename to open, in append mode.  This file should be
 | |
|     world writable!!!  If the file can't be opened, logging is
 | |
|     silently disabled (since there is no safe place where we could
 | |
|     send an error message).
 | |
| 
 | |
|     """
 | |
|     global log, logfile, logfp
 | |
|     if logfile and not logfp:
 | |
|         try:
 | |
|             logfp = open(logfile, "a")
 | |
|         except IOError:
 | |
|             pass
 | |
|     if not logfp:
 | |
|         log = nolog
 | |
|     else:
 | |
|         log = dolog
 | |
|     log(*allargs)
 | |
| 
 | |
| def dolog(fmt, *args):
 | |
|     """Write a log message to the log file.  See initlog() for docs."""
 | |
|     logfp.write(fmt%args + "\n")
 | |
| 
 | |
| def nolog(*allargs):
 | |
|     """Dummy function, assigned to log when logging is disabled."""
 | |
|     pass
 | |
| 
 | |
| def closelog():
 | |
|     """Close the log file."""
 | |
|     global log, logfile, logfp
 | |
|     logfile = ''
 | |
|     if logfp:
 | |
|         logfp.close()
 | |
|         logfp = None
 | |
|     log = initlog
 | |
| 
 | |
| log = initlog           # The current logging function
 | |
| 
 | |
| 
 | |
| # Parsing functions
 | |
| # =================
 | |
| 
 | |
| # Maximum input we will accept when REQUEST_METHOD is POST
 | |
| # 0 ==> unlimited input
 | |
| maxlen = 0
 | |
| 
 | |
| def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
 | |
|     """Parse a query in the environment or from a file (default stdin)
 | |
| 
 | |
|         Arguments, all optional:
 | |
| 
 | |
|         fp              : file pointer; default: sys.stdin.buffer
 | |
| 
 | |
|         environ         : environment dictionary; default: os.environ
 | |
| 
 | |
|         keep_blank_values: flag indicating whether blank values in
 | |
|             percent-encoded forms should be treated as blank strings.
 | |
|             A true value indicates that blanks should be retained as
 | |
|             blank strings.  The default false value indicates that
 | |
|             blank values are to be ignored and treated as if they were
 | |
|             not included.
 | |
| 
 | |
|         strict_parsing: flag indicating what to do with parsing errors.
 | |
|             If false (the default), errors are silently ignored.
 | |
|             If true, errors raise a ValueError exception.
 | |
|     """
 | |
|     if fp is None:
 | |
|         fp = sys.stdin
 | |
| 
 | |
|     # field keys and values (except for files) are returned as strings
 | |
|     # an encoding is required to decode the bytes read from self.fp
 | |
|     if hasattr(fp,'encoding'):
 | |
|         encoding = fp.encoding
 | |
|     else:
 | |
|         encoding = 'latin-1'
 | |
| 
 | |
|     # fp.read() must return bytes
 | |
|     if isinstance(fp, TextIOWrapper):
 | |
|         fp = fp.buffer
 | |
| 
 | |
|     if not 'REQUEST_METHOD' in environ:
 | |
|         environ['REQUEST_METHOD'] = 'GET'       # For testing stand-alone
 | |
|     if environ['REQUEST_METHOD'] == 'POST':
 | |
|         ctype, pdict = parse_header(environ['CONTENT_TYPE'])
 | |
|         if ctype == 'multipart/form-data':
 | |
|             return parse_multipart(fp, pdict)
 | |
|         elif ctype == 'application/x-www-form-urlencoded':
 | |
|             clength = int(environ['CONTENT_LENGTH'])
 | |
|             if maxlen and clength > maxlen:
 | |
|                 raise ValueError('Maximum content length exceeded')
 | |
|             qs = fp.read(clength).decode(encoding)
 | |
|         else:
 | |
|             qs = ''                     # Unknown content-type
 | |
|         if 'QUERY_STRING' in environ:
 | |
|             if qs: qs = qs + '&'
 | |
|             qs = qs + environ['QUERY_STRING']
 | |
|         elif sys.argv[1:]:
 | |
|             if qs: qs = qs + '&'
 | |
|             qs = qs + sys.argv[1]
 | |
|         environ['QUERY_STRING'] = qs    # XXX Shouldn't, really
 | |
|     elif 'QUERY_STRING' in environ:
 | |
|         qs = environ['QUERY_STRING']
 | |
|     else:
 | |
|         if sys.argv[1:]:
 | |
|             qs = sys.argv[1]
 | |
|         else:
 | |
|             qs = ""
 | |
|         environ['QUERY_STRING'] = qs    # XXX Shouldn't, really
 | |
|     return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing,
 | |
|                                  encoding=encoding)
 | |
| 
 | |
| 
 | |
| # parse query string function called from urlparse,
 | |
| # this is done in order to maintain backward compatiblity.
 | |
| 
 | |
| def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
 | |
|     """Parse a query given as a string argument."""
 | |
|     warn("cgi.parse_qs is deprecated, use urllib.parse.parse_qs instead",
 | |
|          DeprecationWarning, 2)
 | |
|     return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing)
 | |
| 
 | |
| def parse_qsl(qs, keep_blank_values=0, strict_parsing=0):
 | |
|     """Parse a query given as a string argument."""
 | |
|     warn("cgi.parse_qsl is deprecated, use urllib.parse.parse_qsl instead",
 | |
|          DeprecationWarning, 2)
 | |
|     return urllib.parse.parse_qsl(qs, keep_blank_values, strict_parsing)
 | |
| 
 | |
| def parse_multipart(fp, pdict):
 | |
|     """Parse multipart input.
 | |
| 
 | |
|     Arguments:
 | |
|     fp   : input file
 | |
|     pdict: dictionary containing other parameters of content-type header
 | |
| 
 | |
|     Returns a dictionary just like parse_qs(): keys are the field names, each
 | |
|     value is a list of values for that field.  This is easy to use but not
 | |
|     much good if you are expecting megabytes to be uploaded -- in that case,
 | |
|     use the FieldStorage class instead which is much more flexible.  Note
 | |
|     that content-type is the raw, unparsed contents of the content-type
 | |
|     header.
 | |
| 
 | |
|     XXX This does not parse nested multipart parts -- use FieldStorage for
 | |
|     that.
 | |
| 
 | |
|     XXX This should really be subsumed by FieldStorage altogether -- no
 | |
|     point in having two implementations of the same parsing algorithm.
 | |
|     Also, FieldStorage protects itself better against certain DoS attacks
 | |
|     by limiting the size of the data read in one chunk.  The API here
 | |
|     does not support that kind of protection.  This also affects parse()
 | |
|     since it can call parse_multipart().
 | |
| 
 | |
|     """
 | |
|     import http.client
 | |
| 
 | |
|     boundary = ""
 | |
|     if 'boundary' in pdict:
 | |
|         boundary = pdict['boundary']
 | |
|     if not valid_boundary(boundary):
 | |
|         raise ValueError('Invalid boundary in multipart form: %r'
 | |
|                             % (boundary,))
 | |
| 
 | |
|     nextpart = "--" + boundary
 | |
|     lastpart = "--" + boundary + "--"
 | |
|     partdict = {}
 | |
|     terminator = ""
 | |
| 
 | |
|     while terminator != lastpart:
 | |
|         bytes = -1
 | |
|         data = None
 | |
|         if terminator:
 | |
|             # At start of next part.  Read headers first.
 | |
|             headers = http.client.parse_headers(fp)
 | |
|             clength = headers.get('content-length')
 | |
|             if clength:
 | |
|                 try:
 | |
|                     bytes = int(clength)
 | |
|                 except ValueError:
 | |
|                     pass
 | |
|             if bytes > 0:
 | |
|                 if maxlen and bytes > maxlen:
 | |
|                     raise ValueError('Maximum content length exceeded')
 | |
|                 data = fp.read(bytes)
 | |
|             else:
 | |
|                 data = ""
 | |
|         # Read lines until end of part.
 | |
|         lines = []
 | |
|         while 1:
 | |
|             line = fp.readline()
 | |
|             if not line:
 | |
|                 terminator = lastpart # End outer loop
 | |
|                 break
 | |
|             if line.startswith("--"):
 | |
|                 terminator = line.rstrip()
 | |
|                 if terminator in (nextpart, lastpart):
 | |
|                     break
 | |
|             lines.append(line)
 | |
|         # Done with part.
 | |
|         if data is None:
 | |
|             continue
 | |
|         if bytes < 0:
 | |
|             if lines:
 | |
|                 # Strip final line terminator
 | |
|                 line = lines[-1]
 | |
|                 if line[-2:] == "\r\n":
 | |
|                     line = line[:-2]
 | |
|                 elif line[-1:] == "\n":
 | |
|                     line = line[:-1]
 | |
|                 lines[-1] = line
 | |
|                 data = "".join(lines)
 | |
|         line = headers['content-disposition']
 | |
|         if not line:
 | |
|             continue
 | |
|         key, params = parse_header(line)
 | |
|         if key != 'form-data':
 | |
|             continue
 | |
|         if 'name' in params:
 | |
|             name = params['name']
 | |
|         else:
 | |
|             continue
 | |
|         if name in partdict:
 | |
|             partdict[name].append(data)
 | |
|         else:
 | |
|             partdict[name] = [data]
 | |
| 
 | |
|     return partdict
 | |
| 
 | |
| 
 | |
| def _parseparam(s):
 | |
|     while s[:1] == ';':
 | |
|         s = s[1:]
 | |
|         end = s.find(';')
 | |
|         while end > 0 and (s.count('"', 0, end) - s.count('\\"', 0, end)) % 2:
 | |
|             end = s.find(';', end + 1)
 | |
|         if end < 0:
 | |
|             end = len(s)
 | |
|         f = s[:end]
 | |
|         yield f.strip()
 | |
|         s = s[end:]
 | |
| 
 | |
| def parse_header(line):
 | |
|     """Parse a Content-type like header.
 | |
| 
 | |
|     Return the main content-type and a dictionary of options.
 | |
| 
 | |
|     """
 | |
|     parts = _parseparam(';' + line)
 | |
|     key = parts.__next__()
 | |
|     pdict = {}
 | |
|     for p in parts:
 | |
|         i = p.find('=')
 | |
|         if i >= 0:
 | |
|             name = p[:i].strip().lower()
 | |
|             value = p[i+1:].strip()
 | |
|             if len(value) >= 2 and value[0] == value[-1] == '"':
 | |
|                 value = value[1:-1]
 | |
|                 value = value.replace('\\\\', '\\').replace('\\"', '"')
 | |
|             pdict[name] = value
 | |
|     return key, pdict
 | |
| 
 | |
| 
 | |
| # Classes for field storage
 | |
| # =========================
 | |
| 
 | |
| class MiniFieldStorage:
 | |
| 
 | |
|     """Like FieldStorage, for use when no file uploads are possible."""
 | |
| 
 | |
|     # Dummy attributes
 | |
|     filename = None
 | |
|     list = None
 | |
|     type = None
 | |
|     file = None
 | |
|     type_options = {}
 | |
|     disposition = None
 | |
|     disposition_options = {}
 | |
|     headers = {}
 | |
| 
 | |
|     def __init__(self, name, value):
 | |
|         """Constructor from field name and value."""
 | |
|         self.name = name
 | |
|         self.value = value
 | |
|         # self.file = StringIO(value)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         """Return printable representation."""
 | |
|         return "MiniFieldStorage(%r, %r)" % (self.name, self.value)
 | |
| 
 | |
| 
 | |
| class FieldStorage:
 | |
| 
 | |
|     """Store a sequence of fields, reading multipart/form-data.
 | |
| 
 | |
|     This class provides naming, typing, files stored on disk, and
 | |
|     more.  At the top level, it is accessible like a dictionary, whose
 | |
|     keys are the field names.  (Note: None can occur as a field name.)
 | |
|     The items are either a Python list (if there's multiple values) or
 | |
|     another FieldStorage or MiniFieldStorage object.  If it's a single
 | |
|     object, it has the following attributes:
 | |
| 
 | |
|     name: the field name, if specified; otherwise None
 | |
| 
 | |
|     filename: the filename, if specified; otherwise None; this is the
 | |
|         client side filename, *not* the file name on which it is
 | |
|         stored (that's a temporary file you don't deal with)
 | |
| 
 | |
|     value: the value as a *string*; for file uploads, this
 | |
|         transparently reads the file every time you request the value
 | |
|         and returns *bytes*
 | |
| 
 | |
|     file: the file(-like) object from which you can read the data *as
 | |
|         bytes* ; None if the data is stored a simple string
 | |
| 
 | |
|     type: the content-type, or None if not specified
 | |
| 
 | |
|     type_options: dictionary of options specified on the content-type
 | |
|         line
 | |
| 
 | |
|     disposition: content-disposition, or None if not specified
 | |
| 
 | |
|     disposition_options: dictionary of corresponding options
 | |
| 
 | |
|     headers: a dictionary(-like) object (sometimes email.message.Message or a
 | |
|         subclass thereof) containing *all* headers
 | |
| 
 | |
|     The class is subclassable, mostly for the purpose of overriding
 | |
|     the make_file() method, which is called internally to come up with
 | |
|     a file open for reading and writing.  This makes it possible to
 | |
|     override the default choice of storing all files in a temporary
 | |
|     directory and unlinking them as soon as they have been opened.
 | |
| 
 | |
|     """
 | |
|     def __init__(self, fp=None, headers=None, outerboundary=b'',
 | |
|                  environ=os.environ, keep_blank_values=0, strict_parsing=0,
 | |
|                  limit=None, encoding='utf-8', errors='replace'):
 | |
|         """Constructor.  Read multipart/* until last part.
 | |
| 
 | |
|         Arguments, all optional:
 | |
| 
 | |
|         fp              : file pointer; default: sys.stdin.buffer
 | |
|             (not used when the request method is GET)
 | |
|             Can be :
 | |
|             1. a TextIOWrapper object
 | |
|             2. an object whose read() and readline() methods return bytes
 | |
| 
 | |
|         headers         : header dictionary-like object; default:
 | |
|             taken from environ as per CGI spec
 | |
| 
 | |
|         outerboundary   : terminating multipart boundary
 | |
|             (for internal use only)
 | |
| 
 | |
|         environ         : environment dictionary; default: os.environ
 | |
| 
 | |
|         keep_blank_values: flag indicating whether blank values in
 | |
|             percent-encoded forms should be treated as blank strings.
 | |
|             A true value indicates that blanks should be retained as
 | |
|             blank strings.  The default false value indicates that
 | |
|             blank values are to be ignored and treated as if they were
 | |
|             not included.
 | |
| 
 | |
|         strict_parsing: flag indicating what to do with parsing errors.
 | |
|             If false (the default), errors are silently ignored.
 | |
|             If true, errors raise a ValueError exception.
 | |
| 
 | |
|         limit : used internally to read parts of multipart/form-data forms,
 | |
|             to exit from the reading loop when reached. It is the difference
 | |
|             between the form content-length and the number of bytes already
 | |
|             read
 | |
| 
 | |
|         encoding, errors : the encoding and error handler used to decode the
 | |
|             binary stream to strings. Must be the same as the charset defined
 | |
|             for the page sending the form (content-type : meta http-equiv or
 | |
|             header)
 | |
| 
 | |
|         """
 | |
|         method = 'GET'
 | |
|         self.keep_blank_values = keep_blank_values
 | |
|         self.strict_parsing = strict_parsing
 | |
|         if 'REQUEST_METHOD' in environ:
 | |
|             method = environ['REQUEST_METHOD'].upper()
 | |
|         self.qs_on_post = None
 | |
|         if method == 'GET' or method == 'HEAD':
 | |
|             if 'QUERY_STRING' in environ:
 | |
|                 qs = environ['QUERY_STRING']
 | |
|             elif sys.argv[1:]:
 | |
|                 qs = sys.argv[1]
 | |
|             else:
 | |
|                 qs = ""
 | |
|             qs = qs.encode(locale.getpreferredencoding(), 'surrogateescape')
 | |
|             fp = BytesIO(qs)
 | |
|             if headers is None:
 | |
|                 headers = {'content-type':
 | |
|                            "application/x-www-form-urlencoded"}
 | |
|         if headers is None:
 | |
|             headers = {}
 | |
|             if method == 'POST':
 | |
|                 # Set default content-type for POST to what's traditional
 | |
|                 headers['content-type'] = "application/x-www-form-urlencoded"
 | |
|             if 'CONTENT_TYPE' in environ:
 | |
|                 headers['content-type'] = environ['CONTENT_TYPE']
 | |
|             if 'QUERY_STRING' in environ:
 | |
|                 self.qs_on_post = environ['QUERY_STRING']
 | |
|             if 'CONTENT_LENGTH' in environ:
 | |
|                 headers['content-length'] = environ['CONTENT_LENGTH']
 | |
|         if fp is None:
 | |
|             self.fp = sys.stdin.buffer
 | |
|         # self.fp.read() must return bytes
 | |
|         elif isinstance(fp, TextIOWrapper):
 | |
|             self.fp = fp.buffer
 | |
|         else:
 | |
|             self.fp = fp
 | |
| 
 | |
|         self.encoding = encoding
 | |
|         self.errors = errors
 | |
| 
 | |
|         self.headers = headers
 | |
|         if not isinstance(outerboundary, bytes):
 | |
|             raise TypeError('outerboundary must be bytes, not %s'
 | |
|                             % type(outerboundary).__name__)
 | |
|         self.outerboundary = outerboundary
 | |
| 
 | |
|         self.bytes_read = 0
 | |
|         self.limit = limit
 | |
| 
 | |
|         # Process content-disposition header
 | |
|         cdisp, pdict = "", {}
 | |
|         if 'content-disposition' in self.headers:
 | |
|             cdisp, pdict = parse_header(self.headers['content-disposition'])
 | |
|         self.disposition = cdisp
 | |
|         self.disposition_options = pdict
 | |
|         self.name = None
 | |
|         if 'name' in pdict:
 | |
|             self.name = pdict['name']
 | |
|         self.filename = None
 | |
|         if 'filename' in pdict:
 | |
|             self.filename = pdict['filename']
 | |
|         self._binary_file = self.filename is not None
 | |
| 
 | |
|         # Process content-type header
 | |
|         #
 | |
|         # Honor any existing content-type header.  But if there is no
 | |
|         # content-type header, use some sensible defaults.  Assume
 | |
|         # outerboundary is "" at the outer level, but something non-false
 | |
|         # inside a multi-part.  The default for an inner part is text/plain,
 | |
|         # but for an outer part it should be urlencoded.  This should catch
 | |
|         # bogus clients which erroneously forget to include a content-type
 | |
|         # header.
 | |
|         #
 | |
|         # See below for what we do if there does exist a content-type header,
 | |
|         # but it happens to be something we don't understand.
 | |
|         if 'content-type' in self.headers:
 | |
|             ctype, pdict = parse_header(self.headers['content-type'])
 | |
|         elif self.outerboundary or method != 'POST':
 | |
|             ctype, pdict = "text/plain", {}
 | |
|         else:
 | |
|             ctype, pdict = 'application/x-www-form-urlencoded', {}
 | |
|         self.type = ctype
 | |
|         self.type_options = pdict
 | |
|         if 'boundary' in pdict:
 | |
|             self.innerboundary = pdict['boundary'].encode(self.encoding)
 | |
|         else:
 | |
|             self.innerboundary = b""
 | |
| 
 | |
|         clen = -1
 | |
|         if 'content-length' in self.headers:
 | |
|             try:
 | |
|                 clen = int(self.headers['content-length'])
 | |
|             except ValueError:
 | |
|                 pass
 | |
|             if maxlen and clen > maxlen:
 | |
|                 raise ValueError('Maximum content length exceeded')
 | |
|         self.length = clen
 | |
|         if self.limit is None and clen:
 | |
|             self.limit = clen
 | |
| 
 | |
|         self.list = self.file = None
 | |
|         self.done = 0
 | |
|         if ctype == 'application/x-www-form-urlencoded':
 | |
|             self.read_urlencoded()
 | |
|         elif ctype[:10] == 'multipart/':
 | |
|             self.read_multi(environ, keep_blank_values, strict_parsing)
 | |
|         else:
 | |
|             self.read_single()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         """Return a printable representation."""
 | |
|         return "FieldStorage(%r, %r, %r)" % (
 | |
|                 self.name, self.filename, self.value)
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self.keys())
 | |
| 
 | |
|     def __getattr__(self, name):
 | |
|         if name != 'value':
 | |
|             raise AttributeError(name)
 | |
|         if self.file:
 | |
|             self.file.seek(0)
 | |
|             value = self.file.read()
 | |
|             self.file.seek(0)
 | |
|         elif self.list is not None:
 | |
|             value = self.list
 | |
|         else:
 | |
|             value = None
 | |
|         return value
 | |
| 
 | |
|     def __getitem__(self, key):
 | |
|         """Dictionary style indexing."""
 | |
|         if self.list is None:
 | |
|             raise TypeError("not indexable")
 | |
|         found = []
 | |
|         for item in self.list:
 | |
|             if item.name == key: found.append(item)
 | |
|         if not found:
 | |
|             raise KeyError(key)
 | |
|         if len(found) == 1:
 | |
|             return found[0]
 | |
|         else:
 | |
|             return found
 | |
| 
 | |
|     def getvalue(self, key, default=None):
 | |
|         """Dictionary style get() method, including 'value' lookup."""
 | |
|         if key in self:
 | |
|             value = self[key]
 | |
|             if isinstance(value, list):
 | |
|                 return [x.value for x in value]
 | |
|             else:
 | |
|                 return value.value
 | |
|         else:
 | |
|             return default
 | |
| 
 | |
|     def getfirst(self, key, default=None):
 | |
|         """ Return the first value received."""
 | |
|         if key in self:
 | |
|             value = self[key]
 | |
|             if isinstance(value, list):
 | |
|                 return value[0].value
 | |
|             else:
 | |
|                 return value.value
 | |
|         else:
 | |
|             return default
 | |
| 
 | |
|     def getlist(self, key):
 | |
|         """ Return list of received values."""
 | |
|         if key in self:
 | |
|             value = self[key]
 | |
|             if isinstance(value, list):
 | |
|                 return [x.value for x in value]
 | |
|             else:
 | |
|                 return [value.value]
 | |
|         else:
 | |
|             return []
 | |
| 
 | |
|     def keys(self):
 | |
|         """Dictionary style keys() method."""
 | |
|         if self.list is None:
 | |
|             raise TypeError("not indexable")
 | |
|         return list(set(item.name for item in self.list))
 | |
| 
 | |
|     def __contains__(self, key):
 | |
|         """Dictionary style __contains__ method."""
 | |
|         if self.list is None:
 | |
|             raise TypeError("not indexable")
 | |
|         return any(item.name == key for item in self.list)
 | |
| 
 | |
|     def __len__(self):
 | |
|         """Dictionary style len(x) support."""
 | |
|         return len(self.keys())
 | |
| 
 | |
|     def __nonzero__(self):
 | |
|         return bool(self.list)
 | |
| 
 | |
|     def read_urlencoded(self):
 | |
|         """Internal: read data in query string format."""
 | |
|         qs = self.fp.read(self.length)
 | |
|         if not isinstance(qs, bytes):
 | |
|             raise ValueError("%s should return bytes, got %s" \
 | |
|                              % (self.fp, type(qs).__name__))
 | |
|         qs = qs.decode(self.encoding, self.errors)
 | |
|         if self.qs_on_post:
 | |
|             qs += '&' + self.qs_on_post
 | |
|         self.list = []
 | |
|         query = urllib.parse.parse_qsl(
 | |
|             qs, self.keep_blank_values, self.strict_parsing,
 | |
|             encoding=self.encoding, errors=self.errors)
 | |
|         for key, value in query:
 | |
|             self.list.append(MiniFieldStorage(key, value))
 | |
|         self.skip_lines()
 | |
| 
 | |
|     FieldStorageClass = None
 | |
| 
 | |
|     def read_multi(self, environ, keep_blank_values, strict_parsing):
 | |
|         """Internal: read a part that is itself multipart."""
 | |
|         ib = self.innerboundary
 | |
|         if not valid_boundary(ib):
 | |
|             raise ValueError('Invalid boundary in multipart form: %r' % (ib,))
 | |
|         self.list = []
 | |
|         if self.qs_on_post:
 | |
|             query = urllib.parse.parse_qsl(
 | |
|                 self.qs_on_post, self.keep_blank_values, self.strict_parsing,
 | |
|                 encoding=self.encoding, errors=self.errors)
 | |
|             for key, value in query:
 | |
|                 self.list.append(MiniFieldStorage(key, value))
 | |
|             FieldStorageClass = None
 | |
| 
 | |
|         klass = self.FieldStorageClass or self.__class__
 | |
|         first_line = self.fp.readline() # bytes
 | |
|         if not isinstance(first_line, bytes):
 | |
|             raise ValueError("%s should return bytes, got %s" \
 | |
|                              % (self.fp, type(first_line).__name__))
 | |
|         self.bytes_read += len(first_line)
 | |
|         # first line holds boundary ; ignore it, or check that
 | |
|         # b"--" + ib == first_line.strip() ?
 | |
|         while True:
 | |
|             parser = FeedParser()
 | |
|             hdr_text = b""
 | |
|             while True:
 | |
|                 data = self.fp.readline()
 | |
|                 hdr_text += data
 | |
|                 if not data.strip():
 | |
|                     break
 | |
|             if not hdr_text:
 | |
|                 break
 | |
|             # parser takes strings, not bytes
 | |
|             self.bytes_read += len(hdr_text)
 | |
|             parser.feed(hdr_text.decode(self.encoding, self.errors))
 | |
|             headers = parser.close()
 | |
|             part = klass(self.fp, headers, ib, environ, keep_blank_values,
 | |
|                          strict_parsing,self.limit-self.bytes_read,
 | |
|                          self.encoding, self.errors)
 | |
|             self.bytes_read += part.bytes_read
 | |
|             self.list.append(part)
 | |
|             if self.bytes_read >= self.length:
 | |
|                 break
 | |
|         self.skip_lines()
 | |
| 
 | |
|     def read_single(self):
 | |
|         """Internal: read an atomic part."""
 | |
|         if self.length >= 0:
 | |
|             self.read_binary()
 | |
|             self.skip_lines()
 | |
|         else:
 | |
|             self.read_lines()
 | |
|         self.file.seek(0)
 | |
| 
 | |
|     bufsize = 8*1024            # I/O buffering size for copy to file
 | |
| 
 | |
|     def read_binary(self):
 | |
|         """Internal: read binary data."""
 | |
|         self.file = self.make_file()
 | |
|         todo = self.length
 | |
|         if todo >= 0:
 | |
|             while todo > 0:
 | |
|                 data = self.fp.read(min(todo, self.bufsize)) # bytes
 | |
|                 if not isinstance(data, bytes):
 | |
|                     raise ValueError("%s should return bytes, got %s"
 | |
|                                      % (self.fp, type(data).__name__))
 | |
|                 self.bytes_read += len(data)
 | |
|                 if not data:
 | |
|                     self.done = -1
 | |
|                     break
 | |
|                 self.file.write(data)
 | |
|                 todo = todo - len(data)
 | |
| 
 | |
|     def read_lines(self):
 | |
|         """Internal: read lines until EOF or outerboundary."""
 | |
|         if self._binary_file:
 | |
|             self.file = self.__file = BytesIO() # store data as bytes for files
 | |
|         else:
 | |
|             self.file = self.__file = StringIO() # as strings for other fields
 | |
|         if self.outerboundary:
 | |
|             self.read_lines_to_outerboundary()
 | |
|         else:
 | |
|             self.read_lines_to_eof()
 | |
| 
 | |
|     def __write(self, line):
 | |
|         """line is always bytes, not string"""
 | |
|         if self.__file is not None:
 | |
|             if self.__file.tell() + len(line) > 1000:
 | |
|                 self.file = self.make_file()
 | |
|                 data = self.__file.getvalue()
 | |
|                 self.file.write(data)
 | |
|                 self.__file = None
 | |
|         if self._binary_file:
 | |
|             # keep bytes
 | |
|             self.file.write(line)
 | |
|         else:
 | |
|             # decode to string
 | |
|             self.file.write(line.decode(self.encoding, self.errors))
 | |
| 
 | |
|     def read_lines_to_eof(self):
 | |
|         """Internal: read lines until EOF."""
 | |
|         while 1:
 | |
|             line = self.fp.readline(1<<16) # bytes
 | |
|             self.bytes_read += len(line)
 | |
|             if not line:
 | |
|                 self.done = -1
 | |
|                 break
 | |
|             self.__write(line)
 | |
| 
 | |
|     def read_lines_to_outerboundary(self):
 | |
|         """Internal: read lines until outerboundary.
 | |
|         Data is read as bytes: boundaries and line ends must be converted
 | |
|         to bytes for comparisons.
 | |
|         """
 | |
|         next_boundary = b"--" + self.outerboundary
 | |
|         last_boundary = next_boundary + b"--"
 | |
|         delim = b""
 | |
|         last_line_lfend = True
 | |
|         _read = 0
 | |
|         while 1:
 | |
|             if _read >= self.limit:
 | |
|                 break
 | |
|             line = self.fp.readline(1<<16) # bytes
 | |
|             self.bytes_read += len(line)
 | |
|             _read += len(line)
 | |
|             if not line:
 | |
|                 self.done = -1
 | |
|                 break
 | |
|             if line.startswith(b"--") and last_line_lfend:
 | |
|                 strippedline = line.rstrip()
 | |
|                 if strippedline == next_boundary:
 | |
|                     break
 | |
|                 if strippedline == last_boundary:
 | |
|                     self.done = 1
 | |
|                     break
 | |
|             odelim = delim
 | |
|             if line.endswith(b"\r\n"):
 | |
|                 delim = b"\r\n"
 | |
|                 line = line[:-2]
 | |
|                 last_line_lfend = True
 | |
|             elif line.endswith(b"\n"):
 | |
|                 delim = b"\n"
 | |
|                 line = line[:-1]
 | |
|                 last_line_lfend = True
 | |
|             else:
 | |
|                 delim = b""
 | |
|                 last_line_lfend = False
 | |
|             self.__write(odelim + line)
 | |
| 
 | |
|     def skip_lines(self):
 | |
|         """Internal: skip lines until outer boundary if defined."""
 | |
|         if not self.outerboundary or self.done:
 | |
|             return
 | |
|         next_boundary = b"--" + self.outerboundary
 | |
|         last_boundary = next_boundary + b"--"
 | |
|         last_line_lfend = True
 | |
|         while True:
 | |
|             line = self.fp.readline(1<<16)
 | |
|             self.bytes_read += len(line)
 | |
|             if not line:
 | |
|                 self.done = -1
 | |
|                 break
 | |
|             if line.endswith(b"--") and last_line_lfend:
 | |
|                 strippedline = line.strip()
 | |
|                 if strippedline == next_boundary:
 | |
|                     break
 | |
|                 if strippedline == last_boundary:
 | |
|                     self.done = 1
 | |
|                     break
 | |
|             last_line_lfend = line.endswith(b'\n')
 | |
| 
 | |
|     def make_file(self):
 | |
|         """Overridable: return a readable & writable file.
 | |
| 
 | |
|         The file will be used as follows:
 | |
|         - data is written to it
 | |
|         - seek(0)
 | |
|         - data is read from it
 | |
| 
 | |
|         The file is opened in binary mode for files, in text mode
 | |
|         for other fields
 | |
| 
 | |
|         This version opens a temporary file for reading and writing,
 | |
|         and immediately deletes (unlinks) it.  The trick (on Unix!) is
 | |
|         that the file can still be used, but it can't be opened by
 | |
|         another process, and it will automatically be deleted when it
 | |
|         is closed or when the current process terminates.
 | |
| 
 | |
|         If you want a more permanent file, you derive a class which
 | |
|         overrides this method.  If you want a visible temporary file
 | |
|         that is nevertheless automatically deleted when the script
 | |
|         terminates, try defining a __del__ method in a derived class
 | |
|         which unlinks the temporary files you have created.
 | |
| 
 | |
|         """
 | |
|         if self._binary_file:
 | |
|             return tempfile.TemporaryFile("wb+")
 | |
|         else:
 | |
|             return tempfile.TemporaryFile("w+",
 | |
|                 encoding=self.encoding, newline = '\n')
 | |
| 
 | |
| 
 | |
| # Test/debug code
 | |
| # ===============
 | |
| 
 | |
| def test(environ=os.environ):
 | |
|     """Robust test CGI script, usable as main program.
 | |
| 
 | |
|     Write minimal HTTP headers and dump all information provided to
 | |
|     the script in HTML form.
 | |
| 
 | |
|     """
 | |
|     print("Content-type: text/html")
 | |
|     print()
 | |
|     sys.stderr = sys.stdout
 | |
|     try:
 | |
|         form = FieldStorage()   # Replace with other classes to test those
 | |
|         print_directory()
 | |
|         print_arguments()
 | |
|         print_form(form)
 | |
|         print_environ(environ)
 | |
|         print_environ_usage()
 | |
|         def f():
 | |
|             exec("testing print_exception() -- <I>italics?</I>")
 | |
|         def g(f=f):
 | |
|             f()
 | |
|         print("<H3>What follows is a test, not an actual exception:</H3>")
 | |
|         g()
 | |
|     except:
 | |
|         print_exception()
 | |
| 
 | |
|     print("<H1>Second try with a small maxlen...</H1>")
 | |
| 
 | |
|     global maxlen
 | |
|     maxlen = 50
 | |
|     try:
 | |
|         form = FieldStorage()   # Replace with other classes to test those
 | |
|         print_directory()
 | |
|         print_arguments()
 | |
|         print_form(form)
 | |
|         print_environ(environ)
 | |
|     except:
 | |
|         print_exception()
 | |
| 
 | |
| def print_exception(type=None, value=None, tb=None, limit=None):
 | |
|     if type is None:
 | |
|         type, value, tb = sys.exc_info()
 | |
|     import traceback
 | |
|     print()
 | |
|     print("<H3>Traceback (most recent call last):</H3>")
 | |
|     list = traceback.format_tb(tb, limit) + \
 | |
|            traceback.format_exception_only(type, value)
 | |
|     print("<PRE>%s<B>%s</B></PRE>" % (
 | |
|         html.escape("".join(list[:-1])),
 | |
|         html.escape(list[-1]),
 | |
|         ))
 | |
|     del tb
 | |
| 
 | |
| def print_environ(environ=os.environ):
 | |
|     """Dump the shell environment as HTML."""
 | |
|     keys = sorted(environ.keys())
 | |
|     print()
 | |
|     print("<H3>Shell Environment:</H3>")
 | |
|     print("<DL>")
 | |
|     for key in keys:
 | |
|         print("<DT>", html.escape(key), "<DD>", html.escape(environ[key]))
 | |
|     print("</DL>")
 | |
|     print()
 | |
| 
 | |
| def print_form(form):
 | |
|     """Dump the contents of a form as HTML."""
 | |
|     keys = sorted(form.keys())
 | |
|     print()
 | |
|     print("<H3>Form Contents:</H3>")
 | |
|     if not keys:
 | |
|         print("<P>No form fields.")
 | |
|     print("<DL>")
 | |
|     for key in keys:
 | |
|         print("<DT>" + html.escape(key) + ":", end=' ')
 | |
|         value = form[key]
 | |
|         print("<i>" + html.escape(repr(type(value))) + "</i>")
 | |
|         print("<DD>" + html.escape(repr(value)))
 | |
|     print("</DL>")
 | |
|     print()
 | |
| 
 | |
| def print_directory():
 | |
|     """Dump the current directory as HTML."""
 | |
|     print()
 | |
|     print("<H3>Current Working Directory:</H3>")
 | |
|     try:
 | |
|         pwd = os.getcwd()
 | |
|     except os.error as msg:
 | |
|         print("os.error:", html.escape(str(msg)))
 | |
|     else:
 | |
|         print(html.escape(pwd))
 | |
|     print()
 | |
| 
 | |
| def print_arguments():
 | |
|     print()
 | |
|     print("<H3>Command Line Arguments:</H3>")
 | |
|     print()
 | |
|     print(sys.argv)
 | |
|     print()
 | |
| 
 | |
| def print_environ_usage():
 | |
|     """Dump a list of environment variables used by CGI as HTML."""
 | |
|     print("""
 | |
| <H3>These environment variables could have been set:</H3>
 | |
| <UL>
 | |
| <LI>AUTH_TYPE
 | |
| <LI>CONTENT_LENGTH
 | |
| <LI>CONTENT_TYPE
 | |
| <LI>DATE_GMT
 | |
| <LI>DATE_LOCAL
 | |
| <LI>DOCUMENT_NAME
 | |
| <LI>DOCUMENT_ROOT
 | |
| <LI>DOCUMENT_URI
 | |
| <LI>GATEWAY_INTERFACE
 | |
| <LI>LAST_MODIFIED
 | |
| <LI>PATH
 | |
| <LI>PATH_INFO
 | |
| <LI>PATH_TRANSLATED
 | |
| <LI>QUERY_STRING
 | |
| <LI>REMOTE_ADDR
 | |
| <LI>REMOTE_HOST
 | |
| <LI>REMOTE_IDENT
 | |
| <LI>REMOTE_USER
 | |
| <LI>REQUEST_METHOD
 | |
| <LI>SCRIPT_NAME
 | |
| <LI>SERVER_NAME
 | |
| <LI>SERVER_PORT
 | |
| <LI>SERVER_PROTOCOL
 | |
| <LI>SERVER_ROOT
 | |
| <LI>SERVER_SOFTWARE
 | |
| </UL>
 | |
| In addition, HTTP headers sent by the server may be passed in the
 | |
| environment as well.  Here are some common variable names:
 | |
| <UL>
 | |
| <LI>HTTP_ACCEPT
 | |
| <LI>HTTP_CONNECTION
 | |
| <LI>HTTP_HOST
 | |
| <LI>HTTP_PRAGMA
 | |
| <LI>HTTP_REFERER
 | |
| <LI>HTTP_USER_AGENT
 | |
| </UL>
 | |
| """)
 | |
| 
 | |
| 
 | |
| # Utilities
 | |
| # =========
 | |
| 
 | |
| def escape(s, quote=None):
 | |
|     """Deprecated API."""
 | |
|     warn("cgi.escape is deprecated, use html.escape instead",
 | |
|          PendingDeprecationWarning, stacklevel=2)
 | |
|     s = s.replace("&", "&") # Must be done first!
 | |
|     s = s.replace("<", "<")
 | |
|     s = s.replace(">", ">")
 | |
|     if quote:
 | |
|         s = s.replace('"', """)
 | |
|     return s
 | |
| 
 | |
| 
 | |
| def valid_boundary(s, _vb_pattern=None):
 | |
|     import re
 | |
|     if isinstance(s, bytes):
 | |
|         _vb_pattern = b"^[ -~]{0,200}[!-~]$"
 | |
|     else:
 | |
|         _vb_pattern = "^[ -~]{0,200}[!-~]$"
 | |
|     return re.match(_vb_pattern, s)
 | |
| 
 | |
| # Invoke mainline
 | |
| # ===============
 | |
| 
 | |
| # Call test() when this file is run as a script (not imported as a module)
 | |
| if __name__ == '__main__':
 | |
|     test()
 | 
