mirror of
https://github.com/python/cpython.git
synced 2025-08-01 07:33:08 +00:00
[3.13] gh-121723: Relax constraints on queue objects for logging.handlers.QueueHandler
. (GH-122154) (GH-122603)
(cherry picked from commit fb864c76cd
)
This commit is contained in:
parent
9f488f9358
commit
56435a88c4
4 changed files with 123 additions and 49 deletions
|
@ -753,9 +753,12 @@ The ``queue`` and ``listener`` keys are optional.
|
||||||
|
|
||||||
If the ``queue`` key is present, the corresponding value can be one of the following:
|
If the ``queue`` key is present, the corresponding value can be one of the following:
|
||||||
|
|
||||||
* An actual instance of :class:`queue.Queue` or a subclass thereof. This is of course
|
* An object implementing the :class:`queue.Queue` public API. For instance,
|
||||||
only possible if you are constructing or modifying the configuration dictionary in
|
this may be an actual instance of :class:`queue.Queue` or a subclass thereof,
|
||||||
code.
|
or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.
|
||||||
|
|
||||||
|
This is of course only possible if you are constructing or modifying
|
||||||
|
the configuration dictionary in code.
|
||||||
|
|
||||||
* A string that resolves to a callable which, when called with no arguments, returns
|
* A string that resolves to a callable which, when called with no arguments, returns
|
||||||
the :class:`queue.Queue` instance to use. That callable could be a
|
the :class:`queue.Queue` instance to use. That callable could be a
|
||||||
|
|
|
@ -497,6 +497,33 @@ class BaseConfigurator(object):
|
||||||
value = tuple(value)
|
value = tuple(value)
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
def _is_queue_like_object(obj):
|
||||||
|
"""Check that *obj* implements the Queue API."""
|
||||||
|
if isinstance(obj, queue.Queue):
|
||||||
|
return True
|
||||||
|
# defer importing multiprocessing as much as possible
|
||||||
|
from multiprocessing.queues import Queue as MPQueue
|
||||||
|
if isinstance(obj, MPQueue):
|
||||||
|
return True
|
||||||
|
# Depending on the multiprocessing start context, we cannot create
|
||||||
|
# a multiprocessing.managers.BaseManager instance 'mm' to get the
|
||||||
|
# runtime type of mm.Queue() or mm.JoinableQueue() (see gh-119819).
|
||||||
|
#
|
||||||
|
# Since we only need an object implementing the Queue API, we only
|
||||||
|
# do a protocol check, but we do not use typing.runtime_checkable()
|
||||||
|
# and typing.Protocol to reduce import time (see gh-121723).
|
||||||
|
#
|
||||||
|
# Ideally, we would have wanted to simply use strict type checking
|
||||||
|
# instead of a protocol-based type checking since the latter does
|
||||||
|
# not check the method signatures.
|
||||||
|
queue_interface = [
|
||||||
|
'empty', 'full', 'get', 'get_nowait',
|
||||||
|
'put', 'put_nowait', 'join', 'qsize',
|
||||||
|
'task_done',
|
||||||
|
]
|
||||||
|
return all(callable(getattr(obj, method, None))
|
||||||
|
for method in queue_interface)
|
||||||
|
|
||||||
class DictConfigurator(BaseConfigurator):
|
class DictConfigurator(BaseConfigurator):
|
||||||
"""
|
"""
|
||||||
Configure logging using a dictionary-like object to describe the
|
Configure logging using a dictionary-like object to describe the
|
||||||
|
@ -791,32 +818,8 @@ class DictConfigurator(BaseConfigurator):
|
||||||
if '()' not in qspec:
|
if '()' not in qspec:
|
||||||
raise TypeError('Invalid queue specifier %r' % qspec)
|
raise TypeError('Invalid queue specifier %r' % qspec)
|
||||||
config['queue'] = self.configure_custom(dict(qspec))
|
config['queue'] = self.configure_custom(dict(qspec))
|
||||||
else:
|
elif not _is_queue_like_object(qspec):
|
||||||
from multiprocessing.queues import Queue as MPQueue
|
raise TypeError('Invalid queue specifier %r' % qspec)
|
||||||
|
|
||||||
if not isinstance(qspec, (queue.Queue, MPQueue)):
|
|
||||||
# Safely check if 'qspec' is an instance of Manager.Queue
|
|
||||||
# / Manager.JoinableQueue
|
|
||||||
|
|
||||||
from multiprocessing import Manager as MM
|
|
||||||
from multiprocessing.managers import BaseProxy
|
|
||||||
|
|
||||||
# if it's not an instance of BaseProxy, it also can't be
|
|
||||||
# an instance of Manager.Queue / Manager.JoinableQueue
|
|
||||||
if isinstance(qspec, BaseProxy):
|
|
||||||
# Sometimes manager or queue creation might fail
|
|
||||||
# (e.g. see issue gh-120868). In that case, any
|
|
||||||
# exception during the creation of these queues will
|
|
||||||
# propagate up to the caller and be wrapped in a
|
|
||||||
# `ValueError`, whose cause will indicate the details of
|
|
||||||
# the failure.
|
|
||||||
mm = MM()
|
|
||||||
proxy_queue = mm.Queue()
|
|
||||||
proxy_joinable_queue = mm.JoinableQueue()
|
|
||||||
if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))):
|
|
||||||
raise TypeError('Invalid queue specifier %r' % qspec)
|
|
||||||
else:
|
|
||||||
raise TypeError('Invalid queue specifier %r' % qspec)
|
|
||||||
|
|
||||||
if 'listener' in config:
|
if 'listener' in config:
|
||||||
lspec = config['listener']
|
lspec = config['listener']
|
||||||
|
|
|
@ -2367,6 +2367,26 @@ class CustomListener(logging.handlers.QueueListener):
|
||||||
class CustomQueue(queue.Queue):
|
class CustomQueue(queue.Queue):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class CustomQueueProtocol:
|
||||||
|
def __init__(self, maxsize=0):
|
||||||
|
self.queue = queue.Queue(maxsize)
|
||||||
|
|
||||||
|
def __getattr__(self, attribute):
|
||||||
|
queue = object.__getattribute__(self, 'queue')
|
||||||
|
return getattr(queue, attribute)
|
||||||
|
|
||||||
|
class CustomQueueFakeProtocol(CustomQueueProtocol):
|
||||||
|
# An object implementing the Queue API (incorrect signatures).
|
||||||
|
# The object will be considered a valid queue class since we
|
||||||
|
# do not check the signatures (only callability of methods)
|
||||||
|
# but will NOT be usable in production since a TypeError will
|
||||||
|
# be raised due to a missing argument.
|
||||||
|
def empty(self, x):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class CustomQueueWrongProtocol(CustomQueueProtocol):
|
||||||
|
empty = None
|
||||||
|
|
||||||
def queueMaker():
|
def queueMaker():
|
||||||
return queue.Queue()
|
return queue.Queue()
|
||||||
|
|
||||||
|
@ -3900,18 +3920,16 @@ class ConfigDictTest(BaseTest):
|
||||||
@threading_helper.requires_working_threading()
|
@threading_helper.requires_working_threading()
|
||||||
@support.requires_subprocess()
|
@support.requires_subprocess()
|
||||||
def test_config_queue_handler(self):
|
def test_config_queue_handler(self):
|
||||||
q = CustomQueue()
|
qs = [CustomQueue(), CustomQueueProtocol()]
|
||||||
dq = {
|
dqs = [{'()': f'{__name__}.{cls}', 'maxsize': 10}
|
||||||
'()': __name__ + '.CustomQueue',
|
for cls in ['CustomQueue', 'CustomQueueProtocol']]
|
||||||
'maxsize': 10
|
|
||||||
}
|
|
||||||
dl = {
|
dl = {
|
||||||
'()': __name__ + '.listenerMaker',
|
'()': __name__ + '.listenerMaker',
|
||||||
'arg1': None,
|
'arg1': None,
|
||||||
'arg2': None,
|
'arg2': None,
|
||||||
'respect_handler_level': True
|
'respect_handler_level': True
|
||||||
}
|
}
|
||||||
qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', dq, q)
|
qvalues = (None, __name__ + '.queueMaker', __name__ + '.CustomQueue', *dqs, *qs)
|
||||||
lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
|
lvalues = (None, __name__ + '.CustomListener', dl, CustomListener)
|
||||||
for qspec, lspec in itertools.product(qvalues, lvalues):
|
for qspec, lspec in itertools.product(qvalues, lvalues):
|
||||||
self.do_queuehandler_configuration(qspec, lspec)
|
self.do_queuehandler_configuration(qspec, lspec)
|
||||||
|
@ -3931,15 +3949,21 @@ class ConfigDictTest(BaseTest):
|
||||||
@support.requires_subprocess()
|
@support.requires_subprocess()
|
||||||
@patch("multiprocessing.Manager")
|
@patch("multiprocessing.Manager")
|
||||||
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
|
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
|
||||||
# gh-120868
|
# gh-120868, gh-121723
|
||||||
|
|
||||||
from multiprocessing import Queue as MQ
|
from multiprocessing import Queue as MQ
|
||||||
|
|
||||||
q1 = {"()": "queue.Queue", "maxsize": -1}
|
q1 = {"()": "queue.Queue", "maxsize": -1}
|
||||||
q2 = MQ()
|
q2 = MQ()
|
||||||
q3 = queue.Queue()
|
q3 = queue.Queue()
|
||||||
|
# CustomQueueFakeProtocol passes the checks but will not be usable
|
||||||
|
# since the signatures are incompatible. Checking the Queue API
|
||||||
|
# without testing the type of the actual queue is a trade-off
|
||||||
|
# between usability and the work we need to do in order to safely
|
||||||
|
# check that the queue object correctly implements the API.
|
||||||
|
q4 = CustomQueueFakeProtocol()
|
||||||
|
|
||||||
for qspec in (q1, q2, q3):
|
for qspec in (q1, q2, q3, q4):
|
||||||
self.apply_config(
|
self.apply_config(
|
||||||
{
|
{
|
||||||
"version": 1,
|
"version": 1,
|
||||||
|
@ -3955,21 +3979,62 @@ class ConfigDictTest(BaseTest):
|
||||||
|
|
||||||
@patch("multiprocessing.Manager")
|
@patch("multiprocessing.Manager")
|
||||||
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
|
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
|
||||||
# gh-120868
|
# gh-120868, gh-121723
|
||||||
|
|
||||||
with self.assertRaises(ValueError):
|
for qspec in [object(), CustomQueueWrongProtocol()]:
|
||||||
self.apply_config(
|
with self.assertRaises(ValueError):
|
||||||
{
|
self.apply_config(
|
||||||
"version": 1,
|
{
|
||||||
"handlers": {
|
"version": 1,
|
||||||
"queue_listener": {
|
"handlers": {
|
||||||
"class": "logging.handlers.QueueHandler",
|
"queue_listener": {
|
||||||
"queue": object(),
|
"class": "logging.handlers.QueueHandler",
|
||||||
|
"queue": qspec,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
}
|
||||||
|
)
|
||||||
|
manager.assert_not_called()
|
||||||
|
|
||||||
|
@skip_if_tsan_fork
|
||||||
|
@support.requires_subprocess()
|
||||||
|
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
|
||||||
|
"assertions in multiprocessing")
|
||||||
|
def test_config_queue_handler_multiprocessing_context(self):
|
||||||
|
# regression test for gh-121723
|
||||||
|
if support.MS_WINDOWS:
|
||||||
|
start_methods = ['spawn']
|
||||||
|
else:
|
||||||
|
start_methods = ['spawn', 'fork', 'forkserver']
|
||||||
|
for start_method in start_methods:
|
||||||
|
with self.subTest(start_method=start_method):
|
||||||
|
ctx = multiprocessing.get_context(start_method)
|
||||||
|
with ctx.Manager() as manager:
|
||||||
|
q = manager.Queue()
|
||||||
|
records = []
|
||||||
|
# use 1 process and 1 task per child to put 1 record
|
||||||
|
with ctx.Pool(1, initializer=self._mpinit_issue121723,
|
||||||
|
initargs=(q, "text"), maxtasksperchild=1):
|
||||||
|
records.append(q.get(timeout=60))
|
||||||
|
self.assertTrue(q.empty())
|
||||||
|
self.assertEqual(len(records), 1)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _mpinit_issue121723(qspec, message_to_log):
|
||||||
|
# static method for pickling support
|
||||||
|
logging.config.dictConfig({
|
||||||
|
'version': 1,
|
||||||
|
'disable_existing_loggers': True,
|
||||||
|
'handlers': {
|
||||||
|
'log_to_parent': {
|
||||||
|
'class': 'logging.handlers.QueueHandler',
|
||||||
|
'queue': qspec
|
||||||
}
|
}
|
||||||
)
|
},
|
||||||
manager.assert_not_called()
|
'root': {'handlers': ['log_to_parent'], 'level': 'DEBUG'}
|
||||||
|
})
|
||||||
|
# log a message (this creates a record put in the queue)
|
||||||
|
logging.getLogger().info(message_to_log)
|
||||||
|
|
||||||
@skip_if_tsan_fork
|
@skip_if_tsan_fork
|
||||||
@support.requires_subprocess()
|
@support.requires_subprocess()
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
Make :func:`logging.config.dictConfig` accept any object implementing the
|
||||||
|
Queue public API. See the :ref:`queue configuration <configure-queue>`
|
||||||
|
section for details. Patch by Bénédikt Tran.
|
Loading…
Add table
Add a link
Reference in a new issue