mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
Fixes #27930: improved QueueListener behaviour.
This commit is contained in:
parent
0f0eac431f
commit
d61910c598
3 changed files with 93 additions and 21 deletions
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2001-2015 by Vinay Sajip. All Rights Reserved.
|
# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
|
||||||
#
|
#
|
||||||
# Permission to use, copy, modify, and distribute this software and its
|
# Permission to use, copy, modify, and distribute this software and its
|
||||||
# documentation for any purpose and without fee is hereby granted,
|
# documentation for any purpose and without fee is hereby granted,
|
||||||
|
@ -18,7 +18,7 @@
|
||||||
Additional handlers for the logging package for Python. The core package is
|
Additional handlers for the logging package for Python. The core package is
|
||||||
based on PEP 282 and comments thereto in comp.lang.python.
|
based on PEP 282 and comments thereto in comp.lang.python.
|
||||||
|
|
||||||
Copyright (C) 2001-2015 Vinay Sajip. All Rights Reserved.
|
Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
|
||||||
|
|
||||||
To use, simply 'import logging.handlers' and log away!
|
To use, simply 'import logging.handlers' and log away!
|
||||||
"""
|
"""
|
||||||
|
@ -1366,7 +1366,6 @@ if threading:
|
||||||
"""
|
"""
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.handlers = handlers
|
self.handlers = handlers
|
||||||
self._stop = threading.Event()
|
|
||||||
self._thread = None
|
self._thread = None
|
||||||
self.respect_handler_level = respect_handler_level
|
self.respect_handler_level = respect_handler_level
|
||||||
|
|
||||||
|
@ -1387,7 +1386,7 @@ if threading:
|
||||||
LogRecords to process.
|
LogRecords to process.
|
||||||
"""
|
"""
|
||||||
self._thread = t = threading.Thread(target=self._monitor)
|
self._thread = t = threading.Thread(target=self._monitor)
|
||||||
t.setDaemon(True)
|
t.daemon = True
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
def prepare(self , record):
|
def prepare(self , record):
|
||||||
|
@ -1426,20 +1425,9 @@ if threading:
|
||||||
"""
|
"""
|
||||||
q = self.queue
|
q = self.queue
|
||||||
has_task_done = hasattr(q, 'task_done')
|
has_task_done = hasattr(q, 'task_done')
|
||||||
while not self._stop.isSet():
|
|
||||||
try:
|
|
||||||
record = self.dequeue(True)
|
|
||||||
if record is self._sentinel:
|
|
||||||
break
|
|
||||||
self.handle(record)
|
|
||||||
if has_task_done:
|
|
||||||
q.task_done()
|
|
||||||
except queue.Empty:
|
|
||||||
pass
|
|
||||||
# There might still be records in the queue.
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
record = self.dequeue(False)
|
record = self.dequeue(True)
|
||||||
if record is self._sentinel:
|
if record is self._sentinel:
|
||||||
break
|
break
|
||||||
self.handle(record)
|
self.handle(record)
|
||||||
|
@ -1466,7 +1454,6 @@ if threading:
|
||||||
Note that if you don't call this before your application exits, there
|
Note that if you don't call this before your application exits, there
|
||||||
may be some records still left on the queue, which won't be processed.
|
may be some records still left on the queue, which won't be processed.
|
||||||
"""
|
"""
|
||||||
self._stop.set()
|
|
||||||
self.enqueue_sentinel()
|
self.enqueue_sentinel()
|
||||||
self._thread.join()
|
self._thread.join()
|
||||||
self._thread = None
|
self._thread = None
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
# Copyright 2001-2014 by Vinay Sajip. All Rights Reserved.
|
# Copyright 2001-2016 by Vinay Sajip. All Rights Reserved.
|
||||||
#
|
#
|
||||||
# Permission to use, copy, modify, and distribute this software and its
|
# Permission to use, copy, modify, and distribute this software and its
|
||||||
# documentation for any purpose and without fee is hereby granted,
|
# documentation for any purpose and without fee is hereby granted,
|
||||||
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
"""Test harness for the logging module. Run all tests.
|
"""Test harness for the logging module. Run all tests.
|
||||||
|
|
||||||
Copyright (C) 2001-2014 Vinay Sajip. All Rights Reserved.
|
Copyright (C) 2001-2016 Vinay Sajip. All Rights Reserved.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -3022,6 +3022,84 @@ class QueueHandlerTest(BaseTest):
|
||||||
self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
|
self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
|
||||||
self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
|
self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
|
||||||
|
|
||||||
|
if hasattr(logging.handlers, 'QueueListener'):
|
||||||
|
import multiprocessing
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
class QueueListenerTest(BaseTest):
|
||||||
|
"""
|
||||||
|
Tests based on patch submitted for issue #27930. Ensure that
|
||||||
|
QueueListener handles all log messages.
|
||||||
|
"""
|
||||||
|
|
||||||
|
repeat = 20
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def setup_and_log(log_queue, ident):
|
||||||
|
"""
|
||||||
|
Creates a logger with a QueueHandler that logs to a queue read by a
|
||||||
|
QueueListener. Starts the listener, logs five messages, and stops
|
||||||
|
the listener.
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger('test_logger_with_id_%s' % ident)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
handler = logging.handlers.QueueHandler(log_queue)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
listener = logging.handlers.QueueListener(log_queue)
|
||||||
|
listener.start()
|
||||||
|
|
||||||
|
logger.info('one')
|
||||||
|
logger.info('two')
|
||||||
|
logger.info('three')
|
||||||
|
logger.info('four')
|
||||||
|
logger.info('five')
|
||||||
|
|
||||||
|
listener.stop()
|
||||||
|
logger.removeHandler(handler)
|
||||||
|
handler.close()
|
||||||
|
|
||||||
|
@patch.object(logging.handlers.QueueListener, 'handle')
|
||||||
|
def test_handle_called_with_queue_queue(self, mock_handle):
|
||||||
|
for i in range(self.repeat):
|
||||||
|
log_queue = queue.Queue()
|
||||||
|
self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
|
||||||
|
self.assertEqual(mock_handle.call_count, 5 * self.repeat,
|
||||||
|
'correct number of handled log messages')
|
||||||
|
|
||||||
|
@patch.object(logging.handlers.QueueListener, 'handle')
|
||||||
|
def test_handle_called_with_mp_queue(self, mock_handle):
|
||||||
|
for i in range(self.repeat):
|
||||||
|
log_queue = multiprocessing.Queue()
|
||||||
|
self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
|
||||||
|
self.assertEqual(mock_handle.call_count, 5 * self.repeat,
|
||||||
|
'correct number of handled log messages')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_all_from_queue(log_queue):
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
yield log_queue.get_nowait()
|
||||||
|
except queue.Empty:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def test_no_messages_in_queue_after_stop(self):
|
||||||
|
"""
|
||||||
|
Five messages are logged then the QueueListener is stopped. This
|
||||||
|
test then gets everything off the queue. Failure of this test
|
||||||
|
indicates that messages were not registered on the queue until
|
||||||
|
_after_ the QueueListener stopped.
|
||||||
|
"""
|
||||||
|
for i in range(self.repeat):
|
||||||
|
queue = multiprocessing.Queue()
|
||||||
|
self.setup_and_log(queue, '%s_%s' %(self.id(), i))
|
||||||
|
# time.sleep(1)
|
||||||
|
items = list(self.get_all_from_queue(queue))
|
||||||
|
expected = [[], [logging.handlers.QueueListener._sentinel]]
|
||||||
|
self.assertIn(items, expected,
|
||||||
|
'Found unexpected messages in queue: %s' % (
|
||||||
|
[m.msg if isinstance(m, logging.LogRecord)
|
||||||
|
else m for m in items]))
|
||||||
|
|
||||||
|
|
||||||
ZERO = datetime.timedelta(0)
|
ZERO = datetime.timedelta(0)
|
||||||
|
|
||||||
|
@ -4166,7 +4244,7 @@ class NTEventLogHandlerTest(BaseTest):
|
||||||
# first and restore it at the end.
|
# first and restore it at the end.
|
||||||
@support.run_with_locale('LC_ALL', '')
|
@support.run_with_locale('LC_ALL', '')
|
||||||
def test_main():
|
def test_main():
|
||||||
support.run_unittest(
|
tests = [
|
||||||
BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
|
BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
|
||||||
HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
|
HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
|
||||||
DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
|
DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
|
||||||
|
@ -4177,7 +4255,11 @@ def test_main():
|
||||||
RotatingFileHandlerTest, LastResortTest, LogRecordTest,
|
RotatingFileHandlerTest, LastResortTest, LogRecordTest,
|
||||||
ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
|
ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
|
||||||
NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
|
NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
|
||||||
UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest)
|
UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
|
||||||
|
]
|
||||||
|
if hasattr(logging.handlers, 'QueueListener'):
|
||||||
|
tests.append(QueueListenerTest)
|
||||||
|
support.run_unittest(*tests)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
test_main()
|
test_main()
|
||||||
|
|
|
@ -218,6 +218,9 @@ Library
|
||||||
- Issue #27392: Add loop.connect_accepted_socket().
|
- Issue #27392: Add loop.connect_accepted_socket().
|
||||||
Patch by Jim Fulton.
|
Patch by Jim Fulton.
|
||||||
|
|
||||||
|
- Issue #27930: Improved behaviour of logging.handlers.QueueListener.
|
||||||
|
Thanks to Paulo Andrade and Petr Viktorin for the analysis and patch.
|
||||||
|
|
||||||
IDLE
|
IDLE
|
||||||
----
|
----
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue