mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 11:49:12 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			823 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			823 lines
		
	
	
	
		
			26 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
#
 | 
						|
# Copyright 2001-2004 by Vinay Sajip. All Rights Reserved.
 | 
						|
#
 | 
						|
# Permission to use, copy, modify, and distribute this software and its
 | 
						|
# documentation for any purpose and without fee is hereby granted,
 | 
						|
# provided that the above copyright notice appear in all copies and that
 | 
						|
# both that copyright notice and this permission notice appear in
 | 
						|
# supporting documentation, and that the name of Vinay Sajip
 | 
						|
# not be used in advertising or publicity pertaining to distribution
 | 
						|
# of the software without specific, written prior permission.
 | 
						|
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
 | 
						|
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
 | 
						|
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
 | 
						|
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
 | 
						|
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
 | 
						|
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 | 
						|
 | 
						|
"""Test harness for the logging module. Run all tests.
 | 
						|
 | 
						|
Copyright (C) 2001-2002 Vinay Sajip. All Rights Reserved.
 | 
						|
"""
 | 
						|
 | 
						|
import logging
 | 
						|
import logging.handlers
 | 
						|
import logging.config
 | 
						|
 | 
						|
import copy
 | 
						|
import pickle
 | 
						|
import io
 | 
						|
import gc
 | 
						|
import os
 | 
						|
import re
 | 
						|
import select
 | 
						|
import socket
 | 
						|
from socketserver import ThreadingTCPServer, StreamRequestHandler
 | 
						|
import string
 | 
						|
import struct
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
from test.support import captured_stdout, run_with_locale, run_unittest
 | 
						|
import textwrap
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import types
 | 
						|
import unittest
 | 
						|
import weakref
 | 
						|
 | 
						|
 | 
						|
class BaseTest(unittest.TestCase):
 | 
						|
 | 
						|
    """Base class for logging tests."""
 | 
						|
 | 
						|
    log_format = "%(name)s -> %(levelname)s: %(message)s"
 | 
						|
    expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
 | 
						|
    message_num = 0
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        """Setup the default logging stream to an internal StringIO instance,
 | 
						|
        so that we can examine log output as we want."""
 | 
						|
        logger_dict = logging.getLogger().manager.loggerDict
 | 
						|
        logging._acquireLock()
 | 
						|
        try:
 | 
						|
            self.saved_handlers = logging._handlers.copy()
 | 
						|
            self.saved_handler_list = logging._handlerList[:]
 | 
						|
            self.saved_loggers = logger_dict.copy()
 | 
						|
            self.saved_level_names = logging._levelNames.copy()
 | 
						|
        finally:
 | 
						|
            logging._releaseLock()
 | 
						|
 | 
						|
        self.root_logger = logging.getLogger("")
 | 
						|
        self.original_logging_level = self.root_logger.getEffectiveLevel()
 | 
						|
 | 
						|
        self.stream = io.StringIO()
 | 
						|
        self.root_logger.setLevel(logging.DEBUG)
 | 
						|
        self.root_hdlr = logging.StreamHandler(self.stream)
 | 
						|
        self.root_formatter = logging.Formatter(self.log_format)
 | 
						|
        self.root_hdlr.setFormatter(self.root_formatter)
 | 
						|
        self.root_logger.addHandler(self.root_hdlr)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        """Remove our logging stream, and restore the original logging
 | 
						|
        level."""
 | 
						|
        self.stream.close()
 | 
						|
        self.root_logger.removeHandler(self.root_hdlr)
 | 
						|
        self.root_logger.setLevel(self.original_logging_level)
 | 
						|
        logging._acquireLock()
 | 
						|
        try:
 | 
						|
            logging._levelNames.clear()
 | 
						|
            logging._levelNames.update(self.saved_level_names)
 | 
						|
            logging._handlers.clear()
 | 
						|
            logging._handlers.update(self.saved_handlers)
 | 
						|
            logging._handlerList[:] = self.saved_handler_list
 | 
						|
            loggerDict = logging.getLogger().manager.loggerDict
 | 
						|
            loggerDict.clear()
 | 
						|
            loggerDict.update(self.saved_loggers)
 | 
						|
        finally:
 | 
						|
            logging._releaseLock()
 | 
						|
 | 
						|
    def assert_log_lines(self, expected_values, stream=None):
 | 
						|
        """Match the collected log lines against the regular expression
 | 
						|
        self.expected_log_pat, and compare the extracted group values to
 | 
						|
        the expected_values list of tuples."""
 | 
						|
        stream = stream or self.stream
 | 
						|
        pat = re.compile(self.expected_log_pat)
 | 
						|
        try:
 | 
						|
            stream.reset()
 | 
						|
            actual_lines = stream.readlines()
 | 
						|
        except AttributeError:
 | 
						|
            # StringIO.StringIO lacks a reset() method.
 | 
						|
            actual_lines = stream.getvalue().splitlines()
 | 
						|
        self.assertEquals(len(actual_lines), len(expected_values))
 | 
						|
        for actual, expected in zip(actual_lines, expected_values):
 | 
						|
            match = pat.search(actual)
 | 
						|
            if not match:
 | 
						|
                self.fail("Log line does not match expected pattern:\n" +
 | 
						|
                            actual)
 | 
						|
            self.assertEquals(tuple(match.groups()), expected)
 | 
						|
        s = stream.read()
 | 
						|
        if s:
 | 
						|
            self.fail("Remaining output at end of log stream:\n" + s)
 | 
						|
 | 
						|
    def next_message(self):
 | 
						|
        """Generate a message consisting solely of an auto-incrementing
 | 
						|
        integer."""
 | 
						|
        self.message_num += 1
 | 
						|
        return "%d" % self.message_num
 | 
						|
 | 
						|
 | 
						|
class BuiltinLevelsTest(BaseTest):
 | 
						|
    """Test builtin levels and their inheritance."""
 | 
						|
 | 
						|
    def test_flat(self):
 | 
						|
        #Logging levels in a flat logger namespace.
 | 
						|
        m = self.next_message
 | 
						|
 | 
						|
        ERR = logging.getLogger("ERR")
 | 
						|
        ERR.setLevel(logging.ERROR)
 | 
						|
        INF = logging.getLogger("INF")
 | 
						|
        INF.setLevel(logging.INFO)
 | 
						|
        DEB = logging.getLogger("DEB")
 | 
						|
        DEB.setLevel(logging.DEBUG)
 | 
						|
 | 
						|
        # These should log.
 | 
						|
        ERR.log(logging.CRITICAL, m())
 | 
						|
        ERR.error(m())
 | 
						|
 | 
						|
        INF.log(logging.CRITICAL, m())
 | 
						|
        INF.error(m())
 | 
						|
        INF.warn(m())
 | 
						|
        INF.info(m())
 | 
						|
 | 
						|
        DEB.log(logging.CRITICAL, m())
 | 
						|
        DEB.error(m())
 | 
						|
        DEB.warn (m())
 | 
						|
        DEB.info (m())
 | 
						|
        DEB.debug(m())
 | 
						|
 | 
						|
        # These should not log.
 | 
						|
        ERR.warn(m())
 | 
						|
        ERR.info(m())
 | 
						|
        ERR.debug(m())
 | 
						|
 | 
						|
        INF.debug(m())
 | 
						|
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('ERR', 'CRITICAL', '1'),
 | 
						|
            ('ERR', 'ERROR', '2'),
 | 
						|
            ('INF', 'CRITICAL', '3'),
 | 
						|
            ('INF', 'ERROR', '4'),
 | 
						|
            ('INF', 'WARNING', '5'),
 | 
						|
            ('INF', 'INFO', '6'),
 | 
						|
            ('DEB', 'CRITICAL', '7'),
 | 
						|
            ('DEB', 'ERROR', '8'),
 | 
						|
            ('DEB', 'WARNING', '9'),
 | 
						|
            ('DEB', 'INFO', '10'),
 | 
						|
            ('DEB', 'DEBUG', '11'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_nested_explicit(self):
 | 
						|
        # Logging levels in a nested namespace, all explicitly set.
 | 
						|
        m = self.next_message
 | 
						|
 | 
						|
        INF = logging.getLogger("INF")
 | 
						|
        INF.setLevel(logging.INFO)
 | 
						|
        INF_ERR  = logging.getLogger("INF.ERR")
 | 
						|
        INF_ERR.setLevel(logging.ERROR)
 | 
						|
 | 
						|
        # These should log.
 | 
						|
        INF_ERR.log(logging.CRITICAL, m())
 | 
						|
        INF_ERR.error(m())
 | 
						|
 | 
						|
        # These should not log.
 | 
						|
        INF_ERR.warn(m())
 | 
						|
        INF_ERR.info(m())
 | 
						|
        INF_ERR.debug(m())
 | 
						|
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('INF.ERR', 'CRITICAL', '1'),
 | 
						|
            ('INF.ERR', 'ERROR', '2'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_nested_inherited(self):
 | 
						|
        #Logging levels in a nested namespace, inherited from parent loggers.
 | 
						|
        m = self.next_message
 | 
						|
 | 
						|
        INF = logging.getLogger("INF")
 | 
						|
        INF.setLevel(logging.INFO)
 | 
						|
        INF_ERR  = logging.getLogger("INF.ERR")
 | 
						|
        INF_ERR.setLevel(logging.ERROR)
 | 
						|
        INF_UNDEF = logging.getLogger("INF.UNDEF")
 | 
						|
        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
 | 
						|
        UNDEF = logging.getLogger("UNDEF")
 | 
						|
 | 
						|
        # These should log.
 | 
						|
        INF_UNDEF.log(logging.CRITICAL, m())
 | 
						|
        INF_UNDEF.error(m())
 | 
						|
        INF_UNDEF.warn(m())
 | 
						|
        INF_UNDEF.info(m())
 | 
						|
        INF_ERR_UNDEF.log(logging.CRITICAL, m())
 | 
						|
        INF_ERR_UNDEF.error(m())
 | 
						|
 | 
						|
        # These should not log.
 | 
						|
        INF_UNDEF.debug(m())
 | 
						|
        INF_ERR_UNDEF.warn(m())
 | 
						|
        INF_ERR_UNDEF.info(m())
 | 
						|
        INF_ERR_UNDEF.debug(m())
 | 
						|
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('INF.UNDEF', 'CRITICAL', '1'),
 | 
						|
            ('INF.UNDEF', 'ERROR', '2'),
 | 
						|
            ('INF.UNDEF', 'WARNING', '3'),
 | 
						|
            ('INF.UNDEF', 'INFO', '4'),
 | 
						|
            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
 | 
						|
            ('INF.ERR.UNDEF', 'ERROR', '6'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_nested_with_virtual_parent(self):
 | 
						|
        # Logging levels when some parent does not exist yet.
 | 
						|
        m = self.next_message
 | 
						|
 | 
						|
        INF = logging.getLogger("INF")
 | 
						|
        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
 | 
						|
        CHILD = logging.getLogger("INF.BADPARENT")
 | 
						|
        INF.setLevel(logging.INFO)
 | 
						|
 | 
						|
        # These should log.
 | 
						|
        GRANDCHILD.log(logging.FATAL, m())
 | 
						|
        GRANDCHILD.info(m())
 | 
						|
        CHILD.log(logging.FATAL, m())
 | 
						|
        CHILD.info(m())
 | 
						|
 | 
						|
        # These should not log.
 | 
						|
        GRANDCHILD.debug(m())
 | 
						|
        CHILD.debug(m())
 | 
						|
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
 | 
						|
            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
 | 
						|
            ('INF.BADPARENT', 'CRITICAL', '3'),
 | 
						|
            ('INF.BADPARENT', 'INFO', '4'),
 | 
						|
        ])
 | 
						|
 | 
						|
 | 
						|
class BasicFilterTest(BaseTest):
 | 
						|
 | 
						|
    """Test the bundled Filter class."""
 | 
						|
 | 
						|
    def test_filter(self):
 | 
						|
        # Only messages satisfying the specified criteria pass through the
 | 
						|
        #  filter.
 | 
						|
        filter_ = logging.Filter("spam.eggs")
 | 
						|
        handler = self.root_logger.handlers[0]
 | 
						|
        try:
 | 
						|
            handler.addFilter(filter_)
 | 
						|
            spam = logging.getLogger("spam")
 | 
						|
            spam_eggs = logging.getLogger("spam.eggs")
 | 
						|
            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
 | 
						|
            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
 | 
						|
 | 
						|
            spam.info(self.next_message())
 | 
						|
            spam_eggs.info(self.next_message())  # Good.
 | 
						|
            spam_eggs_fish.info(self.next_message())  # Good.
 | 
						|
            spam_bakedbeans.info(self.next_message())
 | 
						|
 | 
						|
            self.assert_log_lines([
 | 
						|
                ('spam.eggs', 'INFO', '2'),
 | 
						|
                ('spam.eggs.fish', 'INFO', '3'),
 | 
						|
            ])
 | 
						|
        finally:
 | 
						|
            handler.removeFilter(filter_)
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
#   First, we define our levels. There can be as many as you want - the only
 | 
						|
#     limitations are that they should be integers, the lowest should be > 0 and
 | 
						|
#   larger values mean less information being logged. If you need specific
 | 
						|
#   level values which do not fit into these limitations, you can use a
 | 
						|
#   mapping dictionary to convert between your application levels and the
 | 
						|
#   logging system.
 | 
						|
#
 | 
						|
SILENT      = 120
 | 
						|
TACITURN    = 119
 | 
						|
TERSE       = 118
 | 
						|
EFFUSIVE    = 117
 | 
						|
SOCIABLE    = 116
 | 
						|
VERBOSE     = 115
 | 
						|
TALKATIVE   = 114
 | 
						|
GARRULOUS   = 113
 | 
						|
CHATTERBOX  = 112
 | 
						|
BORING      = 111
 | 
						|
 | 
						|
LEVEL_RANGE = range(BORING, SILENT + 1)
 | 
						|
 | 
						|
#
 | 
						|
#   Next, we define names for our levels. You don't need to do this - in which
 | 
						|
#   case the system will use "Level n" to denote the text for the level.
 | 
						|
#
 | 
						|
my_logging_levels = {
 | 
						|
    SILENT      : 'Silent',
 | 
						|
    TACITURN    : 'Taciturn',
 | 
						|
    TERSE       : 'Terse',
 | 
						|
    EFFUSIVE    : 'Effusive',
 | 
						|
    SOCIABLE    : 'Sociable',
 | 
						|
    VERBOSE     : 'Verbose',
 | 
						|
    TALKATIVE   : 'Talkative',
 | 
						|
    GARRULOUS   : 'Garrulous',
 | 
						|
    CHATTERBOX  : 'Chatterbox',
 | 
						|
    BORING      : 'Boring',
 | 
						|
}
 | 
						|
 | 
						|
class GarrulousFilter(logging.Filter):
 | 
						|
 | 
						|
    """A filter which blocks garrulous messages."""
 | 
						|
 | 
						|
    def filter(self, record):
 | 
						|
        return record.levelno != GARRULOUS
 | 
						|
 | 
						|
class VerySpecificFilter(logging.Filter):
 | 
						|
 | 
						|
    """A filter which blocks sociable and taciturn messages."""
 | 
						|
 | 
						|
    def filter(self, record):
 | 
						|
        return record.levelno not in [SOCIABLE, TACITURN]
 | 
						|
 | 
						|
 | 
						|
class CustomLevelsAndFiltersTest(BaseTest):
 | 
						|
 | 
						|
    """Test various filtering possibilities with custom logging levels."""
 | 
						|
 | 
						|
    # Skip the logger name group.
 | 
						|
    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        BaseTest.setUp(self)
 | 
						|
        for k, v in list(my_logging_levels.items()):
 | 
						|
            logging.addLevelName(k, v)
 | 
						|
 | 
						|
    def log_at_all_levels(self, logger):
 | 
						|
        for lvl in LEVEL_RANGE:
 | 
						|
            logger.log(lvl, self.next_message())
 | 
						|
 | 
						|
    def test_logger_filter(self):
 | 
						|
        # Filter at logger level.
 | 
						|
        self.root_logger.setLevel(VERBOSE)
 | 
						|
        # Levels >= 'Verbose' are good.
 | 
						|
        self.log_at_all_levels(self.root_logger)
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('Verbose', '5'),
 | 
						|
            ('Sociable', '6'),
 | 
						|
            ('Effusive', '7'),
 | 
						|
            ('Terse', '8'),
 | 
						|
            ('Taciturn', '9'),
 | 
						|
            ('Silent', '10'),
 | 
						|
        ])
 | 
						|
 | 
						|
    def test_handler_filter(self):
 | 
						|
        # Filter at handler level.
 | 
						|
        self.root_logger.handlers[0].setLevel(SOCIABLE)
 | 
						|
        try:
 | 
						|
            # Levels >= 'Sociable' are good.
 | 
						|
            self.log_at_all_levels(self.root_logger)
 | 
						|
            self.assert_log_lines([
 | 
						|
                ('Sociable', '6'),
 | 
						|
                ('Effusive', '7'),
 | 
						|
                ('Terse', '8'),
 | 
						|
                ('Taciturn', '9'),
 | 
						|
                ('Silent', '10'),
 | 
						|
            ])
 | 
						|
        finally:
 | 
						|
            self.root_logger.handlers[0].setLevel(logging.NOTSET)
 | 
						|
 | 
						|
    def test_specific_filters(self):
 | 
						|
        # Set a specific filter object on the handler, and then add another
 | 
						|
        #  filter object on the logger itself.
 | 
						|
        handler = self.root_logger.handlers[0]
 | 
						|
        specific_filter = None
 | 
						|
        garr = GarrulousFilter()
 | 
						|
        handler.addFilter(garr)
 | 
						|
        try:
 | 
						|
            self.log_at_all_levels(self.root_logger)
 | 
						|
            first_lines = [
 | 
						|
                # Notice how 'Garrulous' is missing
 | 
						|
                ('Boring', '1'),
 | 
						|
                ('Chatterbox', '2'),
 | 
						|
                ('Talkative', '4'),
 | 
						|
                ('Verbose', '5'),
 | 
						|
                ('Sociable', '6'),
 | 
						|
                ('Effusive', '7'),
 | 
						|
                ('Terse', '8'),
 | 
						|
                ('Taciturn', '9'),
 | 
						|
                ('Silent', '10'),
 | 
						|
            ]
 | 
						|
            self.assert_log_lines(first_lines)
 | 
						|
 | 
						|
            specific_filter = VerySpecificFilter()
 | 
						|
            self.root_logger.addFilter(specific_filter)
 | 
						|
            self.log_at_all_levels(self.root_logger)
 | 
						|
            self.assert_log_lines(first_lines + [
 | 
						|
                # Not only 'Garrulous' is still missing, but also 'Sociable'
 | 
						|
                # and 'Taciturn'
 | 
						|
                ('Boring', '11'),
 | 
						|
                ('Chatterbox', '12'),
 | 
						|
                ('Talkative', '14'),
 | 
						|
                ('Verbose', '15'),
 | 
						|
                ('Effusive', '17'),
 | 
						|
                ('Terse', '18'),
 | 
						|
                ('Silent', '20'),
 | 
						|
        ])
 | 
						|
        finally:
 | 
						|
            if specific_filter:
 | 
						|
                self.root_logger.removeFilter(specific_filter)
 | 
						|
            handler.removeFilter(garr)
 | 
						|
 | 
						|
 | 
						|
class MemoryHandlerTest(BaseTest):
 | 
						|
 | 
						|
    """Tests for the MemoryHandler."""
 | 
						|
 | 
						|
    # Do not bother with a logger name group.
 | 
						|
    expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        BaseTest.setUp(self)
 | 
						|
        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
 | 
						|
                                                        self.root_hdlr)
 | 
						|
        self.mem_logger = logging.getLogger('mem')
 | 
						|
        self.mem_logger.propagate = 0
 | 
						|
        self.mem_logger.addHandler(self.mem_hdlr)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.mem_hdlr.close()
 | 
						|
        BaseTest.tearDown(self)
 | 
						|
 | 
						|
    def test_flush(self):
 | 
						|
        # The memory handler flushes to its target handler based on specific
 | 
						|
        #  criteria (message count and message level).
 | 
						|
        self.mem_logger.debug(self.next_message())
 | 
						|
        self.assert_log_lines([])
 | 
						|
        self.mem_logger.info(self.next_message())
 | 
						|
        self.assert_log_lines([])
 | 
						|
        # This will flush because the level is >= logging.WARNING
 | 
						|
        self.mem_logger.warn(self.next_message())
 | 
						|
        lines = [
 | 
						|
            ('DEBUG', '1'),
 | 
						|
            ('INFO', '2'),
 | 
						|
            ('WARNING', '3'),
 | 
						|
        ]
 | 
						|
        self.assert_log_lines(lines)
 | 
						|
        for n in (4, 14):
 | 
						|
            for i in range(9):
 | 
						|
                self.mem_logger.debug(self.next_message())
 | 
						|
            self.assert_log_lines(lines)
 | 
						|
            # This will flush because it's the 10th message since the last
 | 
						|
            #  flush.
 | 
						|
            self.mem_logger.debug(self.next_message())
 | 
						|
            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
 | 
						|
            self.assert_log_lines(lines)
 | 
						|
 | 
						|
        self.mem_logger.debug(self.next_message())
 | 
						|
        self.assert_log_lines(lines)
 | 
						|
 | 
						|
 | 
						|
class ExceptionFormatter(logging.Formatter):
 | 
						|
    """A special exception formatter."""
 | 
						|
    def formatException(self, ei):
 | 
						|
        return "Got a [%s]" % ei[0].__name__
 | 
						|
 | 
						|
 | 
						|
class ConfigFileTest(BaseTest):
 | 
						|
 | 
						|
    """Reading logging config from a .ini-style config file."""
 | 
						|
 | 
						|
    expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
 | 
						|
 | 
						|
    # config0 is a standard configuration.
 | 
						|
    config0 = """
 | 
						|
    [loggers]
 | 
						|
    keys=root
 | 
						|
 | 
						|
    [handlers]
 | 
						|
    keys=hand1
 | 
						|
 | 
						|
    [formatters]
 | 
						|
    keys=form1
 | 
						|
 | 
						|
    [logger_root]
 | 
						|
    level=WARNING
 | 
						|
    handlers=hand1
 | 
						|
 | 
						|
    [handler_hand1]
 | 
						|
    class=StreamHandler
 | 
						|
    level=NOTSET
 | 
						|
    formatter=form1
 | 
						|
    args=(sys.stdout,)
 | 
						|
 | 
						|
    [formatter_form1]
 | 
						|
    format=%(levelname)s ++ %(message)s
 | 
						|
    datefmt=
 | 
						|
    """
 | 
						|
 | 
						|
    # config1 adds a little to the standard configuration.
 | 
						|
    config1 = """
 | 
						|
    [loggers]
 | 
						|
    keys=root,parser
 | 
						|
 | 
						|
    [handlers]
 | 
						|
    keys=hand1
 | 
						|
 | 
						|
    [formatters]
 | 
						|
    keys=form1
 | 
						|
 | 
						|
    [logger_root]
 | 
						|
    level=WARNING
 | 
						|
    handlers=
 | 
						|
 | 
						|
    [logger_parser]
 | 
						|
    level=DEBUG
 | 
						|
    handlers=hand1
 | 
						|
    propagate=1
 | 
						|
    qualname=compiler.parser
 | 
						|
 | 
						|
    [handler_hand1]
 | 
						|
    class=StreamHandler
 | 
						|
    level=NOTSET
 | 
						|
    formatter=form1
 | 
						|
    args=(sys.stdout,)
 | 
						|
 | 
						|
    [formatter_form1]
 | 
						|
    format=%(levelname)s ++ %(message)s
 | 
						|
    datefmt=
 | 
						|
    """
 | 
						|
 | 
						|
    # config2 has a subtle configuration error that should be reported
 | 
						|
    config2 = config1.replace("sys.stdout", "sys.stbout")
 | 
						|
 | 
						|
    # config3 has a less subtle configuration error
 | 
						|
    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
 | 
						|
 | 
						|
    # config4 specifies a custom formatter class to be loaded
 | 
						|
    config4 = """
 | 
						|
    [loggers]
 | 
						|
    keys=root
 | 
						|
 | 
						|
    [handlers]
 | 
						|
    keys=hand1
 | 
						|
 | 
						|
    [formatters]
 | 
						|
    keys=form1
 | 
						|
 | 
						|
    [logger_root]
 | 
						|
    level=NOTSET
 | 
						|
    handlers=hand1
 | 
						|
 | 
						|
    [handler_hand1]
 | 
						|
    class=StreamHandler
 | 
						|
    level=NOTSET
 | 
						|
    formatter=form1
 | 
						|
    args=(sys.stdout,)
 | 
						|
 | 
						|
    [formatter_form1]
 | 
						|
    class=""" + __name__ + """.ExceptionFormatter
 | 
						|
    format=%(levelname)s:%(name)s:%(message)s
 | 
						|
    datefmt=
 | 
						|
    """
 | 
						|
 | 
						|
    def apply_config(self, conf):
 | 
						|
        try:
 | 
						|
            fn = tempfile.mktemp(".ini")
 | 
						|
            f = open(fn, "w")
 | 
						|
            f.write(textwrap.dedent(conf))
 | 
						|
            f.close()
 | 
						|
            logging.config.fileConfig(fn)
 | 
						|
        finally:
 | 
						|
            os.remove(fn)
 | 
						|
 | 
						|
    def test_config0_ok(self):
 | 
						|
        # A simple config file which overrides the default settings.
 | 
						|
        with captured_stdout() as output:
 | 
						|
            self.apply_config(self.config0)
 | 
						|
            logger = logging.getLogger()
 | 
						|
            # Won't output anything
 | 
						|
            logger.info(self.next_message())
 | 
						|
            # Outputs a message
 | 
						|
            logger.error(self.next_message())
 | 
						|
            self.assert_log_lines([
 | 
						|
                ('ERROR', '2'),
 | 
						|
            ], stream=output)
 | 
						|
            # Original logger output is empty.
 | 
						|
            self.assert_log_lines([])
 | 
						|
 | 
						|
    def test_config1_ok(self):
 | 
						|
        # A config file defining a sub-parser as well.
 | 
						|
        with captured_stdout() as output:
 | 
						|
            self.apply_config(self.config1)
 | 
						|
            logger = logging.getLogger("compiler.parser")
 | 
						|
            # Both will output a message
 | 
						|
            logger.info(self.next_message())
 | 
						|
            logger.error(self.next_message())
 | 
						|
            self.assert_log_lines([
 | 
						|
                ('INFO', '1'),
 | 
						|
                ('ERROR', '2'),
 | 
						|
            ], stream=output)
 | 
						|
            # Original logger output is empty.
 | 
						|
            self.assert_log_lines([])
 | 
						|
 | 
						|
    def test_config2_failure(self):
 | 
						|
        # A simple config file which overrides the default settings.
 | 
						|
        self.assertRaises(Exception, self.apply_config, self.config2)
 | 
						|
 | 
						|
    def test_config3_failure(self):
 | 
						|
        # A simple config file which overrides the default settings.
 | 
						|
        self.assertRaises(Exception, self.apply_config, self.config3)
 | 
						|
 | 
						|
    def test_config4_ok(self):
 | 
						|
        # A config file specifying a custom formatter class.
 | 
						|
        with captured_stdout() as output:
 | 
						|
            self.apply_config(self.config4)
 | 
						|
            logger = logging.getLogger()
 | 
						|
            try:
 | 
						|
                raise RuntimeError()
 | 
						|
            except RuntimeError:
 | 
						|
                logging.exception("just testing")
 | 
						|
            sys.stdout.seek(0)
 | 
						|
            self.assertEquals(output.getvalue(),
 | 
						|
                "ERROR:root:just testing\nGot a [RuntimeError]\n")
 | 
						|
            # Original logger output is empty
 | 
						|
            self.assert_log_lines([])
 | 
						|
 | 
						|
 | 
						|
class LogRecordStreamHandler(StreamRequestHandler):
 | 
						|
 | 
						|
    """Handler for a streaming logging request. It saves the log message in the
 | 
						|
    TCP server's 'log_output' attribute."""
 | 
						|
 | 
						|
    TCP_LOG_END = "!!!END!!!"
 | 
						|
 | 
						|
    def handle(self):
 | 
						|
        """Handle multiple requests - each expected to be of 4-byte length,
 | 
						|
        followed by the LogRecord in pickle format. Logs the record
 | 
						|
        according to whatever policy is configured locally."""
 | 
						|
        while True:
 | 
						|
            chunk = self.connection.recv(4)
 | 
						|
            if len(chunk) < 4:
 | 
						|
                break
 | 
						|
            slen = struct.unpack(">L", chunk)[0]
 | 
						|
            chunk = self.connection.recv(slen)
 | 
						|
            while len(chunk) < slen:
 | 
						|
                chunk = chunk + self.connection.recv(slen - len(chunk))
 | 
						|
            obj = self.unpickle(chunk)
 | 
						|
            record = logging.makeLogRecord(obj)
 | 
						|
            self.handle_log_record(record)
 | 
						|
 | 
						|
    def unpickle(self, data):
 | 
						|
        return pickle.loads(data)
 | 
						|
 | 
						|
    def handle_log_record(self, record):
 | 
						|
        # If the end-of-messages sentinel is seen, tell the server to
 | 
						|
        #  terminate.
 | 
						|
        if self.TCP_LOG_END in record.msg:
 | 
						|
            self.server.abort = 1
 | 
						|
            return
 | 
						|
        self.server.log_output += record.msg + "\n"
 | 
						|
 | 
						|
 | 
						|
class LogRecordSocketReceiver(ThreadingTCPServer):
 | 
						|
 | 
						|
    """A simple-minded TCP socket-based logging receiver suitable for test
 | 
						|
    purposes."""
 | 
						|
 | 
						|
    allow_reuse_address = 1
 | 
						|
    log_output = ""
 | 
						|
 | 
						|
    def __init__(self, host='localhost',
 | 
						|
                             port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
 | 
						|
                     handler=LogRecordStreamHandler):
 | 
						|
        ThreadingTCPServer.__init__(self, (host, port), handler)
 | 
						|
        self.abort = False
 | 
						|
        self.timeout = 0.1
 | 
						|
        self.finished = threading.Event()
 | 
						|
 | 
						|
    def serve_until_stopped(self):
 | 
						|
        while not self.abort:
 | 
						|
            rd, wr, ex = select.select([self.socket.fileno()], [], [],
 | 
						|
                                       self.timeout)
 | 
						|
            if rd:
 | 
						|
                self.handle_request()
 | 
						|
        # Notify the main thread that we're about to exit
 | 
						|
        self.finished.set()
 | 
						|
        # close the listen socket
 | 
						|
        self.server_close()
 | 
						|
 | 
						|
 | 
						|
class SocketHandlerTest(BaseTest):
 | 
						|
 | 
						|
    """Test for SocketHandler objects."""
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        """Set up a TCP server to receive log messages, and a SocketHandler
 | 
						|
        pointing to that server's address and port."""
 | 
						|
        BaseTest.setUp(self)
 | 
						|
        self.tcpserver = LogRecordSocketReceiver(port=0)
 | 
						|
        self.port = self.tcpserver.socket.getsockname()[1]
 | 
						|
        self.threads = [
 | 
						|
                threading.Thread(target=self.tcpserver.serve_until_stopped)]
 | 
						|
        for thread in self.threads:
 | 
						|
            thread.start()
 | 
						|
 | 
						|
        self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
 | 
						|
        self.sock_hdlr.setFormatter(self.root_formatter)
 | 
						|
        self.root_logger.removeHandler(self.root_logger.handlers[0])
 | 
						|
        self.root_logger.addHandler(self.sock_hdlr)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        """Shutdown the TCP server."""
 | 
						|
        try:
 | 
						|
            self.tcpserver.abort = True
 | 
						|
            del self.tcpserver
 | 
						|
            self.root_logger.removeHandler(self.sock_hdlr)
 | 
						|
            self.sock_hdlr.close()
 | 
						|
            for thread in self.threads:
 | 
						|
                thread.join(2.0)
 | 
						|
        finally:
 | 
						|
            BaseTest.tearDown(self)
 | 
						|
 | 
						|
    def get_output(self):
 | 
						|
        """Get the log output as received by the TCP server."""
 | 
						|
        # Signal the TCP receiver and wait for it to terminate.
 | 
						|
        self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
 | 
						|
        self.tcpserver.finished.wait(2.0)
 | 
						|
        return self.tcpserver.log_output
 | 
						|
 | 
						|
    def test_output(self):
 | 
						|
        # The log message sent to the SocketHandler is properly received.
 | 
						|
        logger = logging.getLogger("tcp")
 | 
						|
        logger.error("spam")
 | 
						|
        logger.debug("eggs")
 | 
						|
        self.assertEquals(self.get_output(), "spam\neggs\n")
 | 
						|
 | 
						|
 | 
						|
class MemoryTest(BaseTest):
 | 
						|
 | 
						|
    """Test memory persistence of logger objects."""
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        """Create a dict to remember potentially destroyed objects."""
 | 
						|
        BaseTest.setUp(self)
 | 
						|
        self._survivors = {}
 | 
						|
 | 
						|
    def _watch_for_survival(self, *args):
 | 
						|
        """Watch the given objects for survival, by creating weakrefs to
 | 
						|
        them."""
 | 
						|
        for obj in args:
 | 
						|
            key = id(obj), repr(obj)
 | 
						|
            self._survivors[key] = weakref.ref(obj)
 | 
						|
 | 
						|
    def _assert_survival(self):
 | 
						|
        """Assert that all objects watched for survival have survived."""
 | 
						|
        # Trigger cycle breaking.
 | 
						|
        gc.collect()
 | 
						|
        dead = []
 | 
						|
        for (id_, repr_), ref in list(self._survivors.items()):
 | 
						|
            if ref() is None:
 | 
						|
                dead.append(repr_)
 | 
						|
        if dead:
 | 
						|
            self.fail("%d objects should have survived "
 | 
						|
                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
 | 
						|
 | 
						|
    def test_persistent_loggers(self):
 | 
						|
        # Logger objects are persistent and retain their configuration, even
 | 
						|
        #  if visible references are destroyed.
 | 
						|
        self.root_logger.setLevel(logging.INFO)
 | 
						|
        foo = logging.getLogger("foo")
 | 
						|
        self._watch_for_survival(foo)
 | 
						|
        foo.setLevel(logging.DEBUG)
 | 
						|
        self.root_logger.debug(self.next_message())
 | 
						|
        foo.debug(self.next_message())
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('foo', 'DEBUG', '2'),
 | 
						|
        ])
 | 
						|
        del foo
 | 
						|
        # foo has survived.
 | 
						|
        self._assert_survival()
 | 
						|
        # foo has retained its settings.
 | 
						|
        bar = logging.getLogger("foo")
 | 
						|
        bar.debug(self.next_message())
 | 
						|
        self.assert_log_lines([
 | 
						|
            ('foo', 'DEBUG', '2'),
 | 
						|
            ('foo', 'DEBUG', '3'),
 | 
						|
        ])
 | 
						|
 | 
						|
 | 
						|
# Set the locale to the platform-dependent default.  I have no idea
 | 
						|
# why the test does this, but in any case we save the current locale
 | 
						|
# first and restore it at the end.
 | 
						|
@run_with_locale('LC_ALL', '')
 | 
						|
def test_main():
 | 
						|
    run_unittest(BuiltinLevelsTest, BasicFilterTest,
 | 
						|
                    CustomLevelsAndFiltersTest, MemoryHandlerTest,
 | 
						|
                    ConfigFileTest, SocketHandlerTest, MemoryTest)
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_main()
 |