cpython/Lib/test/test_interpreters/test_queues.py
Eric Snow eb22e2b251
gh-115490: Make the interpreter.channels and interpreter.queues Modules Handle Reloading Properly (gh-115493)
The problem manifested when the .py module got reloaded and the corresponding extension module didn't. The .py module registers types with the extension and the extension was not allowing that to happen more than once. The solution: let it happen more than once.
2024-03-04 20:59:30 +00:00

415 lines
13 KiB
Python

import importlib
import threading
from textwrap import dedent
import unittest
import time
from test.support import import_helper
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_xxinterpqueues')
from test.support import interpreters
from test.support.interpreters import queues
from .utils import _run_output, TestBase
class TestBase(TestBase):
def tearDown(self):
for qid in _queues.list_all():
try:
_queues.destroy(qid)
except Exception:
pass
class LowLevelTests(TestBase):
# The behaviors in the low-level module is important in as much
# as it is exercised by the high-level module. Therefore the
# most # important testing happens in the high-level tests.
# These low-level tests cover corner cases that are not
# encountered by the high-level module, thus they
# mostly shouldn't matter as much.
def test_highlevel_reloaded(self):
# See gh-115490 (https://github.com/python/cpython/issues/115490).
importlib.reload(queues)
class QueueTests(TestBase):
def test_create(self):
with self.subTest('vanilla'):
queue = queues.create()
self.assertEqual(queue.maxsize, 0)
with self.subTest('small maxsize'):
queue = queues.create(3)
self.assertEqual(queue.maxsize, 3)
with self.subTest('big maxsize'):
queue = queues.create(100)
self.assertEqual(queue.maxsize, 100)
with self.subTest('no maxsize'):
queue = queues.create(0)
self.assertEqual(queue.maxsize, 0)
with self.subTest('negative maxsize'):
queue = queues.create(-10)
self.assertEqual(queue.maxsize, -10)
with self.subTest('bad maxsize'):
with self.assertRaises(TypeError):
queues.create('1')
def test_shareable(self):
queue1 = queues.create()
interp = interpreters.create()
interp.exec(dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
"""));
with self.subTest('same interpreter'):
queue2 = queues.create()
queue1.put(queue2, syncobj=True)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
queue1.put(queue4, syncobj=True)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
"""))
qid = int(out)
self.assertEqual(qid, queue4.id)
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
queue1.put(queue5, syncobj=True)
print(queue5.id)
"""))
qid = int(out)
queue5 = queue1.get()
self.assertEqual(queue5.id, qid)
def test_id_type(self):
queue = queues.create()
self.assertIsInstance(queue.id, int)
def test_custom_id(self):
with self.assertRaises(queues.QueueNotFoundError):
queues.Queue(1_000_000)
def test_id_readonly(self):
queue = queues.create()
with self.assertRaises(AttributeError):
queue.id = 1_000_000
def test_maxsize_readonly(self):
queue = queues.create(10)
with self.assertRaises(AttributeError):
queue.maxsize = 1_000_000
def test_hashable(self):
queue = queues.create()
expected = hash(queue.id)
actual = hash(queue)
self.assertEqual(actual, expected)
def test_equality(self):
queue1 = queues.create()
queue2 = queues.create()
self.assertEqual(queue1, queue1)
self.assertNotEqual(queue1, queue2)
class TestQueueOps(TestBase):
def test_empty(self):
queue = queues.create()
before = queue.empty()
queue.put(None, syncobj=True)
during = queue.empty()
queue.get()
after = queue.empty()
self.assertIs(before, True)
self.assertIs(during, False)
self.assertIs(after, True)
def test_full(self):
expected = [False, False, False, True, False, False, False]
actual = []
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
queue.put(None, syncobj=True)
actual.append(queue.full())
for _ in range(3):
queue.get()
actual.append(queue.full())
self.assertEqual(actual, expected)
def test_qsize(self):
expected = [0, 1, 2, 3, 2, 3, 2, 1, 0, 1, 0]
actual = []
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
queue.put(None, syncobj=True)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
self.assertEqual(actual, expected)
def test_put_get_main(self):
expected = list(range(20))
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create()
for i in range(20):
queue.put(i, **kwds)
actual = [queue.get() for _ in range(20)]
self.assertEqual(actual, expected)
def test_put_timeout(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put(None, **kwds)
queue.put(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1, **kwds)
queue.get()
queue.put(None, **kwds)
def test_put_nowait(self):
for syncobj in (True, False):
kwds = dict(syncobj=syncobj)
with self.subTest(f'syncobj={syncobj}'):
queue = queues.create(2)
queue.put_nowait(None, **kwds)
queue.put_nowait(None, **kwds)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None, **kwds)
queue.get()
queue.put_nowait(None, **kwds)
def test_put_syncobj(self):
for obj in [
None,
True,
10,
'spam',
b'spam',
(0, 'a'),
]:
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=True)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=True)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
for obj in [
[1, 2, 3],
{'a': 13, 'b': 17},
]:
with self.subTest(repr(obj)):
queue = queues.create()
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj, syncobj=True)
def test_put_not_syncobj(self):
for obj in [
None,
True,
10,
'spam',
b'spam',
(0, 'a'),
# not shareable
[1, 2, 3],
{'a': 13, 'b': 17},
]:
with self.subTest(repr(obj)):
queue = queues.create()
queue.put(obj, syncobj=False)
obj2 = queue.get()
self.assertEqual(obj2, obj)
queue.put(obj, syncobj=False)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
def test_get_timeout(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get(timeout=0.1)
def test_get_nowait(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
def test_put_get_default_syncobj(self):
expected = list(range(20))
queue = queues.create(syncobj=True)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
for i in range(20):
queue.put(i)
actual = [get() for _ in range(20)]
self.assertEqual(actual, expected)
obj = [1, 2, 3] # lists are not shareable
with self.assertRaises(interpreters.NotShareableError):
queue.put(obj)
def test_put_get_default_not_syncobj(self):
expected = list(range(20))
queue = queues.create(syncobj=False)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
for i in range(20):
queue.put(i)
actual = [get() for _ in range(20)]
self.assertEqual(actual, expected)
obj = [1, 2, 3] # lists are not shareable
queue.put(obj)
obj2 = get()
self.assertEqual(obj, obj2)
self.assertIsNot(obj, obj2)
def test_put_get_same_interpreter(self):
interp = interpreters.create()
interp.exec(dedent("""
from test.support.interpreters import queues
queue = queues.create()
"""))
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
interp.exec(dedent(f"""
orig = b'spam'
queue.put(orig, syncobj=True)
obj = queue.{methname}()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
"""))
def test_put_get_different_interpreters(self):
interp = interpreters.create()
queue1 = queues.create()
queue2 = queues.create()
self.assertEqual(len(queues.list_all()), 2)
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
obj1 = b'spam'
queue1.put(obj1, syncobj=True)
out = _run_output(
interp,
dedent(f"""
from test.support.interpreters import queues
queue1 = queues.Queue({queue1.id})
queue2 = queues.Queue({queue2.id})
assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1'
obj = queue1.{methname}()
assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0'
assert obj == b'spam', 'expected: obj == obj1'
# When going to another interpreter we get a copy.
assert id(obj) != {id(obj1)}, 'expected: obj is not obj1'
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
queue2.put(obj2, syncobj=True)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
self.assertEqual(queue1.qsize(), 0)
self.assertEqual(queue2.qsize(), 1)
get = getattr(queue2, methname)
obj2 = get()
self.assertEqual(obj2, b'eggs')
self.assertNotEqual(id(obj2), int(out))
def test_put_cleared_with_subinterpreter(self):
interp = interpreters.create()
queue = queues.create()
out = _run_output(
interp,
dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
queue.put(obj1, syncobj=True)
queue.put(obj2, syncobj=True)
"""))
self.assertEqual(queue.qsize(), 2)
obj1 = queue.get()
self.assertEqual(obj1, b'spam')
self.assertEqual(queue.qsize(), 1)
del interp
self.assertEqual(queue.qsize(), 0)
def test_put_get_different_threads(self):
queue1 = queues.create()
queue2 = queues.create()
def f():
while True:
try:
obj = queue1.get(timeout=0.1)
break
except queues.QueueEmpty:
continue
queue2.put(obj, syncobj=True)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
queue1.put(orig, syncobj=True)
obj = queue2.get()
t.join()
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
unittest.main()