mirror of
https://github.com/python/cpython.git
synced 2025-10-27 16:57:08 +00:00
asyncio: New error handling API. Issue #20681.
This commit is contained in:
parent
6acc5e1330
commit
569efa2e4b
15 changed files with 491 additions and 99 deletions
|
|
@ -122,6 +122,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
self._internal_fds = 0
|
||||
self._running = False
|
||||
self._clock_resolution = time.get_clock_info('monotonic').resolution
|
||||
self._exception_handler = None
|
||||
|
||||
def _make_socket_transport(self, sock, protocol, waiter=None, *,
|
||||
extra=None, server=None):
|
||||
|
|
@ -254,7 +255,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
"""Like call_later(), but uses an absolute time."""
|
||||
if tasks.iscoroutinefunction(callback):
|
||||
raise TypeError("coroutines cannot be used with call_at()")
|
||||
timer = events.TimerHandle(when, callback, args)
|
||||
timer = events.TimerHandle(when, callback, args, self)
|
||||
heapq.heappush(self._scheduled, timer)
|
||||
return timer
|
||||
|
||||
|
|
@ -270,7 +271,7 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
"""
|
||||
if tasks.iscoroutinefunction(callback):
|
||||
raise TypeError("coroutines cannot be used with call_soon()")
|
||||
handle = events.Handle(callback, args)
|
||||
handle = events.Handle(callback, args, self)
|
||||
self._ready.append(handle)
|
||||
return handle
|
||||
|
||||
|
|
@ -625,6 +626,97 @@ class BaseEventLoop(events.AbstractEventLoop):
|
|||
protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
|
||||
return transport, protocol
|
||||
|
||||
def set_exception_handler(self, handler):
|
||||
"""Set handler as the new event loop exception handler.
|
||||
|
||||
If handler is None, the default exception handler will
|
||||
be set.
|
||||
|
||||
If handler is a callable object, it should have a
|
||||
matching signature to '(loop, context)', where 'loop'
|
||||
will be a reference to the active event loop, 'context'
|
||||
will be a dict object (see `call_exception_handler()`
|
||||
documentation for details about context).
|
||||
"""
|
||||
if handler is not None and not callable(handler):
|
||||
raise TypeError('A callable object or None is expected, '
|
||||
'got {!r}'.format(handler))
|
||||
self._exception_handler = handler
|
||||
|
||||
def default_exception_handler(self, context):
|
||||
"""Default exception handler.
|
||||
|
||||
This is called when an exception occurs and no exception
|
||||
handler is set, and can be called by a custom exception
|
||||
handler that wants to defer to the default behavior.
|
||||
|
||||
context parameter has the same meaning as in
|
||||
`call_exception_handler()`.
|
||||
"""
|
||||
message = context.get('message')
|
||||
if not message:
|
||||
message = 'Unhandled exception in event loop'
|
||||
|
||||
exception = context.get('exception')
|
||||
if exception is not None:
|
||||
exc_info = (type(exception), exception, exception.__traceback__)
|
||||
else:
|
||||
exc_info = False
|
||||
|
||||
log_lines = [message]
|
||||
for key in sorted(context):
|
||||
if key in {'message', 'exception'}:
|
||||
continue
|
||||
log_lines.append('{}: {!r}'.format(key, context[key]))
|
||||
|
||||
logger.error('\n'.join(log_lines), exc_info=exc_info)
|
||||
|
||||
def call_exception_handler(self, context):
|
||||
"""Call the current event loop exception handler.
|
||||
|
||||
context is a dict object containing the following keys
|
||||
(new keys maybe introduced later):
|
||||
- 'message': Error message;
|
||||
- 'exception' (optional): Exception object;
|
||||
- 'future' (optional): Future instance;
|
||||
- 'handle' (optional): Handle instance;
|
||||
- 'protocol' (optional): Protocol instance;
|
||||
- 'transport' (optional): Transport instance;
|
||||
- 'socket' (optional): Socket instance.
|
||||
|
||||
Note: this method should not be overloaded in subclassed
|
||||
event loops. For any custom exception handling, use
|
||||
`set_exception_handler()` method.
|
||||
"""
|
||||
if self._exception_handler is None:
|
||||
try:
|
||||
self.default_exception_handler(context)
|
||||
except Exception:
|
||||
# Second protection layer for unexpected errors
|
||||
# in the default implementation, as well as for subclassed
|
||||
# event loops with overloaded "default_exception_handler".
|
||||
logger.error('Exception in default exception handler',
|
||||
exc_info=True)
|
||||
else:
|
||||
try:
|
||||
self._exception_handler(self, context)
|
||||
except Exception as exc:
|
||||
# Exception in the user set custom exception handler.
|
||||
try:
|
||||
# Let's try default handler.
|
||||
self.default_exception_handler({
|
||||
'message': 'Unhandled error in exception handler',
|
||||
'exception': exc,
|
||||
'context': context,
|
||||
})
|
||||
except Exception:
|
||||
# Guard 'default_exception_handler' in case it's
|
||||
# overloaded.
|
||||
logger.error('Exception in default exception handler '
|
||||
'while handling an unexpected error '
|
||||
'in custom exception handler',
|
||||
exc_info=True)
|
||||
|
||||
def _add_callback(self, handle):
|
||||
"""Add a Handle to ready or scheduled."""
|
||||
assert isinstance(handle, events.Handle), 'A Handle is required here'
|
||||
|
|
|
|||
|
|
@ -19,10 +19,11 @@ from .log import logger
|
|||
class Handle:
|
||||
"""Object returned by callback registration methods."""
|
||||
|
||||
__slots__ = ['_callback', '_args', '_cancelled']
|
||||
__slots__ = ['_callback', '_args', '_cancelled', '_loop']
|
||||
|
||||
def __init__(self, callback, args):
|
||||
def __init__(self, callback, args, loop):
|
||||
assert not isinstance(callback, Handle), 'A Handle is not a callback'
|
||||
self._loop = loop
|
||||
self._callback = callback
|
||||
self._args = args
|
||||
self._cancelled = False
|
||||
|
|
@ -39,9 +40,14 @@ class Handle:
|
|||
def _run(self):
|
||||
try:
|
||||
self._callback(*self._args)
|
||||
except Exception:
|
||||
logger.exception('Exception in callback %s %r',
|
||||
self._callback, self._args)
|
||||
except Exception as exc:
|
||||
msg = 'Exception in callback {}{!r}'.format(self._callback,
|
||||
self._args)
|
||||
self._loop.call_exception_handler({
|
||||
'message': msg,
|
||||
'exception': exc,
|
||||
'handle': self,
|
||||
})
|
||||
self = None # Needed to break cycles when an exception occurs.
|
||||
|
||||
|
||||
|
|
@ -50,9 +56,9 @@ class TimerHandle(Handle):
|
|||
|
||||
__slots__ = ['_when']
|
||||
|
||||
def __init__(self, when, callback, args):
|
||||
def __init__(self, when, callback, args, loop):
|
||||
assert when is not None
|
||||
super().__init__(callback, args)
|
||||
super().__init__(callback, args, loop)
|
||||
|
||||
self._when = when
|
||||
|
||||
|
|
@ -328,6 +334,17 @@ class AbstractEventLoop:
|
|||
def remove_signal_handler(self, sig):
|
||||
raise NotImplementedError
|
||||
|
||||
# Error handlers.
|
||||
|
||||
def set_exception_handler(self, handler):
|
||||
raise NotImplementedError
|
||||
|
||||
def default_exception_handler(self, context):
|
||||
raise NotImplementedError
|
||||
|
||||
def call_exception_handler(self, context):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class AbstractEventLoopPolicy:
|
||||
"""Abstract policy for accessing the event loop."""
|
||||
|
|
|
|||
|
|
@ -83,9 +83,10 @@ class _TracebackLogger:
|
|||
in a discussion about closing files when they are collected.
|
||||
"""
|
||||
|
||||
__slots__ = ['exc', 'tb']
|
||||
__slots__ = ['exc', 'tb', 'loop']
|
||||
|
||||
def __init__(self, exc):
|
||||
def __init__(self, exc, loop):
|
||||
self.loop = loop
|
||||
self.exc = exc
|
||||
self.tb = None
|
||||
|
||||
|
|
@ -102,8 +103,11 @@ class _TracebackLogger:
|
|||
|
||||
def __del__(self):
|
||||
if self.tb:
|
||||
logger.error('Future/Task exception was never retrieved:\n%s',
|
||||
''.join(self.tb))
|
||||
msg = 'Future/Task exception was never retrieved:\n{tb}'
|
||||
context = {
|
||||
'message': msg.format(tb=''.join(self.tb)),
|
||||
}
|
||||
self.loop.call_exception_handler(context)
|
||||
|
||||
|
||||
class Future:
|
||||
|
|
@ -173,8 +177,12 @@ class Future:
|
|||
# has consumed the exception
|
||||
return
|
||||
exc = self._exception
|
||||
logger.error('Future/Task exception was never retrieved:',
|
||||
exc_info=(exc.__class__, exc, exc.__traceback__))
|
||||
context = {
|
||||
'message': 'Future/Task exception was never retrieved',
|
||||
'exception': exc,
|
||||
'future': self,
|
||||
}
|
||||
self._loop.call_exception_handler(context)
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel the future and schedule callbacks.
|
||||
|
|
@ -309,7 +317,7 @@ class Future:
|
|||
if _PY34:
|
||||
self._log_traceback = True
|
||||
else:
|
||||
self._tb_logger = _TracebackLogger(exception)
|
||||
self._tb_logger = _TracebackLogger(exception, self._loop)
|
||||
# Arrange for the logger to be activated after all callbacks
|
||||
# have had a chance to call result() or exception().
|
||||
self._loop.call_soon(self._tb_logger.activate)
|
||||
|
|
|
|||
|
|
@ -56,7 +56,12 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
|
|||
|
||||
def _fatal_error(self, exc):
|
||||
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
|
||||
logger.exception('Fatal error for %s', self)
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'Fatal transport error',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
self._force_close(exc)
|
||||
|
||||
def _force_close(self, exc):
|
||||
|
|
@ -103,8 +108,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
|
|||
self._protocol_paused = True
|
||||
try:
|
||||
self._protocol.pause_writing()
|
||||
except Exception:
|
||||
logger.exception('pause_writing() failed')
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.pause_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def _maybe_resume_protocol(self):
|
||||
if (self._protocol_paused and
|
||||
|
|
@ -112,8 +122,13 @@ class _ProactorBasePipeTransport(transports.BaseTransport):
|
|||
self._protocol_paused = False
|
||||
try:
|
||||
self._protocol.resume_writing()
|
||||
except Exception:
|
||||
logger.exception('resume_writing() failed')
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.resume_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def set_write_buffer_limits(self, high=None, low=None):
|
||||
if high is None:
|
||||
|
|
@ -465,9 +480,13 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
|
|||
conn, protocol,
|
||||
extra={'peername': addr}, server=server)
|
||||
f = self._proactor.accept(sock)
|
||||
except OSError:
|
||||
except OSError as exc:
|
||||
if sock.fileno() != -1:
|
||||
logger.exception('Accept failed')
|
||||
self.call_exception_handler({
|
||||
'message': 'Accept failed',
|
||||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
sock.close()
|
||||
except futures.CancelledError:
|
||||
sock.close()
|
||||
|
|
|
|||
|
|
@ -112,7 +112,11 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
# Some platforms (e.g. Linux keep reporting the FD as
|
||||
# ready, so we remove the read handler temporarily.
|
||||
# We'll try again in a while.
|
||||
logger.exception('Accept out of system resource (%s)', exc)
|
||||
self.call_exception_handler({
|
||||
'message': 'socket.accept() out of system resource',
|
||||
'exception': exc,
|
||||
'socket': sock,
|
||||
})
|
||||
self.remove_reader(sock.fileno())
|
||||
self.call_later(constants.ACCEPT_RETRY_DELAY,
|
||||
self._start_serving,
|
||||
|
|
@ -132,7 +136,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
"""Add a reader callback."""
|
||||
handle = events.Handle(callback, args)
|
||||
handle = events.Handle(callback, args, self)
|
||||
try:
|
||||
key = self._selector.get_key(fd)
|
||||
except KeyError:
|
||||
|
|
@ -167,7 +171,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
|
|||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
"""Add a writer callback.."""
|
||||
handle = events.Handle(callback, args)
|
||||
handle = events.Handle(callback, args, self)
|
||||
try:
|
||||
key = self._selector.get_key(fd)
|
||||
except KeyError:
|
||||
|
|
@ -364,8 +368,13 @@ class _FlowControlMixin(transports.Transport):
|
|||
self._protocol_paused = True
|
||||
try:
|
||||
self._protocol.pause_writing()
|
||||
except Exception:
|
||||
logger.exception('pause_writing() failed')
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.pause_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def _maybe_resume_protocol(self):
|
||||
if (self._protocol_paused and
|
||||
|
|
@ -373,8 +382,13 @@ class _FlowControlMixin(transports.Transport):
|
|||
self._protocol_paused = False
|
||||
try:
|
||||
self._protocol.resume_writing()
|
||||
except Exception:
|
||||
logger.exception('resume_writing() failed')
|
||||
except Exception as exc:
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'protocol.resume_writing() failed',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
|
||||
def set_write_buffer_limits(self, high=None, low=None):
|
||||
if high is None:
|
||||
|
|
@ -435,7 +449,12 @@ class _SelectorTransport(_FlowControlMixin, transports.Transport):
|
|||
def _fatal_error(self, exc):
|
||||
# Should be called from exception handler only.
|
||||
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
|
||||
logger.exception('Fatal error for %s', self)
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'Fatal transport error',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
self._force_close(exc)
|
||||
|
||||
def _force_close(self, exc):
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import collections
|
|||
import contextlib
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
import socketserver
|
||||
import sys
|
||||
|
|
@ -301,7 +302,7 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
raise AssertionError("Time generator is not finished")
|
||||
|
||||
def add_reader(self, fd, callback, *args):
|
||||
self.readers[fd] = events.Handle(callback, args)
|
||||
self.readers[fd] = events.Handle(callback, args, self)
|
||||
|
||||
def remove_reader(self, fd):
|
||||
self.remove_reader_count[fd] += 1
|
||||
|
|
@ -320,7 +321,7 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
handle._args, args)
|
||||
|
||||
def add_writer(self, fd, callback, *args):
|
||||
self.writers[fd] = events.Handle(callback, args)
|
||||
self.writers[fd] = events.Handle(callback, args, self)
|
||||
|
||||
def remove_writer(self, fd):
|
||||
self.remove_writer_count[fd] += 1
|
||||
|
|
@ -362,3 +363,16 @@ class TestLoop(base_events.BaseEventLoop):
|
|||
|
||||
def MockCallback(**kwargs):
|
||||
return unittest.mock.Mock(spec=['__call__'], **kwargs)
|
||||
|
||||
|
||||
class MockPattern(str):
|
||||
"""A regex based str with a fuzzy __eq__.
|
||||
|
||||
Use this helper with 'mock.assert_called_with', or anywhere
|
||||
where a regexp comparison between strings is needed.
|
||||
|
||||
For instance:
|
||||
mock_call.assert_called_with(MockPattern('spam.*ham'))
|
||||
"""
|
||||
def __eq__(self, other):
|
||||
return bool(re.search(str(self), other, re.S))
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
|
|||
except ValueError as exc:
|
||||
raise RuntimeError(str(exc))
|
||||
|
||||
handle = events.Handle(callback, args)
|
||||
handle = events.Handle(callback, args, self)
|
||||
self._signal_handlers[sig] = handle
|
||||
|
||||
try:
|
||||
|
|
@ -294,7 +294,12 @@ class _UnixReadPipeTransport(transports.ReadTransport):
|
|||
def _fatal_error(self, exc):
|
||||
# should be called by exception handler only
|
||||
if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
|
||||
logger.exception('Fatal error for %s', self)
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'Fatal transport error',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
self._close(exc)
|
||||
|
||||
def _close(self, exc):
|
||||
|
|
@ -441,7 +446,12 @@ class _UnixWritePipeTransport(selector_events._FlowControlMixin,
|
|||
def _fatal_error(self, exc):
|
||||
# should be called by exception handler only
|
||||
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
|
||||
logger.exception('Fatal error for %s', self)
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'Fatal transport error',
|
||||
'exception': exc,
|
||||
'transport': self,
|
||||
'protocol': self._protocol,
|
||||
})
|
||||
self._close(exc)
|
||||
|
||||
def _close(self, exc=None):
|
||||
|
|
@ -582,8 +592,14 @@ class BaseChildWatcher(AbstractChildWatcher):
|
|||
def _sig_chld(self):
|
||||
try:
|
||||
self._do_waitpid_all()
|
||||
except Exception:
|
||||
logger.exception('Unknown exception in SIGCHLD handler')
|
||||
except Exception as exc:
|
||||
# self._loop should always be available here
|
||||
# as '_sig_chld' is added as a signal handler
|
||||
# in 'attach_loop'
|
||||
self._loop.call_exception_handler({
|
||||
'message': 'Unknown exception in SIGCHLD handler',
|
||||
'exception': exc,
|
||||
})
|
||||
|
||||
def _compute_returncode(self, status):
|
||||
if os.WIFSIGNALED(status):
|
||||
|
|
|
|||
|
|
@ -156,9 +156,13 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
|
|||
if pipe is None:
|
||||
return
|
||||
f = self._proactor.accept_pipe(pipe)
|
||||
except OSError:
|
||||
except OSError as exc:
|
||||
if pipe and pipe.fileno() != -1:
|
||||
logger.exception('Pipe accept failed')
|
||||
self.call_exception_handler({
|
||||
'message': 'Pipe accept failed',
|
||||
'exception': exc,
|
||||
'pipe': pipe,
|
||||
})
|
||||
pipe.close()
|
||||
except futures.CancelledError:
|
||||
if pipe:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue