Improved test_logging coverage.

This commit is contained in:
Vinay Sajip 2011-04-28 12:04:58 +01:00
parent e812bf7bf6
commit 7fe1d51924
3 changed files with 65 additions and 7 deletions

View file

@ -519,8 +519,17 @@ class HandlerTest(BaseTest):
os.unlink(fn)
h = logging.handlers.WatchedFileHandler(fn, delay=True)
if existing:
self.assertNotEqual(h.dev, -1)
self.assertNotEqual(h.ino, -1)
dev, ino = h.dev, h.ino
self.assertNotEqual(dev, -1)
self.assertNotEqual(ino, -1)
r = logging.makeLogRecord({'msg': 'Test'})
h.handle(r)
# Now remove the file.
os.unlink(fn)
self.assertFalse(os.path.exists(fn))
# The next call should recreate the file.
h.handle(r)
self.assertTrue(os.path.exists(fn))
else:
self.assertEqual(h.dev, -1)
self.assertEqual(h.ino, -1)
@ -1045,8 +1054,9 @@ class SocketHandlerTest(BaseTest):
def tearDown(self):
"""Shutdown the TCP server."""
try:
self.tcpserver.abort = True
del self.tcpserver
if hasattr(self, 'tcpserver'):
self.tcpserver.abort = True
del self.tcpserver
self.root_logger.removeHandler(self.sock_hdlr)
self.sock_hdlr.close()
for thread in self.threads:
@ -1068,6 +1078,22 @@ class SocketHandlerTest(BaseTest):
logger.debug("eggs")
self.assertEqual(self.get_output(), "spam\neggs\n")
def test_noserver(self):
# Kill the server
self.tcpserver.abort = True
del self.tcpserver
for thread in self.threads:
thread.join(2.0)
#The logging call should try to connect, which should fail
try:
raise RuntimeError('Deliberate mistake')
except RuntimeError:
self.root_logger.exception('Never sent')
self.root_logger.error('Never sent, either')
now = time.time()
self.assertTrue(self.sock_hdlr.retryTime > now)
time.sleep(self.sock_hdlr.retryTime - now + 0.001)
self.root_logger.error('Nor this')
class MemoryTest(BaseTest):
@ -2613,6 +2639,38 @@ class LogRecordTest(BaseTest):
r.removeHandler(h)
h.close()
def test_multiprocessing(self):
r = logging.makeLogRecord({})
self.assertEqual(r.processName, 'MainProcess')
import multiprocessing as mp
r = logging.makeLogRecord({})
self.assertEqual(r.processName, mp.current_process().name)
def test_optional(self):
r = logging.makeLogRecord({})
NOT_NONE = self.assertIsNotNone
NOT_NONE(r.thread)
NOT_NONE(r.threadName)
NOT_NONE(r.process)
NOT_NONE(r.processName)
log_threads = logging.logThreads
log_processes = logging.logProcesses
log_multiprocessing = logging.logMultiprocessing
try:
logging.logThreads = False
logging.logProcesses = False
logging.logMultiprocessing = False
r = logging.makeLogRecord({})
NONE = self.assertIsNone
NONE(r.thread)
NONE(r.threadName)
NONE(r.process)
NONE(r.processName)
finally:
logging.logThreads = log_threads
logging.logProcesses = log_processes
logging.logMultiprocessing = log_multiprocessing
class BasicConfigTest(unittest.TestCase):
"""Test suite for logging.basicConfig."""