mirror of
https://github.com/python/cpython.git
synced 2025-09-09 18:32:22 +00:00
bpo-31356: Add context manager to temporarily disable GC (GH-4224)
This commit is contained in:
parent
0cd6bca655
commit
72a0d218dc
5 changed files with 208 additions and 1 deletions
|
@ -1,7 +1,7 @@
|
|||
import unittest
|
||||
from test.support import (verbose, refcount_test, run_unittest,
|
||||
strip_python_stderr, cpython_only, start_threads,
|
||||
temp_dir, requires_type_collecting)
|
||||
temp_dir, requires_type_collecting,reap_threads)
|
||||
from test.support.script_helper import assert_python_ok, make_script
|
||||
|
||||
import sys
|
||||
|
@ -9,6 +9,8 @@ import time
|
|||
import gc
|
||||
import weakref
|
||||
import threading
|
||||
import warnings
|
||||
|
||||
|
||||
try:
|
||||
from _testcapi import with_tp_del
|
||||
|
@ -1007,6 +1009,73 @@ class GCTogglingTests(unittest.TestCase):
|
|||
# empty __dict__.
|
||||
self.assertEqual(x, None)
|
||||
|
||||
def test_ensure_disabled(self):
|
||||
original_status = gc.isenabled()
|
||||
|
||||
with gc.ensure_disabled():
|
||||
inside_status = gc.isenabled()
|
||||
|
||||
after_status = gc.isenabled()
|
||||
self.assertEqual(original_status, True)
|
||||
self.assertEqual(inside_status, False)
|
||||
self.assertEqual(after_status, True)
|
||||
|
||||
def test_ensure_disabled_with_gc_disabled(self):
|
||||
gc.disable()
|
||||
|
||||
original_status = gc.isenabled()
|
||||
|
||||
with gc.ensure_disabled():
|
||||
inside_status = gc.isenabled()
|
||||
|
||||
after_status = gc.isenabled()
|
||||
self.assertEqual(original_status, False)
|
||||
self.assertEqual(inside_status, False)
|
||||
self.assertEqual(after_status, False)
|
||||
|
||||
@reap_threads
|
||||
def test_ensure_disabled_thread(self):
|
||||
|
||||
thread_original_status = None
|
||||
thread_inside_status = None
|
||||
thread_after_status = None
|
||||
|
||||
def disabling_thread():
|
||||
nonlocal thread_original_status
|
||||
nonlocal thread_inside_status
|
||||
nonlocal thread_after_status
|
||||
thread_original_status = gc.isenabled()
|
||||
|
||||
with gc.ensure_disabled():
|
||||
time.sleep(0.01)
|
||||
thread_inside_status = gc.isenabled()
|
||||
|
||||
thread_after_status = gc.isenabled()
|
||||
|
||||
original_status = gc.isenabled()
|
||||
|
||||
with warnings.catch_warnings(record=True) as w, gc.ensure_disabled():
|
||||
inside_status_before_thread = gc.isenabled()
|
||||
thread = threading.Thread(target=disabling_thread)
|
||||
thread.start()
|
||||
inside_status_after_thread = gc.isenabled()
|
||||
|
||||
after_status = gc.isenabled()
|
||||
thread.join()
|
||||
|
||||
self.assertEqual(len(w), 1)
|
||||
self.assertTrue(issubclass(w[-1].category, RuntimeWarning))
|
||||
self.assertEqual("Garbage collector enabled while another thread is "
|
||||
"inside gc.ensure_enabled", str(w[-1].message))
|
||||
self.assertEqual(original_status, True)
|
||||
self.assertEqual(inside_status_before_thread, False)
|
||||
self.assertEqual(thread_original_status, False)
|
||||
self.assertEqual(thread_inside_status, True)
|
||||
self.assertEqual(thread_after_status, False)
|
||||
self.assertEqual(inside_status_after_thread, False)
|
||||
self.assertEqual(after_status, True)
|
||||
|
||||
|
||||
def test_main():
|
||||
enabled = gc.isenabled()
|
||||
gc.disable()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue