#11731: simplify/enhance parser/generator API by introducing policy objects.

This new interface will also allow for future planned enhancements
in control over the parser/generator without requiring any additional
complexity in the parser/generator API.

Patch reviewed by Éric Araujo and Barry Warsaw.
This commit is contained in:
R David Murray 2011-04-18 13:59:37 -04:00
parent ce16be91dc
commit 3edd22ac95
13 changed files with 912 additions and 81 deletions

View file

@ -1776,7 +1776,12 @@ YXNkZg==
# Test some badly formatted messages
class TestNonConformant(TestEmailBase):
class TestNonConformantBase:
def _msgobj(self, filename):
with openfile(filename) as fp:
return email.message_from_file(fp, policy=self.policy)
def test_parse_missing_minor_type(self):
eq = self.assertEqual
msg = self._msgobj('msg_14.txt')
@ -1790,17 +1795,18 @@ class TestNonConformant(TestEmailBase):
# XXX We can probably eventually do better
inner = msg.get_payload(0)
unless(hasattr(inner, 'defects'))
self.assertEqual(len(inner.defects), 1)
unless(isinstance(inner.defects[0],
self.assertEqual(len(self.get_defects(inner)), 1)
unless(isinstance(self.get_defects(inner)[0],
errors.StartBoundaryNotFoundDefect))
def test_multipart_no_boundary(self):
unless = self.assertTrue
msg = self._msgobj('msg_25.txt')
unless(isinstance(msg.get_payload(), str))
self.assertEqual(len(msg.defects), 2)
unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
unless(isinstance(msg.defects[1],
self.assertEqual(len(self.get_defects(msg)), 2)
unless(isinstance(self.get_defects(msg)[0],
errors.NoBoundaryInMultipartDefect))
unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_invalid_content_type(self):
@ -1856,9 +1862,10 @@ counter to RFC 2822, there's no separating newline here
unless = self.assertTrue
msg = self._msgobj('msg_41.txt')
unless(hasattr(msg, 'defects'))
self.assertEqual(len(msg.defects), 2)
unless(isinstance(msg.defects[0], errors.NoBoundaryInMultipartDefect))
unless(isinstance(msg.defects[1],
self.assertEqual(len(self.get_defects(msg)), 2)
unless(isinstance(self.get_defects(msg)[0],
errors.NoBoundaryInMultipartDefect))
unless(isinstance(self.get_defects(msg)[1],
errors.MultipartInvariantViolationDefect))
def test_missing_start_boundary(self):
@ -1872,22 +1879,72 @@ counter to RFC 2822, there's no separating newline here
#
# [*] This message is missing its start boundary
bad = outer.get_payload(1).get_payload(0)
self.assertEqual(len(bad.defects), 1)
self.assertTrue(isinstance(bad.defects[0],
self.assertEqual(len(self.get_defects(bad)), 1)
self.assertTrue(isinstance(self.get_defects(bad)[0],
errors.StartBoundaryNotFoundDefect))
def test_first_line_is_continuation_header(self):
eq = self.assertEqual
m = ' Line 1\nLine 2\nLine 3'
msg = email.message_from_string(m)
msg = email.message_from_string(m, policy=self.policy)
eq(msg.keys(), [])
eq(msg.get_payload(), 'Line 2\nLine 3')
eq(len(msg.defects), 1)
self.assertTrue(isinstance(msg.defects[0],
eq(len(self.get_defects(msg)), 1)
self.assertTrue(isinstance(self.get_defects(msg)[0],
errors.FirstHeaderLineIsContinuationDefect))
eq(msg.defects[0].line, ' Line 1\n')
eq(self.get_defects(msg)[0].line, ' Line 1\n')
class TestNonConformant(TestNonConformantBase, TestEmailBase):
policy=email.policy.default
def get_defects(self, obj):
return obj.defects
class TestNonConformantCapture(TestNonConformantBase, TestEmailBase):
class CapturePolicy(email.policy.Policy):
captured = None
def register_defect(self, obj, defect):
self.captured.append(defect)
def setUp(self):
self.policy = self.CapturePolicy(captured=list())
def get_defects(self, obj):
return self.policy.captured
class TestRaisingDefects(TestEmailBase):
def _msgobj(self, filename):
with openfile(filename) as fp:
return email.message_from_file(fp, policy=email.policy.strict)
def test_same_boundary_inner_outer(self):
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
self._msgobj('msg_15.txt')
def test_multipart_no_boundary(self):
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
self._msgobj('msg_25.txt')
def test_lying_multipart(self):
with self.assertRaises(errors.NoBoundaryInMultipartDefect):
self._msgobj('msg_41.txt')
def test_missing_start_boundary(self):
with self.assertRaises(errors.StartBoundaryNotFoundDefect):
self._msgobj('msg_42.txt')
def test_first_line_is_continuation_header(self):
m = ' Line 1\nLine 2\nLine 3'
with self.assertRaises(errors.FirstHeaderLineIsContinuationDefect):
msg = email.message_from_string(m, policy=email.policy.strict)
# Test RFC 2047 header encoding and decoding
class TestRFC2047(TestEmailBase):
@ -2997,6 +3054,25 @@ Here's the message body
g.flatten(msg, linesep='\r\n')
self.assertEqual(s.getvalue(), text)
def test_crlf_control_via_policy(self):
with openfile('msg_26.txt', newline='\n') as fp:
text = fp.read()
msg = email.message_from_string(text)
s = StringIO()
g = email.generator.Generator(s, policy=email.policy.SMTP)
g.flatten(msg)
self.assertEqual(s.getvalue(), text)
def test_flatten_linesep_overrides_policy(self):
# msg_27 is lf separated
with openfile('msg_27.txt', newline='\n') as fp:
text = fp.read()
msg = email.message_from_string(text)
s = StringIO()
g = email.generator.Generator(s, policy=email.policy.SMTP)
g.flatten(msg, linesep='\n')
self.assertEqual(s.getvalue(), text)
maxDiff = None
def test_multipart_digest_with_extra_mime_headers(self):
@ -3463,6 +3539,44 @@ class Test8BitBytesHandling(unittest.TestCase):
g.flatten(msg)
self.assertEqual(s.getvalue(), source)
def test_crlf_control_via_policy(self):
# msg_26 is crlf terminated
with openfile('msg_26.txt', 'rb') as fp:
text = fp.read()
msg = email.message_from_bytes(text)
s = BytesIO()
g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
g.flatten(msg)
self.assertEqual(s.getvalue(), text)
def test_flatten_linesep_overrides_policy(self):
# msg_27 is lf separated
with openfile('msg_27.txt', 'rb') as fp:
text = fp.read()
msg = email.message_from_bytes(text)
s = BytesIO()
g = email.generator.BytesGenerator(s, policy=email.policy.SMTP)
g.flatten(msg, linesep='\n')
self.assertEqual(s.getvalue(), text)
def test_must_be_7bit_handles_unknown_8bit(self):
msg = email.message_from_bytes(self.non_latin_bin_msg)
out = BytesIO()
g = email.generator.BytesGenerator(out,
policy=email.policy.default.clone(must_be_7bit=True))
g.flatten(msg)
self.assertEqual(out.getvalue(),
self.non_latin_bin_msg_as7bit_wrapped.encode('ascii'))
def test_must_be_7bit_transforms_8bit_cte(self):
msg = email.message_from_bytes(self.latin_bin_msg)
out = BytesIO()
g = email.generator.BytesGenerator(out,
policy=email.policy.default.clone(must_be_7bit=True))
g.flatten(msg)
self.assertEqual(out.getvalue(),
self.latin_bin_msg_as7bit.encode('ascii'))
maxDiff = None