mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 15:58:57 +00:00 
			
		
		
		
	 84a13fbda0
			
		
	
	
		84a13fbda0
		
			
		
	
	
	
	
		
			
			The __getitem__ methods of DOMEventStream, FileInput, and FileWrapper classes ignore their 'index' parameters and return the next item instead.
		
			
				
	
	
		
			349 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			349 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import xml.sax
 | |
| import xml.sax.handler
 | |
| 
 | |
| START_ELEMENT = "START_ELEMENT"
 | |
| END_ELEMENT = "END_ELEMENT"
 | |
| COMMENT = "COMMENT"
 | |
| START_DOCUMENT = "START_DOCUMENT"
 | |
| END_DOCUMENT = "END_DOCUMENT"
 | |
| PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
 | |
| IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
 | |
| CHARACTERS = "CHARACTERS"
 | |
| 
 | |
| class PullDOM(xml.sax.ContentHandler):
 | |
|     _locator = None
 | |
|     document = None
 | |
| 
 | |
|     def __init__(self, documentFactory=None):
 | |
|         from xml.dom import XML_NAMESPACE
 | |
|         self.documentFactory = documentFactory
 | |
|         self.firstEvent = [None, None]
 | |
|         self.lastEvent = self.firstEvent
 | |
|         self.elementStack = []
 | |
|         self.push = self.elementStack.append
 | |
|         try:
 | |
|             self.pop = self.elementStack.pop
 | |
|         except AttributeError:
 | |
|             # use class' pop instead
 | |
|             pass
 | |
|         self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
 | |
|         self._current_context = self._ns_contexts[-1]
 | |
|         self.pending_events = []
 | |
| 
 | |
|     def pop(self):
 | |
|         result = self.elementStack[-1]
 | |
|         del self.elementStack[-1]
 | |
|         return result
 | |
| 
 | |
|     def setDocumentLocator(self, locator):
 | |
|         self._locator = locator
 | |
| 
 | |
|     def startPrefixMapping(self, prefix, uri):
 | |
|         if not hasattr(self, '_xmlns_attrs'):
 | |
|             self._xmlns_attrs = []
 | |
|         self._xmlns_attrs.append((prefix or 'xmlns', uri))
 | |
|         self._ns_contexts.append(self._current_context.copy())
 | |
|         self._current_context[uri] = prefix or None
 | |
| 
 | |
|     def endPrefixMapping(self, prefix):
 | |
|         self._current_context = self._ns_contexts.pop()
 | |
| 
 | |
|     def startElementNS(self, name, tagName , attrs):
 | |
|         # Retrieve xml namespace declaration attributes.
 | |
|         xmlns_uri = 'http://www.w3.org/2000/xmlns/'
 | |
|         xmlns_attrs = getattr(self, '_xmlns_attrs', None)
 | |
|         if xmlns_attrs is not None:
 | |
|             for aname, value in xmlns_attrs:
 | |
|                 attrs._attrs[(xmlns_uri, aname)] = value
 | |
|             self._xmlns_attrs = []
 | |
|         uri, localname = name
 | |
|         if uri:
 | |
|             # When using namespaces, the reader may or may not
 | |
|             # provide us with the original name. If not, create
 | |
|             # *a* valid tagName from the current context.
 | |
|             if tagName is None:
 | |
|                 prefix = self._current_context[uri]
 | |
|                 if prefix:
 | |
|                     tagName = prefix + ":" + localname
 | |
|                 else:
 | |
|                     tagName = localname
 | |
|             if self.document:
 | |
|                 node = self.document.createElementNS(uri, tagName)
 | |
|             else:
 | |
|                 node = self.buildDocument(uri, tagName)
 | |
|         else:
 | |
|             # When the tagname is not prefixed, it just appears as
 | |
|             # localname
 | |
|             if self.document:
 | |
|                 node = self.document.createElement(localname)
 | |
|             else:
 | |
|                 node = self.buildDocument(None, localname)
 | |
| 
 | |
|         for aname,value in attrs.items():
 | |
|             a_uri, a_localname = aname
 | |
|             if a_uri == xmlns_uri:
 | |
|                 if a_localname == 'xmlns':
 | |
|                     qname = a_localname
 | |
|                 else:
 | |
|                     qname = 'xmlns:' + a_localname
 | |
|                 attr = self.document.createAttributeNS(a_uri, qname)
 | |
|                 node.setAttributeNodeNS(attr)
 | |
|             elif a_uri:
 | |
|                 prefix = self._current_context[a_uri]
 | |
|                 if prefix:
 | |
|                     qname = prefix + ":" + a_localname
 | |
|                 else:
 | |
|                     qname = a_localname
 | |
|                 attr = self.document.createAttributeNS(a_uri, qname)
 | |
|                 node.setAttributeNodeNS(attr)
 | |
|             else:
 | |
|                 attr = self.document.createAttribute(a_localname)
 | |
|                 node.setAttributeNode(attr)
 | |
|             attr.value = value
 | |
| 
 | |
|         self.lastEvent[1] = [(START_ELEMENT, node), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
|         self.push(node)
 | |
| 
 | |
|     def endElementNS(self, name, tagName):
 | |
|         self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
| 
 | |
|     def startElement(self, name, attrs):
 | |
|         if self.document:
 | |
|             node = self.document.createElement(name)
 | |
|         else:
 | |
|             node = self.buildDocument(None, name)
 | |
| 
 | |
|         for aname,value in attrs.items():
 | |
|             attr = self.document.createAttribute(aname)
 | |
|             attr.value = value
 | |
|             node.setAttributeNode(attr)
 | |
| 
 | |
|         self.lastEvent[1] = [(START_ELEMENT, node), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
|         self.push(node)
 | |
| 
 | |
|     def endElement(self, name):
 | |
|         self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
| 
 | |
|     def comment(self, s):
 | |
|         if self.document:
 | |
|             node = self.document.createComment(s)
 | |
|             self.lastEvent[1] = [(COMMENT, node), None]
 | |
|             self.lastEvent = self.lastEvent[1]
 | |
|         else:
 | |
|             event = [(COMMENT, s), None]
 | |
|             self.pending_events.append(event)
 | |
| 
 | |
|     def processingInstruction(self, target, data):
 | |
|         if self.document:
 | |
|             node = self.document.createProcessingInstruction(target, data)
 | |
|             self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
 | |
|             self.lastEvent = self.lastEvent[1]
 | |
|         else:
 | |
|             event = [(PROCESSING_INSTRUCTION, target, data), None]
 | |
|             self.pending_events.append(event)
 | |
| 
 | |
|     def ignorableWhitespace(self, chars):
 | |
|         node = self.document.createTextNode(chars)
 | |
|         self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
| 
 | |
|     def characters(self, chars):
 | |
|         node = self.document.createTextNode(chars)
 | |
|         self.lastEvent[1] = [(CHARACTERS, node), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
| 
 | |
|     def startDocument(self):
 | |
|         if self.documentFactory is None:
 | |
|             import xml.dom.minidom
 | |
|             self.documentFactory = xml.dom.minidom.Document.implementation
 | |
| 
 | |
|     def buildDocument(self, uri, tagname):
 | |
|         # Can't do that in startDocument, since we need the tagname
 | |
|         # XXX: obtain DocumentType
 | |
|         node = self.documentFactory.createDocument(uri, tagname, None)
 | |
|         self.document = node
 | |
|         self.lastEvent[1] = [(START_DOCUMENT, node), None]
 | |
|         self.lastEvent = self.lastEvent[1]
 | |
|         self.push(node)
 | |
|         # Put everything we have seen so far into the document
 | |
|         for e in self.pending_events:
 | |
|             if e[0][0] == PROCESSING_INSTRUCTION:
 | |
|                 _,target,data = e[0]
 | |
|                 n = self.document.createProcessingInstruction(target, data)
 | |
|                 e[0] = (PROCESSING_INSTRUCTION, n)
 | |
|             elif e[0][0] == COMMENT:
 | |
|                 n = self.document.createComment(e[0][1])
 | |
|                 e[0] = (COMMENT, n)
 | |
|             else:
 | |
|                 raise AssertionError("Unknown pending event ",e[0][0])
 | |
|             self.lastEvent[1] = e
 | |
|             self.lastEvent = e
 | |
|         self.pending_events = None
 | |
|         return node.firstChild
 | |
| 
 | |
|     def endDocument(self):
 | |
|         self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
 | |
|         self.pop()
 | |
| 
 | |
|     def clear(self):
 | |
|         "clear(): Explicitly release parsing structures"
 | |
|         self.document = None
 | |
| 
 | |
| class ErrorHandler:
 | |
|     def warning(self, exception):
 | |
|         print(exception)
 | |
|     def error(self, exception):
 | |
|         raise exception
 | |
|     def fatalError(self, exception):
 | |
|         raise exception
 | |
| 
 | |
| class DOMEventStream:
 | |
|     def __init__(self, stream, parser, bufsize):
 | |
|         self.stream = stream
 | |
|         self.parser = parser
 | |
|         self.bufsize = bufsize
 | |
|         if not hasattr(self.parser, 'feed'):
 | |
|             self.getEvent = self._slurp
 | |
|         self.reset()
 | |
| 
 | |
|     def reset(self):
 | |
|         self.pulldom = PullDOM()
 | |
|         # This content handler relies on namespace support
 | |
|         self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
 | |
|         self.parser.setContentHandler(self.pulldom)
 | |
| 
 | |
|     def __getitem__(self, pos):
 | |
|         import warnings
 | |
|         warnings.warn(
 | |
|             "DOMEventStream's __getitem__ method ignores 'pos' parameter. "
 | |
|             "Use iterator protocol instead.",
 | |
|             DeprecationWarning,
 | |
|             stacklevel=2
 | |
|         )
 | |
|         rc = self.getEvent()
 | |
|         if rc:
 | |
|             return rc
 | |
|         raise IndexError
 | |
| 
 | |
|     def __next__(self):
 | |
|         rc = self.getEvent()
 | |
|         if rc:
 | |
|             return rc
 | |
|         raise StopIteration
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return self
 | |
| 
 | |
|     def expandNode(self, node):
 | |
|         event = self.getEvent()
 | |
|         parents = [node]
 | |
|         while event:
 | |
|             token, cur_node = event
 | |
|             if cur_node is node:
 | |
|                 return
 | |
|             if token != END_ELEMENT:
 | |
|                 parents[-1].appendChild(cur_node)
 | |
|             if token == START_ELEMENT:
 | |
|                 parents.append(cur_node)
 | |
|             elif token == END_ELEMENT:
 | |
|                 del parents[-1]
 | |
|             event = self.getEvent()
 | |
| 
 | |
|     def getEvent(self):
 | |
|         # use IncrementalParser interface, so we get the desired
 | |
|         # pull effect
 | |
|         if not self.pulldom.firstEvent[1]:
 | |
|             self.pulldom.lastEvent = self.pulldom.firstEvent
 | |
|         while not self.pulldom.firstEvent[1]:
 | |
|             buf = self.stream.read(self.bufsize)
 | |
|             if not buf:
 | |
|                 self.parser.close()
 | |
|                 return None
 | |
|             self.parser.feed(buf)
 | |
|         rc = self.pulldom.firstEvent[1][0]
 | |
|         self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
 | |
|         return rc
 | |
| 
 | |
|     def _slurp(self):
 | |
|         """ Fallback replacement for getEvent() using the
 | |
|             standard SAX2 interface, which means we slurp the
 | |
|             SAX events into memory (no performance gain, but
 | |
|             we are compatible to all SAX parsers).
 | |
|         """
 | |
|         self.parser.parse(self.stream)
 | |
|         self.getEvent = self._emit
 | |
|         return self._emit()
 | |
| 
 | |
|     def _emit(self):
 | |
|         """ Fallback replacement for getEvent() that emits
 | |
|             the events that _slurp() read previously.
 | |
|         """
 | |
|         rc = self.pulldom.firstEvent[1][0]
 | |
|         self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
 | |
|         return rc
 | |
| 
 | |
|     def clear(self):
 | |
|         """clear(): Explicitly release parsing objects"""
 | |
|         self.pulldom.clear()
 | |
|         del self.pulldom
 | |
|         self.parser = None
 | |
|         self.stream = None
 | |
| 
 | |
| class SAX2DOM(PullDOM):
 | |
| 
 | |
|     def startElementNS(self, name, tagName , attrs):
 | |
|         PullDOM.startElementNS(self, name, tagName, attrs)
 | |
|         curNode = self.elementStack[-1]
 | |
|         parentNode = self.elementStack[-2]
 | |
|         parentNode.appendChild(curNode)
 | |
| 
 | |
|     def startElement(self, name, attrs):
 | |
|         PullDOM.startElement(self, name, attrs)
 | |
|         curNode = self.elementStack[-1]
 | |
|         parentNode = self.elementStack[-2]
 | |
|         parentNode.appendChild(curNode)
 | |
| 
 | |
|     def processingInstruction(self, target, data):
 | |
|         PullDOM.processingInstruction(self, target, data)
 | |
|         node = self.lastEvent[0][1]
 | |
|         parentNode = self.elementStack[-1]
 | |
|         parentNode.appendChild(node)
 | |
| 
 | |
|     def ignorableWhitespace(self, chars):
 | |
|         PullDOM.ignorableWhitespace(self, chars)
 | |
|         node = self.lastEvent[0][1]
 | |
|         parentNode = self.elementStack[-1]
 | |
|         parentNode.appendChild(node)
 | |
| 
 | |
|     def characters(self, chars):
 | |
|         PullDOM.characters(self, chars)
 | |
|         node = self.lastEvent[0][1]
 | |
|         parentNode = self.elementStack[-1]
 | |
|         parentNode.appendChild(node)
 | |
| 
 | |
| 
 | |
| default_bufsize = (2 ** 14) - 20
 | |
| 
 | |
| def parse(stream_or_string, parser=None, bufsize=None):
 | |
|     if bufsize is None:
 | |
|         bufsize = default_bufsize
 | |
|     if isinstance(stream_or_string, str):
 | |
|         stream = open(stream_or_string, 'rb')
 | |
|     else:
 | |
|         stream = stream_or_string
 | |
|     if not parser:
 | |
|         parser = xml.sax.make_parser()
 | |
|     return DOMEventStream(stream, parser, bufsize)
 | |
| 
 | |
| def parseString(string, parser=None):
 | |
|     from io import StringIO
 | |
| 
 | |
|     bufsize = len(string)
 | |
|     buf = StringIO(string)
 | |
|     if not parser:
 | |
|         parser = xml.sax.make_parser()
 | |
|     return DOMEventStream(buf, parser, bufsize)
 |