mirror of
https://github.com/python/cpython.git
synced 2025-08-03 00:23:06 +00:00

implements a SAX XMLReader interface instead of the old Builder interface used with PyDOM (now obsolete). This only depends on the standard library, not PyXML.
310 lines
8.9 KiB
Python
310 lines
8.9 KiB
Python
"""Miscellaneous utility functions useful for dealing with ESIS streams."""
|
|
__version__ = '$Revision$'
|
|
|
|
import re
|
|
import string
|
|
|
|
import xml.dom.pulldom
|
|
|
|
import xml.sax
|
|
import xml.sax.handler
|
|
import xml.sax.xmlreader
|
|
|
|
|
|
_data_match = re.compile(r"[^\\][^\\]*").match
|
|
|
|
def decode(s):
|
|
r = ''
|
|
while s:
|
|
m = _data_match(s)
|
|
if m:
|
|
r = r + m.group()
|
|
s = s[m.end():]
|
|
elif s[1] == "\\":
|
|
r = r + "\\"
|
|
s = s[2:]
|
|
elif s[1] == "n":
|
|
r = r + "\n"
|
|
s = s[2:]
|
|
elif s[1] == "%":
|
|
s = s[2:]
|
|
n, s = s.split(";", 1)
|
|
r = r + unichr(int(n))
|
|
else:
|
|
raise ValueError, "can't handle " + `s`
|
|
return r
|
|
|
|
|
|
_charmap = {}
|
|
for c in map(chr, range(256)):
|
|
_charmap[c] = c
|
|
_charmap["\n"] = r"\n"
|
|
_charmap["\\"] = r"\\"
|
|
del c
|
|
|
|
_null_join = ''.join
|
|
def encode(s):
|
|
return _null_join(map(_charmap.get, s))
|
|
|
|
|
|
class ESISReader(xml.sax.xmlreader.XMLReader):
|
|
"""SAX Reader which reads from an ESIS stream.
|
|
|
|
No verification of the document structure is performed by the
|
|
reader; a general verifier could be used as the target
|
|
ContentHandler instance.
|
|
|
|
"""
|
|
_decl_handler = None
|
|
_lexical_handler = None
|
|
|
|
_public_id = None
|
|
_system_id = None
|
|
|
|
_buffer = ""
|
|
_is_empty = 0
|
|
_lineno = 0
|
|
_started = 0
|
|
|
|
def __init__(self, contentHandler=None, errorHandler=None):
|
|
xml.sax.xmlreader.XMLReader.__init__(self)
|
|
self._attrs = {}
|
|
self._attributes = Attributes(self._attrs)
|
|
self._locator = Locator()
|
|
self._empties = {}
|
|
if contentHandler:
|
|
self.setContentHandler(contentHandler)
|
|
if errorHandler:
|
|
self.setErrorHandler(errorHandler)
|
|
|
|
def get_empties(self):
|
|
return self._empties.keys()
|
|
|
|
#
|
|
# XMLReader interface
|
|
#
|
|
|
|
def parse(self, source):
|
|
raise RuntimeError
|
|
self._locator._public_id = source.getPublicId()
|
|
self._locator._system_id = source.getSystemId()
|
|
fp = source.getByteStream()
|
|
handler = self.getContentHandler()
|
|
if handler:
|
|
handler.startDocument()
|
|
lineno = 0
|
|
while 1:
|
|
token, data = self._get_token(fp)
|
|
if token is None:
|
|
break
|
|
lineno = lineno + 1
|
|
self._locator._lineno = lineno
|
|
self._handle_token(token, data)
|
|
handler = self.getContentHandler()
|
|
if handler:
|
|
handler.startDocument()
|
|
|
|
def feed(self, data):
|
|
if not self._started:
|
|
handler = self.getContentHandler()
|
|
if handler:
|
|
handler.startDocument()
|
|
self._started = 1
|
|
data = self._buffer + data
|
|
self._buffer = None
|
|
lines = data.split("\n")
|
|
if lines:
|
|
for line in lines[:-1]:
|
|
self._lineno = self._lineno + 1
|
|
self._locator._lineno = self._lineno
|
|
if not line:
|
|
e = xml.sax.SAXParseException(
|
|
"ESIS input line contains no token type mark",
|
|
None, self._locator)
|
|
self.getErrorHandler().error(e)
|
|
else:
|
|
self._handle_token(line[0], line[1:])
|
|
self._buffer = lines[-1]
|
|
else:
|
|
self._buffer = ""
|
|
|
|
def close(self):
|
|
handler = self.getContentHandler()
|
|
if handler:
|
|
handler.endDocument()
|
|
self._buffer = ""
|
|
|
|
def _get_token(self, fp):
|
|
try:
|
|
line = fp.readline()
|
|
except IOError, e:
|
|
e = SAXException("I/O error reading input stream", e)
|
|
self.getErrorHandler().fatalError(e)
|
|
return
|
|
if not line:
|
|
return None, None
|
|
if line[-1] == "\n":
|
|
line = line[:-1]
|
|
if not line:
|
|
e = xml.sax.SAXParseException(
|
|
"ESIS input line contains no token type mark",
|
|
None, self._locator)
|
|
self.getErrorHandler().error(e)
|
|
return
|
|
return line[0], line[1:]
|
|
|
|
def _handle_token(self, token, data):
|
|
handler = self.getContentHandler()
|
|
if token == '-':
|
|
if data and handler:
|
|
handler.characters(decode(data))
|
|
elif token == ')':
|
|
if handler:
|
|
handler.endElement(decode(data))
|
|
elif token == '(':
|
|
if self._is_empty:
|
|
self._empties[data] = 1
|
|
if handler:
|
|
handler.startElement(data, self._attributes)
|
|
self._attrs.clear()
|
|
self._is_empty = 0
|
|
elif token == 'A':
|
|
name, value = data.split(' ', 1)
|
|
if value != "IMPLIED":
|
|
type, value = value.split(' ', 1)
|
|
self._attrs[name] = (decode(value), type)
|
|
elif token == '&':
|
|
# entity reference in SAX?
|
|
pass
|
|
elif token == '?':
|
|
if handler:
|
|
if ' ' in data:
|
|
target, data = string.split(data, None, 1)
|
|
else:
|
|
target, data = data, ""
|
|
handler.processingInstruction(target, decode(data))
|
|
elif token == 'N':
|
|
handler = self.getDTDHandler()
|
|
if handler:
|
|
handler.notationDecl(data, self._public_id, self._system_id)
|
|
self._public_id = None
|
|
self._system_id = None
|
|
elif token == 'p':
|
|
self._public_id = decode(data)
|
|
elif token == 's':
|
|
self._system_id = decode(data)
|
|
elif token == 'e':
|
|
self._is_empty = 1
|
|
elif token == 'C':
|
|
pass
|
|
else:
|
|
e = SAXParseException("unknown ESIS token in event stream",
|
|
None, self._locator)
|
|
self.getErrorHandler().error(e)
|
|
|
|
def setContentHandler(self, handler):
|
|
old = self.getContentHandler()
|
|
if old:
|
|
old.setDocumentLocator(None)
|
|
if handler:
|
|
handler.setDocumentLocator(self._locator)
|
|
xml.sax.xmlreader.XMLReader.setContentHandler(self, handler)
|
|
|
|
def getProperty(self, property):
|
|
if property == xml.sax.handler.property_lexical_handler:
|
|
return self._lexical_handler
|
|
|
|
elif property == xml.sax.handler.property_declaration_handler:
|
|
return self._decl_handler
|
|
|
|
else:
|
|
raise xml.sax.SAXNotRecognizedException("unknown property %s"
|
|
% `property`)
|
|
|
|
def setProperty(self, property, value):
|
|
if property == xml.sax.handler.property_lexical_handler:
|
|
if self._lexical_handler:
|
|
self._lexical_handler.setDocumentLocator(None)
|
|
if value:
|
|
value.setDocumentLocator(self._locator)
|
|
self._lexical_handler = value
|
|
|
|
elif property == xml.sax.handler.property_declaration_handler:
|
|
if self._decl_handler:
|
|
self._decl_handler.setDocumentLocator(None)
|
|
if value:
|
|
value.setDocumentLocator(self._locator)
|
|
self._decl_handler = value
|
|
|
|
else:
|
|
raise xml.sax.SAXNotRecognizedException()
|
|
|
|
def getFeature(self, feature):
|
|
if feature == xml.sax.handler.feature_namespaces:
|
|
return 1
|
|
else:
|
|
return xml.sax.xmlreader.XMLReader.getFeature(self, feature)
|
|
|
|
def setFeature(self, feature, enabled):
|
|
if feature == xml.sax.handler.feature_namespaces:
|
|
pass
|
|
else:
|
|
xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled)
|
|
|
|
|
|
class Attributes(xml.sax.xmlreader.AttributesImpl):
|
|
# self._attrs has the form {name: (value, type)}
|
|
|
|
def getType(self, name):
|
|
return self._attrs[name][1]
|
|
|
|
def getValue(self, name):
|
|
return self._attrs[name][0]
|
|
|
|
def getValueByQName(self, name):
|
|
return self._attrs[name][0]
|
|
|
|
def __getitem__(self, name):
|
|
return self._attrs[name][0]
|
|
|
|
def get(self, name, default=None):
|
|
if self._attrs.has_key(name):
|
|
return self._attrs[name][0]
|
|
return default
|
|
|
|
def items(self):
|
|
L = []
|
|
for name, (value, type) in self._attrs.items():
|
|
L.append((name, value))
|
|
return L
|
|
|
|
def values(self):
|
|
L = []
|
|
for value, type in self._attrs.values():
|
|
L.append(value)
|
|
return L
|
|
|
|
|
|
class Locator(xml.sax.xmlreader.Locator):
|
|
_lineno = -1
|
|
_public_id = None
|
|
_system_id = None
|
|
|
|
def getLineNumber(self):
|
|
return self._lineno
|
|
|
|
def getPublicId(self):
|
|
return self._public_id
|
|
|
|
def getSystemId(self):
|
|
return self._system_id
|
|
|
|
|
|
def parse(stream_or_string, parser=None):
|
|
if type(stream_or_string) in [type(""), type(u"")]:
|
|
stream = open(stream_or_string)
|
|
else:
|
|
stream = stream_or_string
|
|
if not parser:
|
|
parser = ESISReader()
|
|
return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)
|