mirror of
https://github.com/python/cpython.git
synced 2025-09-12 03:37:09 +00:00

Completely get rid of StringIO.py and cStringIO.c. I had to fix a few tests and modules beyond what Christian did, and invent a few conventions. E.g. in elementtree, I chose to write/return Unicode strings whe no encoding is given, but bytes when an explicit encoding is given. Also mimetools was made to always assume binary files.
346 lines
12 KiB
Python
346 lines
12 KiB
Python
import xml.sax
|
|
import xml.sax.handler
|
|
import types
|
|
|
|
START_ELEMENT = "START_ELEMENT"
|
|
END_ELEMENT = "END_ELEMENT"
|
|
COMMENT = "COMMENT"
|
|
START_DOCUMENT = "START_DOCUMENT"
|
|
END_DOCUMENT = "END_DOCUMENT"
|
|
PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
|
|
IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
|
|
CHARACTERS = "CHARACTERS"
|
|
|
|
class PullDOM(xml.sax.ContentHandler):
|
|
_locator = None
|
|
document = None
|
|
|
|
def __init__(self, documentFactory=None):
|
|
from xml.dom import XML_NAMESPACE
|
|
self.documentFactory = documentFactory
|
|
self.firstEvent = [None, None]
|
|
self.lastEvent = self.firstEvent
|
|
self.elementStack = []
|
|
self.push = self.elementStack.append
|
|
try:
|
|
self.pop = self.elementStack.pop
|
|
except AttributeError:
|
|
# use class' pop instead
|
|
pass
|
|
self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
|
|
self._current_context = self._ns_contexts[-1]
|
|
self.pending_events = []
|
|
|
|
def pop(self):
|
|
result = self.elementStack[-1]
|
|
del self.elementStack[-1]
|
|
return result
|
|
|
|
def setDocumentLocator(self, locator):
|
|
self._locator = locator
|
|
|
|
def startPrefixMapping(self, prefix, uri):
|
|
if not hasattr(self, '_xmlns_attrs'):
|
|
self._xmlns_attrs = []
|
|
self._xmlns_attrs.append((prefix or 'xmlns', uri))
|
|
self._ns_contexts.append(self._current_context.copy())
|
|
self._current_context[uri] = prefix or None
|
|
|
|
def endPrefixMapping(self, prefix):
|
|
self._current_context = self._ns_contexts.pop()
|
|
|
|
def startElementNS(self, name, tagName , attrs):
|
|
# Retrieve xml namespace declaration attributes.
|
|
xmlns_uri = 'http://www.w3.org/2000/xmlns/'
|
|
xmlns_attrs = getattr(self, '_xmlns_attrs', None)
|
|
if xmlns_attrs is not None:
|
|
for aname, value in xmlns_attrs:
|
|
attrs._attrs[(xmlns_uri, aname)] = value
|
|
self._xmlns_attrs = []
|
|
uri, localname = name
|
|
if uri:
|
|
# When using namespaces, the reader may or may not
|
|
# provide us with the original name. If not, create
|
|
# *a* valid tagName from the current context.
|
|
if tagName is None:
|
|
prefix = self._current_context[uri]
|
|
if prefix:
|
|
tagName = prefix + ":" + localname
|
|
else:
|
|
tagName = localname
|
|
if self.document:
|
|
node = self.document.createElementNS(uri, tagName)
|
|
else:
|
|
node = self.buildDocument(uri, tagName)
|
|
else:
|
|
# When the tagname is not prefixed, it just appears as
|
|
# localname
|
|
if self.document:
|
|
node = self.document.createElement(localname)
|
|
else:
|
|
node = self.buildDocument(None, localname)
|
|
|
|
for aname,value in attrs.items():
|
|
a_uri, a_localname = aname
|
|
if a_uri == xmlns_uri:
|
|
if a_localname == 'xmlns':
|
|
qname = a_localname
|
|
else:
|
|
qname = 'xmlns:' + a_localname
|
|
attr = self.document.createAttributeNS(a_uri, qname)
|
|
node.setAttributeNodeNS(attr)
|
|
elif a_uri:
|
|
prefix = self._current_context[a_uri]
|
|
if prefix:
|
|
qname = prefix + ":" + a_localname
|
|
else:
|
|
qname = a_localname
|
|
attr = self.document.createAttributeNS(a_uri, qname)
|
|
node.setAttributeNodeNS(attr)
|
|
else:
|
|
attr = self.document.createAttribute(a_localname)
|
|
node.setAttributeNode(attr)
|
|
attr.value = value
|
|
|
|
self.lastEvent[1] = [(START_ELEMENT, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
self.push(node)
|
|
|
|
def endElementNS(self, name, tagName):
|
|
self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
|
|
def startElement(self, name, attrs):
|
|
if self.document:
|
|
node = self.document.createElement(name)
|
|
else:
|
|
node = self.buildDocument(None, name)
|
|
|
|
for aname,value in attrs.items():
|
|
attr = self.document.createAttribute(aname)
|
|
attr.value = value
|
|
node.setAttributeNode(attr)
|
|
|
|
self.lastEvent[1] = [(START_ELEMENT, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
self.push(node)
|
|
|
|
def endElement(self, name):
|
|
self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
|
|
def comment(self, s):
|
|
if self.document:
|
|
node = self.document.createComment(s)
|
|
self.lastEvent[1] = [(COMMENT, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
else:
|
|
event = [(COMMENT, s), None]
|
|
self.pending_events.append(event)
|
|
|
|
def processingInstruction(self, target, data):
|
|
if self.document:
|
|
node = self.document.createProcessingInstruction(target, data)
|
|
self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
else:
|
|
event = [(PROCESSING_INSTRUCTION, target, data), None]
|
|
self.pending_events.append(event)
|
|
|
|
def ignorableWhitespace(self, chars):
|
|
node = self.document.createTextNode(chars)
|
|
self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
|
|
def characters(self, chars):
|
|
node = self.document.createTextNode(chars)
|
|
self.lastEvent[1] = [(CHARACTERS, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
|
|
def startDocument(self):
|
|
if self.documentFactory is None:
|
|
import xml.dom.minidom
|
|
self.documentFactory = xml.dom.minidom.Document.implementation
|
|
|
|
def buildDocument(self, uri, tagname):
|
|
# Can't do that in startDocument, since we need the tagname
|
|
# XXX: obtain DocumentType
|
|
node = self.documentFactory.createDocument(uri, tagname, None)
|
|
self.document = node
|
|
self.lastEvent[1] = [(START_DOCUMENT, node), None]
|
|
self.lastEvent = self.lastEvent[1]
|
|
self.push(node)
|
|
# Put everything we have seen so far into the document
|
|
for e in self.pending_events:
|
|
if e[0][0] == PROCESSING_INSTRUCTION:
|
|
_,target,data = e[0]
|
|
n = self.document.createProcessingInstruction(target, data)
|
|
e[0] = (PROCESSING_INSTRUCTION, n)
|
|
elif e[0][0] == COMMENT:
|
|
n = self.document.createComment(e[0][1])
|
|
e[0] = (COMMENT, n)
|
|
else:
|
|
raise AssertionError("Unknown pending event ",e[0][0])
|
|
self.lastEvent[1] = e
|
|
self.lastEvent = e
|
|
self.pending_events = None
|
|
return node.firstChild
|
|
|
|
def endDocument(self):
|
|
self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
|
|
self.pop()
|
|
|
|
def clear(self):
|
|
"clear(): Explicitly release parsing structures"
|
|
self.document = None
|
|
|
|
class ErrorHandler:
|
|
def warning(self, exception):
|
|
print(exception)
|
|
def error(self, exception):
|
|
raise exception
|
|
def fatalError(self, exception):
|
|
raise exception
|
|
|
|
class DOMEventStream:
|
|
def __init__(self, stream, parser, bufsize):
|
|
self.stream = stream
|
|
self.parser = parser
|
|
self.bufsize = bufsize
|
|
if not hasattr(self.parser, 'feed'):
|
|
self.getEvent = self._slurp
|
|
self.reset()
|
|
|
|
def reset(self):
|
|
self.pulldom = PullDOM()
|
|
# This content handler relies on namespace support
|
|
self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
|
|
self.parser.setContentHandler(self.pulldom)
|
|
|
|
def __getitem__(self, pos):
|
|
rc = self.getEvent()
|
|
if rc:
|
|
return rc
|
|
raise IndexError
|
|
|
|
def __next__(self):
|
|
rc = self.getEvent()
|
|
if rc:
|
|
return rc
|
|
raise StopIteration
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def expandNode(self, node):
|
|
event = self.getEvent()
|
|
parents = [node]
|
|
while event:
|
|
token, cur_node = event
|
|
if cur_node is node:
|
|
return
|
|
if token != END_ELEMENT:
|
|
parents[-1].appendChild(cur_node)
|
|
if token == START_ELEMENT:
|
|
parents.append(cur_node)
|
|
elif token == END_ELEMENT:
|
|
del parents[-1]
|
|
event = self.getEvent()
|
|
|
|
def getEvent(self):
|
|
# use IncrementalParser interface, so we get the desired
|
|
# pull effect
|
|
if not self.pulldom.firstEvent[1]:
|
|
self.pulldom.lastEvent = self.pulldom.firstEvent
|
|
while not self.pulldom.firstEvent[1]:
|
|
buf = self.stream.read(self.bufsize)
|
|
if not buf:
|
|
self.parser.close()
|
|
return None
|
|
self.parser.feed(buf)
|
|
rc = self.pulldom.firstEvent[1][0]
|
|
self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
|
|
return rc
|
|
|
|
def _slurp(self):
|
|
""" Fallback replacement for getEvent() using the
|
|
standard SAX2 interface, which means we slurp the
|
|
SAX events into memory (no performance gain, but
|
|
we are compatible to all SAX parsers).
|
|
"""
|
|
self.parser.parse(self.stream)
|
|
self.getEvent = self._emit
|
|
return self._emit()
|
|
|
|
def _emit(self):
|
|
""" Fallback replacement for getEvent() that emits
|
|
the events that _slurp() read previously.
|
|
"""
|
|
rc = self.pulldom.firstEvent[1][0]
|
|
self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
|
|
return rc
|
|
|
|
def clear(self):
|
|
"""clear(): Explicitly release parsing objects"""
|
|
self.pulldom.clear()
|
|
del self.pulldom
|
|
self.parser = None
|
|
self.stream = None
|
|
|
|
class SAX2DOM(PullDOM):
|
|
|
|
def startElementNS(self, name, tagName , attrs):
|
|
PullDOM.startElementNS(self, name, tagName, attrs)
|
|
curNode = self.elementStack[-1]
|
|
parentNode = self.elementStack[-2]
|
|
parentNode.appendChild(curNode)
|
|
|
|
def startElement(self, name, attrs):
|
|
PullDOM.startElement(self, name, attrs)
|
|
curNode = self.elementStack[-1]
|
|
parentNode = self.elementStack[-2]
|
|
parentNode.appendChild(curNode)
|
|
|
|
def processingInstruction(self, target, data):
|
|
PullDOM.processingInstruction(self, target, data)
|
|
node = self.lastEvent[0][1]
|
|
parentNode = self.elementStack[-1]
|
|
parentNode.appendChild(node)
|
|
|
|
def ignorableWhitespace(self, chars):
|
|
PullDOM.ignorableWhitespace(self, chars)
|
|
node = self.lastEvent[0][1]
|
|
parentNode = self.elementStack[-1]
|
|
parentNode.appendChild(node)
|
|
|
|
def characters(self, chars):
|
|
PullDOM.characters(self, chars)
|
|
node = self.lastEvent[0][1]
|
|
parentNode = self.elementStack[-1]
|
|
parentNode.appendChild(node)
|
|
|
|
|
|
default_bufsize = (2 ** 14) - 20
|
|
|
|
def parse(stream_or_string, parser=None, bufsize=None):
|
|
if bufsize is None:
|
|
bufsize = default_bufsize
|
|
if isinstance(stream_or_string, basestring):
|
|
stream = open(stream_or_string)
|
|
else:
|
|
stream = stream_or_string
|
|
if not parser:
|
|
parser = xml.sax.make_parser()
|
|
return DOMEventStream(stream, parser, bufsize)
|
|
|
|
def parseString(string, parser=None):
|
|
try:
|
|
from io import StringIO
|
|
except ImportError:
|
|
from io import StringIO
|
|
|
|
bufsize = len(string)
|
|
buf = StringIO(string)
|
|
if not parser:
|
|
parser = xml.sax.make_parser()
|
|
return DOMEventStream(buf, parser, bufsize)
|