mirror of
https://github.com/python/cpython.git
synced 2025-07-13 22:35:18 +00:00

It consists of code from urllib, urllib2, urlparse, and robotparser. The old modules have all been removed. The new package has five submodules: urllib.parse, urllib.request, urllib.response, urllib.error, and urllib.robotparser. The urllib.request.urlopen() function uses the url opener from urllib2. Note that the unittests have not been renamed for the beta, but they will be renamed in the future. Joint work with Senthil Kumaran.
386 lines
12 KiB
Python
386 lines
12 KiB
Python
"""Implementation of the DOM Level 3 'LS-Load' feature."""
|
|
|
|
import copy
|
|
import xml.dom
|
|
|
|
from xml.dom.NodeFilter import NodeFilter
|
|
|
|
|
|
__all__ = ["DOMBuilder", "DOMEntityResolver", "DOMInputSource"]
|
|
|
|
|
|
class Options:
|
|
"""Features object that has variables set for each DOMBuilder feature.
|
|
|
|
The DOMBuilder class uses an instance of this class to pass settings to
|
|
the ExpatBuilder class.
|
|
"""
|
|
|
|
# Note that the DOMBuilder class in LoadSave constrains which of these
|
|
# values can be set using the DOM Level 3 LoadSave feature.
|
|
|
|
namespaces = 1
|
|
namespace_declarations = True
|
|
validation = False
|
|
external_parameter_entities = True
|
|
external_general_entities = True
|
|
external_dtd_subset = True
|
|
validate_if_schema = False
|
|
validate = False
|
|
datatype_normalization = False
|
|
create_entity_ref_nodes = True
|
|
entities = True
|
|
whitespace_in_element_content = True
|
|
cdata_sections = True
|
|
comments = True
|
|
charset_overrides_xml_encoding = True
|
|
infoset = False
|
|
supported_mediatypes_only = False
|
|
|
|
errorHandler = None
|
|
filter = None
|
|
|
|
|
|
class DOMBuilder:
|
|
entityResolver = None
|
|
errorHandler = None
|
|
filter = None
|
|
|
|
ACTION_REPLACE = 1
|
|
ACTION_APPEND_AS_CHILDREN = 2
|
|
ACTION_INSERT_AFTER = 3
|
|
ACTION_INSERT_BEFORE = 4
|
|
|
|
_legal_actions = (ACTION_REPLACE, ACTION_APPEND_AS_CHILDREN,
|
|
ACTION_INSERT_AFTER, ACTION_INSERT_BEFORE)
|
|
|
|
def __init__(self):
|
|
self._options = Options()
|
|
|
|
def _get_entityResolver(self):
|
|
return self.entityResolver
|
|
def _set_entityResolver(self, entityResolver):
|
|
self.entityResolver = entityResolver
|
|
|
|
def _get_errorHandler(self):
|
|
return self.errorHandler
|
|
def _set_errorHandler(self, errorHandler):
|
|
self.errorHandler = errorHandler
|
|
|
|
def _get_filter(self):
|
|
return self.filter
|
|
def _set_filter(self, filter):
|
|
self.filter = filter
|
|
|
|
def setFeature(self, name, state):
|
|
if self.supportsFeature(name):
|
|
state = state and 1 or 0
|
|
try:
|
|
settings = self._settings[(_name_xform(name), state)]
|
|
except KeyError:
|
|
raise xml.dom.NotSupportedErr(
|
|
"unsupported feature: %r" % (name,))
|
|
else:
|
|
for name, value in settings:
|
|
setattr(self._options, name, value)
|
|
else:
|
|
raise xml.dom.NotFoundErr("unknown feature: " + repr(name))
|
|
|
|
def supportsFeature(self, name):
|
|
return hasattr(self._options, _name_xform(name))
|
|
|
|
def canSetFeature(self, name, state):
|
|
key = (_name_xform(name), state and 1 or 0)
|
|
return key in self._settings
|
|
|
|
# This dictionary maps from (feature,value) to a list of
|
|
# (option,value) pairs that should be set on the Options object.
|
|
# If a (feature,value) setting is not in this dictionary, it is
|
|
# not supported by the DOMBuilder.
|
|
#
|
|
_settings = {
|
|
("namespace_declarations", 0): [
|
|
("namespace_declarations", 0)],
|
|
("namespace_declarations", 1): [
|
|
("namespace_declarations", 1)],
|
|
("validation", 0): [
|
|
("validation", 0)],
|
|
("external_general_entities", 0): [
|
|
("external_general_entities", 0)],
|
|
("external_general_entities", 1): [
|
|
("external_general_entities", 1)],
|
|
("external_parameter_entities", 0): [
|
|
("external_parameter_entities", 0)],
|
|
("external_parameter_entities", 1): [
|
|
("external_parameter_entities", 1)],
|
|
("validate_if_schema", 0): [
|
|
("validate_if_schema", 0)],
|
|
("create_entity_ref_nodes", 0): [
|
|
("create_entity_ref_nodes", 0)],
|
|
("create_entity_ref_nodes", 1): [
|
|
("create_entity_ref_nodes", 1)],
|
|
("entities", 0): [
|
|
("create_entity_ref_nodes", 0),
|
|
("entities", 0)],
|
|
("entities", 1): [
|
|
("entities", 1)],
|
|
("whitespace_in_element_content", 0): [
|
|
("whitespace_in_element_content", 0)],
|
|
("whitespace_in_element_content", 1): [
|
|
("whitespace_in_element_content", 1)],
|
|
("cdata_sections", 0): [
|
|
("cdata_sections", 0)],
|
|
("cdata_sections", 1): [
|
|
("cdata_sections", 1)],
|
|
("comments", 0): [
|
|
("comments", 0)],
|
|
("comments", 1): [
|
|
("comments", 1)],
|
|
("charset_overrides_xml_encoding", 0): [
|
|
("charset_overrides_xml_encoding", 0)],
|
|
("charset_overrides_xml_encoding", 1): [
|
|
("charset_overrides_xml_encoding", 1)],
|
|
("infoset", 0): [],
|
|
("infoset", 1): [
|
|
("namespace_declarations", 0),
|
|
("validate_if_schema", 0),
|
|
("create_entity_ref_nodes", 0),
|
|
("entities", 0),
|
|
("cdata_sections", 0),
|
|
("datatype_normalization", 1),
|
|
("whitespace_in_element_content", 1),
|
|
("comments", 1),
|
|
("charset_overrides_xml_encoding", 1)],
|
|
("supported_mediatypes_only", 0): [
|
|
("supported_mediatypes_only", 0)],
|
|
("namespaces", 0): [
|
|
("namespaces", 0)],
|
|
("namespaces", 1): [
|
|
("namespaces", 1)],
|
|
}
|
|
|
|
def getFeature(self, name):
|
|
xname = _name_xform(name)
|
|
try:
|
|
return getattr(self._options, xname)
|
|
except AttributeError:
|
|
if name == "infoset":
|
|
options = self._options
|
|
return (options.datatype_normalization
|
|
and options.whitespace_in_element_content
|
|
and options.comments
|
|
and options.charset_overrides_xml_encoding
|
|
and not (options.namespace_declarations
|
|
or options.validate_if_schema
|
|
or options.create_entity_ref_nodes
|
|
or options.entities
|
|
or options.cdata_sections))
|
|
raise xml.dom.NotFoundErr("feature %s not known" % repr(name))
|
|
|
|
def parseURI(self, uri):
|
|
if self.entityResolver:
|
|
input = self.entityResolver.resolveEntity(None, uri)
|
|
else:
|
|
input = DOMEntityResolver().resolveEntity(None, uri)
|
|
return self.parse(input)
|
|
|
|
def parse(self, input):
|
|
options = copy.copy(self._options)
|
|
options.filter = self.filter
|
|
options.errorHandler = self.errorHandler
|
|
fp = input.byteStream
|
|
if fp is None and options.systemId:
|
|
import urllib.request
|
|
fp = urllib.request.urlopen(input.systemId)
|
|
return self._parse_bytestream(fp, options)
|
|
|
|
def parseWithContext(self, input, cnode, action):
|
|
if action not in self._legal_actions:
|
|
raise ValueError("not a legal action")
|
|
raise NotImplementedError("Haven't written this yet...")
|
|
|
|
def _parse_bytestream(self, stream, options):
|
|
import xml.dom.expatbuilder
|
|
builder = xml.dom.expatbuilder.makeBuilder(options)
|
|
return builder.parseFile(stream)
|
|
|
|
|
|
def _name_xform(name):
|
|
return name.lower().replace('-', '_')
|
|
|
|
|
|
class DOMEntityResolver(object):
|
|
__slots__ = '_opener',
|
|
|
|
def resolveEntity(self, publicId, systemId):
|
|
assert systemId is not None
|
|
source = DOMInputSource()
|
|
source.publicId = publicId
|
|
source.systemId = systemId
|
|
source.byteStream = self._get_opener().open(systemId)
|
|
|
|
# determine the encoding if the transport provided it
|
|
source.encoding = self._guess_media_encoding(source)
|
|
|
|
# determine the base URI is we can
|
|
import posixpath, urllib.parse
|
|
parts = urllib.parse.urlparse(systemId)
|
|
scheme, netloc, path, params, query, fragment = parts
|
|
# XXX should we check the scheme here as well?
|
|
if path and not path.endswith("/"):
|
|
path = posixpath.dirname(path) + "/"
|
|
parts = scheme, netloc, path, params, query, fragment
|
|
source.baseURI = urllib.parse.urlunparse(parts)
|
|
|
|
return source
|
|
|
|
def _get_opener(self):
|
|
try:
|
|
return self._opener
|
|
except AttributeError:
|
|
self._opener = self._create_opener()
|
|
return self._opener
|
|
|
|
def _create_opener(self):
|
|
import urllib.request
|
|
return urllib.request.build_opener()
|
|
|
|
def _guess_media_encoding(self, source):
|
|
info = source.byteStream.info()
|
|
if "Content-Type" in info:
|
|
for param in info.getplist():
|
|
if param.startswith("charset="):
|
|
return param.split("=", 1)[1].lower()
|
|
|
|
|
|
class DOMInputSource(object):
|
|
__slots__ = ('byteStream', 'characterStream', 'stringData',
|
|
'encoding', 'publicId', 'systemId', 'baseURI')
|
|
|
|
def __init__(self):
|
|
self.byteStream = None
|
|
self.characterStream = None
|
|
self.stringData = None
|
|
self.encoding = None
|
|
self.publicId = None
|
|
self.systemId = None
|
|
self.baseURI = None
|
|
|
|
def _get_byteStream(self):
|
|
return self.byteStream
|
|
def _set_byteStream(self, byteStream):
|
|
self.byteStream = byteStream
|
|
|
|
def _get_characterStream(self):
|
|
return self.characterStream
|
|
def _set_characterStream(self, characterStream):
|
|
self.characterStream = characterStream
|
|
|
|
def _get_stringData(self):
|
|
return self.stringData
|
|
def _set_stringData(self, data):
|
|
self.stringData = data
|
|
|
|
def _get_encoding(self):
|
|
return self.encoding
|
|
def _set_encoding(self, encoding):
|
|
self.encoding = encoding
|
|
|
|
def _get_publicId(self):
|
|
return self.publicId
|
|
def _set_publicId(self, publicId):
|
|
self.publicId = publicId
|
|
|
|
def _get_systemId(self):
|
|
return self.systemId
|
|
def _set_systemId(self, systemId):
|
|
self.systemId = systemId
|
|
|
|
def _get_baseURI(self):
|
|
return self.baseURI
|
|
def _set_baseURI(self, uri):
|
|
self.baseURI = uri
|
|
|
|
|
|
class DOMBuilderFilter:
|
|
"""Element filter which can be used to tailor construction of
|
|
a DOM instance.
|
|
"""
|
|
|
|
# There's really no need for this class; concrete implementations
|
|
# should just implement the endElement() and startElement()
|
|
# methods as appropriate. Using this makes it easy to only
|
|
# implement one of them.
|
|
|
|
FILTER_ACCEPT = 1
|
|
FILTER_REJECT = 2
|
|
FILTER_SKIP = 3
|
|
FILTER_INTERRUPT = 4
|
|
|
|
whatToShow = NodeFilter.SHOW_ALL
|
|
|
|
def _get_whatToShow(self):
|
|
return self.whatToShow
|
|
|
|
def acceptNode(self, element):
|
|
return self.FILTER_ACCEPT
|
|
|
|
def startContainer(self, element):
|
|
return self.FILTER_ACCEPT
|
|
|
|
del NodeFilter
|
|
|
|
|
|
class DocumentLS:
|
|
"""Mixin to create documents that conform to the load/save spec."""
|
|
|
|
async = False
|
|
|
|
def _get_async(self):
|
|
return False
|
|
def _set_async(self, async):
|
|
if async:
|
|
raise xml.dom.NotSupportedErr(
|
|
"asynchronous document loading is not supported")
|
|
|
|
def abort(self):
|
|
# What does it mean to "clear" a document? Does the
|
|
# documentElement disappear?
|
|
raise NotImplementedError(
|
|
"haven't figured out what this means yet")
|
|
|
|
def load(self, uri):
|
|
raise NotImplementedError("haven't written this yet")
|
|
|
|
def loadXML(self, source):
|
|
raise NotImplementedError("haven't written this yet")
|
|
|
|
def saveXML(self, snode):
|
|
if snode is None:
|
|
snode = self
|
|
elif snode.ownerDocument is not self:
|
|
raise xml.dom.WrongDocumentErr()
|
|
return snode.toxml()
|
|
|
|
|
|
class DOMImplementationLS:
|
|
MODE_SYNCHRONOUS = 1
|
|
MODE_ASYNCHRONOUS = 2
|
|
|
|
def createDOMBuilder(self, mode, schemaType):
|
|
if schemaType is not None:
|
|
raise xml.dom.NotSupportedErr(
|
|
"schemaType not yet supported")
|
|
if mode == self.MODE_SYNCHRONOUS:
|
|
return DOMBuilder()
|
|
if mode == self.MODE_ASYNCHRONOUS:
|
|
raise xml.dom.NotSupportedErr(
|
|
"asynchronous builders are not supported")
|
|
raise ValueError("unknown value for mode")
|
|
|
|
def createDOMWriter(self):
|
|
raise NotImplementedError(
|
|
"the writer interface hasn't been written yet!")
|
|
|
|
def createDOMInputSource(self):
|
|
return DOMInputSource()
|