[3.13] gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-121807)

Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str).  Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object.

When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter.

Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed.  (See my comment on that PR.)
 It took me a little while to realize what was going on.  I expect that users, which much less context than I have, would experience the same pain.

My approach, here, to improving the situation is to give users three options:

1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item
2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item
3. existing behavior: silently remove each item (i.e. Queue.get() skips each one)

The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior.

The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created.  (This is the same as I did for "synconly".)

(cherry picked from commit 6b98b274b6, AKA gh-116431)

Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
This commit is contained in:
Miss Islington (bot) 2024-07-15 21:13:51 +02:00 committed by GitHub
parent ff65d1e054
commit f19ccfdae0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 513 additions and 90 deletions

View file

@ -13,13 +13,16 @@ from test.support.interpreters import queues
from .utils import _run_output, TestBase as _TestBase
REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
def get_num_queues():
return len(_queues.list_all())
class TestBase(_TestBase):
def tearDown(self):
for qid, _ in _queues.list_all():
for qid, _, _ in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
@ -40,7 +43,7 @@ class LowLevelTests(TestBase):
importlib.reload(queues)
def test_create_destroy(self):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
@ -54,7 +57,7 @@ class LowLevelTests(TestBase):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
_queues.create(2, 0)
_queues.create(2, 0, {REPLACE})
"""),
)
self.assertEqual(stdout, '')
@ -65,13 +68,13 @@ class LowLevelTests(TestBase):
def test_bind_release(self):
with self.subTest('typical'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -79,7 +82,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
@ -87,7 +90,7 @@ class LowLevelTests(TestBase):
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
qid = _queues.create(2, 0)
qid = _queues.create(2, 0, REPLACE)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
@ -427,26 +430,206 @@ class TestQueueOps(TestBase):
self.assertNotEqual(id(obj2), int(out))
def test_put_cleared_with_subinterpreter(self):
interp = interpreters.create()
queue = queues.create()
def common(queue, unbound=None, presize=0):
if not unbound:
extraargs = ''
elif unbound is queues.UNBOUND:
extraargs = ', unbound=queues.UNBOUND'
elif unbound is queues.UNBOUND_ERROR:
extraargs = ', unbound=queues.UNBOUND_ERROR'
elif unbound is queues.UNBOUND_REMOVE:
extraargs = ', unbound=queues.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
out = _run_output(
interp,
dedent(f"""
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1, syncobj=True)
queue.put(obj2, syncobj=True)
queue.put(obj1, syncobj=True{extraargs})
queue.put(obj2, syncobj=True{extraargs})
"""))
self.assertEqual(queue.qsize(), 2)
self.assertEqual(queue.qsize(), presize + 2)
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), 1)
if presize == 0:
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), presize + 1)
return interp
with self.subTest('default'): # UNBOUND
queue = queues.create()
interp = common(queue)
del interp
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND'):
queue = queues.create()
interp = common(queue, queues.UNBOUND)
del interp
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND_ERROR'):
queue = queues.create()
interp = common(queue, queues.UNBOUND_ERROR)
del interp
self.assertEqual(queue.qsize(), 1)
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.subTest('UNBOUND_REMOVE'):
queue = queues.create()
interp = common(queue, queues.UNBOUND_REMOVE)
del interp
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 1)
interp = common(queue, queues.UNBOUND_REMOVE, 1)
self.assertEqual(queue.qsize(), 3)
queue.put(42, unbound=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 4)
del interp
self.assertEqual(queue.qsize(), 2)
obj1 = queue.get()
obj2 = queue.get()
self.assertEqual(obj1, b'ham')
self.assertEqual(obj2, 42)
self.assertEqual(queue.qsize(), 0)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
def test_put_cleared_with_subinterpreter_mixed(self):
queue = queues.create()
interp = interpreters.create()
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
queue.put(1, syncobj=True, unbound=queues.UNBOUND)
queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(3, syncobj=True)
queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(5, syncobj=True, unbound=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 5)
del interp
self.assertEqual(queue.qsize(), 4)
obj1 = queue.get()
self.assertIs(obj1, queues.UNBOUND)
self.assertEqual(queue.qsize(), 3)
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 2)
obj2 = queue.get()
self.assertIs(obj2, queues.UNBOUND)
self.assertEqual(queue.qsize(), 1)
obj3 = queue.get()
self.assertIs(obj3, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
def test_put_cleared_with_subinterpreter_multiple(self):
queue = queues.create()
interp1 = interpreters.create()
interp2 = interpreters.create()
queue.put(1, syncobj=True)
_run_output(interp1, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = queue.get()
queue.put(2, syncobj=True, unbound=queues.UNBOUND)
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj2 = queue.get()
obj1 = queue.get()
"""))
self.assertEqual(queue.qsize(), 0)
queue.put(3)
_run_output(interp1, dedent("""
queue.put(4, syncobj=True, unbound=queues.UNBOUND)
# interp closed here
queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(6, syncobj=True, unbound=queues.UNBOUND)
"""))
_run_output(interp2, dedent("""
queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
# interp closed here
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(8, syncobj=True, unbound=queues.UNBOUND)
"""))
_run_output(interp1, dedent("""
queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
queue.put(10, syncobj=True, unbound=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 10)
obj3 = queue.get()
self.assertEqual(obj3, 3)
self.assertEqual(queue.qsize(), 9)
obj4 = queue.get()
self.assertEqual(obj4, 4)
self.assertEqual(queue.qsize(), 8)
del interp1
self.assertEqual(queue.qsize(), 6)
# obj5 was removed
obj6 = queue.get()
self.assertIs(obj6, queues.UNBOUND)
self.assertEqual(queue.qsize(), 5)
obj7 = queue.get()
self.assertEqual(obj7, 7)
self.assertEqual(queue.qsize(), 4)
del interp2
self.assertEqual(queue.qsize(), 3)
# obj1
with self.assertRaises(queues.ItemInterpreterDestroyed):
queue.get()
self.assertEqual(queue.qsize(), 2)
# obj2 was removed
obj8 = queue.get()
self.assertIs(obj8, queues.UNBOUND)
self.assertEqual(queue.qsize(), 1)
# obj9 was removed
obj10 = queue.get()
self.assertIs(obj10, queues.UNBOUND)
self.assertEqual(queue.qsize(), 0)
def test_put_get_different_threads(self):