[3.13] gh-76785: Expand How Interpreter Channels Handle Interpreter Finalization (gh-121811)

See 6b98b274b6 for an explanation of the problem and solution.  Here I've applied the solution to channels.

(cherry picked from commit 8b209fd4f8, AKA gh-121805)

Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
This commit is contained in:
Miss Islington (bot) 2024-07-15 22:15:47 +02:00 committed by GitHub
parent c4daec4319
commit 835f4add60
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 898 additions and 306 deletions

View file

@ -372,6 +372,228 @@ class TestSendRecv(TestBase):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
def test_send_cleared_with_subinterpreter(self):
def common(rch, sch, unbound=None, presize=0):
if not unbound:
extraargs = ''
elif unbound is channels.UNBOUND:
extraargs = ', unbound=channels.UNBOUND'
elif unbound is channels.UNBOUND_ERROR:
extraargs = ', unbound=channels.UNBOUND_ERROR'
elif unbound is channels.UNBOUND_REMOVE:
extraargs = ', unbound=channels.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
obj1 = b'spam'
obj2 = b'eggs'
sch.send_nowait(obj1{extraargs})
sch.send_nowait(obj2{extraargs})
"""))
self.assertEqual(
_channels.get_count(rch.id),
presize + 2,
)
if presize == 0:
obj1 = rch.recv()
self.assertEqual(obj1, b'spam')
self.assertEqual(
_channels.get_count(rch.id),
presize + 1,
)
return interp
with self.subTest('default'): # UNBOUND
rch, sch = channels.create()
interp = common(rch, sch)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
obj1 = rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
obj1 = rch.recv()
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND_ERROR'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND_ERROR)
del interp
self.assertEqual(_channels.get_count(rch.id), 1)
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
with self.subTest('UNBOUND_REMOVE'):
rch, sch = channels.create()
interp = common(rch, sch, channels.UNBOUND_REMOVE)
del interp
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 1)
interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
self.assertEqual(_channels.get_count(rch.id), 3)
sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp
self.assertEqual(_channels.get_count(rch.id), 2)
obj1 = rch.recv()
obj2 = rch.recv()
self.assertEqual(obj1, b'ham')
self.assertEqual(obj2, 42)
self.assertEqual(_channels.get_count(rch.id), 0)
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
def test_send_cleared_with_subinterpreter_mixed(self):
rch, sch = channels.create()
interp = interpreters.create()
# If we don't associate the main interpreter with the channel
# then the channel will be automatically closed when interp
# is destroyed.
sch.send_nowait(None)
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 0)
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
sch.send_nowait(1, unbound=channels.UNBOUND)
sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(3)
sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(5, unbound=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 5)
del interp
self.assertEqual(_channels.get_count(rch.id), 4)
obj1 = rch.recv()
self.assertIs(obj1, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 3)
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 2)
obj2 = rch.recv()
self.assertIs(obj2, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 1)
obj3 = rch.recv()
self.assertIs(obj3, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
def test_send_cleared_with_subinterpreter_multiple(self):
rch, sch = channels.create()
interp1 = interpreters.create()
interp2 = interpreters.create()
sch.send_nowait(1)
_run_output(interp1, dedent(f"""
from test.support.interpreters import channels
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj1 = rch.recv()
sch.send_nowait(2, unbound=channels.UNBOUND)
sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import channels
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj2 = rch.recv()
obj1 = rch.recv()
"""))
self.assertEqual(_channels.get_count(rch.id), 0)
sch.send_nowait(3)
_run_output(interp1, dedent("""
sch.send_nowait(4, unbound=channels.UNBOUND)
# interp closed here
sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(6, unbound=channels.UNBOUND)
"""))
_run_output(interp2, dedent("""
sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
# interp closed here
sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(8, unbound=channels.UNBOUND)
"""))
_run_output(interp1, dedent("""
sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
sch.send_nowait(10, unbound=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 10)
obj3 = rch.recv()
self.assertEqual(obj3, 3)
self.assertEqual(_channels.get_count(rch.id), 9)
obj4 = rch.recv()
self.assertEqual(obj4, 4)
self.assertEqual(_channels.get_count(rch.id), 8)
del interp1
self.assertEqual(_channels.get_count(rch.id), 6)
# obj5 was removed
obj6 = rch.recv()
self.assertIs(obj6, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 5)
obj7 = rch.recv()
self.assertEqual(obj7, 7)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp2
self.assertEqual(_channels.get_count(rch.id), 3)
# obj1
with self.assertRaises(channels.ItemInterpreterDestroyed):
rch.recv()
self.assertEqual(_channels.get_count(rch.id), 2)
# obj2 was removed
obj8 = rch.recv()
self.assertIs(obj8, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 1)
# obj9 was removed
obj10 = rch.recv()
self.assertIs(obj10, channels.UNBOUND)
self.assertEqual(_channels.get_count(rch.id), 0)
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.