mirror of
				https://github.com/python/cpython.git
				synced 2025-10-22 14:42:22 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1590 lines
		
	
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1590 lines
		
	
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import unittest.mock
 | |
| from test import support
 | |
| from test.support import (verbose, refcount_test,
 | |
|                           cpython_only, requires_subprocess,
 | |
|                           requires_gil_enabled,
 | |
|                           Py_GIL_DISABLED)
 | |
| from test.support.import_helper import import_module
 | |
| from test.support.os_helper import temp_dir, TESTFN, unlink
 | |
| from test.support.script_helper import assert_python_ok, make_script
 | |
| from test.support import threading_helper, gc_threshold
 | |
| 
 | |
| import gc
 | |
| import sys
 | |
| import sysconfig
 | |
| import textwrap
 | |
| import threading
 | |
| import time
 | |
| import weakref
 | |
| 
 | |
| try:
 | |
|     import _testcapi
 | |
|     from _testcapi import with_tp_del
 | |
|     from _testcapi import ContainerNoGC
 | |
| except ImportError:
 | |
|     _testcapi = None
 | |
|     def with_tp_del(cls):
 | |
|         class C(object):
 | |
|             def __new__(cls, *args, **kwargs):
 | |
|                 raise unittest.SkipTest('requires _testcapi.with_tp_del')
 | |
|         return C
 | |
|     ContainerNoGC = None
 | |
| 
 | |
| ### Support code
 | |
| ###############################################################################
 | |
| 
 | |
| # Bug 1055820 has several tests of longstanding bugs involving weakrefs and
 | |
| # cyclic gc.
 | |
| 
 | |
| # An instance of C1055820 has a self-loop, so becomes cyclic trash when
 | |
| # unreachable.
 | |
| class C1055820(object):
 | |
|     def __init__(self, i):
 | |
|         self.i = i
 | |
|         self.loop = self
 | |
| 
 | |
| class GC_Detector(object):
 | |
|     # Create an instance I.  Then gc hasn't happened again so long as
 | |
|     # I.gc_happened is false.
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.gc_happened = False
 | |
| 
 | |
|         def it_happened(ignored):
 | |
|             self.gc_happened = True
 | |
| 
 | |
|         # Create a piece of cyclic trash that triggers it_happened when
 | |
|         # gc collects it.
 | |
|         self.wr = weakref.ref(C1055820(666), it_happened)
 | |
| 
 | |
| @with_tp_del
 | |
| class Uncollectable(object):
 | |
|     """Create a reference cycle with multiple __del__ methods.
 | |
| 
 | |
|     An object in a reference cycle will never have zero references,
 | |
|     and so must be garbage collected.  If one or more objects in the
 | |
|     cycle have __del__ methods, the gc refuses to guess an order,
 | |
|     and leaves the cycle uncollected."""
 | |
|     def __init__(self, partner=None):
 | |
|         if partner is None:
 | |
|             self.partner = Uncollectable(partner=self)
 | |
|         else:
 | |
|             self.partner = partner
 | |
|     def __tp_del__(self):
 | |
|         pass
 | |
| 
 | |
| if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
 | |
|     BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
 | |
| else:
 | |
|     # Usually, sys.gettotalrefcount() is only present if Python has been
 | |
|     # compiled in debug mode. If it's missing, expect that Python has
 | |
|     # been released in release mode: with NDEBUG defined.
 | |
|     BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
 | |
| 
 | |
| ### Tests
 | |
| ###############################################################################
 | |
| 
 | |
| class GCTests(unittest.TestCase):
 | |
|     def test_list(self):
 | |
|         l = []
 | |
|         l.append(l)
 | |
|         gc.collect()
 | |
|         del l
 | |
|         self.assertEqual(gc.collect(), 1)
 | |
| 
 | |
|     def test_dict(self):
 | |
|         d = {}
 | |
|         d[1] = d
 | |
|         gc.collect()
 | |
|         del d
 | |
|         self.assertEqual(gc.collect(), 1)
 | |
| 
 | |
|     def test_tuple(self):
 | |
|         # since tuples are immutable we close the loop with a list
 | |
|         l = []
 | |
|         t = (l,)
 | |
|         l.append(t)
 | |
|         gc.collect()
 | |
|         del t
 | |
|         del l
 | |
|         self.assertEqual(gc.collect(), 2)
 | |
| 
 | |
|     def test_class(self):
 | |
|         class A:
 | |
|             pass
 | |
|         A.a = A
 | |
|         gc.collect()
 | |
|         del A
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
| 
 | |
|     def test_newstyleclass(self):
 | |
|         class A(object):
 | |
|             pass
 | |
|         gc.collect()
 | |
|         del A
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
| 
 | |
|     def test_instance(self):
 | |
|         class A:
 | |
|             pass
 | |
|         a = A()
 | |
|         a.a = a
 | |
|         gc.collect()
 | |
|         del a
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
| 
 | |
|     def test_newinstance(self):
 | |
|         class A(object):
 | |
|             pass
 | |
|         a = A()
 | |
|         a.a = a
 | |
|         gc.collect()
 | |
|         del a
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         class B(list):
 | |
|             pass
 | |
|         class C(B, A):
 | |
|             pass
 | |
|         a = C()
 | |
|         a.a = a
 | |
|         gc.collect()
 | |
|         del a
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         del B, C
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         A.a = A()
 | |
|         del A
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         self.assertEqual(gc.collect(), 0)
 | |
| 
 | |
|     def test_method(self):
 | |
|         # Tricky: self.__init__ is a bound method, it references the instance.
 | |
|         class A:
 | |
|             def __init__(self):
 | |
|                 self.init = self.__init__
 | |
|         a = A()
 | |
|         gc.collect()
 | |
|         del a
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
| 
 | |
|     @cpython_only
 | |
|     def test_legacy_finalizer(self):
 | |
|         # A() is uncollectable if it is part of a cycle, make sure it shows up
 | |
|         # in gc.garbage.
 | |
|         @with_tp_del
 | |
|         class A:
 | |
|             def __tp_del__(self): pass
 | |
|         class B:
 | |
|             pass
 | |
|         a = A()
 | |
|         a.a = a
 | |
|         id_a = id(a)
 | |
|         b = B()
 | |
|         b.b = b
 | |
|         gc.collect()
 | |
|         del a
 | |
|         del b
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         for obj in gc.garbage:
 | |
|             if id(obj) == id_a:
 | |
|                 del obj.a
 | |
|                 break
 | |
|         else:
 | |
|             self.fail("didn't find obj in garbage (finalizer)")
 | |
|         gc.garbage.remove(obj)
 | |
| 
 | |
|     @cpython_only
 | |
|     def test_legacy_finalizer_newclass(self):
 | |
|         # A() is uncollectable if it is part of a cycle, make sure it shows up
 | |
|         # in gc.garbage.
 | |
|         @with_tp_del
 | |
|         class A(object):
 | |
|             def __tp_del__(self): pass
 | |
|         class B(object):
 | |
|             pass
 | |
|         a = A()
 | |
|         a.a = a
 | |
|         id_a = id(a)
 | |
|         b = B()
 | |
|         b.b = b
 | |
|         gc.collect()
 | |
|         del a
 | |
|         del b
 | |
|         self.assertNotEqual(gc.collect(), 0)
 | |
|         for obj in gc.garbage:
 | |
|             if id(obj) == id_a:
 | |
|                 del obj.a
 | |
|                 break
 | |
|         else:
 | |
|             self.fail("didn't find obj in garbage (finalizer)")
 | |
|         gc.garbage.remove(obj)
 | |
| 
 | |
|     def test_function(self):
 | |
|         # Tricky: f -> d -> f, code should call d.clear() after the exec to
 | |
|         # break the cycle.
 | |
|         d = {}
 | |
|         exec("def f(): pass\n", d)
 | |
|         gc.collect()
 | |
|         del d
 | |
|         # In the free-threaded build, the count returned by `gc.collect()`
 | |
|         # is 3 because it includes f's code object.
 | |
|         self.assertIn(gc.collect(), (2, 3))
 | |
| 
 | |
|     def test_function_tp_clear_leaves_consistent_state(self):
 | |
|         # https://github.com/python/cpython/issues/91636
 | |
|         code = """if 1:
 | |
| 
 | |
|         import gc
 | |
|         import weakref
 | |
| 
 | |
|         class LateFin:
 | |
|             __slots__ = ('ref',)
 | |
| 
 | |
|             def __del__(self):
 | |
| 
 | |
|                 # 8. Now `latefin`'s finalizer is called. Here we
 | |
|                 #    obtain a reference to `func`, which is currently
 | |
|                 #    undergoing `tp_clear`.
 | |
|                 global func
 | |
|                 func = self.ref()
 | |
| 
 | |
|         class Cyclic(tuple):
 | |
|             __slots__ = ()
 | |
| 
 | |
|             # 4. The finalizers of all garbage objects are called. In
 | |
|             #    this case this is only us as `func` doesn't have a
 | |
|             #    finalizer.
 | |
|             def __del__(self):
 | |
| 
 | |
|                 # 5. Create a weakref to `func` now. If we had created
 | |
|                 #    it earlier, it would have been cleared by the
 | |
|                 #    garbage collector before calling the finalizers.
 | |
|                 self[1].ref = weakref.ref(self[0])
 | |
| 
 | |
|                 # 6. Drop the global reference to `latefin`. The only
 | |
|                 #    remaining reference is the one we have.
 | |
|                 global latefin
 | |
|                 del latefin
 | |
| 
 | |
|             # 7. Now `func` is `tp_clear`-ed. This drops the last
 | |
|             #    reference to `Cyclic`, which gets `tp_dealloc`-ed.
 | |
|             #    This drops the last reference to `latefin`.
 | |
| 
 | |
|         latefin = LateFin()
 | |
|         def func():
 | |
|             pass
 | |
|         cyc = tuple.__new__(Cyclic, (func, latefin))
 | |
| 
 | |
|         # 1. Create a reference cycle of `cyc` and `func`.
 | |
|         func.__module__ = cyc
 | |
| 
 | |
|         # 2. Make the cycle unreachable, but keep the global reference
 | |
|         #    to `latefin` so that it isn't detected as garbage. This
 | |
|         #    way its finalizer will not be called immediately.
 | |
|         del func, cyc
 | |
| 
 | |
|         # 3. Invoke garbage collection,
 | |
|         #    which will find `cyc` and `func` as garbage.
 | |
|         gc.collect()
 | |
| 
 | |
|         # 9. Previously, this would crash because `func_qualname`
 | |
|         #    had been NULL-ed out by func_clear().
 | |
|         print(f"{func=}")
 | |
|         """
 | |
|         # We're mostly just checking that this doesn't crash.
 | |
|         rc, stdout, stderr = assert_python_ok("-c", code)
 | |
|         self.assertEqual(rc, 0)
 | |
|         self.assertRegex(stdout, rb"""\A\s*func=<function  at \S+>\s*\Z""")
 | |
|         self.assertFalse(stderr)
 | |
| 
 | |
|     @refcount_test
 | |
|     def test_frame(self):
 | |
|         def f():
 | |
|             frame = sys._getframe()
 | |
|         gc.collect()
 | |
|         f()
 | |
|         self.assertEqual(gc.collect(), 1)
 | |
| 
 | |
|     def test_saveall(self):
 | |
|         # Verify that cyclic garbage like lists show up in gc.garbage if the
 | |
|         # SAVEALL option is enabled.
 | |
| 
 | |
|         # First make sure we don't save away other stuff that just happens to
 | |
|         # be waiting for collection.
 | |
|         gc.collect()
 | |
|         # if this fails, someone else created immortal trash
 | |
|         self.assertEqual(gc.garbage, [])
 | |
| 
 | |
|         L = []
 | |
|         L.append(L)
 | |
|         id_L = id(L)
 | |
| 
 | |
|         debug = gc.get_debug()
 | |
|         gc.set_debug(debug | gc.DEBUG_SAVEALL)
 | |
|         del L
 | |
|         gc.collect()
 | |
|         gc.set_debug(debug)
 | |
| 
 | |
|         self.assertEqual(len(gc.garbage), 1)
 | |
|         obj = gc.garbage.pop()
 | |
|         self.assertEqual(id(obj), id_L)
 | |
| 
 | |
|     def test_del(self):
 | |
|         # __del__ methods can trigger collection, make this to happen
 | |
|         thresholds = gc.get_threshold()
 | |
|         gc.enable()
 | |
|         gc.set_threshold(1)
 | |
| 
 | |
|         class A:
 | |
|             def __del__(self):
 | |
|                 dir(self)
 | |
|         a = A()
 | |
|         del a
 | |
| 
 | |
|         gc.disable()
 | |
|         gc.set_threshold(*thresholds)
 | |
| 
 | |
|     def test_del_newclass(self):
 | |
|         # __del__ methods can trigger collection, make this to happen
 | |
|         thresholds = gc.get_threshold()
 | |
|         gc.enable()
 | |
|         gc.set_threshold(1)
 | |
| 
 | |
|         class A(object):
 | |
|             def __del__(self):
 | |
|                 dir(self)
 | |
|         a = A()
 | |
|         del a
 | |
| 
 | |
|         gc.disable()
 | |
|         gc.set_threshold(*thresholds)
 | |
| 
 | |
|     # The following two tests are fragile:
 | |
|     # They precisely count the number of allocations,
 | |
|     # which is highly implementation-dependent.
 | |
|     # For example, disposed tuples are not freed, but reused.
 | |
|     # To minimize variations, though, we first store the get_count() results
 | |
|     # and check them at the end.
 | |
|     @refcount_test
 | |
|     @requires_gil_enabled('needs precise allocation counts')
 | |
|     def test_get_count(self):
 | |
|         gc.collect()
 | |
|         a, b, c = gc.get_count()
 | |
|         x = []
 | |
|         d, e, f = gc.get_count()
 | |
|         self.assertEqual((b, c), (0, 0))
 | |
|         self.assertEqual((e, f), (0, 0))
 | |
|         # This is less fragile than asserting that a equals 0.
 | |
|         self.assertLess(a, 5)
 | |
|         # Between the two calls to get_count(), at least one object was
 | |
|         # created (the list).
 | |
|         self.assertGreater(d, a)
 | |
| 
 | |
|     @refcount_test
 | |
|     def test_collect_generations(self):
 | |
|         gc.collect()
 | |
|         # This object will "trickle" into generation N + 1 after
 | |
|         # each call to collect(N)
 | |
|         x = []
 | |
|         gc.collect(0)
 | |
|         # x is now in the old gen
 | |
|         a, b, c = gc.get_count()
 | |
|         # We don't check a since its exact values depends on
 | |
|         # internal implementation details of the interpreter.
 | |
|         self.assertEqual((b, c), (1, 0))
 | |
| 
 | |
|     def test_trashcan(self):
 | |
|         class Ouch:
 | |
|             n = 0
 | |
|             def __del__(self):
 | |
|                 Ouch.n = Ouch.n + 1
 | |
|                 if Ouch.n % 17 == 0:
 | |
|                     gc.collect()
 | |
| 
 | |
|         # "trashcan" is a hack to prevent stack overflow when deallocating
 | |
|         # very deeply nested tuples etc.  It works in part by abusing the
 | |
|         # type pointer and refcount fields, and that can yield horrible
 | |
|         # problems when gc tries to traverse the structures.
 | |
|         # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
 | |
|         # most likely die via segfault.
 | |
| 
 | |
|         # Note:  In 2.3 the possibility for compiling without cyclic gc was
 | |
|         # removed, and that in turn allows the trashcan mechanism to work
 | |
|         # via much simpler means (e.g., it never abuses the type pointer or
 | |
|         # refcount fields anymore).  Since it's much less likely to cause a
 | |
|         # problem now, the various constants in this expensive (we force a lot
 | |
|         # of full collections) test are cut back from the 2.2 version.
 | |
|         gc.enable()
 | |
|         N = 150
 | |
|         for count in range(2):
 | |
|             t = []
 | |
|             for i in range(N):
 | |
|                 t = [t, Ouch()]
 | |
|             u = []
 | |
|             for i in range(N):
 | |
|                 u = [u, Ouch()]
 | |
|             v = {}
 | |
|             for i in range(N):
 | |
|                 v = {1: v, 2: Ouch()}
 | |
|         gc.disable()
 | |
| 
 | |
|     @threading_helper.requires_working_threading()
 | |
|     def test_trashcan_threads(self):
 | |
|         # Issue #13992: trashcan mechanism should be thread-safe
 | |
|         NESTING = 60
 | |
|         N_THREADS = 2
 | |
| 
 | |
|         def sleeper_gen():
 | |
|             """A generator that releases the GIL when closed or dealloc'ed."""
 | |
|             try:
 | |
|                 yield
 | |
|             finally:
 | |
|                 time.sleep(0.000001)
 | |
| 
 | |
|         class C(list):
 | |
|             # Appending to a list is atomic, which avoids the use of a lock.
 | |
|             inits = []
 | |
|             dels = []
 | |
|             def __init__(self, alist):
 | |
|                 self[:] = alist
 | |
|                 C.inits.append(None)
 | |
|             def __del__(self):
 | |
|                 # This __del__ is called by subtype_dealloc().
 | |
|                 C.dels.append(None)
 | |
|                 # `g` will release the GIL when garbage-collected.  This
 | |
|                 # helps assert subtype_dealloc's behaviour when threads
 | |
|                 # switch in the middle of it.
 | |
|                 g = sleeper_gen()
 | |
|                 next(g)
 | |
|                 # Now that __del__ is finished, subtype_dealloc will proceed
 | |
|                 # to call list_dealloc, which also uses the trashcan mechanism.
 | |
| 
 | |
|         def make_nested():
 | |
|             """Create a sufficiently nested container object so that the
 | |
|             trashcan mechanism is invoked when deallocating it."""
 | |
|             x = C([])
 | |
|             for i in range(NESTING):
 | |
|                 x = [C([x])]
 | |
|             del x
 | |
| 
 | |
|         def run_thread():
 | |
|             """Exercise make_nested() in a loop."""
 | |
|             while not exit:
 | |
|                 make_nested()
 | |
| 
 | |
|         old_switchinterval = sys.getswitchinterval()
 | |
|         support.setswitchinterval(1e-5)
 | |
|         try:
 | |
|             exit = []
 | |
|             threads = []
 | |
|             for i in range(N_THREADS):
 | |
|                 t = threading.Thread(target=run_thread)
 | |
|                 threads.append(t)
 | |
|             with threading_helper.start_threads(threads, lambda: exit.append(1)):
 | |
|                 time.sleep(1.0)
 | |
|         finally:
 | |
|             sys.setswitchinterval(old_switchinterval)
 | |
|         gc.collect()
 | |
|         self.assertEqual(len(C.inits), len(C.dels))
 | |
| 
 | |
|     def test_boom(self):
 | |
|         class Boom:
 | |
|             def __getattr__(self, someattribute):
 | |
|                 del self.attr
 | |
|                 raise AttributeError
 | |
| 
 | |
|         a = Boom()
 | |
|         b = Boom()
 | |
|         a.attr = b
 | |
|         b.attr = a
 | |
| 
 | |
|         gc.collect()
 | |
|         garbagelen = len(gc.garbage)
 | |
|         del a, b
 | |
|         # a<->b are in a trash cycle now.  Collection will invoke
 | |
|         # Boom.__getattr__ (to see whether a and b have __del__ methods), and
 | |
|         # __getattr__ deletes the internal "attr" attributes as a side effect.
 | |
|         # That causes the trash cycle to get reclaimed via refcounts falling to
 | |
|         # 0, thus mutating the trash graph as a side effect of merely asking
 | |
|         # whether __del__ exists.  This used to (before 2.3b1) crash Python.
 | |
|         # Now __getattr__ isn't called.
 | |
|         self.assertEqual(gc.collect(), 2)
 | |
|         self.assertEqual(len(gc.garbage), garbagelen)
 | |
| 
 | |
|     def test_boom2(self):
 | |
|         class Boom2:
 | |
|             def __init__(self):
 | |
|                 self.x = 0
 | |
| 
 | |
|             def __getattr__(self, someattribute):
 | |
|                 self.x += 1
 | |
|                 if self.x > 1:
 | |
|                     del self.attr
 | |
|                 raise AttributeError
 | |
| 
 | |
|         a = Boom2()
 | |
|         b = Boom2()
 | |
|         a.attr = b
 | |
|         b.attr = a
 | |
| 
 | |
|         gc.collect()
 | |
|         garbagelen = len(gc.garbage)
 | |
|         del a, b
 | |
|         # Much like test_boom(), except that __getattr__ doesn't break the
 | |
|         # cycle until the second time gc checks for __del__.  As of 2.3b1,
 | |
|         # there isn't a second time, so this simply cleans up the trash cycle.
 | |
|         # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
 | |
|         # reclaimed this way.
 | |
|         self.assertEqual(gc.collect(), 2)
 | |
|         self.assertEqual(len(gc.garbage), garbagelen)
 | |
| 
 | |
|     def test_get_referents(self):
 | |
|         alist = [1, 3, 5]
 | |
|         got = gc.get_referents(alist)
 | |
|         got.sort()
 | |
|         self.assertEqual(got, alist)
 | |
| 
 | |
|         atuple = tuple(alist)
 | |
|         got = gc.get_referents(atuple)
 | |
|         got.sort()
 | |
|         self.assertEqual(got, alist)
 | |
| 
 | |
|         adict = {1: 3, 5: 7}
 | |
|         expected = [1, 3, 5, 7]
 | |
|         got = gc.get_referents(adict)
 | |
|         got.sort()
 | |
|         self.assertEqual(got, expected)
 | |
| 
 | |
|         got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
 | |
|         got.sort()
 | |
|         self.assertEqual(got, [0, 0] + list(range(5)))
 | |
| 
 | |
|         self.assertEqual(gc.get_referents(1, 'a', 4j), [])
 | |
| 
 | |
|     def test_is_tracked(self):
 | |
|         # Atomic built-in types are not tracked, user-defined objects and
 | |
|         # mutable containers are.
 | |
|         # NOTE: types with special optimizations (e.g. tuple) have tests
 | |
|         # in their own test files instead.
 | |
|         self.assertFalse(gc.is_tracked(None))
 | |
|         self.assertFalse(gc.is_tracked(1))
 | |
|         self.assertFalse(gc.is_tracked(1.0))
 | |
|         self.assertFalse(gc.is_tracked(1.0 + 5.0j))
 | |
|         self.assertFalse(gc.is_tracked(True))
 | |
|         self.assertFalse(gc.is_tracked(False))
 | |
|         self.assertFalse(gc.is_tracked(b"a"))
 | |
|         self.assertFalse(gc.is_tracked("a"))
 | |
|         self.assertFalse(gc.is_tracked(bytearray(b"a")))
 | |
|         self.assertFalse(gc.is_tracked(type))
 | |
|         self.assertFalse(gc.is_tracked(int))
 | |
|         self.assertFalse(gc.is_tracked(object))
 | |
|         self.assertFalse(gc.is_tracked(object()))
 | |
| 
 | |
|         class UserClass:
 | |
|             pass
 | |
| 
 | |
|         class UserInt(int):
 | |
|             pass
 | |
| 
 | |
|         # Base class is object; no extra fields.
 | |
|         class UserClassSlots:
 | |
|             __slots__ = ()
 | |
| 
 | |
|         # Base class is fixed size larger than object; no extra fields.
 | |
|         class UserFloatSlots(float):
 | |
|             __slots__ = ()
 | |
| 
 | |
|         # Base class is variable size; no extra fields.
 | |
|         class UserIntSlots(int):
 | |
|             __slots__ = ()
 | |
| 
 | |
|         self.assertTrue(gc.is_tracked(gc))
 | |
|         self.assertTrue(gc.is_tracked(UserClass))
 | |
|         self.assertTrue(gc.is_tracked(UserClass()))
 | |
|         self.assertTrue(gc.is_tracked(UserInt()))
 | |
|         self.assertTrue(gc.is_tracked([]))
 | |
|         self.assertTrue(gc.is_tracked(set()))
 | |
|         self.assertTrue(gc.is_tracked(UserClassSlots()))
 | |
|         self.assertTrue(gc.is_tracked(UserFloatSlots()))
 | |
|         self.assertTrue(gc.is_tracked(UserIntSlots()))
 | |
| 
 | |
|     def test_is_finalized(self):
 | |
|         # Objects not tracked by the always gc return false
 | |
|         self.assertFalse(gc.is_finalized(3))
 | |
| 
 | |
|         storage = []
 | |
|         class Lazarus:
 | |
|             def __del__(self):
 | |
|                 storage.append(self)
 | |
| 
 | |
|         lazarus = Lazarus()
 | |
|         self.assertFalse(gc.is_finalized(lazarus))
 | |
| 
 | |
|         del lazarus
 | |
|         gc.collect()
 | |
| 
 | |
|         lazarus = storage.pop()
 | |
|         self.assertTrue(gc.is_finalized(lazarus))
 | |
| 
 | |
|     def test_bug1055820b(self):
 | |
|         # Corresponds to temp2b.py in the bug report.
 | |
| 
 | |
|         ouch = []
 | |
|         def callback(ignored):
 | |
|             ouch[:] = [wr() for wr in WRs]
 | |
| 
 | |
|         Cs = [C1055820(i) for i in range(2)]
 | |
|         WRs = [weakref.ref(c, callback) for c in Cs]
 | |
|         c = None
 | |
| 
 | |
|         gc.collect()
 | |
|         self.assertEqual(len(ouch), 0)
 | |
|         # Make the two instances trash, and collect again.  The bug was that
 | |
|         # the callback materialized a strong reference to an instance, but gc
 | |
|         # cleared the instance's dict anyway.
 | |
|         Cs = None
 | |
|         gc.collect()
 | |
|         self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
 | |
|         for x in ouch:
 | |
|             # If the callback resurrected one of these guys, the instance
 | |
|             # would be damaged, with an empty __dict__.
 | |
|             self.assertEqual(x, None)
 | |
| 
 | |
|     def test_bug21435(self):
 | |
|         # This is a poor test - its only virtue is that it happened to
 | |
|         # segfault on Tim's Windows box before the patch for 21435 was
 | |
|         # applied.  That's a nasty bug relying on specific pieces of cyclic
 | |
|         # trash appearing in exactly the right order in finalize_garbage()'s
 | |
|         # input list.
 | |
|         # But there's no reliable way to force that order from Python code,
 | |
|         # so over time chances are good this test won't really be testing much
 | |
|         # of anything anymore.  Still, if it blows up, there's _some_
 | |
|         # problem ;-)
 | |
|         gc.collect()
 | |
| 
 | |
|         class A:
 | |
|             pass
 | |
| 
 | |
|         class B:
 | |
|             def __init__(self, x):
 | |
|                 self.x = x
 | |
| 
 | |
|             def __del__(self):
 | |
|                 self.attr = None
 | |
| 
 | |
|         def do_work():
 | |
|             a = A()
 | |
|             b = B(A())
 | |
| 
 | |
|             a.attr = b
 | |
|             b.attr = a
 | |
| 
 | |
|         do_work()
 | |
|         gc.collect() # this blows up (bad C pointer) when it fails
 | |
| 
 | |
|     @cpython_only
 | |
|     @requires_subprocess()
 | |
|     @unittest.skipIf(_testcapi is None, "requires _testcapi")
 | |
|     def test_garbage_at_shutdown(self):
 | |
|         import subprocess
 | |
|         code = """if 1:
 | |
|             import gc
 | |
|             import _testcapi
 | |
|             @_testcapi.with_tp_del
 | |
|             class X:
 | |
|                 def __init__(self, name):
 | |
|                     self.name = name
 | |
|                 def __repr__(self):
 | |
|                     return "<X %%r>" %% self.name
 | |
|                 def __tp_del__(self):
 | |
|                     pass
 | |
| 
 | |
|             x = X('first')
 | |
|             x.x = x
 | |
|             x.y = X('second')
 | |
|             del x
 | |
|             gc.set_debug(%s)
 | |
|         """
 | |
|         def run_command(code):
 | |
|             p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
 | |
|                 stdout=subprocess.PIPE,
 | |
|                 stderr=subprocess.PIPE)
 | |
|             stdout, stderr = p.communicate()
 | |
|             p.stdout.close()
 | |
|             p.stderr.close()
 | |
|             self.assertEqual(p.returncode, 0)
 | |
|             self.assertEqual(stdout, b"")
 | |
|             return stderr
 | |
| 
 | |
|         stderr = run_command(code % "0")
 | |
|         self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
 | |
|                       b"shutdown; use", stderr)
 | |
|         self.assertNotIn(b"<X 'first'>", stderr)
 | |
|         # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
 | |
|         stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
 | |
|         self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
 | |
|                       b"shutdown", stderr)
 | |
|         self.assertTrue(
 | |
|             (b"[<X 'first'>, <X 'second'>]" in stderr) or
 | |
|             (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
 | |
|         # With DEBUG_SAVEALL, no additional message should get printed
 | |
|         # (because gc.garbage also contains normally reclaimable cyclic
 | |
|         # references, and its elements get printed at runtime anyway).
 | |
|         stderr = run_command(code % "gc.DEBUG_SAVEALL")
 | |
|         self.assertNotIn(b"uncollectable objects at shutdown", stderr)
 | |
| 
 | |
|     def test_gc_main_module_at_shutdown(self):
 | |
|         # Create a reference cycle through the __main__ module and check
 | |
|         # it gets collected at interpreter shutdown.
 | |
|         code = """if 1:
 | |
|             class C:
 | |
|                 def __del__(self):
 | |
|                     print('__del__ called')
 | |
|             l = [C()]
 | |
|             l.append(l)
 | |
|             """
 | |
|         rc, out, err = assert_python_ok('-c', code)
 | |
|         self.assertEqual(out.strip(), b'__del__ called')
 | |
| 
 | |
|     def test_gc_ordinary_module_at_shutdown(self):
 | |
|         # Same as above, but with a non-__main__ module.
 | |
|         with temp_dir() as script_dir:
 | |
|             module = """if 1:
 | |
|                 class C:
 | |
|                     def __del__(self):
 | |
|                         print('__del__ called')
 | |
|                 l = [C()]
 | |
|                 l.append(l)
 | |
|                 """
 | |
|             code = """if 1:
 | |
|                 import sys
 | |
|                 sys.path.insert(0, %r)
 | |
|                 import gctest
 | |
|                 """ % (script_dir,)
 | |
|             make_script(script_dir, 'gctest', module)
 | |
|             rc, out, err = assert_python_ok('-c', code)
 | |
|             self.assertEqual(out.strip(), b'__del__ called')
 | |
| 
 | |
|     def test_global_del_SystemExit(self):
 | |
|         code = """if 1:
 | |
|             class ClassWithDel:
 | |
|                 def __del__(self):
 | |
|                     print('__del__ called')
 | |
|             a = ClassWithDel()
 | |
|             a.link = a
 | |
|             raise SystemExit(0)"""
 | |
|         self.addCleanup(unlink, TESTFN)
 | |
|         with open(TESTFN, 'w', encoding="utf-8") as script:
 | |
|             script.write(code)
 | |
|         rc, out, err = assert_python_ok(TESTFN)
 | |
|         self.assertEqual(out.strip(), b'__del__ called')
 | |
| 
 | |
|     def test_get_stats(self):
 | |
|         stats = gc.get_stats()
 | |
|         self.assertEqual(len(stats), 3)
 | |
|         for st in stats:
 | |
|             self.assertIsInstance(st, dict)
 | |
|             self.assertEqual(set(st),
 | |
|                              {"collected", "collections", "uncollectable"})
 | |
|             self.assertGreaterEqual(st["collected"], 0)
 | |
|             self.assertGreaterEqual(st["collections"], 0)
 | |
|             self.assertGreaterEqual(st["uncollectable"], 0)
 | |
|         # Check that collection counts are incremented correctly
 | |
|         if gc.isenabled():
 | |
|             self.addCleanup(gc.enable)
 | |
|             gc.disable()
 | |
|         old = gc.get_stats()
 | |
|         gc.collect(0)
 | |
|         new = gc.get_stats()
 | |
|         self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
 | |
|         self.assertEqual(new[1]["collections"], old[1]["collections"])
 | |
|         self.assertEqual(new[2]["collections"], old[2]["collections"])
 | |
|         gc.collect(2)
 | |
|         new = gc.get_stats()
 | |
|         self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
 | |
|         self.assertEqual(new[1]["collections"], old[1]["collections"])
 | |
|         self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
 | |
| 
 | |
|     def test_freeze(self):
 | |
|         gc.freeze()
 | |
|         self.assertGreater(gc.get_freeze_count(), 0)
 | |
|         gc.unfreeze()
 | |
|         self.assertEqual(gc.get_freeze_count(), 0)
 | |
| 
 | |
|     def test_get_objects(self):
 | |
|         gc.collect()
 | |
|         l = []
 | |
|         l.append(l)
 | |
|         self.assertTrue(
 | |
|                 any(l is element for element in gc.get_objects())
 | |
|         )
 | |
| 
 | |
|     @requires_gil_enabled('need generational GC')
 | |
|     def test_get_objects_generations(self):
 | |
|         gc.collect()
 | |
|         l = []
 | |
|         l.append(l)
 | |
|         self.assertTrue(
 | |
|                 any(l is element for element in gc.get_objects(generation=0))
 | |
|         )
 | |
|         gc.collect()
 | |
|         self.assertFalse(
 | |
|                 any(l is element for element in gc.get_objects(generation=0))
 | |
|         )
 | |
|         del l
 | |
|         gc.collect()
 | |
| 
 | |
|     def test_get_objects_arguments(self):
 | |
|         gc.collect()
 | |
|         self.assertEqual(len(gc.get_objects()),
 | |
|                          len(gc.get_objects(generation=None)))
 | |
| 
 | |
|         self.assertRaises(ValueError, gc.get_objects, 1000)
 | |
|         self.assertRaises(ValueError, gc.get_objects, -1000)
 | |
|         self.assertRaises(TypeError, gc.get_objects, "1")
 | |
|         self.assertRaises(TypeError, gc.get_objects, 1.234)
 | |
| 
 | |
|     def test_resurrection_only_happens_once_per_object(self):
 | |
|         class A:  # simple self-loop
 | |
|             def __init__(self):
 | |
|                 self.me = self
 | |
| 
 | |
|         class Lazarus(A):
 | |
|             resurrected = 0
 | |
|             resurrected_instances = []
 | |
| 
 | |
|             def __del__(self):
 | |
|                 Lazarus.resurrected += 1
 | |
|                 Lazarus.resurrected_instances.append(self)
 | |
| 
 | |
|         gc.collect()
 | |
|         gc.disable()
 | |
| 
 | |
|         # We start with 0 resurrections
 | |
|         laz = Lazarus()
 | |
|         self.assertEqual(Lazarus.resurrected, 0)
 | |
| 
 | |
|         # Deleting the instance and triggering a collection
 | |
|         # resurrects the object
 | |
|         del laz
 | |
|         gc.collect()
 | |
|         self.assertEqual(Lazarus.resurrected, 1)
 | |
|         self.assertEqual(len(Lazarus.resurrected_instances), 1)
 | |
| 
 | |
|         # Clearing the references and forcing a collection
 | |
|         # should not resurrect the object again.
 | |
|         Lazarus.resurrected_instances.clear()
 | |
|         self.assertEqual(Lazarus.resurrected, 1)
 | |
|         gc.collect()
 | |
|         self.assertEqual(Lazarus.resurrected, 1)
 | |
| 
 | |
|         gc.enable()
 | |
| 
 | |
|     def test_resurrection_is_transitive(self):
 | |
|         class Cargo:
 | |
|             def __init__(self):
 | |
|                 self.me = self
 | |
| 
 | |
|         class Lazarus:
 | |
|             resurrected_instances = []
 | |
| 
 | |
|             def __del__(self):
 | |
|                 Lazarus.resurrected_instances.append(self)
 | |
| 
 | |
|         gc.collect()
 | |
|         gc.disable()
 | |
| 
 | |
|         laz = Lazarus()
 | |
|         cargo = Cargo()
 | |
|         cargo_id = id(cargo)
 | |
| 
 | |
|         # Create a cycle between cargo and laz
 | |
|         laz.cargo = cargo
 | |
|         cargo.laz = laz
 | |
| 
 | |
|         # Drop the references, force a collection and check that
 | |
|         # everything was resurrected.
 | |
|         del laz, cargo
 | |
|         gc.collect()
 | |
|         self.assertEqual(len(Lazarus.resurrected_instances), 1)
 | |
|         instance = Lazarus.resurrected_instances.pop()
 | |
|         self.assertTrue(hasattr(instance, "cargo"))
 | |
|         self.assertEqual(id(instance.cargo), cargo_id)
 | |
| 
 | |
|         gc.collect()
 | |
|         gc.enable()
 | |
| 
 | |
|     def test_resurrection_does_not_block_cleanup_of_other_objects(self):
 | |
| 
 | |
|         # When a finalizer resurrects objects, stats were reporting them as
 | |
|         # having been collected.  This affected both collect()'s return
 | |
|         # value and the dicts returned by get_stats().
 | |
|         N = 100
 | |
| 
 | |
|         class A:  # simple self-loop
 | |
|             def __init__(self):
 | |
|                 self.me = self
 | |
| 
 | |
|         class Z(A):  # resurrecting __del__
 | |
|             def __del__(self):
 | |
|                 zs.append(self)
 | |
| 
 | |
|         zs = []
 | |
| 
 | |
|         def getstats():
 | |
|             d = gc.get_stats()[-1]
 | |
|             return d['collected'], d['uncollectable']
 | |
| 
 | |
|         gc.collect()
 | |
|         gc.disable()
 | |
| 
 | |
|         # No problems if just collecting A() instances.
 | |
|         oldc, oldnc = getstats()
 | |
|         for i in range(N):
 | |
|             A()
 | |
|         t = gc.collect()
 | |
|         c, nc = getstats()
 | |
|         self.assertEqual(t, N) # instance objects
 | |
|         self.assertEqual(c - oldc, N)
 | |
|         self.assertEqual(nc - oldnc, 0)
 | |
| 
 | |
|         # But Z() is not actually collected.
 | |
|         oldc, oldnc = c, nc
 | |
|         Z()
 | |
|         # Nothing is collected - Z() is merely resurrected.
 | |
|         t = gc.collect()
 | |
|         c, nc = getstats()
 | |
|         self.assertEqual(t, 0)
 | |
|         self.assertEqual(c - oldc, 0)
 | |
|         self.assertEqual(nc - oldnc, 0)
 | |
| 
 | |
|         # Z() should not prevent anything else from being collected.
 | |
|         oldc, oldnc = c, nc
 | |
|         for i in range(N):
 | |
|             A()
 | |
|         Z()
 | |
|         t = gc.collect()
 | |
|         c, nc = getstats()
 | |
|         self.assertEqual(t, N)
 | |
|         self.assertEqual(c - oldc, N)
 | |
|         self.assertEqual(nc - oldnc, 0)
 | |
| 
 | |
|         # The A() trash should have been reclaimed already but the
 | |
|         # 2 copies of Z are still in zs (and the associated dicts).
 | |
|         oldc, oldnc = c, nc
 | |
|         zs.clear()
 | |
|         t = gc.collect()
 | |
|         c, nc = getstats()
 | |
|         self.assertEqual(t, 2)
 | |
|         self.assertEqual(c - oldc, 2)
 | |
|         self.assertEqual(nc - oldnc, 0)
 | |
| 
 | |
|         gc.enable()
 | |
| 
 | |
|     @unittest.skipIf(ContainerNoGC is None,
 | |
|                      'requires ContainerNoGC extension type')
 | |
|     def test_trash_weakref_clear(self):
 | |
|         # Test that trash weakrefs are properly cleared (bpo-38006).
 | |
|         #
 | |
|         # Structure we are creating:
 | |
|         #
 | |
|         #   Z <- Y <- A--+--> WZ -> C
 | |
|         #             ^  |
 | |
|         #             +--+
 | |
|         # where:
 | |
|         #   WZ is a weakref to Z with callback C
 | |
|         #   Y doesn't implement tp_traverse
 | |
|         #   A contains a reference to itself, Y and WZ
 | |
|         #
 | |
|         # A, Y, Z, WZ are all trash.  The GC doesn't know that Z is trash
 | |
|         # because Y does not implement tp_traverse.  To show the bug, WZ needs
 | |
|         # to live long enough so that Z is deallocated before it.  Then, if
 | |
|         # gcmodule is buggy, when Z is being deallocated, C will run.
 | |
|         #
 | |
|         # To ensure WZ lives long enough, we put it in a second reference
 | |
|         # cycle.  That trick only works due to the ordering of the GC prev/next
 | |
|         # linked lists.  So, this test is a bit fragile.
 | |
|         #
 | |
|         # The bug reported in bpo-38006 is caused because the GC did not
 | |
|         # clear WZ before starting the process of calling tp_clear on the
 | |
|         # trash.  Normally, handle_weakrefs() would find the weakref via Z and
 | |
|         # clear it.  However, since the GC cannot find Z, WR is not cleared and
 | |
|         # it can execute during delete_garbage().  That can lead to disaster
 | |
|         # since the callback might tinker with objects that have already had
 | |
|         # tp_clear called on them (leaving them in possibly invalid states).
 | |
| 
 | |
|         callback = unittest.mock.Mock()
 | |
| 
 | |
|         class A:
 | |
|             __slots__ = ['a', 'y', 'wz']
 | |
| 
 | |
|         class Z:
 | |
|             pass
 | |
| 
 | |
|         # setup required object graph, as described above
 | |
|         a = A()
 | |
|         a.a = a
 | |
|         a.y = ContainerNoGC(Z())
 | |
|         a.wz = weakref.ref(a.y.value, callback)
 | |
|         # create second cycle to keep WZ alive longer
 | |
|         wr_cycle = [a.wz]
 | |
|         wr_cycle.append(wr_cycle)
 | |
|         # ensure trash unrelated to this test is gone
 | |
|         gc.collect()
 | |
|         gc.disable()
 | |
|         # release references and create trash
 | |
|         del a, wr_cycle
 | |
|         gc.collect()
 | |
|         # if called, it means there is a bug in the GC.  The weakref should be
 | |
|         # cleared before Z dies.
 | |
|         callback.assert_not_called()
 | |
|         gc.enable()
 | |
| 
 | |
|     @cpython_only
 | |
|     def test_get_referents_on_capsule(self):
 | |
|         # gh-124538: Calling gc.get_referents() on an untracked capsule must not crash.
 | |
|         import _datetime
 | |
|         import _socket
 | |
|         untracked_capsule = _datetime.datetime_CAPI
 | |
|         tracked_capsule = _socket.CAPI
 | |
| 
 | |
|         # For whoever sees this in the future: if this is failing
 | |
|         # after making datetime's capsule tracked, that's fine -- this isn't something
 | |
|         # users are relying on. Just find a different capsule that is untracked.
 | |
|         self.assertFalse(gc.is_tracked(untracked_capsule))
 | |
|         self.assertTrue(gc.is_tracked(tracked_capsule))
 | |
| 
 | |
|         self.assertEqual(len(gc.get_referents(untracked_capsule)), 0)
 | |
|         gc.get_referents(tracked_capsule)
 | |
| 
 | |
|     @cpython_only
 | |
|     def test_get_objects_during_gc(self):
 | |
|         # gh-125859: Calling gc.get_objects() or gc.get_referrers() during a
 | |
|         # collection should not crash.
 | |
|         test = self
 | |
|         collected = False
 | |
| 
 | |
|         class GetObjectsOnDel:
 | |
|             def __del__(self):
 | |
|                 nonlocal collected
 | |
|                 collected = True
 | |
|                 objs = gc.get_objects()
 | |
|                 # NB: can't use "in" here because some objects override __eq__
 | |
|                 for obj in objs:
 | |
|                     test.assertTrue(obj is not self)
 | |
|                 test.assertEqual(gc.get_referrers(self), [])
 | |
| 
 | |
|         obj = GetObjectsOnDel()
 | |
|         obj.cycle = obj
 | |
|         del obj
 | |
| 
 | |
|         gc.collect()
 | |
|         self.assertTrue(collected)
 | |
| 
 | |
|     def test_traverse_frozen_objects(self):
 | |
|         # See GH-126312: Objects that were not frozen could traverse over
 | |
|         # a frozen object on the free-threaded build, which would cause
 | |
|         # a negative reference count.
 | |
|         x = [1, 2, 3]
 | |
|         gc.freeze()
 | |
|         y = [x]
 | |
|         y.append(y)
 | |
|         del y
 | |
|         gc.collect()
 | |
|         gc.unfreeze()
 | |
| 
 | |
|     def test_deferred_refcount_frozen(self):
 | |
|         # Also from GH-126312: objects that use deferred reference counting
 | |
|         # weren't ignored if they were frozen. Unfortunately, it's pretty
 | |
|         # difficult to come up with a case that triggers this.
 | |
|         #
 | |
|         # Calling gc.collect() while the garbage collector is frozen doesn't
 | |
|         # trigger this normally, but it *does* if it's inside unittest for whatever
 | |
|         # reason. We can't call unittest from inside a test, so it has to be
 | |
|         # in a subprocess.
 | |
|         source = textwrap.dedent("""
 | |
|         import gc
 | |
|         import unittest
 | |
| 
 | |
| 
 | |
|         class Test(unittest.TestCase):
 | |
|             def test_something(self):
 | |
|                 gc.freeze()
 | |
|                 gc.collect()
 | |
|                 gc.unfreeze()
 | |
| 
 | |
| 
 | |
|         if __name__ == "__main__":
 | |
|             unittest.main()
 | |
|         """)
 | |
|         assert_python_ok("-c", source)
 | |
| 
 | |
| 
 | |
| class IncrementalGCTests(unittest.TestCase):
 | |
| 
 | |
|     def setUp(self):
 | |
|         # Reenable GC as it is disabled module-wide
 | |
|         gc.enable()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         gc.disable()
 | |
| 
 | |
|     @requires_gil_enabled("Free threading does not support incremental GC")
 | |
|     # Use small increments to emulate longer running process in a shorter time
 | |
|     @gc_threshold(200, 10)
 | |
|     def test_incremental_gc_handles_fast_cycle_creation(self):
 | |
| 
 | |
|         class LinkedList:
 | |
| 
 | |
|             #Use slots to reduce number of implicit objects
 | |
|             __slots__ = "next", "prev", "surprise"
 | |
| 
 | |
|             def __init__(self, next=None, prev=None):
 | |
|                 self.next = next
 | |
|                 if next is not None:
 | |
|                     next.prev = self
 | |
|                 self.prev = prev
 | |
|                 if prev is not None:
 | |
|                     prev.next = self
 | |
| 
 | |
|         def make_ll(depth):
 | |
|             head = LinkedList()
 | |
|             for i in range(depth):
 | |
|                 head = LinkedList(head, head.prev)
 | |
|             return head
 | |
| 
 | |
|         head = make_ll(1000)
 | |
|         count = 1000
 | |
| 
 | |
|         # There will be some objects we aren't counting,
 | |
|         # e.g. the gc stats dicts. This test checks
 | |
|         # that the counts don't grow, so we try to
 | |
|         # correct for the uncounted objects
 | |
|         # This is just an estimate.
 | |
|         CORRECTION = 20
 | |
| 
 | |
|         enabled = gc.isenabled()
 | |
|         gc.enable()
 | |
|         olds = []
 | |
|         for i in range(20_000):
 | |
|             newhead = make_ll(20)
 | |
|             count += 20
 | |
|             newhead.surprise = head
 | |
|             olds.append(newhead)
 | |
|             if len(olds) == 20:
 | |
|                 stats = gc.get_stats()
 | |
|                 young = stats[0]
 | |
|                 incremental = stats[1]
 | |
|                 old = stats[2]
 | |
|                 collected = young['collected'] + incremental['collected'] + old['collected']
 | |
|                 count += CORRECTION
 | |
|                 live = count - collected
 | |
|                 self.assertLess(live, 25000)
 | |
|                 del olds[:]
 | |
|         if not enabled:
 | |
|             gc.disable()
 | |
| 
 | |
| 
 | |
| class GCCallbackTests(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         # Save gc state and disable it.
 | |
|         self.enabled = gc.isenabled()
 | |
|         gc.disable()
 | |
|         self.debug = gc.get_debug()
 | |
|         gc.set_debug(0)
 | |
|         gc.callbacks.append(self.cb1)
 | |
|         gc.callbacks.append(self.cb2)
 | |
|         self.othergarbage = []
 | |
| 
 | |
|     def tearDown(self):
 | |
|         # Restore gc state
 | |
|         del self.visit
 | |
|         gc.callbacks.remove(self.cb1)
 | |
|         gc.callbacks.remove(self.cb2)
 | |
|         gc.set_debug(self.debug)
 | |
|         if self.enabled:
 | |
|             gc.enable()
 | |
|         # destroy any uncollectables
 | |
|         gc.collect()
 | |
|         for obj in gc.garbage:
 | |
|             if isinstance(obj, Uncollectable):
 | |
|                 obj.partner = None
 | |
|         del gc.garbage[:]
 | |
|         del self.othergarbage
 | |
|         gc.collect()
 | |
| 
 | |
|     def preclean(self):
 | |
|         # Remove all fluff from the system.  Invoke this function
 | |
|         # manually rather than through self.setUp() for maximum
 | |
|         # safety.
 | |
|         self.visit = []
 | |
|         gc.collect()
 | |
|         garbage, gc.garbage[:] = gc.garbage[:], []
 | |
|         self.othergarbage.append(garbage)
 | |
|         self.visit = []
 | |
| 
 | |
|     def cb1(self, phase, info):
 | |
|         self.visit.append((1, phase, dict(info)))
 | |
| 
 | |
|     def cb2(self, phase, info):
 | |
|         self.visit.append((2, phase, dict(info)))
 | |
|         if phase == "stop" and hasattr(self, "cleanup"):
 | |
|             # Clean Uncollectable from garbage
 | |
|             uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
 | |
|             gc.garbage[:] = [e for e in gc.garbage
 | |
|                              if not isinstance(e, Uncollectable)]
 | |
|             for e in uc:
 | |
|                 e.partner = None
 | |
| 
 | |
|     def test_collect(self):
 | |
|         self.preclean()
 | |
|         gc.collect()
 | |
|         # Algorithmically verify the contents of self.visit
 | |
|         # because it is long and tortuous.
 | |
| 
 | |
|         # Count the number of visits to each callback
 | |
|         n = [v[0] for v in self.visit]
 | |
|         n1 = [i for i in n if i == 1]
 | |
|         n2 = [i for i in n if i == 2]
 | |
|         self.assertEqual(n1, [1]*2)
 | |
|         self.assertEqual(n2, [2]*2)
 | |
| 
 | |
|         # Count that we got the right number of start and stop callbacks.
 | |
|         n = [v[1] for v in self.visit]
 | |
|         n1 = [i for i in n if i == "start"]
 | |
|         n2 = [i for i in n if i == "stop"]
 | |
|         self.assertEqual(n1, ["start"]*2)
 | |
|         self.assertEqual(n2, ["stop"]*2)
 | |
| 
 | |
|         # Check that we got the right info dict for all callbacks
 | |
|         for v in self.visit:
 | |
|             info = v[2]
 | |
|             self.assertTrue("generation" in info)
 | |
|             self.assertTrue("collected" in info)
 | |
|             self.assertTrue("uncollectable" in info)
 | |
| 
 | |
|     def test_collect_generation(self):
 | |
|         self.preclean()
 | |
|         gc.collect(2)
 | |
|         for v in self.visit:
 | |
|             info = v[2]
 | |
|             self.assertEqual(info["generation"], 2)
 | |
| 
 | |
|     @cpython_only
 | |
|     def test_collect_garbage(self):
 | |
|         self.preclean()
 | |
|         # Each of these cause two objects to be garbage:
 | |
|         Uncollectable()
 | |
|         Uncollectable()
 | |
|         C1055820(666)
 | |
|         gc.collect()
 | |
|         for v in self.visit:
 | |
|             if v[1] != "stop":
 | |
|                 continue
 | |
|             info = v[2]
 | |
|             self.assertEqual(info["collected"], 1)
 | |
|             self.assertEqual(info["uncollectable"], 4)
 | |
| 
 | |
|         # We should now have the Uncollectables in gc.garbage
 | |
|         self.assertEqual(len(gc.garbage), 4)
 | |
|         for e in gc.garbage:
 | |
|             self.assertIsInstance(e, Uncollectable)
 | |
| 
 | |
|         # Now, let our callback handle the Uncollectable instances
 | |
|         self.cleanup=True
 | |
|         self.visit = []
 | |
|         gc.garbage[:] = []
 | |
|         gc.collect()
 | |
|         for v in self.visit:
 | |
|             if v[1] != "stop":
 | |
|                 continue
 | |
|             info = v[2]
 | |
|             self.assertEqual(info["collected"], 0)
 | |
|             self.assertEqual(info["uncollectable"], 2)
 | |
| 
 | |
|         # Uncollectables should be gone
 | |
|         self.assertEqual(len(gc.garbage), 0)
 | |
| 
 | |
| 
 | |
|     @requires_subprocess()
 | |
|     @unittest.skipIf(BUILD_WITH_NDEBUG,
 | |
|                      'built with -NDEBUG')
 | |
|     def test_refcount_errors(self):
 | |
|         self.preclean()
 | |
|         # Verify the "handling" of objects with broken refcounts
 | |
| 
 | |
|         # Skip the test if ctypes is not available
 | |
|         import_module("ctypes")
 | |
| 
 | |
|         import subprocess
 | |
|         code = textwrap.dedent('''
 | |
|             from test.support import gc_collect, SuppressCrashReport
 | |
| 
 | |
|             a = [1, 2, 3]
 | |
|             b = [a]
 | |
| 
 | |
|             # Avoid coredump when Py_FatalError() calls abort()
 | |
|             SuppressCrashReport().__enter__()
 | |
| 
 | |
|             # Simulate the refcount of "a" being too low (compared to the
 | |
|             # references held on it by live data), but keeping it above zero
 | |
|             # (to avoid deallocating it):
 | |
|             import ctypes
 | |
|             ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
 | |
| 
 | |
|             # The garbage collector should now have a fatal error
 | |
|             # when it reaches the broken object
 | |
|             gc_collect()
 | |
|         ''')
 | |
|         p = subprocess.Popen([sys.executable, "-c", code],
 | |
|                              stdout=subprocess.PIPE,
 | |
|                              stderr=subprocess.PIPE)
 | |
|         stdout, stderr = p.communicate()
 | |
|         p.stdout.close()
 | |
|         p.stderr.close()
 | |
|         # Verify that stderr has a useful error message:
 | |
|         self.assertRegex(stderr,
 | |
|             br'gc.*\.c:[0-9]+: .*: Assertion "gc_get_refs\(.+\) .*" failed.')
 | |
|         self.assertRegex(stderr,
 | |
|             br'refcount is too small')
 | |
|         # "address : 0x7fb5062efc18"
 | |
|         # "address : 7FB5062EFC18"
 | |
|         address_regex = br'[0-9a-fA-Fx]+'
 | |
|         self.assertRegex(stderr,
 | |
|             br'object address  : ' + address_regex)
 | |
|         self.assertRegex(stderr,
 | |
|             br'object refcount : 1')
 | |
|         self.assertRegex(stderr,
 | |
|             br'object type     : ' + address_regex)
 | |
|         self.assertRegex(stderr,
 | |
|             br'object type name: list')
 | |
|         self.assertRegex(stderr,
 | |
|             br'object repr     : \[1, 2, 3\]')
 | |
| 
 | |
| 
 | |
| class GCTogglingTests(unittest.TestCase):
 | |
|     def setUp(self):
 | |
|         gc.enable()
 | |
| 
 | |
|     def tearDown(self):
 | |
|         gc.disable()
 | |
| 
 | |
|     def test_bug1055820c(self):
 | |
|         # Corresponds to temp2c.py in the bug report.  This is pretty
 | |
|         # elaborate.
 | |
| 
 | |
|         c0 = C1055820(0)
 | |
|         # Move c0 into generation 2.
 | |
|         gc.collect()
 | |
| 
 | |
|         c1 = C1055820(1)
 | |
|         c1.keep_c0_alive = c0
 | |
|         del c0.loop # now only c1 keeps c0 alive
 | |
| 
 | |
|         c2 = C1055820(2)
 | |
|         c2wr = weakref.ref(c2) # no callback!
 | |
| 
 | |
|         ouch = []
 | |
|         def callback(ignored):
 | |
|             ouch[:] = [c2wr()]
 | |
| 
 | |
|         # The callback gets associated with a wr on an object in generation 2.
 | |
|         c0wr = weakref.ref(c0, callback)
 | |
| 
 | |
|         c0 = c1 = c2 = None
 | |
| 
 | |
|         # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
 | |
|         # generation 2.  The only thing keeping it alive is that c1 points to
 | |
|         # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
 | |
|         # global weakref to c2 (c2wr), but that weakref has no callback.
 | |
|         # There's also a global weakref to c0 (c0wr), and that does have a
 | |
|         # callback, and that callback references c2 via c2wr().
 | |
|         #
 | |
|         #               c0 has a wr with callback, which references c2wr
 | |
|         #               ^
 | |
|         #               |
 | |
|         #               |     Generation 2 above dots
 | |
|         #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
 | |
|         #               |     Generation 0 below dots
 | |
|         #               |
 | |
|         #               |
 | |
|         #            ^->c1   ^->c2 has a wr but no callback
 | |
|         #            |  |    |  |
 | |
|         #            <--v    <--v
 | |
|         #
 | |
|         # So this is the nightmare:  when generation 0 gets collected, we see
 | |
|         # that c2 has a callback-free weakref, and c1 doesn't even have a
 | |
|         # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
 | |
|         # the only object that has a weakref with a callback.  gc clears c1
 | |
|         # and c2.  Clearing c1 has the side effect of dropping the refcount on
 | |
|         # c0 to 0, so c0 goes away (despite that it's in an older generation)
 | |
|         # and c0's wr callback triggers.  That in turn materializes a reference
 | |
|         # to c2 via c2wr(), but c2 gets cleared anyway by gc.
 | |
| 
 | |
|         # We want to let gc happen "naturally", to preserve the distinction
 | |
|         # between generations.
 | |
|         junk = []
 | |
|         i = 0
 | |
|         detector = GC_Detector()
 | |
|         if Py_GIL_DISABLED:
 | |
|             # The free-threaded build doesn't have multiple generations, so
 | |
|             # just trigger a GC manually.
 | |
|             gc.collect()
 | |
|         while not detector.gc_happened:
 | |
|             i += 1
 | |
|             if i > 10000:
 | |
|                 self.fail("gc didn't happen after 10000 iterations")
 | |
|             self.assertEqual(len(ouch), 0)
 | |
|             junk.append([])  # this will eventually trigger gc
 | |
| 
 | |
|         self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
 | |
|         for x in ouch:
 | |
|             # If the callback resurrected c2, the instance would be damaged,
 | |
|             # with an empty __dict__.
 | |
|             self.assertEqual(x, None)
 | |
| 
 | |
|     @gc_threshold(1000, 0, 0)
 | |
|     def test_bug1055820d(self):
 | |
|         # Corresponds to temp2d.py in the bug report.  This is very much like
 | |
|         # test_bug1055820c, but uses a __del__ method instead of a weakref
 | |
|         # callback to sneak in a resurrection of cyclic trash.
 | |
| 
 | |
|         ouch = []
 | |
|         class D(C1055820):
 | |
|             def __del__(self):
 | |
|                 ouch[:] = [c2wr()]
 | |
| 
 | |
|         d0 = D(0)
 | |
|         # Move all the above into generation 2.
 | |
|         gc.collect()
 | |
| 
 | |
|         c1 = C1055820(1)
 | |
|         c1.keep_d0_alive = d0
 | |
|         del d0.loop # now only c1 keeps d0 alive
 | |
| 
 | |
|         c2 = C1055820(2)
 | |
|         c2wr = weakref.ref(c2) # no callback!
 | |
| 
 | |
|         d0 = c1 = c2 = None
 | |
| 
 | |
|         # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
 | |
|         # generation 2.  The only thing keeping it alive is that c1 points to
 | |
|         # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
 | |
|         # a global weakref to c2 (c2wr), but that weakref has no callback.
 | |
|         # There are no other weakrefs.
 | |
|         #
 | |
|         #               d0 has a __del__ method that references c2wr
 | |
|         #               ^
 | |
|         #               |
 | |
|         #               |     Generation 2 above dots
 | |
|         #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
 | |
|         #               |     Generation 0 below dots
 | |
|         #               |
 | |
|         #               |
 | |
|         #            ^->c1   ^->c2 has a wr but no callback
 | |
|         #            |  |    |  |
 | |
|         #            <--v    <--v
 | |
|         #
 | |
|         # So this is the nightmare:  when generation 0 gets collected, we see
 | |
|         # that c2 has a callback-free weakref, and c1 doesn't even have a
 | |
|         # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
 | |
|         # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
 | |
|         # on d0 to 0, so d0 goes away (despite that it's in an older
 | |
|         # generation) and d0's __del__ triggers.  That in turn materializes
 | |
|         # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
 | |
| 
 | |
|         # We want to let gc happen "naturally", to preserve the distinction
 | |
|         # between generations.
 | |
|         detector = GC_Detector()
 | |
|         junk = []
 | |
|         i = 0
 | |
|         if Py_GIL_DISABLED:
 | |
|             # The free-threaded build doesn't have multiple generations, so
 | |
|             # just trigger a GC manually.
 | |
|             gc.collect()
 | |
|         while not detector.gc_happened:
 | |
|             i += 1
 | |
|             if i > 10000:
 | |
|                 self.fail("gc didn't happen after 10000 iterations")
 | |
|             self.assertEqual(len(ouch), 0)
 | |
|             junk.append([])  # this will eventually trigger gc
 | |
| 
 | |
|         self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
 | |
|         for x in ouch:
 | |
|             # If __del__ resurrected c2, the instance would be damaged, with an
 | |
|             # empty __dict__.
 | |
|             self.assertEqual(x, None)
 | |
| 
 | |
|     @gc_threshold(1000, 0, 0)
 | |
|     def test_indirect_calls_with_gc_disabled(self):
 | |
|         junk = []
 | |
|         i = 0
 | |
|         detector = GC_Detector()
 | |
|         while not detector.gc_happened:
 | |
|             i += 1
 | |
|             if i > 10000:
 | |
|                 self.fail("gc didn't happen after 10000 iterations")
 | |
|             junk.append([])  # this will eventually trigger gc
 | |
| 
 | |
|         try:
 | |
|             gc.disable()
 | |
|             junk = []
 | |
|             i = 0
 | |
|             detector = GC_Detector()
 | |
|             while not detector.gc_happened:
 | |
|                 i += 1
 | |
|                 if i > 10000:
 | |
|                     break
 | |
|                 junk.append([])  # this may eventually trigger gc (if it is enabled)
 | |
| 
 | |
|             self.assertEqual(i, 10001)
 | |
|         finally:
 | |
|             gc.enable()
 | |
| 
 | |
| 
 | |
| class PythonFinalizationTests(unittest.TestCase):
 | |
|     def test_ast_fini(self):
 | |
|         # bpo-44184: Regression test for subtype_dealloc() when deallocating
 | |
|         # an AST instance also destroy its AST type: subtype_dealloc() must
 | |
|         # not access the type memory after deallocating the instance, since
 | |
|         # the type memory can be freed as well. The test is also related to
 | |
|         # _PyAST_Fini() which clears references to AST types.
 | |
|         code = textwrap.dedent("""
 | |
|             import ast
 | |
|             import codecs
 | |
|             from test import support
 | |
| 
 | |
|             # Small AST tree to keep their AST types alive
 | |
|             tree = ast.parse("def f(x, y): return 2*x-y")
 | |
| 
 | |
|             # Store the tree somewhere to survive until the last GC collection
 | |
|             support.late_deletion(tree)
 | |
|         """)
 | |
|         assert_python_ok("-c", code)
 | |
| 
 | |
| 
 | |
| def setUpModule():
 | |
|     global enabled, debug
 | |
|     enabled = gc.isenabled()
 | |
|     gc.disable()
 | |
|     assert not gc.isenabled()
 | |
|     debug = gc.get_debug()
 | |
|     gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
 | |
|     gc.collect() # Delete 2nd generation garbage
 | |
| 
 | |
| 
 | |
| def tearDownModule():
 | |
|     gc.set_debug(debug)
 | |
|     # test gc.enable() even if GC is disabled by default
 | |
|     if verbose:
 | |
|         print("restoring automatic collection")
 | |
|     # make sure to always test gc.enable()
 | |
|     gc.enable()
 | |
|     assert gc.isenabled()
 | |
|     if not enabled:
 | |
|         gc.disable()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 | 
