mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00
Issue #15528: Add weakref.finalize to support finalization using
weakref callbacks. This is 2e446e87ac5b except that collections/__init__.py has been modified to import proxy from _weakref instead of weakref. This eliminates an import cycle which seems to cause a problem on Unix but not Windows.
This commit is contained in:
parent
8408cea0cd
commit
7a3dae056d
5 changed files with 501 additions and 8 deletions
|
@ -7,11 +7,15 @@ import operator
|
|||
import contextlib
|
||||
import copy
|
||||
|
||||
from test import support
|
||||
from test import support, script_helper
|
||||
|
||||
# Used in ReferencesTestCase.test_ref_created_during_del() .
|
||||
ref_from_del = None
|
||||
|
||||
# Used by FinalizeTestCase as a global that may be replaced by None
|
||||
# when the interpreter shuts down.
|
||||
_global_var = 'foobar'
|
||||
|
||||
class C:
|
||||
def method(self):
|
||||
pass
|
||||
|
@ -1551,6 +1555,151 @@ class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
|
|||
def _reference(self):
|
||||
return self.__ref.copy()
|
||||
|
||||
|
||||
class FinalizeTestCase(unittest.TestCase):
|
||||
|
||||
class A:
|
||||
pass
|
||||
|
||||
def _collect_if_necessary(self):
|
||||
# we create no ref-cycles so in CPython no gc should be needed
|
||||
if sys.implementation.name != 'cpython':
|
||||
support.gc_collect()
|
||||
|
||||
def test_finalize(self):
|
||||
def add(x,y,z):
|
||||
res.append(x + y + z)
|
||||
return x + y + z
|
||||
|
||||
a = self.A()
|
||||
|
||||
res = []
|
||||
f = weakref.finalize(a, add, 67, 43, z=89)
|
||||
self.assertEqual(f.alive, True)
|
||||
self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
|
||||
self.assertEqual(f(), 199)
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f.peek(), None)
|
||||
self.assertEqual(f.detach(), None)
|
||||
self.assertEqual(f.alive, False)
|
||||
self.assertEqual(res, [199])
|
||||
|
||||
res = []
|
||||
f = weakref.finalize(a, add, 67, 43, 89)
|
||||
self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
|
||||
self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f.peek(), None)
|
||||
self.assertEqual(f.detach(), None)
|
||||
self.assertEqual(f.alive, False)
|
||||
self.assertEqual(res, [])
|
||||
|
||||
res = []
|
||||
f = weakref.finalize(a, add, x=67, y=43, z=89)
|
||||
del a
|
||||
self._collect_if_necessary()
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f(), None)
|
||||
self.assertEqual(f.peek(), None)
|
||||
self.assertEqual(f.detach(), None)
|
||||
self.assertEqual(f.alive, False)
|
||||
self.assertEqual(res, [199])
|
||||
|
||||
def test_order(self):
|
||||
a = self.A()
|
||||
res = []
|
||||
|
||||
f1 = weakref.finalize(a, res.append, 'f1')
|
||||
f2 = weakref.finalize(a, res.append, 'f2')
|
||||
f3 = weakref.finalize(a, res.append, 'f3')
|
||||
f4 = weakref.finalize(a, res.append, 'f4')
|
||||
f5 = weakref.finalize(a, res.append, 'f5')
|
||||
|
||||
# make sure finalizers can keep themselves alive
|
||||
del f1, f4
|
||||
|
||||
self.assertTrue(f2.alive)
|
||||
self.assertTrue(f3.alive)
|
||||
self.assertTrue(f5.alive)
|
||||
|
||||
self.assertTrue(f5.detach())
|
||||
self.assertFalse(f5.alive)
|
||||
|
||||
f5() # nothing because previously unregistered
|
||||
res.append('A')
|
||||
f3() # => res.append('f3')
|
||||
self.assertFalse(f3.alive)
|
||||
res.append('B')
|
||||
f3() # nothing because previously called
|
||||
res.append('C')
|
||||
del a
|
||||
self._collect_if_necessary()
|
||||
# => res.append('f4')
|
||||
# => res.append('f2')
|
||||
# => res.append('f1')
|
||||
self.assertFalse(f2.alive)
|
||||
res.append('D')
|
||||
f2() # nothing because previously called by gc
|
||||
|
||||
expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
def test_all_freed(self):
|
||||
# we want a weakrefable subclass of weakref.finalize
|
||||
class MyFinalizer(weakref.finalize):
|
||||
pass
|
||||
|
||||
a = self.A()
|
||||
res = []
|
||||
def callback():
|
||||
res.append(123)
|
||||
f = MyFinalizer(a, callback)
|
||||
|
||||
wr_callback = weakref.ref(callback)
|
||||
wr_f = weakref.ref(f)
|
||||
del callback, f
|
||||
|
||||
self.assertIsNotNone(wr_callback())
|
||||
self.assertIsNotNone(wr_f())
|
||||
|
||||
del a
|
||||
self._collect_if_necessary()
|
||||
|
||||
self.assertIsNone(wr_callback())
|
||||
self.assertIsNone(wr_f())
|
||||
self.assertEqual(res, [123])
|
||||
|
||||
@classmethod
|
||||
def run_in_child(cls):
|
||||
def error():
|
||||
# Create an atexit finalizer from inside a finalizer called
|
||||
# at exit. This should be the next to be run.
|
||||
g1 = weakref.finalize(cls, print, 'g1')
|
||||
print('f3 error')
|
||||
1/0
|
||||
|
||||
# cls should stay alive till atexit callbacks run
|
||||
f1 = weakref.finalize(cls, print, 'f1', _global_var)
|
||||
f2 = weakref.finalize(cls, print, 'f2', _global_var)
|
||||
f3 = weakref.finalize(cls, error)
|
||||
f4 = weakref.finalize(cls, print, 'f4', _global_var)
|
||||
|
||||
assert f1.atexit == True
|
||||
f2.atexit = False
|
||||
assert f3.atexit == True
|
||||
assert f4.atexit == True
|
||||
|
||||
def test_atexit(self):
|
||||
prog = ('from test.test_weakref import FinalizeTestCase;'+
|
||||
'FinalizeTestCase.run_in_child()')
|
||||
rc, out, err = script_helper.assert_python_ok('-c', prog)
|
||||
out = out.decode('ascii').splitlines()
|
||||
self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
|
||||
self.assertTrue(b'ZeroDivisionError' in err)
|
||||
|
||||
|
||||
libreftest = """ Doctest for examples in the library reference: weakref.rst
|
||||
|
||||
>>> import weakref
|
||||
|
@ -1644,6 +1793,7 @@ def test_main():
|
|||
WeakValueDictionaryTestCase,
|
||||
WeakKeyDictionaryTestCase,
|
||||
SubclassableWeakrefTestCase,
|
||||
FinalizeTestCase,
|
||||
)
|
||||
support.run_doctest(sys.modules[__name__])
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue