mirror of
https://github.com/python/cpython.git
synced 2025-07-29 06:05:00 +00:00

terminology in the alpha 1 documentation. - "context manager" reverts to its alpha 1 definition - the term "context specifier" goes away entirely - contextlib.GeneratorContextManager is renamed GeneratorContext There are still a number of changes relative to alpha 1: - the expression in the with statement is explicitly called the "context expression" in the language reference - the terms 'with statement context', 'context object' or 'with statement context' are used in several places instead of a bare 'context'. The aim of this is to avoid ambiguity in relation to the runtime context set up when the block is executed, and the context objects that already exist in various application domains (such as decimal.Context) - contextlib.contextmanager is renamed to contextfactory This best reflects the nature of the function resulting from the use of that decorator - decimal.ContextManager is renamed to WithStatementContext Simple dropping the 'Manager' part wasn't possible due to the fact that decimal.Context already exists and means something different. WithStatementContext is ugly but workable. A technically unrelated change snuck into this commit: contextlib.closing now avoids the overhead of creating a generator, since it's trivial to implement that particular context manager directly.
368 lines
10 KiB
Python
368 lines
10 KiB
Python
"""Unit tests for contextlib.py, and other context managers."""
|
|
|
|
from __future__ import with_statement
|
|
|
|
import sys
|
|
import os
|
|
import decimal
|
|
import tempfile
|
|
import unittest
|
|
import threading
|
|
from contextlib import * # Tests __all__
|
|
from test.test_support import run_suite
|
|
|
|
class ContextManagerTestCase(unittest.TestCase):
|
|
|
|
def test_contextfactory_plain(self):
|
|
state = []
|
|
@contextfactory
|
|
def woohoo():
|
|
state.append(1)
|
|
yield 42
|
|
state.append(999)
|
|
with woohoo() as x:
|
|
self.assertEqual(state, [1])
|
|
self.assertEqual(x, 42)
|
|
state.append(x)
|
|
self.assertEqual(state, [1, 42, 999])
|
|
|
|
def test_contextfactory_finally(self):
|
|
state = []
|
|
@contextfactory
|
|
def woohoo():
|
|
state.append(1)
|
|
try:
|
|
yield 42
|
|
finally:
|
|
state.append(999)
|
|
try:
|
|
with woohoo() as x:
|
|
self.assertEqual(state, [1])
|
|
self.assertEqual(x, 42)
|
|
state.append(x)
|
|
raise ZeroDivisionError()
|
|
except ZeroDivisionError:
|
|
pass
|
|
else:
|
|
self.fail("Expected ZeroDivisionError")
|
|
self.assertEqual(state, [1, 42, 999])
|
|
|
|
def test_contextfactory_no_reraise(self):
|
|
@contextfactory
|
|
def whee():
|
|
yield
|
|
ctx = whee().__context__()
|
|
ctx.__enter__()
|
|
# Calling __exit__ should not result in an exception
|
|
self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None))
|
|
|
|
def test_contextfactory_trap_yield_after_throw(self):
|
|
@contextfactory
|
|
def whoo():
|
|
try:
|
|
yield
|
|
except:
|
|
yield
|
|
ctx = whoo().__context__()
|
|
ctx.__enter__()
|
|
self.assertRaises(
|
|
RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
|
|
)
|
|
|
|
def test_contextfactory_except(self):
|
|
state = []
|
|
@contextfactory
|
|
def woohoo():
|
|
state.append(1)
|
|
try:
|
|
yield 42
|
|
except ZeroDivisionError, e:
|
|
state.append(e.args[0])
|
|
self.assertEqual(state, [1, 42, 999])
|
|
with woohoo() as x:
|
|
self.assertEqual(state, [1])
|
|
self.assertEqual(x, 42)
|
|
state.append(x)
|
|
raise ZeroDivisionError(999)
|
|
self.assertEqual(state, [1, 42, 999])
|
|
|
|
def test_contextfactory_attribs(self):
|
|
def attribs(**kw):
|
|
def decorate(func):
|
|
for k,v in kw.items():
|
|
setattr(func,k,v)
|
|
return func
|
|
return decorate
|
|
@contextfactory
|
|
@attribs(foo='bar')
|
|
def baz(spam):
|
|
"""Whee!"""
|
|
self.assertEqual(baz.__name__,'baz')
|
|
self.assertEqual(baz.foo, 'bar')
|
|
self.assertEqual(baz.__doc__, "Whee!")
|
|
|
|
class NestedTestCase(unittest.TestCase):
|
|
|
|
# XXX This needs more work
|
|
|
|
def test_nested(self):
|
|
@contextfactory
|
|
def a():
|
|
yield 1
|
|
@contextfactory
|
|
def b():
|
|
yield 2
|
|
@contextfactory
|
|
def c():
|
|
yield 3
|
|
with nested(a(), b(), c()) as (x, y, z):
|
|
self.assertEqual(x, 1)
|
|
self.assertEqual(y, 2)
|
|
self.assertEqual(z, 3)
|
|
|
|
def test_nested_cleanup(self):
|
|
state = []
|
|
@contextfactory
|
|
def a():
|
|
state.append(1)
|
|
try:
|
|
yield 2
|
|
finally:
|
|
state.append(3)
|
|
@contextfactory
|
|
def b():
|
|
state.append(4)
|
|
try:
|
|
yield 5
|
|
finally:
|
|
state.append(6)
|
|
try:
|
|
with nested(a(), b()) as (x, y):
|
|
state.append(x)
|
|
state.append(y)
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.assertEqual(state, [1, 4, 2, 5, 6, 3])
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
|
|
def test_nested_right_exception(self):
|
|
state = []
|
|
@contextfactory
|
|
def a():
|
|
yield 1
|
|
class b(object):
|
|
def __context__(self):
|
|
return self
|
|
def __enter__(self):
|
|
return 2
|
|
def __exit__(self, *exc_info):
|
|
try:
|
|
raise Exception()
|
|
except:
|
|
pass
|
|
try:
|
|
with nested(a(), b()) as (x, y):
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.assertEqual((x, y), (1, 2))
|
|
except Exception:
|
|
self.fail("Reraised wrong exception")
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
|
|
def test_nested_b_swallows(self):
|
|
@contextfactory
|
|
def a():
|
|
yield
|
|
@contextfactory
|
|
def b():
|
|
try:
|
|
yield
|
|
except:
|
|
# Swallow the exception
|
|
pass
|
|
try:
|
|
with nested(a(), b()):
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.fail("Didn't swallow ZeroDivisionError")
|
|
|
|
def test_nested_break(self):
|
|
@contextfactory
|
|
def a():
|
|
yield
|
|
state = 0
|
|
while True:
|
|
state += 1
|
|
with nested(a(), a()):
|
|
break
|
|
state += 10
|
|
self.assertEqual(state, 1)
|
|
|
|
def test_nested_continue(self):
|
|
@contextfactory
|
|
def a():
|
|
yield
|
|
state = 0
|
|
while state < 3:
|
|
state += 1
|
|
with nested(a(), a()):
|
|
continue
|
|
state += 10
|
|
self.assertEqual(state, 3)
|
|
|
|
def test_nested_return(self):
|
|
@contextfactory
|
|
def a():
|
|
try:
|
|
yield
|
|
except:
|
|
pass
|
|
def foo():
|
|
with nested(a(), a()):
|
|
return 1
|
|
return 10
|
|
self.assertEqual(foo(), 1)
|
|
|
|
class ClosingTestCase(unittest.TestCase):
|
|
|
|
# XXX This needs more work
|
|
|
|
def test_closing(self):
|
|
state = []
|
|
class C:
|
|
def close(self):
|
|
state.append(1)
|
|
x = C()
|
|
self.assertEqual(state, [])
|
|
with closing(x) as y:
|
|
self.assertEqual(x, y)
|
|
self.assertEqual(state, [1])
|
|
|
|
def test_closing_error(self):
|
|
state = []
|
|
class C:
|
|
def close(self):
|
|
state.append(1)
|
|
x = C()
|
|
self.assertEqual(state, [])
|
|
try:
|
|
with closing(x) as y:
|
|
self.assertEqual(x, y)
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.assertEqual(state, [1])
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
|
|
class FileContextTestCase(unittest.TestCase):
|
|
|
|
def testWithOpen(self):
|
|
tfn = tempfile.mktemp()
|
|
try:
|
|
f = None
|
|
with open(tfn, "w") as f:
|
|
self.failIf(f.closed)
|
|
f.write("Booh\n")
|
|
self.failUnless(f.closed)
|
|
f = None
|
|
try:
|
|
with open(tfn, "r") as f:
|
|
self.failIf(f.closed)
|
|
self.assertEqual(f.read(), "Booh\n")
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.failUnless(f.closed)
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
finally:
|
|
try:
|
|
os.remove(tfn)
|
|
except os.error:
|
|
pass
|
|
|
|
class LockContextTestCase(unittest.TestCase):
|
|
|
|
def boilerPlate(self, lock, locked):
|
|
self.failIf(locked())
|
|
with lock:
|
|
self.failUnless(locked())
|
|
self.failIf(locked())
|
|
try:
|
|
with lock:
|
|
self.failUnless(locked())
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.failIf(locked())
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
|
|
def testWithLock(self):
|
|
lock = threading.Lock()
|
|
self.boilerPlate(lock, lock.locked)
|
|
|
|
def testWithRLock(self):
|
|
lock = threading.RLock()
|
|
self.boilerPlate(lock, lock._is_owned)
|
|
|
|
def testWithCondition(self):
|
|
lock = threading.Condition()
|
|
def locked():
|
|
return lock._is_owned()
|
|
self.boilerPlate(lock, locked)
|
|
|
|
def testWithSemaphore(self):
|
|
lock = threading.Semaphore()
|
|
def locked():
|
|
if lock.acquire(False):
|
|
lock.release()
|
|
return False
|
|
else:
|
|
return True
|
|
self.boilerPlate(lock, locked)
|
|
|
|
def testWithBoundedSemaphore(self):
|
|
lock = threading.BoundedSemaphore()
|
|
def locked():
|
|
if lock.acquire(False):
|
|
lock.release()
|
|
return False
|
|
else:
|
|
return True
|
|
self.boilerPlate(lock, locked)
|
|
|
|
class DecimalContextTestCase(unittest.TestCase):
|
|
|
|
# XXX Somebody should write more thorough tests for this
|
|
|
|
def testBasic(self):
|
|
ctx = decimal.getcontext()
|
|
orig_context = ctx.copy()
|
|
try:
|
|
ctx.prec = save_prec = decimal.ExtendedContext.prec + 5
|
|
with decimal.ExtendedContext:
|
|
self.assertEqual(decimal.getcontext().prec,
|
|
decimal.ExtendedContext.prec)
|
|
self.assertEqual(decimal.getcontext().prec, save_prec)
|
|
try:
|
|
with decimal.ExtendedContext:
|
|
self.assertEqual(decimal.getcontext().prec,
|
|
decimal.ExtendedContext.prec)
|
|
1/0
|
|
except ZeroDivisionError:
|
|
self.assertEqual(decimal.getcontext().prec, save_prec)
|
|
else:
|
|
self.fail("Didn't raise ZeroDivisionError")
|
|
finally:
|
|
decimal.setcontext(orig_context)
|
|
|
|
|
|
# This is needed to make the test actually run under regrtest.py!
|
|
def test_main():
|
|
run_suite(
|
|
unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|