This commit is contained in:
Jessica James 2025-12-23 17:54:23 +09:00 committed by GitHub
commit 402fd0250c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 436 additions and 82 deletions

View file

@ -22,6 +22,7 @@ object's .defects attribute.
__all__ = ['FeedParser', 'BytesFeedParser']
import re
import sys
from email import errors
from email._policybase import compat32
@ -52,15 +53,16 @@ class BufferedSubFile(object):
simple abstraction -- it parses until EOF closes the current message.
"""
def __init__(self):
# Text stream of the last partial line pushed into this object.
# See issue 22233 for why this is a text stream and not a list.
self._partial = StringIO(newline='')
self._partial = []
self._dangling_partial = False
# A deque of full, pushed lines
self._lines = deque()
# The stack of false-EOF checking predicates.
self._eofstack = []
# A flag indicating whether the file has been closed or not.
self._closed = False
self._dump_destination = None
self._dump_result = None
def push_eof_matcher(self, pred):
self._eofstack.append(pred)
@ -70,10 +72,8 @@ class BufferedSubFile(object):
def close(self):
# Don't forget any trailing partial line.
self._partial.seek(0)
self.pushlines(self._partial.readlines())
self._partial.seek(0)
self._partial.truncate()
if self._partial:
self._flush_partial()
self._closed = True
def readline(self):
@ -87,40 +87,253 @@ class BufferedSubFile(object):
# RFC 2046, section 5.1.2 requires us to recognize outer level
# boundaries at any level of inner nesting. Do this, but be sure it's
# in the order of most to least nested.
for ateof in reversed(self._eofstack):
if ateof(line):
# We're at the false EOF. But push the last line back first.
self._lines.appendleft(line)
return ''
if self._check_eofstack(line):
# We're at the false EOF. But push the last line back first.
self._lines.appendleft(line)
return ''
return line
def _check_eofstack(self, data, start=0, end=sys.maxsize):
# check if we can find a dummy EOF
return any(
ateof(data, start, end)
for ateof in reversed(self._eofstack)
)
def unreadline(self, line):
# Let the consumer push a line back into the buffer.
assert line is not NeedMoreData
self._lines.appendleft(line)
def _flush_partial(self):
line = EMPTYSTRING.join(self._partial)
if not line:
pass
elif self._dump_destination is None:
# We're not dumping data. Just flush the partial to lines.
self._lines.append(line)
elif self._check_eofstack(line):
# We were dumping, but we've now reached the end of the dump.
self._dump_destination = None
self._lines.append(line)
else:
# We're still dumping; push to dump
self._dump_destination.append(line)
self._partial.clear()
self._dangling_partial = False
def push(self, data):
"""Push some new data into this object."""
self._partial.write(data)
if '\n' not in data and '\r' not in data:
if not data:
pass
elif self._can_dump_data(data):
self._dump_destination.append(data)
else:
self._push_data(data)
def _can_dump_data(self, data):
if self._dump_destination is None:
return False
# We're dumping; check for easy optimizations.
if not self._eofstack:
# There's nothing that will ever tell us to stop dumping.
# This does absolute wonders for large non-multipart emails.
assert not self._lines
assert not self._dangling_partial
assert not self._partial
return True
# We can't dump this blob if we have pending partial data
if self._partial:
return False
for pred in self._eofstack:
if not hasattr(pred, 'is_boundary_match'):
# We can't blindly dump entire chunks, if we're interested in
# more than just boundaries
return False
# We only care about boundaries; we can dump as long as there's no
# potential boundaries.
return '-' not in data
def _can_dump_partial(self, line, start=0, end=sys.maxsize):
# Very similar to _can_dump_data above, except we can make some
# additional assumptions for partials/lines.
assert not self._partial or line is self._partial[0]
if self._dump_destination is None:
return False
# We're dumping. There should be absolutely no other pending lines,
# because those should've been dumped.
assert not self._lines
if not self._eofstack:
# There's nothing that will ever tell us to stop dumping. Dump away
return True
for pred in self._eofstack:
if not hasattr(pred, 'is_boundary_match'):
return False
# We only care about boundaries; we can dump as long as there's no
# potential boundaries.
return not line.startswith("-", start, end)
def _is_dump_midline(self):
if not self._dump_destination:
return False
return self._dump_destination[-1][-1] not in ('\n', '\r')
def _push_data(self, data):
# Find first newline character in the data
unl_start_index = BufferedSubFile._find_unl(data)
if unl_start_index < 0:
# No new complete lines, wait for more.
# Check to see if we had a previous dangling partial newline
if self._dangling_partial:
# We previously pushed a dangling line expecting \n to follow,
# however we received other data instead. Therefore, that \r
# does actually terminate a line. Go ahead and push it.
self._flush_partial()
# No lines in data to push; wait for more data
if self._is_dump_midline():
assert not self._partial
self._dump_destination.append(data)
else:
self._partial.append(data)
return
# Crack into lines, preserving the linesep characters.
self._partial.seek(0)
parts = self._partial.readlines()
self._partial.seek(0)
self._partial.truncate()
data_start_index = 0
# Complete our previous/partial line.
if self._partial:
if self._dangling_partial:
if data[0] != NL:
# "\r<whatever>" -- push what we had, it's been terminated
self._flush_partial()
else:
# "\r\n" -- append \n to complete it and push
self._partial.append(NL)
self._flush_partial()
data_start_index = 1
# If the last element of the list does not end in a newline, then treat
# it as a partial line. We only check for '\n' here because a line
# ending with '\r' might be a line that was split in the middle of a
# '\r\n' sequence (see bugs 1555570 and 1721862).
if not parts[-1].endswith('\n'):
self._partial.write(parts.pop())
self.pushlines(parts)
unl_start_index = BufferedSubFile._find_unl(
data, data_start_index)
else:
# Complete our partial with the new line and push it
unl_end_index = BufferedSubFile._find_unl_end(
data, unl_start_index)
if unl_end_index < 0:
# The newline is incomplete; append data and return
self._partial.append(data)
self._dangling_partial = True
return
# We have a complete line; append it and flush _partial
self._partial.append(data[data_start_index:unl_end_index])
self._flush_partial()
data_start_index = unl_end_index
# Find the next newline
unl_start_index = BufferedSubFile._find_unl(
data, data_start_index)
# _partial is now guaranteed to point to be empty
# data_start_index is an index which points to the start of next line
# unl_start_index is the start of the next newline character, or -1
self._push_data_no_partial(data, data_start_index, unl_start_index)
def _push_data_no_partial(self, data, data_start_index, unl_start_index):
# Process any remaining whole lines in data
if unl_start_index < 0:
# Push right to the partial if there's no lines
if data_start_index < len(data):
assert data_start_index >= 0
partial_line = data[data_start_index:]
if self._is_dump_midline() \
or self._can_dump_partial(partial_line):
self._dump_destination.append(partial_line)
else:
self._partial = [partial_line]
if data[-1] == '\r':
self._dangling_partial = True
elif self._dump_destination is None \
and unl_start_index < len(data) // 2:
# If it looks like we're going to be doing a lot of splits/joins,
# just go ahead and use StringIO, for speed
# If we had some sort of "StringViewIO" to avoid the copy, this
# would be significantly more efficient
# This code block, and the "else" code block below, functionally do
# the exact same thing, except this path makes no attempt to handle
# dumping data
sio = StringIO(data, '')
sio.seek(data_start_index)
lines = sio.readlines()
if lines:
if data[-1] != '\n':
self._partial.append(lines.pop())
if data[-1] == '\r':
self._dangling_partial = True
self.pushlines(lines)
else:
dump_data_start = None if self._dump_destination is None \
else data_start_index
while unl_start_index >= 0:
unl_end_index = BufferedSubFile._find_unl_end(
data, unl_start_index)
if unl_end_index < 0:
# Incomplete line ending; break to just update our partial
self._dangling_partial = True
break
# We have an easy line; push it
if self._dump_destination is not None:
# We have a window into a line. Make sure it's not EOF
if self._check_eofstack(
data, data_start_index, unl_end_index):
# This line is "EOF". This is the end of our dump data
self._dump_destination.append(
data[dump_data_start:data_start_index])
# Also push our line, since we already have it
self._lines.append(
data[data_start_index:unl_end_index])
self._dump_destination = None
#else: # This line didn't mark the end. Keep going.
else:
# We're not dumping. Just go ahead and push the line
self._lines.append(data[data_start_index:unl_end_index])
# Update our iterators
data_start_index = unl_end_index
unl_start_index = BufferedSubFile._find_unl(
data, data_start_index)
if self._dump_destination is not None:
# Push everything that isn't going into the partial to the dump
# If we're able to safely flush the partial, do that too
# We don't care about self._is_dump_midline() here, because
# data_start_index always represents the start of a new line
if self._can_dump_partial(data, data_start_index):
self._dump_destination.append(data[dump_data_start:])
# Flush any partial-related state we may have set
self._dangling_partial = False
return # skip the _partial.append below
else:
self._dump_destination.append(
data[dump_data_start:data_start_index])
# If we have any partial data leftover, go ahead and set it
if data_start_index < len(data):
self._partial.append(data[data_start_index:])
def pushlines(self, lines):
# This method is not documented on docs.python.org
self._lines.extend(lines)
def __iter__(self):
@ -132,6 +345,71 @@ class BufferedSubFile(object):
raise StopIteration
return line
def _get_dump(self, start_value=None):
_dump_destination = deque()
self._dump_destination = _dump_destination
if start_value:
_dump_destination.append(start_value)
# Flush our current _lines to _dump_destination
needs_more_data = False
for line in self:
if line is NeedMoreData:
needs_more_data = True
break
_dump_destination.append(line)
# Pull in more data, if we need more
if needs_more_data:
# Flush our partial, if we can
if self._partial and self._can_dump_partial(self._partial[0]):
_dump_destination.extend(self._partial)
self._partial.clear()
self._dangling_partial = False
# Pull in more data until we're told to stop
while not self._closed and self._dump_destination is not None:
yield NeedMoreData
# Flush our final dump string to _dump_result
self._dump_destination = None
self._dump_result = EMPTYSTRING.join(_dump_destination)
def _pop_dump(self):
result = self._dump_result
self._dump_result = None
return result
@staticmethod
def _find_unl(data, start=0):
# Like str.find(), but for universal newlines
# Originally, this iterated over the string, however this is faster
# This could be sped up by replacing with a similar function in C,
# so we don't pass over the string twice.
cr_index = data.find('\r', start)
if cr_index < 0:
return data.find(NL, start)
nl_index = data.find(NL, start, cr_index)
return nl_index if nl_index >= 0 else cr_index
@staticmethod
def _find_unl_end(data, start):
# Returns the 1-past-the-end index of a universal newline
# This could be sped up by replacing with a similar function in C.
# \n is always end of line
if data.startswith(NL, start):
return start + 1
# \r\n is always end of line
if data.startswith(NL, start + 1):
return start + 2
# End of data; we can't know if a \n follows, so no universal line end
if start + 1 >= len(data):
return -1
# This is a \r followed by some other non-newline character
return start + 1
class FeedParser:
"""A feed-style parser of email."""
@ -242,16 +520,8 @@ class FeedParser:
# necessary in the older parser, which could raise errors. All
# remaining lines in the input are thrown into the message body.
if self._headersonly:
lines = []
while True:
line = self._input.readline()
if line is NeedMoreData:
yield NeedMoreData
continue
if line == '':
break
lines.append(line)
self._cur.set_payload(EMPTYSTRING.join(lines))
yield from self._input._get_dump()
self._cur.set_payload(self._input._pop_dump())
return
if self._cur.get_content_type() == 'message/delivery-status':
# message/delivery-status contains blocks of headers separated by
@ -311,13 +581,8 @@ class FeedParser:
# defective.
defect = errors.NoBoundaryInMultipartDefect()
self.policy.handle_defect(self._cur, defect)
lines = []
for line in self._input:
if line is NeedMoreData:
yield NeedMoreData
continue
lines.append(line)
self._cur.set_payload(EMPTYSTRING.join(lines))
yield from self._input._get_dump()
self._cur.set_payload(self._input._pop_dump())
return
# Make sure a valid content type was specified per RFC 2045:6.4.
if (str(self._cur.get('content-transfer-encoding', '8bit')).lower()
@ -329,10 +594,11 @@ class FeedParser:
# this onto the input stream until we've scanned past the
# preamble.
separator = '--' + boundary
def boundarymatch(line):
if not line.startswith(separator):
def boundarymatch(line, pos = 0, endpos = sys.maxsize):
if not line.startswith(separator, pos, endpos):
return None
return boundaryendRE.match(line, len(separator))
return boundaryendRE.match(line, pos + len(separator), endpos)
boundarymatch.is_boundary_match = True
capturing_preamble = True
preamble = []
linesep = False
@ -424,12 +690,11 @@ class FeedParser:
defect = errors.StartBoundaryNotFoundDefect()
self.policy.handle_defect(self._cur, defect)
self._cur.set_payload(EMPTYSTRING.join(preamble))
epilogue = []
for line in self._input:
if line is NeedMoreData:
yield NeedMoreData
continue
self._cur.epilogue = EMPTYSTRING.join(epilogue)
self._cur.epilogue = ''
return
# If we're not processing the preamble, then we might have seen
# EOF without seeing that end boundary...that is also a defect.
@ -440,34 +705,27 @@ class FeedParser:
# Everything from here to the EOF is epilogue. If the end boundary
# ended in a newline, we'll need to make sure the epilogue isn't
# None
if linesep:
epilogue = ['']
else:
epilogue = []
for line in self._input:
if line is NeedMoreData:
yield NeedMoreData
continue
epilogue.append(line)
# Any CRLF at the front of the epilogue is not technically part of
# the epilogue. Also, watch out for an empty string epilogue,
# which means a single newline.
if epilogue:
firstline = epilogue[0]
bolmo = NLCRE_bol.match(firstline)
if bolmo:
epilogue[0] = firstline[len(bolmo.group(0)):]
self._cur.epilogue = EMPTYSTRING.join(epilogue)
first_line = ''
if not linesep:
for line in self._input:
if line is NeedMoreData:
yield NeedMoreData
continue
first_line = line
if first_line:
bolmo = NLCRE_bol.match(first_line)
if bolmo:
first_line = first_line[len(bolmo.group(0)):]
break
yield from self._input._get_dump(first_line)
self._cur.epilogue = self._input._pop_dump()
return
# Otherwise, it's some non-multipart type, so the entire rest of the
# file contents becomes the payload.
lines = []
for line in self._input:
if line is NeedMoreData:
yield NeedMoreData
continue
lines.append(line)
self._cur.set_payload(EMPTYSTRING.join(lines))
yield from self._input._get_dump()
self._cur.set_payload(self._input._pop_dump())
def _parse_headers(self, lines):
# Passed a list of lines that make up the headers for the current msg

View file

@ -12,6 +12,8 @@ from io import StringIO, TextIOWrapper
from email.feedparser import FeedParser, BytesFeedParser
from email._policybase import compat32
_FEED_CHUNK_SIZE = 8192
class Parser:
def __init__(self, _class=None, *, policy=compat32):
@ -38,6 +40,18 @@ class Parser:
self._class = _class
self.policy = policy
def _parse_chunks(self, chunk_generator, headersonly=False):
"""Internal method / implementation detail
Parses chunks from a chunk generator into a FeedParser
"""
feedparser = FeedParser(self._class, policy=self.policy)
if headersonly:
feedparser._set_headersonly()
for data in chunk_generator:
feedparser.feed(data)
return feedparser.close()
def parse(self, fp, headersonly=False):
"""Create a message structure from the data in a file.
@ -46,12 +60,12 @@ class Parser:
parsing after reading the headers or not. The default is False,
meaning it parses the entire contents of the file.
"""
feedparser = FeedParser(self._class, policy=self.policy)
if headersonly:
feedparser._set_headersonly()
while data := fp.read(8192):
feedparser.feed(data)
return feedparser.close()
def _fp_get_chunks():
while data := fp.read(_FEED_CHUNK_SIZE):
yield data
_chunk_generator = _fp_get_chunks()
return self._parse_chunks(_chunk_generator, headersonly)
def parsestr(self, text, headersonly=False):
"""Create a message structure from a string.
@ -61,7 +75,12 @@ class Parser:
not. The default is False, meaning it parses the entire contents of
the file.
"""
return self.parse(StringIO(text), headersonly=headersonly)
_chunk_generator = (
text[offset:offset + _FEED_CHUNK_SIZE]
for offset in range(0, len(text), _FEED_CHUNK_SIZE)
)
return self._parse_chunks(_chunk_generator, headersonly)
class HeaderParser(Parser):
@ -115,8 +134,13 @@ class BytesParser:
not. The default is False, meaning it parses the entire contents of
the file.
"""
text = text.decode('ASCII', errors='surrogateescape')
return self.parser.parsestr(text, headersonly)
_chunk_generator = (
text[offset:offset + _FEED_CHUNK_SIZE].decode(
'ASCII', errors='surrogateescape')
for offset in range(0, len(text), _FEED_CHUNK_SIZE)
)
return self.parser._parse_chunks(_chunk_generator, headersonly)
class BytesHeaderParser(BytesParser):

View file

@ -4564,6 +4564,72 @@ class BaseTestBytesGeneratorIdempotent:
g.flatten(msg, unixfrom=unixfrom, linesep=self.linesep)
self.assertEqual(data, b.getvalue())
class TestFeedParserTrickle(TestEmailBase):
@staticmethod
def _msgobj_trickle(filename, trickle_size=2, force_linetype="\r\n"):
# Trickle data into the feed parser, one character at a time
with openfile(filename, encoding="utf-8") as fp:
file_str = fp.read()
file_str = file_str.replace("\r\n", "\n").replace("\r", "\n") \
.replace("\n", force_linetype)
feedparser = FeedParser()
for index in range(0, len(file_str), trickle_size):
feedparser.feed(file_str[index:index + trickle_size])
return feedparser.close()
def _validate_msg10_msgobj(self, msg, line_end):
if isinstance(line_end, str):
line_end = line_end.encode()
eq = self.assertEqual
eq(msg.get_payload(decode=True), None)
eq(msg.get_payload(0).get_payload(decode=True),
b'This is a 7bit encoded message.' + line_end)
eq(msg.get_payload(1).get_payload(decode=True),
b'\xa1This is a Quoted Printable encoded message!' + line_end)
eq(msg.get_payload(2).get_payload(decode=True),
b'This is a Base64 encoded message.')
eq(msg.get_payload(3).get_payload(decode=True),
b'This is a Base64 encoded message.\n')
eq(msg.get_payload(4).get_payload(decode=True),
b'This has no Content-Transfer-Encoding: header.' + line_end)
def test_trickle_1chr_crlf(self):
msg = self._msgobj_trickle('msg_10.txt', 1, '\r\n')
self._validate_msg10_msgobj(msg, '\r\n')
def test_trickle_1chr_cr(self):
msg = self._msgobj_trickle('msg_10.txt', 1, '\r')
self._validate_msg10_msgobj(msg, '\r')
def test_trickle_1chr_lf(self):
msg = self._msgobj_trickle('msg_10.txt', 1, '\n')
self._validate_msg10_msgobj(msg, '\n')
def test_trickle_2chr_crlf(self):
msg = self._msgobj_trickle('msg_10.txt', 2, '\r\n')
self._validate_msg10_msgobj(msg, '\r\n')
def test_trickle_2chr_cr(self):
msg = self._msgobj_trickle('msg_10.txt', 2, '\r')
self._validate_msg10_msgobj(msg, '\r')
def test_trickle_2chr_lf(self):
msg = self._msgobj_trickle('msg_10.txt', 2, '\n')
self._validate_msg10_msgobj(msg, '\n')
def test_trickle_3chr_crlf(self):
msg = self._msgobj_trickle('msg_10.txt', 3, '\r\n')
self._validate_msg10_msgobj(msg, '\r\n')
def test_trickle_3chr_cr(self):
msg = self._msgobj_trickle('msg_10.txt', 3, '\r')
self._validate_msg10_msgobj(msg, '\r')
def test_trickle_3chr_lf(self):
msg = self._msgobj_trickle('msg_10.txt', 3, '\n')
self._validate_msg10_msgobj(msg, '\n')
class TestBytesGeneratorIdempotentNL(BaseTestBytesGeneratorIdempotent,
TestIdempotent):

View file

@ -883,6 +883,7 @@ Jeffrey C. Jacobs
Kevin Jacobs
Kjetil Jacobsen
Shantanu Jain
Jessica A. James
Bertrand Janin
Geert Jansen
Jack Jansen

View file

@ -0,0 +1,5 @@
Substantially improved memory usage and performance when parsing email text
in :mod:`email`. Primarily reduces memory usage in
:func:`email.message_from_bytes`, :func:`email.message_from_string`,
:class:`email.parser.Parser`, :class:`email.parser.BytesParser`,
:class:`email.parser.FeedParser`, :class:`email.parser.BytesFeedParser`