Issue #18112: PEP 442 implementation (safe object finalization).

This commit is contained in:
Antoine Pitrou 2013-07-30 19:59:21 +02:00
parent c5d95b17ac
commit 796564c27b
25 changed files with 1254 additions and 321 deletions

View file

@ -3736,18 +3736,8 @@ order (MRO) for bases """
# bug).
del c
# If that didn't blow up, it's also interesting to see whether clearing
# the last container slot works: that will attempt to delete c again,
# which will cause c to get appended back to the container again
# "during" the del. (On non-CPython implementations, however, __del__
# is typically not called again.)
support.gc_collect()
self.assertEqual(len(C.container), 1)
del C.container[-1]
if support.check_impl_detail():
support.gc_collect()
self.assertEqual(len(C.container), 1)
self.assertEqual(C.container[-1].attr, 42)
# Make c mortal again, so that the test framework with -l doesn't report
# it as a leak.

View file

@ -0,0 +1,513 @@
"""
Tests for object finalization semantics, as outlined in PEP 442.
"""
import contextlib
import gc
import unittest
import weakref
import _testcapi
from test import support
class NonGCSimpleBase:
"""
The base class for all the objects under test, equipped with various
testing features.
"""
survivors = []
del_calls = []
tp_del_calls = []
errors = []
_cleaning = False
__slots__ = ()
@classmethod
def _cleanup(cls):
cls.survivors.clear()
cls.errors.clear()
gc.garbage.clear()
gc.collect()
cls.del_calls.clear()
cls.tp_del_calls.clear()
@classmethod
@contextlib.contextmanager
def test(cls):
"""
A context manager to use around all finalization tests.
"""
with support.disable_gc():
cls.del_calls.clear()
cls.tp_del_calls.clear()
NonGCSimpleBase._cleaning = False
try:
yield
if cls.errors:
raise cls.errors[0]
finally:
NonGCSimpleBase._cleaning = True
cls._cleanup()
def check_sanity(self):
"""
Check the object is sane (non-broken).
"""
def __del__(self):
"""
PEP 442 finalizer. Record that this was called, check the
object is in a sane state, and invoke a side effect.
"""
try:
if not self._cleaning:
self.del_calls.append(id(self))
self.check_sanity()
self.side_effect()
except Exception as e:
self.errors.append(e)
def side_effect(self):
"""
A side effect called on destruction.
"""
class SimpleBase(NonGCSimpleBase):
def __init__(self):
self.id_ = id(self)
def check_sanity(self):
assert self.id_ == id(self)
class NonGC(NonGCSimpleBase):
__slots__ = ()
class NonGCResurrector(NonGCSimpleBase):
__slots__ = ()
def side_effect(self):
"""
Resurrect self by storing self in a class-wide list.
"""
self.survivors.append(self)
class Simple(SimpleBase):
pass
class SimpleResurrector(NonGCResurrector, SimpleBase):
pass
class TestBase:
def setUp(self):
self.old_garbage = gc.garbage[:]
gc.garbage[:] = []
def tearDown(self):
# None of the tests here should put anything in gc.garbage
try:
self.assertEqual(gc.garbage, [])
finally:
del self.old_garbage
gc.collect()
def assert_del_calls(self, ids):
self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
def assert_tp_del_calls(self, ids):
self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
def assert_survivors(self, ids):
self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
def assert_garbage(self, ids):
self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
def clear_survivors(self):
SimpleBase.survivors.clear()
class SimpleFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization without refcycles.
"""
def test_simple(self):
with SimpleBase.test():
s = Simple()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_simple_resurrect(self):
with SimpleBase.test():
s = SimpleResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
self.assertIsNot(wr(), None)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
def test_non_gc(self):
with SimpleBase.test():
s = NonGC()
self.assertFalse(gc.is_tracked(s))
ids = [id(s)]
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_non_gc_resurrect(self):
with SimpleBase.test():
s = NonGCResurrector()
self.assertFalse(gc.is_tracked(s))
ids = [id(s)]
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids * 2)
self.assert_survivors(ids)
class SelfCycleBase:
def __init__(self):
super().__init__()
self.ref = self
def check_sanity(self):
super().check_sanity()
assert self.ref is self
class SimpleSelfCycle(SelfCycleBase, Simple):
pass
class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
pass
class SuicidalSelfCycle(SelfCycleBase, Simple):
def side_effect(self):
"""
Explicitly break the reference cycle.
"""
self.ref = None
class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of an object having a single cyclic reference to
itself.
"""
def test_simple(self):
with SimpleBase.test():
s = SimpleSelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_simple_resurrect(self):
# Test that __del__ can resurrect the object being finalized.
with SimpleBase.test():
s = SelfCycleResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(ids)
# XXX is this desirable?
self.assertIs(wr(), None)
# When trying to destroy the object a second time, __del__
# isn't called anymore (and the object isn't resurrected).
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
def test_simple_suicide(self):
# Test the GC is able to deal with an object that kills its last
# reference during __del__.
with SimpleBase.test():
s = SuicidalSelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
class ChainedBase:
def chain(self, left):
self.suicided = False
self.left = left
left.right = self
def check_sanity(self):
super().check_sanity()
if self.suicided:
assert self.left is None
assert self.right is None
else:
left = self.left
if left.suicided:
assert left.right is None
else:
assert left.right is self
right = self.right
if right.suicided:
assert right.left is None
else:
assert right.left is self
class SimpleChained(ChainedBase, Simple):
pass
class ChainedResurrector(ChainedBase, SimpleResurrector):
pass
class SuicidalChained(ChainedBase, Simple):
def side_effect(self):
"""
Explicitly break the reference cycle.
"""
self.suicided = True
self.left = None
self.right = None
class CycleChainFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of a cyclic chain. These tests are similar in
spirit to the self-cycle tests above, but the collectable object
graph isn't trivial anymore.
"""
def build_chain(self, classes):
nodes = [cls() for cls in classes]
for i in range(len(nodes)):
nodes[i].chain(nodes[i-1])
return nodes
def check_non_resurrecting_chain(self, classes):
N = len(classes)
with SimpleBase.test():
nodes = self.build_chain(classes)
ids = [id(s) for s in nodes]
wrs = [weakref.ref(s) for s in nodes]
del nodes
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
self.assertEqual([wr() for wr in wrs], [None] * N)
gc.collect()
self.assert_del_calls(ids)
def check_resurrecting_chain(self, classes):
N = len(classes)
with SimpleBase.test():
nodes = self.build_chain(classes)
N = len(nodes)
ids = [id(s) for s in nodes]
survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
wrs = [weakref.ref(s) for s in nodes]
del nodes
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors(survivor_ids)
# XXX desirable?
self.assertEqual([wr() for wr in wrs], [None] * N)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_survivors([])
def test_homogenous(self):
self.check_non_resurrecting_chain([SimpleChained] * 3)
def test_homogenous_resurrect(self):
self.check_resurrecting_chain([ChainedResurrector] * 3)
def test_homogenous_suicidal(self):
self.check_non_resurrecting_chain([SuicidalChained] * 3)
def test_heterogenous_suicidal_one(self):
self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
def test_heterogenous_suicidal_two(self):
self.check_non_resurrecting_chain(
[SuicidalChained] * 2 + [SimpleChained] * 2)
def test_heterogenous_resurrect_one(self):
self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
def test_heterogenous_resurrect_two(self):
self.check_resurrecting_chain(
[ChainedResurrector, SimpleChained, SuicidalChained] * 2)
def test_heterogenous_resurrect_three(self):
self.check_resurrecting_chain(
[ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
# NOTE: the tp_del slot isn't automatically inherited, so we have to call
# with_tp_del() for each instantiated class.
class LegacyBase(SimpleBase):
def __del__(self):
try:
# Do not invoke side_effect here, since we are now exercising
# the tp_del slot.
if not self._cleaning:
self.del_calls.append(id(self))
self.check_sanity()
except Exception as e:
self.errors.append(e)
def __tp_del__(self):
"""
Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
"""
try:
if not self._cleaning:
self.tp_del_calls.append(id(self))
self.check_sanity()
self.side_effect()
except Exception as e:
self.errors.append(e)
@_testcapi.with_tp_del
class Legacy(LegacyBase):
pass
@_testcapi.with_tp_del
class LegacyResurrector(LegacyBase):
def side_effect(self):
"""
Resurrect self by storing self in a class-wide list.
"""
self.survivors.append(self)
@_testcapi.with_tp_del
class LegacySelfCycle(SelfCycleBase, LegacyBase):
pass
class LegacyFinalizationTest(TestBase, unittest.TestCase):
"""
Test finalization of objects with a tp_del.
"""
def tearDown(self):
# These tests need to clean up a bit more, since they create
# uncollectable objects.
gc.garbage.clear()
gc.collect()
super().tearDown()
def test_legacy(self):
with SimpleBase.test():
s = Legacy()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
self.assert_survivors([])
self.assertIs(wr(), None)
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
def test_legacy_resurrect(self):
with SimpleBase.test():
s = LegacyResurrector()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids)
self.assert_survivors(ids)
# weakrefs are cleared before tp_del is called.
self.assertIs(wr(), None)
self.clear_survivors()
gc.collect()
self.assert_del_calls(ids)
self.assert_tp_del_calls(ids * 2)
self.assert_survivors(ids)
self.assertIs(wr(), None)
def test_legacy_self_cycle(self):
# Self-cycles with legacy finalizers end up in gc.garbage.
with SimpleBase.test():
s = LegacySelfCycle()
ids = [id(s)]
wr = weakref.ref(s)
del s
gc.collect()
self.assert_del_calls([])
self.assert_tp_del_calls([])
self.assert_survivors([])
self.assert_garbage(ids)
self.assertIsNot(wr(), None)
# Break the cycle to allow collection
gc.garbage[0].ref = None
self.assert_garbage([])
self.assertIs(wr(), None)
def test_main():
support.run_unittest(__name__)
if __name__ == "__main__":
test_main()

View file

@ -1,3 +1,4 @@
import _testcapi
import unittest
from test.support import (verbose, refcount_test, run_unittest,
strip_python_stderr)
@ -40,6 +41,7 @@ class GC_Detector(object):
# gc collects it.
self.wr = weakref.ref(C1055820(666), it_happened)
@_testcapi.with_tp_del
class Uncollectable(object):
"""Create a reference cycle with multiple __del__ methods.
@ -52,7 +54,7 @@ class Uncollectable(object):
self.partner = Uncollectable(partner=self)
else:
self.partner = partner
def __del__(self):
def __tp_del__(self):
pass
### Tests
@ -141,11 +143,12 @@ class GCTests(unittest.TestCase):
del a
self.assertNotEqual(gc.collect(), 0)
def test_finalizer(self):
def test_legacy_finalizer(self):
# A() is uncollectable if it is part of a cycle, make sure it shows up
# in gc.garbage.
@_testcapi.with_tp_del
class A:
def __del__(self): pass
def __tp_del__(self): pass
class B:
pass
a = A()
@ -165,11 +168,12 @@ class GCTests(unittest.TestCase):
self.fail("didn't find obj in garbage (finalizer)")
gc.garbage.remove(obj)
def test_finalizer_newclass(self):
def test_legacy_finalizer_newclass(self):
# A() is uncollectable if it is part of a cycle, make sure it shows up
# in gc.garbage.
@_testcapi.with_tp_del
class A(object):
def __del__(self): pass
def __tp_del__(self): pass
class B(object):
pass
a = A()
@ -570,12 +574,14 @@ class GCTests(unittest.TestCase):
import subprocess
code = """if 1:
import gc
import _testcapi
@_testcapi.with_tp_del
class X:
def __init__(self, name):
self.name = name
def __repr__(self):
return "<X %%r>" %% self.name
def __del__(self):
def __tp_del__(self):
pass
x = X('first')

View file

@ -1,3 +1,55 @@
import gc
import sys
import unittest
import weakref
from test import support
class FinalizationTest(unittest.TestCase):
def test_frame_resurrect(self):
# A generator frame can be resurrected by a generator's finalization.
def gen():
nonlocal frame
try:
yield
finally:
frame = sys._getframe()
g = gen()
wr = weakref.ref(g)
next(g)
del g
support.gc_collect()
self.assertIs(wr(), None)
self.assertTrue(frame)
del frame
support.gc_collect()
def test_refcycle(self):
# A generator caught in a refcycle gets finalized anyway.
old_garbage = gc.garbage[:]
finalized = False
def gen():
nonlocal finalized
try:
g = yield
yield 1
finally:
finalized = True
g = gen()
next(g)
g.send(g)
self.assertGreater(sys.getrefcount(g), 2)
self.assertFalse(finalized)
del g
support.gc_collect()
self.assertTrue(finalized)
self.assertEqual(gc.garbage, old_garbage)
tutorial_tests = """
Let's try a simple generator:
@ -1880,6 +1932,7 @@ __test__ = {"tut": tutorial_tests,
# so this works as expected in both ways of running regrtest.
def test_main(verbose=None):
from test import support, test_generators
support.run_unittest(__name__)
support.run_doctest(test_generators, verbose)
# This part isn't needed for regrtest, but for running the test directly.

View file

@ -1072,12 +1072,13 @@ class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
def test_garbage_collection(self):
# C BufferedReader objects are collected.
# The Python version has __del__, so it ends into gc.garbage instead
rawio = self.FileIO(support.TESTFN, "w+b")
f = self.tp(rawio)
f.f = f
wr = weakref.ref(f)
del f
support.gc_collect()
with support.check_warnings(('', ResourceWarning)):
rawio = self.FileIO(support.TESTFN, "w+b")
f = self.tp(rawio)
f.f = f
wr = weakref.ref(f)
del f
support.gc_collect()
self.assertTrue(wr() is None, wr)
def test_args_error(self):
@ -1366,13 +1367,14 @@ class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
# C BufferedWriter objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends into gc.garbage instead
rawio = self.FileIO(support.TESTFN, "w+b")
f = self.tp(rawio)
f.write(b"123xxx")
f.x = f
wr = weakref.ref(f)
del f
support.gc_collect()
with support.check_warnings(('', ResourceWarning)):
rawio = self.FileIO(support.TESTFN, "w+b")
f = self.tp(rawio)
f.write(b"123xxx")
f.x = f
wr = weakref.ref(f)
del f
support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"123xxx")
@ -2607,14 +2609,15 @@ class CTextIOWrapperTest(TextIOWrapperTest):
# C TextIOWrapper objects are collected, and collecting them flushes
# all data to disk.
# The Python version has __del__, so it ends in gc.garbage instead.
rawio = io.FileIO(support.TESTFN, "wb")
b = self.BufferedWriter(rawio)
t = self.TextIOWrapper(b, encoding="ascii")
t.write("456def")
t.x = t
wr = weakref.ref(t)
del t
support.gc_collect()
with support.check_warnings(('', ResourceWarning)):
rawio = io.FileIO(support.TESTFN, "wb")
b = self.BufferedWriter(rawio)
t = self.TextIOWrapper(b, encoding="ascii")
t.write("456def")
t.x = t
wr = weakref.ref(t)
del t
support.gc_collect()
self.assertTrue(wr() is None, wr)
with self.open(support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"456def")

View file

@ -864,11 +864,11 @@ class SizeofTest(unittest.TestCase):
check((1,2,3), vsize('') + 3*self.P)
# type
# static type: PyTypeObject
s = vsize('P2n15Pl4Pn9Pn11PI')
s = vsize('P2n15Pl4Pn9Pn11PIP')
check(int, s)
# (PyTypeObject + PyNumberMethods + PyMappingMethods +
# PySequenceMethods + PyBufferProcs + 4P)
s = vsize('P2n15Pl4Pn9Pn11PI') + struct.calcsize('34P 3P 10P 2P 4P')
s = vsize('P2n15Pl4Pn9Pn11PIP') + struct.calcsize('34P 3P 10P 2P 4P')
# Separate block for PyDictKeysObject with 4 entries
s += struct.calcsize("2nPn") + 4*struct.calcsize("n2P")
# class