mirror of
https://github.com/python/cpython.git
synced 2025-11-02 03:01:58 +00:00
Issue #21448: Fixed FeedParser feed() to avoid O(N**2) behavior when parsing long line.
Original patch by Raymond Hettinger.
This commit is contained in:
parent
6f20170762
commit
320a1c0ff7
3 changed files with 80 additions and 12 deletions
|
|
@ -10,6 +10,7 @@ import textwrap
|
|||
|
||||
from io import StringIO, BytesIO
|
||||
from itertools import chain
|
||||
from random import choice
|
||||
|
||||
import email
|
||||
import email.policy
|
||||
|
|
@ -3353,16 +3354,70 @@ Do you like this message?
|
|||
bsf.push(il)
|
||||
nt += n
|
||||
n1 = 0
|
||||
while True:
|
||||
ol = bsf.readline()
|
||||
if ol == NeedMoreData:
|
||||
break
|
||||
for ol in iter(bsf.readline, NeedMoreData):
|
||||
om.append(ol)
|
||||
n1 += 1
|
||||
self.assertEqual(n, n1)
|
||||
self.assertEqual(len(om), nt)
|
||||
self.assertEqual(''.join([il for il, n in imt]), ''.join(om))
|
||||
|
||||
def test_push_random(self):
|
||||
from email.feedparser import BufferedSubFile, NeedMoreData
|
||||
|
||||
n = 10000
|
||||
chunksize = 5
|
||||
chars = 'abcd \t\r\n'
|
||||
|
||||
s = ''.join(choice(chars) for i in range(n)) + '\n'
|
||||
target = s.splitlines(True)
|
||||
|
||||
bsf = BufferedSubFile()
|
||||
lines = []
|
||||
for i in range(0, len(s), chunksize):
|
||||
chunk = s[i:i+chunksize]
|
||||
bsf.push(chunk)
|
||||
lines.extend(iter(bsf.readline, NeedMoreData))
|
||||
self.assertEqual(lines, target)
|
||||
|
||||
|
||||
class TestFeedParsers(TestEmailBase):
|
||||
|
||||
def parse(self, chunks):
|
||||
from email.feedparser import FeedParser
|
||||
feedparser = FeedParser()
|
||||
for chunk in chunks:
|
||||
feedparser.feed(chunk)
|
||||
return feedparser.close()
|
||||
|
||||
def test_newlines(self):
|
||||
m = self.parse(['a:\nb:\rc:\r\nd:\n'])
|
||||
self.assertEqual(m.keys(), ['a', 'b', 'c', 'd'])
|
||||
m = self.parse(['a:\nb:\rc:\r\nd:'])
|
||||
self.assertEqual(m.keys(), ['a', 'b', 'c', 'd'])
|
||||
m = self.parse(['a:\rb', 'c:\n'])
|
||||
self.assertEqual(m.keys(), ['a', 'bc'])
|
||||
m = self.parse(['a:\r', 'b:\n'])
|
||||
self.assertEqual(m.keys(), ['a', 'b'])
|
||||
m = self.parse(['a:\r', '\nb:\n'])
|
||||
self.assertEqual(m.keys(), ['a', 'b'])
|
||||
m = self.parse(['a:\x85b:\u2028c:\n'])
|
||||
self.assertEqual(m.items(), [('a', '\x85'), ('b', '\u2028'), ('c', '')])
|
||||
m = self.parse(['a:\r', 'b:\x85', 'c:\n'])
|
||||
self.assertEqual(m.items(), [('a', ''), ('b', '\x85'), ('c', '')])
|
||||
|
||||
def test_long_lines(self):
|
||||
M, N = 1000, 100000
|
||||
m = self.parse(['a:b\n\n'] + ['x'*M] * N)
|
||||
self.assertEqual(m.items(), [('a', 'b')])
|
||||
self.assertEqual(m.get_payload(), 'x'*M*N)
|
||||
m = self.parse(['a:b\r\r'] + ['x'*M] * N)
|
||||
self.assertEqual(m.items(), [('a', 'b')])
|
||||
self.assertEqual(m.get_payload(), 'x'*M*N)
|
||||
m = self.parse(['a:b\r\r'] + ['x'*M+'\x85'] * N)
|
||||
self.assertEqual(m.items(), [('a', 'b')])
|
||||
self.assertEqual(m.get_payload(), ('x'*M+'\x85')*N)
|
||||
m = self.parse(['a:\r', 'b: '] + ['x'*M] * N)
|
||||
self.assertEqual(m.items(), [('a', ''), ('b', 'x'*M*N)])
|
||||
|
||||
|
||||
class TestParsers(TestEmailBase):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue