mirror of
https://github.com/python/cpython.git
synced 2025-10-09 16:34:44 +00:00
#21800: Add RFC 6855 support to imaplib.
Original patch by Milan Oberkirch, updated by myself and Maciej Szulik.
This commit is contained in:
parent
18c30a29f8
commit
a6429db4b8
5 changed files with 179 additions and 21 deletions
|
@ -66,6 +66,7 @@ Commands = {
|
|||
'CREATE': ('AUTH', 'SELECTED'),
|
||||
'DELETE': ('AUTH', 'SELECTED'),
|
||||
'DELETEACL': ('AUTH', 'SELECTED'),
|
||||
'ENABLE': ('AUTH', ),
|
||||
'EXAMINE': ('AUTH', 'SELECTED'),
|
||||
'EXPUNGE': ('SELECTED',),
|
||||
'FETCH': ('SELECTED',),
|
||||
|
@ -107,12 +108,17 @@ InternalDate = re.compile(br'.*INTERNALDATE "'
|
|||
br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
|
||||
br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
|
||||
br'"')
|
||||
# Literal is no longer used; kept for backward compatibility.
|
||||
Literal = re.compile(br'.*{(?P<size>\d+)}$', re.ASCII)
|
||||
MapCRLF = re.compile(br'\r\n|\r|\n')
|
||||
Response_code = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
|
||||
Untagged_response = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
|
||||
# Untagged_status is no longer used; kept for backward compatibility
|
||||
Untagged_status = re.compile(
|
||||
br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?', re.ASCII)
|
||||
# We compile these in _mode_xxx.
|
||||
_Literal = br'.*{(?P<size>\d+)}$'
|
||||
_Untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
|
||||
|
||||
|
||||
|
||||
|
@ -166,7 +172,7 @@ class IMAP4:
|
|||
class abort(error): pass # Service errors - close and retry
|
||||
class readonly(abort): pass # Mailbox status changed to READ-ONLY
|
||||
|
||||
def __init__(self, host = '', port = IMAP4_PORT):
|
||||
def __init__(self, host='', port=IMAP4_PORT):
|
||||
self.debug = Debug
|
||||
self.state = 'LOGOUT'
|
||||
self.literal = None # A literal argument to a command
|
||||
|
@ -176,6 +182,7 @@ class IMAP4:
|
|||
self.is_readonly = False # READ-ONLY desired state
|
||||
self.tagnum = 0
|
||||
self._tls_established = False
|
||||
self._mode_ascii()
|
||||
|
||||
# Open socket to server.
|
||||
|
||||
|
@ -190,6 +197,19 @@ class IMAP4:
|
|||
pass
|
||||
raise
|
||||
|
||||
def _mode_ascii(self):
|
||||
self.utf8_enabled = False
|
||||
self._encoding = 'ascii'
|
||||
self.Literal = re.compile(_Literal, re.ASCII)
|
||||
self.Untagged_status = re.compile(_Untagged_status, re.ASCII)
|
||||
|
||||
|
||||
def _mode_utf8(self):
|
||||
self.utf8_enabled = True
|
||||
self._encoding = 'utf-8'
|
||||
self.Literal = re.compile(_Literal)
|
||||
self.Untagged_status = re.compile(_Untagged_status)
|
||||
|
||||
|
||||
def _connect(self):
|
||||
# Create unique tag for this session,
|
||||
|
@ -360,7 +380,10 @@ class IMAP4:
|
|||
date_time = Time2Internaldate(date_time)
|
||||
else:
|
||||
date_time = None
|
||||
self.literal = MapCRLF.sub(CRLF, message)
|
||||
literal = MapCRLF.sub(CRLF, message)
|
||||
if self.utf8_enabled:
|
||||
literal = b'UTF8 (' + literal + b')'
|
||||
self.literal = literal
|
||||
return self._simple_command(name, mailbox, flags, date_time)
|
||||
|
||||
|
||||
|
@ -455,6 +478,18 @@ class IMAP4:
|
|||
"""
|
||||
return self._simple_command('DELETEACL', mailbox, who)
|
||||
|
||||
def enable(self, capability):
|
||||
"""Send an RFC5161 enable string to the server.
|
||||
|
||||
(typ, [data]) = <intance>.enable(capability)
|
||||
"""
|
||||
if 'ENABLE' not in self.capabilities:
|
||||
raise IMAP4.error("Server does not support ENABLE")
|
||||
typ, data = self._simple_command('ENABLE', capability)
|
||||
if typ == 'OK' and 'UTF8=ACCEPT' in capability.upper():
|
||||
self._mode_utf8()
|
||||
return typ, data
|
||||
|
||||
def expunge(self):
|
||||
"""Permanently remove deleted items from selected mailbox.
|
||||
|
||||
|
@ -561,7 +596,7 @@ class IMAP4:
|
|||
def _CRAM_MD5_AUTH(self, challenge):
|
||||
""" Authobject to use with CRAM-MD5 authentication. """
|
||||
import hmac
|
||||
pwd = (self.password.encode('ASCII') if isinstance(self.password, str)
|
||||
pwd = (self.password.encode('utf-8') if isinstance(self.password, str)
|
||||
else self.password)
|
||||
return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest()
|
||||
|
||||
|
@ -661,9 +696,12 @@ class IMAP4:
|
|||
(typ, [data]) = <instance>.search(charset, criterion, ...)
|
||||
|
||||
'data' is space separated list of matching message numbers.
|
||||
If UTF8 is enabled, charset MUST be None.
|
||||
"""
|
||||
name = 'SEARCH'
|
||||
if charset:
|
||||
if self.utf8_enabled:
|
||||
raise IMAP4.error("Non-None charset not valid in UTF8 mode")
|
||||
typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
|
||||
else:
|
||||
typ, dat = self._simple_command(name, *criteria)
|
||||
|
@ -877,7 +915,7 @@ class IMAP4:
|
|||
def _check_bye(self):
|
||||
bye = self.untagged_responses.get('BYE')
|
||||
if bye:
|
||||
raise self.abort(bye[-1].decode('ascii', 'replace'))
|
||||
raise self.abort(bye[-1].decode(self._encoding, 'replace'))
|
||||
|
||||
|
||||
def _command(self, name, *args):
|
||||
|
@ -898,12 +936,12 @@ class IMAP4:
|
|||
raise self.readonly('mailbox status changed to READ-ONLY')
|
||||
|
||||
tag = self._new_tag()
|
||||
name = bytes(name, 'ASCII')
|
||||
name = bytes(name, self._encoding)
|
||||
data = tag + b' ' + name
|
||||
for arg in args:
|
||||
if arg is None: continue
|
||||
if isinstance(arg, str):
|
||||
arg = bytes(arg, "ASCII")
|
||||
arg = bytes(arg, self._encoding)
|
||||
data = data + b' ' + arg
|
||||
|
||||
literal = self.literal
|
||||
|
@ -913,7 +951,7 @@ class IMAP4:
|
|||
literator = literal
|
||||
else:
|
||||
literator = None
|
||||
data = data + bytes(' {%s}' % len(literal), 'ASCII')
|
||||
data = data + bytes(' {%s}' % len(literal), self._encoding)
|
||||
|
||||
if __debug__:
|
||||
if self.debug >= 4:
|
||||
|
@ -978,7 +1016,7 @@ class IMAP4:
|
|||
typ, dat = self.capability()
|
||||
if dat == [None]:
|
||||
raise self.error('no CAPABILITY response from server')
|
||||
dat = str(dat[-1], "ASCII")
|
||||
dat = str(dat[-1], self._encoding)
|
||||
dat = dat.upper()
|
||||
self.capabilities = tuple(dat.split())
|
||||
|
||||
|
@ -997,10 +1035,10 @@ class IMAP4:
|
|||
if self._match(self.tagre, resp):
|
||||
tag = self.mo.group('tag')
|
||||
if not tag in self.tagged_commands:
|
||||
raise self.abort('unexpected tagged response: %s' % resp)
|
||||
raise self.abort('unexpected tagged response: %r' % resp)
|
||||
|
||||
typ = self.mo.group('type')
|
||||
typ = str(typ, 'ASCII')
|
||||
typ = str(typ, self._encoding)
|
||||
dat = self.mo.group('data')
|
||||
self.tagged_commands[tag] = (typ, [dat])
|
||||
else:
|
||||
|
@ -1009,7 +1047,7 @@ class IMAP4:
|
|||
# '*' (untagged) responses?
|
||||
|
||||
if not self._match(Untagged_response, resp):
|
||||
if self._match(Untagged_status, resp):
|
||||
if self._match(self.Untagged_status, resp):
|
||||
dat2 = self.mo.group('data2')
|
||||
|
||||
if self.mo is None:
|
||||
|
@ -1019,17 +1057,17 @@ class IMAP4:
|
|||
self.continuation_response = self.mo.group('data')
|
||||
return None # NB: indicates continuation
|
||||
|
||||
raise self.abort("unexpected response: '%s'" % resp)
|
||||
raise self.abort("unexpected response: %r" % resp)
|
||||
|
||||
typ = self.mo.group('type')
|
||||
typ = str(typ, 'ascii')
|
||||
typ = str(typ, self._encoding)
|
||||
dat = self.mo.group('data')
|
||||
if dat is None: dat = b'' # Null untagged response
|
||||
if dat2: dat = dat + b' ' + dat2
|
||||
|
||||
# Is there a literal to come?
|
||||
|
||||
while self._match(Literal, dat):
|
||||
while self._match(self.Literal, dat):
|
||||
|
||||
# Read literal direct from connection.
|
||||
|
||||
|
@ -1053,7 +1091,7 @@ class IMAP4:
|
|||
|
||||
if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
|
||||
typ = self.mo.group('type')
|
||||
typ = str(typ, "ASCII")
|
||||
typ = str(typ, self._encoding)
|
||||
self._append_untagged(typ, self.mo.group('data'))
|
||||
|
||||
if __debug__:
|
||||
|
@ -1123,7 +1161,7 @@ class IMAP4:
|
|||
|
||||
def _new_tag(self):
|
||||
|
||||
tag = self.tagpre + bytes(str(self.tagnum), 'ASCII')
|
||||
tag = self.tagpre + bytes(str(self.tagnum), self._encoding)
|
||||
self.tagnum = self.tagnum + 1
|
||||
self.tagged_commands[tag] = None
|
||||
return tag
|
||||
|
@ -1213,7 +1251,8 @@ if HAVE_SSL:
|
|||
"""
|
||||
|
||||
|
||||
def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None, certfile=None, ssl_context=None):
|
||||
def __init__(self, host='', port=IMAP4_SSL_PORT, keyfile=None,
|
||||
certfile=None, ssl_context=None):
|
||||
if ssl_context is not None and keyfile is not None:
|
||||
raise ValueError("ssl_context and keyfile arguments are mutually "
|
||||
"exclusive")
|
||||
|
@ -1251,7 +1290,7 @@ class IMAP4_stream(IMAP4):
|
|||
|
||||
Instantiate with: IMAP4_stream(command)
|
||||
|
||||
where "command" is a string that can be passed to subprocess.Popen()
|
||||
"command" - a string that can be passed to subprocess.Popen()
|
||||
|
||||
for more documentation see the docstring of the parent class IMAP4.
|
||||
"""
|
||||
|
@ -1328,7 +1367,7 @@ class _Authenticator:
|
|||
#
|
||||
oup = b''
|
||||
if isinstance(inp, str):
|
||||
inp = inp.encode('ASCII')
|
||||
inp = inp.encode('utf-8')
|
||||
while inp:
|
||||
if len(inp) > 48:
|
||||
t = inp[:48]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue