mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
#4661: add bytes parsing and generation to email (email version bump to 5.1.0)
The work on this is not 100% complete, but everything is present to allow real-world testing of the code. The only remaining major todo item is to (hopefully!) enhance the handling of non-ASCII bytes in headers converted to unicode by RFC2047 encoding them rather than replacing them with '?'s.
This commit is contained in:
parent
59fdd6736b
commit
96fd54eaec
11 changed files with 708 additions and 85 deletions
|
@ -4,7 +4,7 @@
|
|||
|
||||
"""A package for parsing, handling, and generating email messages."""
|
||||
|
||||
__version__ = '5.0.0'
|
||||
__version__ = '5.1.0'
|
||||
|
||||
__all__ = [
|
||||
'base64mime',
|
||||
|
@ -16,7 +16,9 @@ __all__ = [
|
|||
'iterators',
|
||||
'message',
|
||||
'message_from_file',
|
||||
'message_from_binary_file',
|
||||
'message_from_string',
|
||||
'message_from_bytes',
|
||||
'mime',
|
||||
'parser',
|
||||
'quoprimime',
|
||||
|
@ -36,6 +38,13 @@ def message_from_string(s, *args, **kws):
|
|||
from email.parser import Parser
|
||||
return Parser(*args, **kws).parsestr(s)
|
||||
|
||||
def message_from_bytes(s, *args, **kws):
|
||||
"""Parse a bytes string into a Message object model.
|
||||
|
||||
Optional _class and strict are passed to the Parser constructor.
|
||||
"""
|
||||
from email.parser import BytesParser
|
||||
return BytesParser(*args, **kws).parsebytes(s)
|
||||
|
||||
def message_from_file(fp, *args, **kws):
|
||||
"""Read a file and parse its contents into a Message object model.
|
||||
|
@ -44,3 +53,11 @@ def message_from_file(fp, *args, **kws):
|
|||
"""
|
||||
from email.parser import Parser
|
||||
return Parser(*args, **kws).parse(fp)
|
||||
|
||||
def message_from_binary_file(fp, *args, **kws):
|
||||
"""Read a binary file and parse its contents into a Message object model.
|
||||
|
||||
Optional _class and strict are passed to the Parser constructor.
|
||||
"""
|
||||
from email.parser import Parser
|
||||
return BytesParser(*args, **kws).parse(fp)
|
||||
|
|
|
@ -482,3 +482,10 @@ class FeedParser:
|
|||
if lastheader:
|
||||
# XXX reconsider the joining of folded lines
|
||||
self._cur[lastheader] = EMPTYSTRING.join(lastvalue).rstrip('\r\n')
|
||||
|
||||
|
||||
class BytesFeedParser(FeedParser):
|
||||
"""Like FeedParser, but feed accepts bytes."""
|
||||
|
||||
def feed(self, data):
|
||||
super().feed(data.decode('ascii', 'surrogateescape'))
|
||||
|
|
|
@ -12,8 +12,9 @@ import time
|
|||
import random
|
||||
import warnings
|
||||
|
||||
from io import StringIO
|
||||
from io import StringIO, BytesIO
|
||||
from email.header import Header
|
||||
from email.message import _has_surrogates
|
||||
|
||||
UNDERSCORE = '_'
|
||||
NL = '\n'
|
||||
|
@ -72,7 +73,7 @@ class Generator:
|
|||
ufrom = msg.get_unixfrom()
|
||||
if not ufrom:
|
||||
ufrom = 'From nobody ' + time.ctime(time.time())
|
||||
print(ufrom, file=self._fp)
|
||||
self.write(ufrom + NL)
|
||||
self._write(msg)
|
||||
|
||||
def clone(self, fp):
|
||||
|
@ -83,6 +84,29 @@ class Generator:
|
|||
# Protected interface - undocumented ;/
|
||||
#
|
||||
|
||||
# Note that we use 'self.write' when what we are writing is coming from
|
||||
# the source, and self._fp.write when what we are writing is coming from a
|
||||
# buffer (because the Bytes subclass has already had a chance to transform
|
||||
# the data in its write method in that case). This is an entirely
|
||||
# pragmatic split determined by experiment; we could be more general by
|
||||
# always using write and having the Bytes subclass write method detect when
|
||||
# it has already transformed the input; but, since this whole thing is a
|
||||
# hack anyway this seems good enough.
|
||||
|
||||
# We use these class constants when we need to manipulate data that has
|
||||
# already been written to a buffer (ex: constructing a re to check the
|
||||
# boundary), and the module level NL constant when adding new output to a
|
||||
# buffer via self.write, because 'write' always takes strings.
|
||||
# Having write always take strings makes the code simpler, but there are
|
||||
# a few occasions when we need to write previously created data back
|
||||
# to the buffer or to a new buffer; for those cases we use self._fp.write.
|
||||
_NL = NL
|
||||
_EMPTY = ''
|
||||
|
||||
def _new_buffer(self):
|
||||
# BytesGenerator overrides this to return BytesIO.
|
||||
return StringIO()
|
||||
|
||||
def _write(self, msg):
|
||||
# We can't write the headers yet because of the following scenario:
|
||||
# say a multipart message includes the boundary string somewhere in
|
||||
|
@ -91,13 +115,13 @@ class Generator:
|
|||
# parameter.
|
||||
#
|
||||
# The way we do this, so as to make the _handle_*() methods simpler,
|
||||
# is to cache any subpart writes into a StringIO. The we write the
|
||||
# headers and the StringIO contents. That way, subpart handlers can
|
||||
# is to cache any subpart writes into a buffer. The we write the
|
||||
# headers and the buffer contents. That way, subpart handlers can
|
||||
# Do The Right Thing, and can still modify the Content-Type: header if
|
||||
# necessary.
|
||||
oldfp = self._fp
|
||||
try:
|
||||
self._fp = sfp = StringIO()
|
||||
self._fp = sfp = self._new_buffer()
|
||||
self._dispatch(msg)
|
||||
finally:
|
||||
self._fp = oldfp
|
||||
|
@ -132,16 +156,16 @@ class Generator:
|
|||
|
||||
def _write_headers(self, msg):
|
||||
for h, v in msg.items():
|
||||
print('%s:' % h, end=' ', file=self._fp)
|
||||
self.write('%s: ' % h)
|
||||
if isinstance(v, Header):
|
||||
print(v.encode(maxlinelen=self._maxheaderlen), file=self._fp)
|
||||
self.write(v.encode(maxlinelen=self._maxheaderlen)+NL)
|
||||
else:
|
||||
# Header's got lots of smarts, so use it.
|
||||
header = Header(v, maxlinelen=self._maxheaderlen,
|
||||
header_name=h)
|
||||
print(header.encode(), file=self._fp)
|
||||
self.write(header.encode()+NL)
|
||||
# A blank line always separates headers from body
|
||||
print(file=self._fp)
|
||||
self.write(NL)
|
||||
|
||||
#
|
||||
# Handlers for writing types and subtypes
|
||||
|
@ -153,9 +177,15 @@ class Generator:
|
|||
return
|
||||
if not isinstance(payload, str):
|
||||
raise TypeError('string payload expected: %s' % type(payload))
|
||||
if _has_surrogates(msg._payload):
|
||||
charset = msg.get_param('charset')
|
||||
if charset is not None:
|
||||
del msg['content-transfer-encoding']
|
||||
msg.set_payload(payload, charset)
|
||||
payload = msg.get_payload()
|
||||
if self._mangle_from_:
|
||||
payload = fcre.sub('>From ', payload)
|
||||
self._fp.write(payload)
|
||||
self.write(payload)
|
||||
|
||||
# Default body handler
|
||||
_writeBody = _handle_text
|
||||
|
@ -170,21 +200,21 @@ class Generator:
|
|||
subparts = []
|
||||
elif isinstance(subparts, str):
|
||||
# e.g. a non-strict parse of a message with no starting boundary.
|
||||
self._fp.write(subparts)
|
||||
self.write(subparts)
|
||||
return
|
||||
elif not isinstance(subparts, list):
|
||||
# Scalar payload
|
||||
subparts = [subparts]
|
||||
for part in subparts:
|
||||
s = StringIO()
|
||||
s = self._new_buffer()
|
||||
g = self.clone(s)
|
||||
g.flatten(part, unixfrom=False)
|
||||
msgtexts.append(s.getvalue())
|
||||
# Now make sure the boundary we've selected doesn't appear in any of
|
||||
# the message texts.
|
||||
alltext = NL.join(msgtexts)
|
||||
alltext = self._NL.join(msgtexts)
|
||||
# BAW: What about boundaries that are wrapped in double-quotes?
|
||||
boundary = msg.get_boundary(failobj=_make_boundary(alltext))
|
||||
boundary = msg.get_boundary(failobj=self._make_boundary(alltext))
|
||||
# If we had to calculate a new boundary because the body text
|
||||
# contained that string, set the new boundary. We don't do it
|
||||
# unconditionally because, while set_boundary() preserves order, it
|
||||
|
@ -195,9 +225,9 @@ class Generator:
|
|||
msg.set_boundary(boundary)
|
||||
# If there's a preamble, write it out, with a trailing CRLF
|
||||
if msg.preamble is not None:
|
||||
print(msg.preamble, file=self._fp)
|
||||
self.write(msg.preamble + NL)
|
||||
# dash-boundary transport-padding CRLF
|
||||
print('--' + boundary, file=self._fp)
|
||||
self.write('--' + boundary + NL)
|
||||
# body-part
|
||||
if msgtexts:
|
||||
self._fp.write(msgtexts.pop(0))
|
||||
|
@ -206,14 +236,14 @@ class Generator:
|
|||
# --> CRLF body-part
|
||||
for body_part in msgtexts:
|
||||
# delimiter transport-padding CRLF
|
||||
print('\n--' + boundary, file=self._fp)
|
||||
self.write('\n--' + boundary + NL)
|
||||
# body-part
|
||||
self._fp.write(body_part)
|
||||
# close-delimiter transport-padding
|
||||
self._fp.write('\n--' + boundary + '--')
|
||||
self.write('\n--' + boundary + '--')
|
||||
if msg.epilogue is not None:
|
||||
print(file=self._fp)
|
||||
self._fp.write(msg.epilogue)
|
||||
self.write(NL)
|
||||
self.write(msg.epilogue)
|
||||
|
||||
def _handle_multipart_signed(self, msg):
|
||||
# The contents of signed parts has to stay unmodified in order to keep
|
||||
|
@ -232,23 +262,23 @@ class Generator:
|
|||
# block and the boundary. Sigh.
|
||||
blocks = []
|
||||
for part in msg.get_payload():
|
||||
s = StringIO()
|
||||
s = self._new_buffer()
|
||||
g = self.clone(s)
|
||||
g.flatten(part, unixfrom=False)
|
||||
text = s.getvalue()
|
||||
lines = text.split('\n')
|
||||
lines = text.split(self._NL)
|
||||
# Strip off the unnecessary trailing empty line
|
||||
if lines and lines[-1] == '':
|
||||
blocks.append(NL.join(lines[:-1]))
|
||||
if lines and lines[-1] == self._EMPTY:
|
||||
blocks.append(self._NL.join(lines[:-1]))
|
||||
else:
|
||||
blocks.append(text)
|
||||
# Now join all the blocks with an empty line. This has the lovely
|
||||
# effect of separating each block with an empty line, but not adding
|
||||
# an extra one after the last one.
|
||||
self._fp.write(NL.join(blocks))
|
||||
self._fp.write(self._NL.join(blocks))
|
||||
|
||||
def _handle_message(self, msg):
|
||||
s = StringIO()
|
||||
s = self._new_buffer()
|
||||
g = self.clone(s)
|
||||
# The payload of a message/rfc822 part should be a multipart sequence
|
||||
# of length 1. The zeroth element of the list should be the Message
|
||||
|
@ -265,6 +295,90 @@ class Generator:
|
|||
payload = s.getvalue()
|
||||
self._fp.write(payload)
|
||||
|
||||
# This used to be a module level function; we use a classmethod for this
|
||||
# and _compile_re so we can continue to provide the module level function
|
||||
# for backward compatibility by doing
|
||||
# _make_boudary = Generator._make_boundary
|
||||
# at the end of the module. It *is* internal, so we could drop that...
|
||||
@classmethod
|
||||
def _make_boundary(cls, text=None):
|
||||
# Craft a random boundary. If text is given, ensure that the chosen
|
||||
# boundary doesn't appear in the text.
|
||||
token = random.randrange(sys.maxsize)
|
||||
boundary = ('=' * 15) + (_fmt % token) + '=='
|
||||
if text is None:
|
||||
return boundary
|
||||
b = boundary
|
||||
counter = 0
|
||||
while True:
|
||||
cre = cls._compile_re('^--' + re.escape(b) + '(--)?$', re.MULTILINE)
|
||||
if not cre.search(text):
|
||||
break
|
||||
b = boundary + '.' + str(counter)
|
||||
counter += 1
|
||||
return b
|
||||
|
||||
@classmethod
|
||||
def _compile_re(cls, s, flags):
|
||||
return re.compile(s, flags)
|
||||
|
||||
|
||||
class BytesGenerator(Generator):
|
||||
"""Generates a bytes version of a Message object tree.
|
||||
|
||||
Functionally identical to the base Generator except that the output is
|
||||
bytes and not string. When surrogates were used in the input to encode
|
||||
bytes, these are decoded back to bytes for output.
|
||||
|
||||
The outfp object must accept bytes in its write method.
|
||||
"""
|
||||
|
||||
# Bytes versions of these constants for use in manipulating data from
|
||||
# the BytesIO buffer.
|
||||
_NL = NL.encode('ascii')
|
||||
_EMPTY = b''
|
||||
|
||||
def write(self, s):
|
||||
self._fp.write(s.encode('ascii', 'surrogateescape'))
|
||||
|
||||
def _new_buffer(self):
|
||||
return BytesIO()
|
||||
|
||||
def _write_headers(self, msg):
|
||||
# This is almost the same as the string version, except for handling
|
||||
# strings with 8bit bytes.
|
||||
for h, v in msg._headers:
|
||||
self.write('%s: ' % h)
|
||||
if isinstance(v, Header):
|
||||
self.write(v.encode(maxlinelen=self._maxheaderlen)+NL)
|
||||
elif _has_surrogates(v):
|
||||
# If we have raw 8bit data in a byte string, we have no idea
|
||||
# what the encoding is. There is no safe way to split this
|
||||
# string. If it's ascii-subset, then we could do a normal
|
||||
# ascii split, but if it's multibyte then we could break the
|
||||
# string. There's no way to know so the least harm seems to
|
||||
# be to not split the string and risk it being too long.
|
||||
self.write(v+NL)
|
||||
else:
|
||||
# Header's got lots of smarts and this string is safe...
|
||||
header = Header(v, maxlinelen=self._maxheaderlen,
|
||||
header_name=h)
|
||||
self.write(header.encode()+NL)
|
||||
# A blank line always separates headers from body
|
||||
self.write(NL)
|
||||
|
||||
def _handle_text(self, msg):
|
||||
# If the string has surrogates the original source was bytes, so
|
||||
# just write it back out.
|
||||
if _has_surrogates(msg._payload):
|
||||
self.write(msg._payload)
|
||||
else:
|
||||
super(BytesGenerator,self)._handle_text(msg)
|
||||
|
||||
@classmethod
|
||||
def _compile_re(cls, s, flags):
|
||||
return re.compile(s.encode('ascii'), flags)
|
||||
|
||||
|
||||
|
||||
_FMT = '[Non-text (%(type)s) part of message omitted, filename %(filename)s]'
|
||||
|
@ -325,23 +439,9 @@ class DecodedGenerator(Generator):
|
|||
|
||||
|
||||
|
||||
# Helper
|
||||
# Helper used by Generator._make_boundary
|
||||
_width = len(repr(sys.maxsize-1))
|
||||
_fmt = '%%0%dd' % _width
|
||||
|
||||
def _make_boundary(text=None):
|
||||
# Craft a random boundary. If text is given, ensure that the chosen
|
||||
# boundary doesn't appear in the text.
|
||||
token = random.randrange(sys.maxsize)
|
||||
boundary = ('=' * 15) + (_fmt % token) + '=='
|
||||
if text is None:
|
||||
return boundary
|
||||
b = boundary
|
||||
counter = 0
|
||||
while True:
|
||||
cre = re.compile('^--' + re.escape(b) + '(--)?$', re.MULTILINE)
|
||||
if not cre.search(text):
|
||||
break
|
||||
b = boundary + '.' + str(counter)
|
||||
counter += 1
|
||||
return b
|
||||
# Backward compatibility
|
||||
_make_boundary = Generator._make_boundary
|
||||
|
|
|
@ -24,8 +24,26 @@ SEMISPACE = '; '
|
|||
# existence of which force quoting of the parameter value.
|
||||
tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
|
||||
|
||||
# How to figure out if we are processing strings that come from a byte
|
||||
# source with undecodable characters.
|
||||
_has_surrogates = re.compile(
|
||||
'([^\ud800-\udbff]|\A)[\udc00-\udfff]([^\udc00-\udfff]|\Z)').search
|
||||
|
||||
|
||||
# Helper functions
|
||||
def _sanitize_surrogates(value):
|
||||
# If the value contains surrogates, re-decode and replace the original
|
||||
# non-ascii bytes with '?'s. Used to sanitize header values before letting
|
||||
# them escape as strings.
|
||||
if not isinstance(value, str):
|
||||
# Header object
|
||||
return value
|
||||
if _has_surrogates(value):
|
||||
original_bytes = value.encode('ascii', 'surrogateescape')
|
||||
return original_bytes.decode('ascii', 'replace').replace('\ufffd', '?')
|
||||
else:
|
||||
return value
|
||||
|
||||
def _splitparam(param):
|
||||
# Split header parameters. BAW: this may be too simple. It isn't
|
||||
# strictly RFC 2045 (section 5.1) compliant, but it catches most headers
|
||||
|
@ -184,44 +202,72 @@ class Message:
|
|||
If the message is a multipart and the decode flag is True, then None
|
||||
is returned.
|
||||
"""
|
||||
if i is None:
|
||||
payload = self._payload
|
||||
elif not isinstance(self._payload, list):
|
||||
# Here is the logic table for this code, based on the email5.0.0 code:
|
||||
# i decode is_multipart result
|
||||
# ------ ------ ------------ ------------------------------
|
||||
# None True True None
|
||||
# i True True None
|
||||
# None False True _payload (a list)
|
||||
# i False True _payload element i (a Message)
|
||||
# i False False error (not a list)
|
||||
# i True False error (not a list)
|
||||
# None False False _payload
|
||||
# None True False _payload decoded (bytes)
|
||||
# Note that Barry planned to factor out the 'decode' case, but that
|
||||
# isn't so easy now that we handle the 8 bit data, which needs to be
|
||||
# converted in both the decode and non-decode path.
|
||||
if self.is_multipart():
|
||||
if decode:
|
||||
return None
|
||||
if i is None:
|
||||
return self._payload
|
||||
else:
|
||||
return self._payload[i]
|
||||
# For backward compatibility, Use isinstance and this error message
|
||||
# instead of the more logical is_multipart test.
|
||||
if i is not None and not isinstance(self._payload, list):
|
||||
raise TypeError('Expected list, got %s' % type(self._payload))
|
||||
else:
|
||||
payload = self._payload[i]
|
||||
payload = self._payload
|
||||
cte = self.get('content-transfer-encoding', '').lower()
|
||||
# payload can be bytes here, (I wonder if that is actually a bug?)
|
||||
if isinstance(payload, str):
|
||||
if _has_surrogates(payload):
|
||||
bpayload = payload.encode('ascii', 'surrogateescape')
|
||||
if not decode:
|
||||
try:
|
||||
payload = bpayload.decode(self.get_param('charset', 'ascii'), 'replace')
|
||||
except LookupError:
|
||||
payload = bpayload.decode('ascii', 'replace')
|
||||
elif decode:
|
||||
try:
|
||||
bpayload = payload.encode('ascii')
|
||||
except UnicodeError:
|
||||
# This won't happen for RFC compliant messages (messages
|
||||
# containing only ASCII codepoints in the unicode input).
|
||||
# If it does happen, turn the string into bytes in a way
|
||||
# guaranteed not to fail.
|
||||
bpayload = payload.encode('raw-unicode-escape')
|
||||
if not decode:
|
||||
return payload
|
||||
# Decoded payloads always return bytes. XXX split this part out into
|
||||
# a new method called .get_decoded_payload().
|
||||
if self.is_multipart():
|
||||
return None
|
||||
cte = self.get('content-transfer-encoding', '').lower()
|
||||
if cte == 'quoted-printable':
|
||||
if isinstance(payload, str):
|
||||
payload = payload.encode('ascii')
|
||||
return utils._qdecode(payload)
|
||||
return utils._qdecode(bpayload)
|
||||
elif cte == 'base64':
|
||||
try:
|
||||
if isinstance(payload, str):
|
||||
payload = payload.encode('ascii')
|
||||
return base64.b64decode(payload)
|
||||
return base64.b64decode(bpayload)
|
||||
except binascii.Error:
|
||||
# Incorrect padding
|
||||
pass
|
||||
return bpayload
|
||||
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
|
||||
in_file = BytesIO(payload.encode('ascii'))
|
||||
in_file = BytesIO(bpayload)
|
||||
out_file = BytesIO()
|
||||
try:
|
||||
uu.decode(in_file, out_file, quiet=True)
|
||||
return out_file.getvalue()
|
||||
except uu.Error:
|
||||
# Some decoding problem
|
||||
pass
|
||||
# Is there a better way to do this? We can't use the bytes
|
||||
# constructor.
|
||||
return bpayload
|
||||
if isinstance(payload, str):
|
||||
return payload.encode('raw-unicode-escape')
|
||||
return bpayload
|
||||
return payload
|
||||
|
||||
def set_payload(self, payload, charset=None):
|
||||
|
@ -340,7 +386,7 @@ class Message:
|
|||
Any fields deleted and re-inserted are always appended to the header
|
||||
list.
|
||||
"""
|
||||
return [v for k, v in self._headers]
|
||||
return [_sanitize_surrogates(v) for k, v in self._headers]
|
||||
|
||||
def items(self):
|
||||
"""Get all the message's header fields and values.
|
||||
|
@ -350,7 +396,7 @@ class Message:
|
|||
Any fields deleted and re-inserted are always appended to the header
|
||||
list.
|
||||
"""
|
||||
return self._headers[:]
|
||||
return [(k, _sanitize_surrogates(v)) for k, v in self._headers]
|
||||
|
||||
def get(self, name, failobj=None):
|
||||
"""Get a header value.
|
||||
|
@ -361,7 +407,7 @@ class Message:
|
|||
name = name.lower()
|
||||
for k, v in self._headers:
|
||||
if k.lower() == name:
|
||||
return v
|
||||
return _sanitize_surrogates(v)
|
||||
return failobj
|
||||
|
||||
#
|
||||
|
@ -381,7 +427,7 @@ class Message:
|
|||
name = name.lower()
|
||||
for k, v in self._headers:
|
||||
if k.lower() == name:
|
||||
values.append(v)
|
||||
values.append(_sanitize_surrogates(v))
|
||||
if not values:
|
||||
return failobj
|
||||
return values
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
__all__ = ['Parser', 'HeaderParser']
|
||||
|
||||
import warnings
|
||||
from io import StringIO
|
||||
from io import StringIO, TextIOWrapper
|
||||
|
||||
from email.feedparser import FeedParser
|
||||
from email.message import Message
|
||||
|
@ -89,3 +89,47 @@ class HeaderParser(Parser):
|
|||
|
||||
def parsestr(self, text, headersonly=True):
|
||||
return Parser.parsestr(self, text, True)
|
||||
|
||||
|
||||
class BytesParser:
|
||||
|
||||
def __init__(self, *args, **kw):
|
||||
"""Parser of binary RFC 2822 and MIME email messages.
|
||||
|
||||
Creates an in-memory object tree representing the email message, which
|
||||
can then be manipulated and turned over to a Generator to return the
|
||||
textual representation of the message.
|
||||
|
||||
The input must be formatted as a block of RFC 2822 headers and header
|
||||
continuation lines, optionally preceeded by a `Unix-from' header. The
|
||||
header block is terminated either by the end of the input or by a
|
||||
blank line.
|
||||
|
||||
_class is the class to instantiate for new message objects when they
|
||||
must be created. This class must have a constructor that can take
|
||||
zero arguments. Default is Message.Message.
|
||||
"""
|
||||
self.parser = Parser(*args, **kw)
|
||||
|
||||
def parse(self, fp, headersonly=False):
|
||||
"""Create a message structure from the data in a binary file.
|
||||
|
||||
Reads all the data from the file and returns the root of the message
|
||||
structure. Optional headersonly is a flag specifying whether to stop
|
||||
parsing after reading the headers or not. The default is False,
|
||||
meaning it parses the entire contents of the file.
|
||||
"""
|
||||
fp = TextIOWrapper(fp, encoding='ascii', errors='surrogateescape')
|
||||
return self.parser.parse(fp, headersonly)
|
||||
|
||||
|
||||
def parsebytes(self, text, headersonly=False):
|
||||
"""Create a message structure from a byte string.
|
||||
|
||||
Returns the root of the message structure. Optional headersonly is a
|
||||
flag specifying whether to stop parsing after reading the headers or
|
||||
not. The default is False, meaning it parses the entire contents of
|
||||
the file.
|
||||
"""
|
||||
text = text.decode('ASCII', errors='surrogateescape')
|
||||
return self.parser.parsestr(text, headersonly)
|
||||
|
|
|
@ -9,8 +9,9 @@ import base64
|
|||
import difflib
|
||||
import unittest
|
||||
import warnings
|
||||
import textwrap
|
||||
|
||||
from io import StringIO
|
||||
from io import StringIO, BytesIO
|
||||
from itertools import chain
|
||||
|
||||
import email
|
||||
|
@ -34,7 +35,7 @@ from email import iterators
|
|||
from email import base64mime
|
||||
from email import quoprimime
|
||||
|
||||
from test.support import findfile, run_unittest
|
||||
from test.support import findfile, run_unittest, unlink
|
||||
from email.test import __file__ as landmark
|
||||
|
||||
|
||||
|
@ -2070,6 +2071,10 @@ class TestIdempotent(TestEmailBase):
|
|||
msg, text = self._msgobj('msg_36.txt')
|
||||
self._idempotent(msg, text)
|
||||
|
||||
def test_message_signed_idempotent(self):
|
||||
msg, text = self._msgobj('msg_45.txt')
|
||||
self._idempotent(msg, text)
|
||||
|
||||
def test_content_type(self):
|
||||
eq = self.assertEquals
|
||||
unless = self.assertTrue
|
||||
|
@ -2186,7 +2191,8 @@ class TestMiscellaneous(TestEmailBase):
|
|||
all.sort()
|
||||
self.assertEqual(all, [
|
||||
'base64mime', 'charset', 'encoders', 'errors', 'generator',
|
||||
'header', 'iterators', 'message', 'message_from_file',
|
||||
'header', 'iterators', 'message', 'message_from_binary_file',
|
||||
'message_from_bytes', 'message_from_file',
|
||||
'message_from_string', 'mime', 'parser',
|
||||
'quoprimime', 'utils',
|
||||
])
|
||||
|
@ -2686,6 +2692,266 @@ Here's the message body
|
|||
msg = email.message_from_string(m)
|
||||
self.assertTrue(msg.get_payload(0).get_payload().endswith('\r\n'))
|
||||
|
||||
|
||||
class Test8BitBytesHandling(unittest.TestCase):
|
||||
# In Python3 all input is string, but that doesn't work if the actual input
|
||||
# uses an 8bit transfer encoding. To hack around that, in email 5.1 we
|
||||
# decode byte streams using the surrogateescape error handler, and
|
||||
# reconvert to binary at appropriate places if we detect surrogates. This
|
||||
# doesn't allow us to transform headers with 8bit bytes (they get munged),
|
||||
# but it does allow us to parse and preserve them, and to decode body
|
||||
# parts that use an 8bit CTE.
|
||||
|
||||
bodytest_msg = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: baz
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset={charset}
|
||||
Content-Transfer-Encoding: {cte}
|
||||
|
||||
{bodyline}
|
||||
""")
|
||||
|
||||
def test_known_8bit_CTE(self):
|
||||
m = self.bodytest_msg.format(charset='utf-8',
|
||||
cte='8bit',
|
||||
bodyline='pöstal').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(), "pöstal\n")
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
"pöstal\n".encode('utf-8'))
|
||||
|
||||
def test_unknown_8bit_CTE(self):
|
||||
m = self.bodytest_msg.format(charset='notavalidcharset',
|
||||
cte='8bit',
|
||||
bodyline='pöstal').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(), "p<EFBFBD><EFBFBD>stal\n")
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
"pöstal\n".encode('utf-8'))
|
||||
|
||||
def test_8bit_in_quopri_body(self):
|
||||
# This is non-RFC compliant data...without 'decode' the library code
|
||||
# decodes the body using the charset from the headers, and because the
|
||||
# source byte really is utf-8 this works. This is likely to fail
|
||||
# against real dirty data (ie: produce mojibake), but the data is
|
||||
# invalid anyway so it is as good a guess as any. But this means that
|
||||
# this test just confirms the current behavior; that behavior is not
|
||||
# necessarily the best possible behavior. With 'decode' it is
|
||||
# returning the raw bytes, so that test should be of correct behavior,
|
||||
# or at least produce the same result that email4 did.
|
||||
m = self.bodytest_msg.format(charset='utf-8',
|
||||
cte='quoted-printable',
|
||||
bodyline='p=C3=B6stál').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'pöstál\n'.encode('utf-8'))
|
||||
|
||||
def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
|
||||
# This is similar to the previous test, but proves that if the 8bit
|
||||
# byte is undecodeable in the specified charset, it gets replaced
|
||||
# by the unicode 'unknown' character. Again, this may or may not
|
||||
# be the ideal behavior. Note that if decode=False none of the
|
||||
# decoders will get involved, so this is the only test we need
|
||||
# for this behavior.
|
||||
m = self.bodytest_msg.format(charset='ascii',
|
||||
cte='quoted-printable',
|
||||
bodyline='p=C3=B6stál').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(), 'p=C3=B6st<73><74>l\n')
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'pöstál\n'.encode('utf-8'))
|
||||
|
||||
def test_8bit_in_base64_body(self):
|
||||
# Sticking an 8bit byte in a base64 block makes it undecodable by
|
||||
# normal means, so the block is returned undecoded, but as bytes.
|
||||
m = self.bodytest_msg.format(charset='utf-8',
|
||||
cte='base64',
|
||||
bodyline='cMO2c3RhbAá=').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'cMO2c3RhbAá=\n'.encode('utf-8'))
|
||||
|
||||
def test_8bit_in_uuencode_body(self):
|
||||
# Sticking an 8bit byte in a uuencode block makes it undecodable by
|
||||
# normal means, so the block is returned undecoded, but as bytes.
|
||||
m = self.bodytest_msg.format(charset='utf-8',
|
||||
cte='uuencode',
|
||||
bodyline='<,.V<W1A; á ').encode('utf-8')
|
||||
msg = email.message_from_bytes(m)
|
||||
self.assertEqual(msg.get_payload(decode=True),
|
||||
'<,.V<W1A; á \n'.encode('utf-8'))
|
||||
|
||||
|
||||
headertest_msg = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: báz
|
||||
Subject: Maintenant je vous présente mon collègue, le pouf célèbre
|
||||
\tJean de Baddie
|
||||
From: göst
|
||||
|
||||
Yes, they are flying.
|
||||
""").encode('utf-8')
|
||||
|
||||
def test_get_8bit_header(self):
|
||||
msg = email.message_from_bytes(self.headertest_msg)
|
||||
self.assertEqual(msg.get('to'), 'b??z')
|
||||
self.assertEqual(msg['to'], 'b??z')
|
||||
|
||||
def test_print_8bit_headers(self):
|
||||
msg = email.message_from_bytes(self.headertest_msg)
|
||||
self.assertEqual(str(msg),
|
||||
self.headertest_msg.decode(
|
||||
'ascii', 'replace').replace('<EFBFBD>', '?'))
|
||||
|
||||
def test_values_with_8bit_headers(self):
|
||||
msg = email.message_from_bytes(self.headertest_msg)
|
||||
self.assertListEqual(msg.values(),
|
||||
['foo@bar.com',
|
||||
'b??z',
|
||||
'Maintenant je vous pr??sente mon '
|
||||
'coll??gue, le pouf c??l??bre\n'
|
||||
'\tJean de Baddie',
|
||||
"g??st"])
|
||||
|
||||
def test_items_with_8bit_headers(self):
|
||||
msg = email.message_from_bytes(self.headertest_msg)
|
||||
self.assertListEqual(msg.items(),
|
||||
[('From', 'foo@bar.com'),
|
||||
('To', 'b??z'),
|
||||
('Subject', 'Maintenant je vous pr??sente mon '
|
||||
'coll??gue, le pouf c??l??bre\n'
|
||||
'\tJean de Baddie'),
|
||||
('From', 'g??st')])
|
||||
|
||||
def test_get_all_with_8bit_headers(self):
|
||||
msg = email.message_from_bytes(self.headertest_msg)
|
||||
self.assertListEqual(msg.get_all('from'),
|
||||
['foo@bar.com',
|
||||
'g??st'])
|
||||
|
||||
non_latin_bin_msg = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: báz
|
||||
Subject: Maintenant je vous présente mon collègue, le pouf célèbre
|
||||
\tJean de Baddie
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
Да, они летят.
|
||||
""").encode('utf-8')
|
||||
|
||||
def test_bytes_generator(self):
|
||||
msg = email.message_from_bytes(self.non_latin_bin_msg)
|
||||
out = BytesIO()
|
||||
email.generator.BytesGenerator(out).flatten(msg)
|
||||
self.assertEqual(out.getvalue(), self.non_latin_bin_msg)
|
||||
|
||||
# XXX: ultimately the '?' should turn into CTE encoded bytes
|
||||
# using 'unknown-8bit' charset.
|
||||
non_latin_bin_msg_as7bit = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: b??z
|
||||
Subject: Maintenant je vous pr??sente mon coll??gue, le pouf c??l??bre
|
||||
\tJean de Baddie
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="utf-8"
|
||||
Content-Transfer-Encoding: base64
|
||||
|
||||
0JTQsCwg0L7QvdC4INC70LXRgtGP0YIuCg==
|
||||
""")
|
||||
|
||||
def test_generator_handles_8bit(self):
|
||||
msg = email.message_from_bytes(self.non_latin_bin_msg)
|
||||
out = StringIO()
|
||||
email.generator.Generator(out).flatten(msg)
|
||||
self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit)
|
||||
|
||||
def test_bytes_generator_with_unix_from(self):
|
||||
# The unixfrom contains a current date, so we can't check it
|
||||
# literally. Just make sure the first word is 'From' and the
|
||||
# rest of the message matches the input.
|
||||
msg = email.message_from_bytes(self.non_latin_bin_msg)
|
||||
out = BytesIO()
|
||||
email.generator.BytesGenerator(out).flatten(msg, unixfrom=True)
|
||||
lines = out.getvalue().split(b'\n')
|
||||
self.assertEqual(lines[0].split()[0], b'From')
|
||||
self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg)
|
||||
|
||||
def test_message_from_binary_file(self):
|
||||
fn = 'test.msg'
|
||||
self.addCleanup(unlink, fn)
|
||||
with open(fn, 'wb') as testfile:
|
||||
testfile.write(self.non_latin_bin_msg)
|
||||
m = email.parser.BytesParser().parse(open(fn, 'rb'))
|
||||
self.assertEqual(str(m), self.non_latin_bin_msg_as7bit)
|
||||
|
||||
latin_bin_msg = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: Dinsdale
|
||||
Subject: Nudge nudge, wink, wink
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="latin-1"
|
||||
Content-Transfer-Encoding: 8bit
|
||||
|
||||
oh là là, know what I mean, know what I mean?
|
||||
""").encode('latin-1')
|
||||
|
||||
latin_bin_msg_as7bit = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: Dinsdale
|
||||
Subject: Nudge nudge, wink, wink
|
||||
Mime-Version: 1.0
|
||||
Content-Type: text/plain; charset="iso-8859-1"
|
||||
Content-Transfer-Encoding: quoted-printable
|
||||
|
||||
oh l=E0 l=E0, know what I mean, know what I mean?
|
||||
""")
|
||||
|
||||
def test_string_generator_reencodes_to_quopri_when_appropriate(self):
|
||||
m = email.message_from_bytes(self.latin_bin_msg)
|
||||
self.assertEqual(str(m), self.latin_bin_msg_as7bit)
|
||||
|
||||
def test_decoded_generator_emits_unicode_body(self):
|
||||
m = email.message_from_bytes(self.latin_bin_msg)
|
||||
out = StringIO()
|
||||
email.generator.DecodedGenerator(out).flatten(m)
|
||||
#DecodedHeader output contains an extra blank line compared
|
||||
#to the input message. RDM: not sure if this is a bug or not,
|
||||
#but it is not specific to the 8bit->7bit conversion.
|
||||
self.assertEqual(out.getvalue(),
|
||||
self.latin_bin_msg.decode('latin-1')+'\n')
|
||||
|
||||
def test_bytes_feedparser(self):
|
||||
bfp = email.feedparser.BytesFeedParser()
|
||||
for i in range(0, len(self.latin_bin_msg), 10):
|
||||
bfp.feed(self.latin_bin_msg[i:i+10])
|
||||
m = bfp.close()
|
||||
self.assertEqual(str(m), self.latin_bin_msg_as7bit)
|
||||
|
||||
|
||||
class TestBytesGeneratorIdempotent(TestIdempotent):
|
||||
|
||||
def _msgobj(self, filename):
|
||||
with openfile(filename, 'rb') as fp:
|
||||
data = fp.read()
|
||||
msg = email.message_from_bytes(data)
|
||||
return msg, data
|
||||
|
||||
def _idempotent(self, msg, data):
|
||||
b = BytesIO()
|
||||
g = email.generator.BytesGenerator(b, maxheaderlen=0)
|
||||
g.flatten(msg)
|
||||
self.assertEqual(data, b.getvalue())
|
||||
|
||||
maxDiff = None
|
||||
|
||||
def assertEqual(self, str1, str2):
|
||||
self.assertListEqual(str1.split(b'\n'), str2.split(b'\n'))
|
||||
|
||||
|
||||
|
||||
class TestBase64(unittest.TestCase):
|
||||
def test_len(self):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue