mirror of
https://github.com/python/cpython.git
synced 2025-07-25 12:14:38 +00:00

svn+ssh://pythondev@svn.python.org/python/trunk ........ r70768 | andrew.kuchling | 2009-03-30 17:29:15 -0500 (Mon, 30 Mar 2009) | 1 line Typo fixes ........ r71657 | vinay.sajip | 2009-04-16 14:07:37 -0500 (Thu, 16 Apr 2009) | 1 line Issue #5768: Change to Unicode output logic and test case for same. ........ r71721 | benjamin.peterson | 2009-04-18 14:26:19 -0500 (Sat, 18 Apr 2009) | 1 line fix a few nits in unittest.py #5771 ........ r71729 | benjamin.peterson | 2009-04-18 16:03:10 -0500 (Sat, 18 Apr 2009) | 1 line move test to a more appropiate one ........ r71794 | vinay.sajip | 2009-04-22 07:10:47 -0500 (Wed, 22 Apr 2009) | 2 lines Issue #5170: Fixed regression caused when fixing #5768. ........ r71976 | mark.dickinson | 2009-04-26 14:54:55 -0500 (Sun, 26 Apr 2009) | 2 lines Fix typo in function name ........ r72036 | georg.brandl | 2009-04-27 12:04:23 -0500 (Mon, 27 Apr 2009) | 1 line #5848: small unittest doc patch. ........ r72037 | georg.brandl | 2009-04-27 12:09:53 -0500 (Mon, 27 Apr 2009) | 1 line #5840: dont claim we dont support TLS. ........ r72079 | r.david.murray | 2009-04-28 14:02:55 -0500 (Tue, 28 Apr 2009) | 2 lines Remove spurious 'u'. ........ r72085 | georg.brandl | 2009-04-28 16:48:35 -0500 (Tue, 28 Apr 2009) | 1 line Make the doctests in the docs pass, except for those in the turtle module. ........ r72131 | benjamin.peterson | 2009-04-29 17:43:35 -0500 (Wed, 29 Apr 2009) | 1 line fix test_shutil on ZFS #5676 ........ r72132 | georg.brandl | 2009-04-29 17:44:07 -0500 (Wed, 29 Apr 2009) | 1 line #5878: fix repr of re object. ........ r72133 | benjamin.peterson | 2009-04-29 17:44:15 -0500 (Wed, 29 Apr 2009) | 1 line make sure mode is removable while cleaning up test droppings ........ r72134 | benjamin.peterson | 2009-04-29 19:06:33 -0500 (Wed, 29 Apr 2009) | 1 line make sure to close file ........ r72191 | michael.foord | 2009-05-02 06:43:06 -0500 (Sat, 02 May 2009) | 9 lines Adds an exit parameter to unittest.main(). If False main no longer calls sys.exit. Closes issue 3379. Michael Foord ........ r72197 | benjamin.peterson | 2009-05-02 11:24:37 -0500 (Sat, 02 May 2009) | 1 line don't let sys.argv be used in the tests ........ r72198 | andrew.kuchling | 2009-05-02 12:12:15 -0500 (Sat, 02 May 2009) | 1 line Add items ........ r72219 | michael.foord | 2009-05-02 15:15:05 -0500 (Sat, 02 May 2009) | 8 lines Add addCleanup and doCleanups to unittest.TestCase. Closes issue 5679. Michael Foord ........ r72221 | benjamin.peterson | 2009-05-02 15:26:53 -0500 (Sat, 02 May 2009) | 1 line add myself ........ r72225 | michael.foord | 2009-05-02 17:43:34 -0500 (Sat, 02 May 2009) | 1 line ........ r72303 | benjamin.peterson | 2009-05-04 19:55:24 -0500 (Mon, 04 May 2009) | 1 line using sys._getframe(x), where x > 0 doesnt' work on IronPython ........ r72434 | r.david.murray | 2009-05-07 13:09:58 -0500 (Thu, 07 May 2009) | 2 lines Pre-opened test file needs to be opened in binary mode. ........ r72467 | georg.brandl | 2009-05-08 07:17:34 -0500 (Fri, 08 May 2009) | 1 line Fix name. ........ r72476 | thomas.heller | 2009-05-08 15:09:40 -0500 (Fri, 08 May 2009) | 4 lines Add a file that contains diffs between offical libffi files and the files in this repository. Should make it easier to merge new libffi versions. ........
952 lines
30 KiB
Python
952 lines
30 KiB
Python
#!/usr/bin/env python
|
|
#
|
|
# Copyright 2001-2009 by Vinay Sajip. All Rights Reserved.
|
|
#
|
|
# Permission to use, copy, modify, and distribute this software and its
|
|
# documentation for any purpose and without fee is hereby granted,
|
|
# provided that the above copyright notice appear in all copies and that
|
|
# both that copyright notice and this permission notice appear in
|
|
# supporting documentation, and that the name of Vinay Sajip
|
|
# not be used in advertising or publicity pertaining to distribution
|
|
# of the software without specific, written prior permission.
|
|
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
|
|
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
|
|
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
|
|
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
|
|
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
|
|
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
|
|
|
"""Test harness for the logging module. Run all tests.
|
|
|
|
Copyright (C) 2001-2009 Vinay Sajip. All Rights Reserved.
|
|
"""
|
|
|
|
import logging
|
|
import logging.handlers
|
|
import logging.config
|
|
|
|
import codecs
|
|
import copy
|
|
import pickle
|
|
import io
|
|
import gc
|
|
import os
|
|
import re
|
|
import select
|
|
import socket
|
|
from socketserver import ThreadingTCPServer, StreamRequestHandler
|
|
import string
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
from test.support import captured_stdout, run_with_locale, run_unittest
|
|
import textwrap
|
|
import threading
|
|
import time
|
|
import types
|
|
import unittest
|
|
import warnings
|
|
import weakref
|
|
|
|
|
|
class BaseTest(unittest.TestCase):
|
|
|
|
"""Base class for logging tests."""
|
|
|
|
log_format = "%(name)s -> %(levelname)s: %(message)s"
|
|
expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
|
|
message_num = 0
|
|
|
|
def setUp(self):
|
|
"""Setup the default logging stream to an internal StringIO instance,
|
|
so that we can examine log output as we want."""
|
|
logger_dict = logging.getLogger().manager.loggerDict
|
|
logging._acquireLock()
|
|
try:
|
|
self.saved_handlers = logging._handlers.copy()
|
|
self.saved_handler_list = logging._handlerList[:]
|
|
self.saved_loggers = logger_dict.copy()
|
|
self.saved_level_names = logging._levelNames.copy()
|
|
finally:
|
|
logging._releaseLock()
|
|
|
|
self.root_logger = logging.getLogger("")
|
|
self.original_logging_level = self.root_logger.getEffectiveLevel()
|
|
|
|
self.stream = io.StringIO()
|
|
self.root_logger.setLevel(logging.DEBUG)
|
|
self.root_hdlr = logging.StreamHandler(self.stream)
|
|
self.root_formatter = logging.Formatter(self.log_format)
|
|
self.root_hdlr.setFormatter(self.root_formatter)
|
|
self.root_logger.addHandler(self.root_hdlr)
|
|
|
|
def tearDown(self):
|
|
"""Remove our logging stream, and restore the original logging
|
|
level."""
|
|
self.stream.close()
|
|
self.root_logger.removeHandler(self.root_hdlr)
|
|
self.root_logger.setLevel(self.original_logging_level)
|
|
logging._acquireLock()
|
|
try:
|
|
logging._levelNames.clear()
|
|
logging._levelNames.update(self.saved_level_names)
|
|
logging._handlers.clear()
|
|
logging._handlers.update(self.saved_handlers)
|
|
logging._handlerList[:] = self.saved_handler_list
|
|
loggerDict = logging.getLogger().manager.loggerDict
|
|
loggerDict.clear()
|
|
loggerDict.update(self.saved_loggers)
|
|
finally:
|
|
logging._releaseLock()
|
|
|
|
def assert_log_lines(self, expected_values, stream=None):
|
|
"""Match the collected log lines against the regular expression
|
|
self.expected_log_pat, and compare the extracted group values to
|
|
the expected_values list of tuples."""
|
|
stream = stream or self.stream
|
|
pat = re.compile(self.expected_log_pat)
|
|
try:
|
|
stream.reset()
|
|
actual_lines = stream.readlines()
|
|
except AttributeError:
|
|
# StringIO.StringIO lacks a reset() method.
|
|
actual_lines = stream.getvalue().splitlines()
|
|
self.assertEquals(len(actual_lines), len(expected_values))
|
|
for actual, expected in zip(actual_lines, expected_values):
|
|
match = pat.search(actual)
|
|
if not match:
|
|
self.fail("Log line does not match expected pattern:\n" +
|
|
actual)
|
|
self.assertEquals(tuple(match.groups()), expected)
|
|
s = stream.read()
|
|
if s:
|
|
self.fail("Remaining output at end of log stream:\n" + s)
|
|
|
|
def next_message(self):
|
|
"""Generate a message consisting solely of an auto-incrementing
|
|
integer."""
|
|
self.message_num += 1
|
|
return "%d" % self.message_num
|
|
|
|
|
|
class BuiltinLevelsTest(BaseTest):
|
|
"""Test builtin levels and their inheritance."""
|
|
|
|
def test_flat(self):
|
|
#Logging levels in a flat logger namespace.
|
|
m = self.next_message
|
|
|
|
ERR = logging.getLogger("ERR")
|
|
ERR.setLevel(logging.ERROR)
|
|
INF = logging.getLogger("INF")
|
|
INF.setLevel(logging.INFO)
|
|
DEB = logging.getLogger("DEB")
|
|
DEB.setLevel(logging.DEBUG)
|
|
|
|
# These should log.
|
|
ERR.log(logging.CRITICAL, m())
|
|
ERR.error(m())
|
|
|
|
INF.log(logging.CRITICAL, m())
|
|
INF.error(m())
|
|
INF.warn(m())
|
|
INF.info(m())
|
|
|
|
DEB.log(logging.CRITICAL, m())
|
|
DEB.error(m())
|
|
DEB.warn (m())
|
|
DEB.info (m())
|
|
DEB.debug(m())
|
|
|
|
# These should not log.
|
|
ERR.warn(m())
|
|
ERR.info(m())
|
|
ERR.debug(m())
|
|
|
|
INF.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('ERR', 'CRITICAL', '1'),
|
|
('ERR', 'ERROR', '2'),
|
|
('INF', 'CRITICAL', '3'),
|
|
('INF', 'ERROR', '4'),
|
|
('INF', 'WARNING', '5'),
|
|
('INF', 'INFO', '6'),
|
|
('DEB', 'CRITICAL', '7'),
|
|
('DEB', 'ERROR', '8'),
|
|
('DEB', 'WARNING', '9'),
|
|
('DEB', 'INFO', '10'),
|
|
('DEB', 'DEBUG', '11'),
|
|
])
|
|
|
|
def test_nested_explicit(self):
|
|
# Logging levels in a nested namespace, all explicitly set.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
INF.setLevel(logging.INFO)
|
|
INF_ERR = logging.getLogger("INF.ERR")
|
|
INF_ERR.setLevel(logging.ERROR)
|
|
|
|
# These should log.
|
|
INF_ERR.log(logging.CRITICAL, m())
|
|
INF_ERR.error(m())
|
|
|
|
# These should not log.
|
|
INF_ERR.warn(m())
|
|
INF_ERR.info(m())
|
|
INF_ERR.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.ERR', 'CRITICAL', '1'),
|
|
('INF.ERR', 'ERROR', '2'),
|
|
])
|
|
|
|
def test_nested_inherited(self):
|
|
#Logging levels in a nested namespace, inherited from parent loggers.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
INF.setLevel(logging.INFO)
|
|
INF_ERR = logging.getLogger("INF.ERR")
|
|
INF_ERR.setLevel(logging.ERROR)
|
|
INF_UNDEF = logging.getLogger("INF.UNDEF")
|
|
INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
|
|
UNDEF = logging.getLogger("UNDEF")
|
|
|
|
# These should log.
|
|
INF_UNDEF.log(logging.CRITICAL, m())
|
|
INF_UNDEF.error(m())
|
|
INF_UNDEF.warn(m())
|
|
INF_UNDEF.info(m())
|
|
INF_ERR_UNDEF.log(logging.CRITICAL, m())
|
|
INF_ERR_UNDEF.error(m())
|
|
|
|
# These should not log.
|
|
INF_UNDEF.debug(m())
|
|
INF_ERR_UNDEF.warn(m())
|
|
INF_ERR_UNDEF.info(m())
|
|
INF_ERR_UNDEF.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.UNDEF', 'CRITICAL', '1'),
|
|
('INF.UNDEF', 'ERROR', '2'),
|
|
('INF.UNDEF', 'WARNING', '3'),
|
|
('INF.UNDEF', 'INFO', '4'),
|
|
('INF.ERR.UNDEF', 'CRITICAL', '5'),
|
|
('INF.ERR.UNDEF', 'ERROR', '6'),
|
|
])
|
|
|
|
def test_nested_with_virtual_parent(self):
|
|
# Logging levels when some parent does not exist yet.
|
|
m = self.next_message
|
|
|
|
INF = logging.getLogger("INF")
|
|
GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
|
|
CHILD = logging.getLogger("INF.BADPARENT")
|
|
INF.setLevel(logging.INFO)
|
|
|
|
# These should log.
|
|
GRANDCHILD.log(logging.FATAL, m())
|
|
GRANDCHILD.info(m())
|
|
CHILD.log(logging.FATAL, m())
|
|
CHILD.info(m())
|
|
|
|
# These should not log.
|
|
GRANDCHILD.debug(m())
|
|
CHILD.debug(m())
|
|
|
|
self.assert_log_lines([
|
|
('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
|
|
('INF.BADPARENT.UNDEF', 'INFO', '2'),
|
|
('INF.BADPARENT', 'CRITICAL', '3'),
|
|
('INF.BADPARENT', 'INFO', '4'),
|
|
])
|
|
|
|
|
|
class BasicFilterTest(BaseTest):
|
|
|
|
"""Test the bundled Filter class."""
|
|
|
|
def test_filter(self):
|
|
# Only messages satisfying the specified criteria pass through the
|
|
# filter.
|
|
filter_ = logging.Filter("spam.eggs")
|
|
handler = self.root_logger.handlers[0]
|
|
try:
|
|
handler.addFilter(filter_)
|
|
spam = logging.getLogger("spam")
|
|
spam_eggs = logging.getLogger("spam.eggs")
|
|
spam_eggs_fish = logging.getLogger("spam.eggs.fish")
|
|
spam_bakedbeans = logging.getLogger("spam.bakedbeans")
|
|
|
|
spam.info(self.next_message())
|
|
spam_eggs.info(self.next_message()) # Good.
|
|
spam_eggs_fish.info(self.next_message()) # Good.
|
|
spam_bakedbeans.info(self.next_message())
|
|
|
|
self.assert_log_lines([
|
|
('spam.eggs', 'INFO', '2'),
|
|
('spam.eggs.fish', 'INFO', '3'),
|
|
])
|
|
finally:
|
|
handler.removeFilter(filter_)
|
|
|
|
|
|
#
|
|
# First, we define our levels. There can be as many as you want - the only
|
|
# limitations are that they should be integers, the lowest should be > 0 and
|
|
# larger values mean less information being logged. If you need specific
|
|
# level values which do not fit into these limitations, you can use a
|
|
# mapping dictionary to convert between your application levels and the
|
|
# logging system.
|
|
#
|
|
SILENT = 120
|
|
TACITURN = 119
|
|
TERSE = 118
|
|
EFFUSIVE = 117
|
|
SOCIABLE = 116
|
|
VERBOSE = 115
|
|
TALKATIVE = 114
|
|
GARRULOUS = 113
|
|
CHATTERBOX = 112
|
|
BORING = 111
|
|
|
|
LEVEL_RANGE = range(BORING, SILENT + 1)
|
|
|
|
#
|
|
# Next, we define names for our levels. You don't need to do this - in which
|
|
# case the system will use "Level n" to denote the text for the level.
|
|
#
|
|
my_logging_levels = {
|
|
SILENT : 'Silent',
|
|
TACITURN : 'Taciturn',
|
|
TERSE : 'Terse',
|
|
EFFUSIVE : 'Effusive',
|
|
SOCIABLE : 'Sociable',
|
|
VERBOSE : 'Verbose',
|
|
TALKATIVE : 'Talkative',
|
|
GARRULOUS : 'Garrulous',
|
|
CHATTERBOX : 'Chatterbox',
|
|
BORING : 'Boring',
|
|
}
|
|
|
|
class GarrulousFilter(logging.Filter):
|
|
|
|
"""A filter which blocks garrulous messages."""
|
|
|
|
def filter(self, record):
|
|
return record.levelno != GARRULOUS
|
|
|
|
class VerySpecificFilter(logging.Filter):
|
|
|
|
"""A filter which blocks sociable and taciturn messages."""
|
|
|
|
def filter(self, record):
|
|
return record.levelno not in [SOCIABLE, TACITURN]
|
|
|
|
|
|
class CustomLevelsAndFiltersTest(BaseTest):
|
|
|
|
"""Test various filtering possibilities with custom logging levels."""
|
|
|
|
# Skip the logger name group.
|
|
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
for k, v in list(my_logging_levels.items()):
|
|
logging.addLevelName(k, v)
|
|
|
|
def log_at_all_levels(self, logger):
|
|
for lvl in LEVEL_RANGE:
|
|
logger.log(lvl, self.next_message())
|
|
|
|
def test_logger_filter(self):
|
|
# Filter at logger level.
|
|
self.root_logger.setLevel(VERBOSE)
|
|
# Levels >= 'Verbose' are good.
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines([
|
|
('Verbose', '5'),
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
])
|
|
|
|
def test_handler_filter(self):
|
|
# Filter at handler level.
|
|
self.root_logger.handlers[0].setLevel(SOCIABLE)
|
|
try:
|
|
# Levels >= 'Sociable' are good.
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines([
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
])
|
|
finally:
|
|
self.root_logger.handlers[0].setLevel(logging.NOTSET)
|
|
|
|
def test_specific_filters(self):
|
|
# Set a specific filter object on the handler, and then add another
|
|
# filter object on the logger itself.
|
|
handler = self.root_logger.handlers[0]
|
|
specific_filter = None
|
|
garr = GarrulousFilter()
|
|
handler.addFilter(garr)
|
|
try:
|
|
self.log_at_all_levels(self.root_logger)
|
|
first_lines = [
|
|
# Notice how 'Garrulous' is missing
|
|
('Boring', '1'),
|
|
('Chatterbox', '2'),
|
|
('Talkative', '4'),
|
|
('Verbose', '5'),
|
|
('Sociable', '6'),
|
|
('Effusive', '7'),
|
|
('Terse', '8'),
|
|
('Taciturn', '9'),
|
|
('Silent', '10'),
|
|
]
|
|
self.assert_log_lines(first_lines)
|
|
|
|
specific_filter = VerySpecificFilter()
|
|
self.root_logger.addFilter(specific_filter)
|
|
self.log_at_all_levels(self.root_logger)
|
|
self.assert_log_lines(first_lines + [
|
|
# Not only 'Garrulous' is still missing, but also 'Sociable'
|
|
# and 'Taciturn'
|
|
('Boring', '11'),
|
|
('Chatterbox', '12'),
|
|
('Talkative', '14'),
|
|
('Verbose', '15'),
|
|
('Effusive', '17'),
|
|
('Terse', '18'),
|
|
('Silent', '20'),
|
|
])
|
|
finally:
|
|
if specific_filter:
|
|
self.root_logger.removeFilter(specific_filter)
|
|
handler.removeFilter(garr)
|
|
|
|
|
|
class MemoryHandlerTest(BaseTest):
|
|
|
|
"""Tests for the MemoryHandler."""
|
|
|
|
# Do not bother with a logger name group.
|
|
expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$"
|
|
|
|
def setUp(self):
|
|
BaseTest.setUp(self)
|
|
self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
|
|
self.root_hdlr)
|
|
self.mem_logger = logging.getLogger('mem')
|
|
self.mem_logger.propagate = 0
|
|
self.mem_logger.addHandler(self.mem_hdlr)
|
|
|
|
def tearDown(self):
|
|
self.mem_hdlr.close()
|
|
BaseTest.tearDown(self)
|
|
|
|
def test_flush(self):
|
|
# The memory handler flushes to its target handler based on specific
|
|
# criteria (message count and message level).
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines([])
|
|
self.mem_logger.info(self.next_message())
|
|
self.assert_log_lines([])
|
|
# This will flush because the level is >= logging.WARNING
|
|
self.mem_logger.warn(self.next_message())
|
|
lines = [
|
|
('DEBUG', '1'),
|
|
('INFO', '2'),
|
|
('WARNING', '3'),
|
|
]
|
|
self.assert_log_lines(lines)
|
|
for n in (4, 14):
|
|
for i in range(9):
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines(lines)
|
|
# This will flush because it's the 10th message since the last
|
|
# flush.
|
|
self.mem_logger.debug(self.next_message())
|
|
lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
|
|
self.assert_log_lines(lines)
|
|
|
|
self.mem_logger.debug(self.next_message())
|
|
self.assert_log_lines(lines)
|
|
|
|
|
|
class ExceptionFormatter(logging.Formatter):
|
|
"""A special exception formatter."""
|
|
def formatException(self, ei):
|
|
return "Got a [%s]" % ei[0].__name__
|
|
|
|
|
|
class ConfigFileTest(BaseTest):
|
|
|
|
"""Reading logging config from a .ini-style config file."""
|
|
|
|
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
|
|
|
|
# config0 is a standard configuration.
|
|
config0 = """
|
|
[loggers]
|
|
keys=root
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=hand1
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config1 adds a little to the standard configuration.
|
|
config1 = """
|
|
[loggers]
|
|
keys=root,parser
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=hand1
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config2 has a subtle configuration error that should be reported
|
|
config2 = config1.replace("sys.stdout", "sys.stbout")
|
|
|
|
# config3 has a less subtle configuration error
|
|
config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
|
|
|
|
# config4 specifies a custom formatter class to be loaded
|
|
config4 = """
|
|
[loggers]
|
|
keys=root
|
|
|
|
[handlers]
|
|
keys=hand1
|
|
|
|
[formatters]
|
|
keys=form1
|
|
|
|
[logger_root]
|
|
level=NOTSET
|
|
handlers=hand1
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[formatter_form1]
|
|
class=""" + __name__ + """.ExceptionFormatter
|
|
format=%(levelname)s:%(name)s:%(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
# config5 specifies a custom handler class to be loaded
|
|
config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
|
|
|
|
# config6 uses ', ' delimiters in the handlers and formatters sections
|
|
config6 = """
|
|
[loggers]
|
|
keys=root,parser
|
|
|
|
[handlers]
|
|
keys=hand1, hand2
|
|
|
|
[formatters]
|
|
keys=form1, form2
|
|
|
|
[logger_root]
|
|
level=WARNING
|
|
handlers=
|
|
|
|
[logger_parser]
|
|
level=DEBUG
|
|
handlers=hand1
|
|
propagate=1
|
|
qualname=compiler.parser
|
|
|
|
[handler_hand1]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stdout,)
|
|
|
|
[handler_hand2]
|
|
class=StreamHandler
|
|
level=NOTSET
|
|
formatter=form1
|
|
args=(sys.stderr,)
|
|
|
|
[formatter_form1]
|
|
format=%(levelname)s ++ %(message)s
|
|
datefmt=
|
|
|
|
[formatter_form2]
|
|
format=%(message)s
|
|
datefmt=
|
|
"""
|
|
|
|
def apply_config(self, conf):
|
|
try:
|
|
fn = tempfile.mktemp(".ini")
|
|
f = open(fn, "w")
|
|
f.write(textwrap.dedent(conf))
|
|
f.close()
|
|
logging.config.fileConfig(fn)
|
|
finally:
|
|
os.remove(fn)
|
|
|
|
def test_config0_ok(self):
|
|
# A simple config file which overrides the default settings.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config0)
|
|
logger = logging.getLogger()
|
|
# Won't output anything
|
|
logger.info(self.next_message())
|
|
# Outputs a message
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config1_ok(self, config=config1):
|
|
# A config file defining a sub-parser as well.
|
|
with captured_stdout() as output:
|
|
self.apply_config(config)
|
|
logger = logging.getLogger("compiler.parser")
|
|
# Both will output a message
|
|
logger.info(self.next_message())
|
|
logger.error(self.next_message())
|
|
self.assert_log_lines([
|
|
('INFO', '1'),
|
|
('ERROR', '2'),
|
|
], stream=output)
|
|
# Original logger output is empty.
|
|
self.assert_log_lines([])
|
|
|
|
def test_config2_failure(self):
|
|
# A simple config file which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config2)
|
|
|
|
def test_config3_failure(self):
|
|
# A simple config file which overrides the default settings.
|
|
self.assertRaises(Exception, self.apply_config, self.config3)
|
|
|
|
def test_config4_ok(self):
|
|
# A config file specifying a custom formatter class.
|
|
with captured_stdout() as output:
|
|
self.apply_config(self.config4)
|
|
logger = logging.getLogger()
|
|
try:
|
|
raise RuntimeError()
|
|
except RuntimeError:
|
|
logging.exception("just testing")
|
|
sys.stdout.seek(0)
|
|
self.assertEquals(output.getvalue(),
|
|
"ERROR:root:just testing\nGot a [RuntimeError]\n")
|
|
# Original logger output is empty
|
|
self.assert_log_lines([])
|
|
|
|
def test_config5_ok(self):
|
|
self.test_config1_ok(config=self.config5)
|
|
|
|
def test_config6_ok(self):
|
|
self.test_config1_ok(config=self.config6)
|
|
|
|
class LogRecordStreamHandler(StreamRequestHandler):
|
|
|
|
"""Handler for a streaming logging request. It saves the log message in the
|
|
TCP server's 'log_output' attribute."""
|
|
|
|
TCP_LOG_END = "!!!END!!!"
|
|
|
|
def handle(self):
|
|
"""Handle multiple requests - each expected to be of 4-byte length,
|
|
followed by the LogRecord in pickle format. Logs the record
|
|
according to whatever policy is configured locally."""
|
|
while True:
|
|
chunk = self.connection.recv(4)
|
|
if len(chunk) < 4:
|
|
break
|
|
slen = struct.unpack(">L", chunk)[0]
|
|
chunk = self.connection.recv(slen)
|
|
while len(chunk) < slen:
|
|
chunk = chunk + self.connection.recv(slen - len(chunk))
|
|
obj = self.unpickle(chunk)
|
|
record = logging.makeLogRecord(obj)
|
|
self.handle_log_record(record)
|
|
|
|
def unpickle(self, data):
|
|
return pickle.loads(data)
|
|
|
|
def handle_log_record(self, record):
|
|
# If the end-of-messages sentinel is seen, tell the server to
|
|
# terminate.
|
|
if self.TCP_LOG_END in record.msg:
|
|
self.server.abort = 1
|
|
return
|
|
self.server.log_output += record.msg + "\n"
|
|
|
|
|
|
class LogRecordSocketReceiver(ThreadingTCPServer):
|
|
|
|
"""A simple-minded TCP socket-based logging receiver suitable for test
|
|
purposes."""
|
|
|
|
allow_reuse_address = 1
|
|
log_output = ""
|
|
|
|
def __init__(self, host='localhost',
|
|
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
|
|
handler=LogRecordStreamHandler):
|
|
ThreadingTCPServer.__init__(self, (host, port), handler)
|
|
self.abort = False
|
|
self.timeout = 0.1
|
|
self.finished = threading.Event()
|
|
|
|
def serve_until_stopped(self):
|
|
while not self.abort:
|
|
rd, wr, ex = select.select([self.socket.fileno()], [], [],
|
|
self.timeout)
|
|
if rd:
|
|
self.handle_request()
|
|
# Notify the main thread that we're about to exit
|
|
self.finished.set()
|
|
# close the listen socket
|
|
self.server_close()
|
|
|
|
|
|
class SocketHandlerTest(BaseTest):
|
|
|
|
"""Test for SocketHandler objects."""
|
|
|
|
def setUp(self):
|
|
"""Set up a TCP server to receive log messages, and a SocketHandler
|
|
pointing to that server's address and port."""
|
|
BaseTest.setUp(self)
|
|
self.tcpserver = LogRecordSocketReceiver(port=0)
|
|
self.port = self.tcpserver.socket.getsockname()[1]
|
|
self.threads = [
|
|
threading.Thread(target=self.tcpserver.serve_until_stopped)]
|
|
for thread in self.threads:
|
|
thread.start()
|
|
|
|
self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port)
|
|
self.sock_hdlr.setFormatter(self.root_formatter)
|
|
self.root_logger.removeHandler(self.root_logger.handlers[0])
|
|
self.root_logger.addHandler(self.sock_hdlr)
|
|
|
|
def tearDown(self):
|
|
"""Shutdown the TCP server."""
|
|
try:
|
|
self.tcpserver.abort = True
|
|
del self.tcpserver
|
|
self.root_logger.removeHandler(self.sock_hdlr)
|
|
self.sock_hdlr.close()
|
|
for thread in self.threads:
|
|
thread.join(2.0)
|
|
finally:
|
|
BaseTest.tearDown(self)
|
|
|
|
def get_output(self):
|
|
"""Get the log output as received by the TCP server."""
|
|
# Signal the TCP receiver and wait for it to terminate.
|
|
self.root_logger.critical(LogRecordStreamHandler.TCP_LOG_END)
|
|
self.tcpserver.finished.wait(2.0)
|
|
return self.tcpserver.log_output
|
|
|
|
def test_output(self):
|
|
# The log message sent to the SocketHandler is properly received.
|
|
logger = logging.getLogger("tcp")
|
|
logger.error("spam")
|
|
logger.debug("eggs")
|
|
self.assertEquals(self.get_output(), "spam\neggs\n")
|
|
|
|
|
|
class MemoryTest(BaseTest):
|
|
|
|
"""Test memory persistence of logger objects."""
|
|
|
|
def setUp(self):
|
|
"""Create a dict to remember potentially destroyed objects."""
|
|
BaseTest.setUp(self)
|
|
self._survivors = {}
|
|
|
|
def _watch_for_survival(self, *args):
|
|
"""Watch the given objects for survival, by creating weakrefs to
|
|
them."""
|
|
for obj in args:
|
|
key = id(obj), repr(obj)
|
|
self._survivors[key] = weakref.ref(obj)
|
|
|
|
def _assert_survival(self):
|
|
"""Assert that all objects watched for survival have survived."""
|
|
# Trigger cycle breaking.
|
|
gc.collect()
|
|
dead = []
|
|
for (id_, repr_), ref in list(self._survivors.items()):
|
|
if ref() is None:
|
|
dead.append(repr_)
|
|
if dead:
|
|
self.fail("%d objects should have survived "
|
|
"but have been destroyed: %s" % (len(dead), ", ".join(dead)))
|
|
|
|
def test_persistent_loggers(self):
|
|
# Logger objects are persistent and retain their configuration, even
|
|
# if visible references are destroyed.
|
|
self.root_logger.setLevel(logging.INFO)
|
|
foo = logging.getLogger("foo")
|
|
self._watch_for_survival(foo)
|
|
foo.setLevel(logging.DEBUG)
|
|
self.root_logger.debug(self.next_message())
|
|
foo.debug(self.next_message())
|
|
self.assert_log_lines([
|
|
('foo', 'DEBUG', '2'),
|
|
])
|
|
del foo
|
|
# foo has survived.
|
|
self._assert_survival()
|
|
# foo has retained its settings.
|
|
bar = logging.getLogger("foo")
|
|
bar.debug(self.next_message())
|
|
self.assert_log_lines([
|
|
('foo', 'DEBUG', '2'),
|
|
('foo', 'DEBUG', '3'),
|
|
])
|
|
|
|
|
|
class EncodingTest(BaseTest):
|
|
def test_encoding_plain_file(self):
|
|
# In Python 2.x, a plain file object is treated as having no encoding.
|
|
log = logging.getLogger("test")
|
|
fn = tempfile.mktemp(".log")
|
|
# the non-ascii data we write to the log.
|
|
data = "foo\x80"
|
|
try:
|
|
handler = logging.FileHandler(fn, encoding="utf8")
|
|
log.addHandler(handler)
|
|
try:
|
|
# write non-ascii data to the log.
|
|
log.warning(data)
|
|
finally:
|
|
log.removeHandler(handler)
|
|
handler.close()
|
|
# check we wrote exactly those bytes, ignoring trailing \n etc
|
|
f = open(fn, encoding="utf8")
|
|
try:
|
|
self.failUnlessEqual(f.read().rstrip(), data)
|
|
finally:
|
|
f.close()
|
|
finally:
|
|
if os.path.isfile(fn):
|
|
os.remove(fn)
|
|
|
|
def test_encoding_cyrillic_unicode(self):
|
|
log = logging.getLogger("test")
|
|
#Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
|
|
message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
|
|
#Ensure it's written in a Cyrillic encoding
|
|
writer_class = codecs.getwriter('cp1251')
|
|
writer_class.encoding = 'cp1251'
|
|
stream = io.BytesIO()
|
|
writer = writer_class(stream, 'strict')
|
|
handler = logging.StreamHandler(writer)
|
|
log.addHandler(handler)
|
|
try:
|
|
log.warning(message)
|
|
finally:
|
|
log.removeHandler(handler)
|
|
handler.close()
|
|
# check we wrote exactly those bytes, ignoring trailing \n etc
|
|
s = stream.getvalue()
|
|
#Compare against what the data should be when encoded in CP-1251
|
|
self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
|
|
|
|
|
|
class WarningsTest(BaseTest):
|
|
|
|
def test_warnings(self):
|
|
with warnings.catch_warnings():
|
|
logging.captureWarnings(True)
|
|
try:
|
|
warnings.filterwarnings("always", category=UserWarning)
|
|
file = io.StringIO()
|
|
h = logging.StreamHandler(file)
|
|
logger = logging.getLogger("py.warnings")
|
|
logger.addHandler(h)
|
|
warnings.warn("I'm warning you...")
|
|
logger.removeHandler(h)
|
|
s = file.getvalue()
|
|
h.close()
|
|
self.assertTrue(s.find("UserWarning: I'm warning you...\n") > 0)
|
|
|
|
#See if an explicit file uses the original implementation
|
|
file = io.StringIO()
|
|
warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
|
|
file, "Dummy line")
|
|
s = file.getvalue()
|
|
file.close()
|
|
self.assertEqual(s,
|
|
"dummy.py:42: UserWarning: Explicit\n Dummy line\n")
|
|
finally:
|
|
logging.captureWarnings(False)
|
|
|
|
# Set the locale to the platform-dependent default. I have no idea
|
|
# why the test does this, but in any case we save the current locale
|
|
# first and restore it at the end.
|
|
@run_with_locale('LC_ALL', '')
|
|
def test_main():
|
|
run_unittest(BuiltinLevelsTest, BasicFilterTest,
|
|
CustomLevelsAndFiltersTest, MemoryHandlerTest,
|
|
ConfigFileTest, SocketHandlerTest, MemoryTest,
|
|
EncodingTest, WarningsTest)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|