mirror of
https://github.com/python/cpython.git
synced 2025-07-09 20:35:26 +00:00
gh-76785: Expand How Interpreter Queues Handle Interpreter Finalization (gh-116431)
Any cross-interpreter mechanism for passing objects between interpreters must be very careful to respect isolation, even when the object is effectively immutable (e.g. int, str). Here this especially relates to when an interpreter sends one of its objects, and then is destroyed while the inter-interpreter machinery (e.g. queue) still holds a reference to the object. When I added interpreters.Queue, I dealt with that case (using an atexit hook) by silently removing all items from the queue that were added by the finalizing interpreter. Later, while working on concurrent.futures.InterpreterPoolExecutor (gh-116430), I noticed it was somewhat surprising when items were silently removed from the queue when the originating interpreter was destroyed. (See my comment on that PR.) It took me a little while to realize what was going on. I expect that users, which much less context than I have, would experience the same pain. My approach, here, to improving the situation is to give users three options: 1. return a singleton (interpreters.queues.UNBOUND) from Queue.get() in place of each removed item 2. raise an exception (interpreters.queues.ItemInterpreterDestroyed) from Queue.get() in place of each removed item 3. existing behavior: silently remove each item (i.e. Queue.get() skips each one) The default will now be (1), but users can still explicitly opt in any of them, including to the silent removal behavior. The behavior for each item may be set with the corresponding Queue.put() call. and a queue-wide default may be set when the queue is created. (This is the same as I did for "synconly".)
This commit is contained in:
parent
985dd8e17b
commit
6b98b274b6
3 changed files with 513 additions and 90 deletions
|
@ -12,9 +12,11 @@ from _interpqueues import (
|
||||||
)
|
)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
|
||||||
'create', 'list_all',
|
'create', 'list_all',
|
||||||
'Queue',
|
'Queue',
|
||||||
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
|
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
|
||||||
|
'ItemInterpreterDestroyed',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,26 +34,90 @@ class QueueFull(QueueError, queue.Full):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class ItemInterpreterDestroyed(QueueError):
|
||||||
|
"""Raised from get() and get_nowait()."""
|
||||||
|
|
||||||
|
|
||||||
_SHARED_ONLY = 0
|
_SHARED_ONLY = 0
|
||||||
_PICKLED = 1
|
_PICKLED = 1
|
||||||
|
|
||||||
def create(maxsize=0, *, syncobj=False):
|
|
||||||
|
class UnboundItem:
|
||||||
|
"""Represents a Queue item no longer bound to an interpreter.
|
||||||
|
|
||||||
|
An item is unbound when the interpreter that added it to the queue
|
||||||
|
is destroyed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__slots__ = ()
|
||||||
|
|
||||||
|
def __new__(cls):
|
||||||
|
return UNBOUND
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'interpreters.queues.UNBOUND'
|
||||||
|
|
||||||
|
|
||||||
|
UNBOUND = object.__new__(UnboundItem)
|
||||||
|
UNBOUND_ERROR = object()
|
||||||
|
UNBOUND_REMOVE = object()
|
||||||
|
|
||||||
|
_UNBOUND_CONSTANT_TO_FLAG = {
|
||||||
|
UNBOUND_REMOVE: 1,
|
||||||
|
UNBOUND_ERROR: 2,
|
||||||
|
UNBOUND: 3,
|
||||||
|
}
|
||||||
|
_UNBOUND_FLAG_TO_CONSTANT = {v: k
|
||||||
|
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
|
||||||
|
|
||||||
|
def _serialize_unbound(unbound):
|
||||||
|
op = unbound
|
||||||
|
try:
|
||||||
|
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
|
||||||
|
except KeyError:
|
||||||
|
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
|
||||||
|
return flag,
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_unbound(flag):
|
||||||
|
try:
|
||||||
|
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
|
||||||
|
except KeyError:
|
||||||
|
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
|
||||||
|
if op is UNBOUND_REMOVE:
|
||||||
|
# "remove" not possible here
|
||||||
|
raise NotImplementedError
|
||||||
|
elif op is UNBOUND_ERROR:
|
||||||
|
raise ItemInterpreterDestroyed("item's original interpreter destroyed")
|
||||||
|
elif op is UNBOUND:
|
||||||
|
return UNBOUND
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(repr(op))
|
||||||
|
|
||||||
|
|
||||||
|
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
|
||||||
"""Return a new cross-interpreter queue.
|
"""Return a new cross-interpreter queue.
|
||||||
|
|
||||||
The queue may be used to pass data safely between interpreters.
|
The queue may be used to pass data safely between interpreters.
|
||||||
|
|
||||||
"syncobj" sets the default for Queue.put()
|
"syncobj" sets the default for Queue.put()
|
||||||
and Queue.put_nowait().
|
and Queue.put_nowait().
|
||||||
|
|
||||||
|
"unbounditems" likewise sets the default. See Queue.put() for
|
||||||
|
supported values. The default value is UNBOUND, which replaces
|
||||||
|
the unbound item.
|
||||||
"""
|
"""
|
||||||
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
||||||
qid = _queues.create(maxsize, fmt)
|
unbound = _serialize_unbound(unbounditems)
|
||||||
return Queue(qid, _fmt=fmt)
|
unboundop, = unbound
|
||||||
|
qid = _queues.create(maxsize, fmt, unboundop)
|
||||||
|
return Queue(qid, _fmt=fmt, _unbound=unbound)
|
||||||
|
|
||||||
|
|
||||||
def list_all():
|
def list_all():
|
||||||
"""Return a list of all open queues."""
|
"""Return a list of all open queues."""
|
||||||
return [Queue(qid, _fmt=fmt)
|
return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
|
||||||
for qid, fmt in _queues.list_all()]
|
for qid, fmt, unboundop in _queues.list_all()]
|
||||||
|
|
||||||
|
|
||||||
_known_queues = weakref.WeakValueDictionary()
|
_known_queues = weakref.WeakValueDictionary()
|
||||||
|
@ -59,20 +125,28 @@ _known_queues = weakref.WeakValueDictionary()
|
||||||
class Queue:
|
class Queue:
|
||||||
"""A cross-interpreter queue."""
|
"""A cross-interpreter queue."""
|
||||||
|
|
||||||
def __new__(cls, id, /, *, _fmt=None):
|
def __new__(cls, id, /, *, _fmt=None, _unbound=None):
|
||||||
# There is only one instance for any given ID.
|
# There is only one instance for any given ID.
|
||||||
if isinstance(id, int):
|
if isinstance(id, int):
|
||||||
id = int(id)
|
id = int(id)
|
||||||
else:
|
else:
|
||||||
raise TypeError(f'id must be an int, got {id!r}')
|
raise TypeError(f'id must be an int, got {id!r}')
|
||||||
if _fmt is None:
|
if _fmt is None:
|
||||||
_fmt, = _queues.get_queue_defaults(id)
|
if _unbound is None:
|
||||||
|
_fmt, op = _queues.get_queue_defaults(id)
|
||||||
|
_unbound = (op,)
|
||||||
|
else:
|
||||||
|
_fmt, _ = _queues.get_queue_defaults(id)
|
||||||
|
elif _unbound is None:
|
||||||
|
_, op = _queues.get_queue_defaults(id)
|
||||||
|
_unbound = (op,)
|
||||||
try:
|
try:
|
||||||
self = _known_queues[id]
|
self = _known_queues[id]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self = super().__new__(cls)
|
self = super().__new__(cls)
|
||||||
self._id = id
|
self._id = id
|
||||||
self._fmt = _fmt
|
self._fmt = _fmt
|
||||||
|
self._unbound = _unbound
|
||||||
_known_queues[id] = self
|
_known_queues[id] = self
|
||||||
_queues.bind(id)
|
_queues.bind(id)
|
||||||
return self
|
return self
|
||||||
|
@ -124,6 +198,7 @@ class Queue:
|
||||||
|
|
||||||
def put(self, obj, timeout=None, *,
|
def put(self, obj, timeout=None, *,
|
||||||
syncobj=None,
|
syncobj=None,
|
||||||
|
unbound=None,
|
||||||
_delay=10 / 1000, # 10 milliseconds
|
_delay=10 / 1000, # 10 milliseconds
|
||||||
):
|
):
|
||||||
"""Add the object to the queue.
|
"""Add the object to the queue.
|
||||||
|
@ -131,7 +206,7 @@ class Queue:
|
||||||
This blocks while the queue is full.
|
This blocks while the queue is full.
|
||||||
|
|
||||||
If "syncobj" is None (the default) then it uses the
|
If "syncobj" is None (the default) then it uses the
|
||||||
queue's default, set with create_queue()..
|
queue's default, set with create_queue().
|
||||||
|
|
||||||
If "syncobj" is false then all objects are supported,
|
If "syncobj" is false then all objects are supported,
|
||||||
at the expense of worse performance.
|
at the expense of worse performance.
|
||||||
|
@ -152,11 +227,37 @@ class Queue:
|
||||||
actually is. That's a slightly different and stronger promise
|
actually is. That's a slightly different and stronger promise
|
||||||
than just (initial) equality, which is all "syncobj=False"
|
than just (initial) equality, which is all "syncobj=False"
|
||||||
can promise.
|
can promise.
|
||||||
|
|
||||||
|
"unbound" controls the behavior of Queue.get() for the given
|
||||||
|
object if the current interpreter (calling put()) is later
|
||||||
|
destroyed.
|
||||||
|
|
||||||
|
If "unbound" is None (the default) then it uses the
|
||||||
|
queue's default, set with create_queue(),
|
||||||
|
which is usually UNBOUND.
|
||||||
|
|
||||||
|
If "unbound" is UNBOUND_ERROR then get() will raise an
|
||||||
|
ItemInterpreterDestroyed exception if the original interpreter
|
||||||
|
has been destroyed. This does not otherwise affect the queue;
|
||||||
|
the next call to put() will work like normal, returning the next
|
||||||
|
item in the queue.
|
||||||
|
|
||||||
|
If "unbound" is UNBOUND_REMOVE then the item will be removed
|
||||||
|
from the queue as soon as the original interpreter is destroyed.
|
||||||
|
Be aware that this will introduce an imbalance between put()
|
||||||
|
and get() calls.
|
||||||
|
|
||||||
|
If "unbound" is UNBOUND then it is returned by get() in place
|
||||||
|
of the unbound item.
|
||||||
"""
|
"""
|
||||||
if syncobj is None:
|
if syncobj is None:
|
||||||
fmt = self._fmt
|
fmt = self._fmt
|
||||||
else:
|
else:
|
||||||
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
||||||
|
if unbound is None:
|
||||||
|
unboundop, = self._unbound
|
||||||
|
else:
|
||||||
|
unboundop, = _serialize_unbound(unbound)
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
timeout = int(timeout)
|
timeout = int(timeout)
|
||||||
if timeout < 0:
|
if timeout < 0:
|
||||||
|
@ -166,7 +267,7 @@ class Queue:
|
||||||
obj = pickle.dumps(obj)
|
obj = pickle.dumps(obj)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
_queues.put(self._id, obj, fmt)
|
_queues.put(self._id, obj, fmt, unboundop)
|
||||||
except QueueFull as exc:
|
except QueueFull as exc:
|
||||||
if timeout is not None and time.time() >= end:
|
if timeout is not None and time.time() >= end:
|
||||||
raise # re-raise
|
raise # re-raise
|
||||||
|
@ -174,14 +275,18 @@ class Queue:
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
def put_nowait(self, obj, *, syncobj=None):
|
def put_nowait(self, obj, *, syncobj=None, unbound=None):
|
||||||
if syncobj is None:
|
if syncobj is None:
|
||||||
fmt = self._fmt
|
fmt = self._fmt
|
||||||
else:
|
else:
|
||||||
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
fmt = _SHARED_ONLY if syncobj else _PICKLED
|
||||||
|
if unbound is None:
|
||||||
|
unboundop, = self._unbound
|
||||||
|
else:
|
||||||
|
unboundop, = _serialize_unbound(unbound)
|
||||||
if fmt is _PICKLED:
|
if fmt is _PICKLED:
|
||||||
obj = pickle.dumps(obj)
|
obj = pickle.dumps(obj)
|
||||||
_queues.put(self._id, obj, fmt)
|
_queues.put(self._id, obj, fmt, unboundop)
|
||||||
|
|
||||||
def get(self, timeout=None, *,
|
def get(self, timeout=None, *,
|
||||||
_delay=10 / 1000, # 10 milliseconds
|
_delay=10 / 1000, # 10 milliseconds
|
||||||
|
@ -189,6 +294,10 @@ class Queue:
|
||||||
"""Return the next object from the queue.
|
"""Return the next object from the queue.
|
||||||
|
|
||||||
This blocks while the queue is empty.
|
This blocks while the queue is empty.
|
||||||
|
|
||||||
|
If the next item's original interpreter has been destroyed
|
||||||
|
then the "next object" is determined by the value of the
|
||||||
|
"unbound" argument to put().
|
||||||
"""
|
"""
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
timeout = int(timeout)
|
timeout = int(timeout)
|
||||||
|
@ -197,13 +306,16 @@ class Queue:
|
||||||
end = time.time() + timeout
|
end = time.time() + timeout
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
obj, fmt = _queues.get(self._id)
|
obj, fmt, unboundop = _queues.get(self._id)
|
||||||
except QueueEmpty as exc:
|
except QueueEmpty as exc:
|
||||||
if timeout is not None and time.time() >= end:
|
if timeout is not None and time.time() >= end:
|
||||||
raise # re-raise
|
raise # re-raise
|
||||||
time.sleep(_delay)
|
time.sleep(_delay)
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
if unboundop is not None:
|
||||||
|
assert obj is None, repr(obj)
|
||||||
|
return _resolve_unbound(unboundop)
|
||||||
if fmt == _PICKLED:
|
if fmt == _PICKLED:
|
||||||
obj = pickle.loads(obj)
|
obj = pickle.loads(obj)
|
||||||
else:
|
else:
|
||||||
|
@ -217,9 +329,12 @@ class Queue:
|
||||||
is the same as get().
|
is the same as get().
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
obj, fmt = _queues.get(self._id)
|
obj, fmt, unboundop = _queues.get(self._id)
|
||||||
except QueueEmpty as exc:
|
except QueueEmpty as exc:
|
||||||
raise # re-raise
|
raise # re-raise
|
||||||
|
if unboundop is not None:
|
||||||
|
assert obj is None, repr(obj)
|
||||||
|
return _resolve_unbound(unboundop)
|
||||||
if fmt == _PICKLED:
|
if fmt == _PICKLED:
|
||||||
obj = pickle.loads(obj)
|
obj = pickle.loads(obj)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -12,13 +12,16 @@ from test.support.interpreters import queues
|
||||||
from .utils import _run_output, TestBase as _TestBase
|
from .utils import _run_output, TestBase as _TestBase
|
||||||
|
|
||||||
|
|
||||||
|
REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
|
||||||
|
|
||||||
|
|
||||||
def get_num_queues():
|
def get_num_queues():
|
||||||
return len(_queues.list_all())
|
return len(_queues.list_all())
|
||||||
|
|
||||||
|
|
||||||
class TestBase(_TestBase):
|
class TestBase(_TestBase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
for qid, _ in _queues.list_all():
|
for qid, _, _ in _queues.list_all():
|
||||||
try:
|
try:
|
||||||
_queues.destroy(qid)
|
_queues.destroy(qid)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -39,7 +42,7 @@ class LowLevelTests(TestBase):
|
||||||
importlib.reload(queues)
|
importlib.reload(queues)
|
||||||
|
|
||||||
def test_create_destroy(self):
|
def test_create_destroy(self):
|
||||||
qid = _queues.create(2, 0)
|
qid = _queues.create(2, 0, REPLACE)
|
||||||
_queues.destroy(qid)
|
_queues.destroy(qid)
|
||||||
self.assertEqual(get_num_queues(), 0)
|
self.assertEqual(get_num_queues(), 0)
|
||||||
with self.assertRaises(queues.QueueNotFoundError):
|
with self.assertRaises(queues.QueueNotFoundError):
|
||||||
|
@ -53,7 +56,7 @@ class LowLevelTests(TestBase):
|
||||||
'-c',
|
'-c',
|
||||||
dedent(f"""
|
dedent(f"""
|
||||||
import {_queues.__name__} as _queues
|
import {_queues.__name__} as _queues
|
||||||
_queues.create(2, 0)
|
_queues.create(2, 0, {REPLACE})
|
||||||
"""),
|
"""),
|
||||||
)
|
)
|
||||||
self.assertEqual(stdout, '')
|
self.assertEqual(stdout, '')
|
||||||
|
@ -64,13 +67,13 @@ class LowLevelTests(TestBase):
|
||||||
|
|
||||||
def test_bind_release(self):
|
def test_bind_release(self):
|
||||||
with self.subTest('typical'):
|
with self.subTest('typical'):
|
||||||
qid = _queues.create(2, 0)
|
qid = _queues.create(2, 0, REPLACE)
|
||||||
_queues.bind(qid)
|
_queues.bind(qid)
|
||||||
_queues.release(qid)
|
_queues.release(qid)
|
||||||
self.assertEqual(get_num_queues(), 0)
|
self.assertEqual(get_num_queues(), 0)
|
||||||
|
|
||||||
with self.subTest('bind too much'):
|
with self.subTest('bind too much'):
|
||||||
qid = _queues.create(2, 0)
|
qid = _queues.create(2, 0, REPLACE)
|
||||||
_queues.bind(qid)
|
_queues.bind(qid)
|
||||||
_queues.bind(qid)
|
_queues.bind(qid)
|
||||||
_queues.release(qid)
|
_queues.release(qid)
|
||||||
|
@ -78,7 +81,7 @@ class LowLevelTests(TestBase):
|
||||||
self.assertEqual(get_num_queues(), 0)
|
self.assertEqual(get_num_queues(), 0)
|
||||||
|
|
||||||
with self.subTest('nested'):
|
with self.subTest('nested'):
|
||||||
qid = _queues.create(2, 0)
|
qid = _queues.create(2, 0, REPLACE)
|
||||||
_queues.bind(qid)
|
_queues.bind(qid)
|
||||||
_queues.bind(qid)
|
_queues.bind(qid)
|
||||||
_queues.release(qid)
|
_queues.release(qid)
|
||||||
|
@ -86,7 +89,7 @@ class LowLevelTests(TestBase):
|
||||||
self.assertEqual(get_num_queues(), 0)
|
self.assertEqual(get_num_queues(), 0)
|
||||||
|
|
||||||
with self.subTest('release without binding'):
|
with self.subTest('release without binding'):
|
||||||
qid = _queues.create(2, 0)
|
qid = _queues.create(2, 0, REPLACE)
|
||||||
with self.assertRaises(queues.QueueError):
|
with self.assertRaises(queues.QueueError):
|
||||||
_queues.release(qid)
|
_queues.release(qid)
|
||||||
|
|
||||||
|
@ -426,26 +429,206 @@ class TestQueueOps(TestBase):
|
||||||
self.assertNotEqual(id(obj2), int(out))
|
self.assertNotEqual(id(obj2), int(out))
|
||||||
|
|
||||||
def test_put_cleared_with_subinterpreter(self):
|
def test_put_cleared_with_subinterpreter(self):
|
||||||
interp = interpreters.create()
|
def common(queue, unbound=None, presize=0):
|
||||||
queue = queues.create()
|
if not unbound:
|
||||||
|
extraargs = ''
|
||||||
|
elif unbound is queues.UNBOUND:
|
||||||
|
extraargs = ', unbound=queues.UNBOUND'
|
||||||
|
elif unbound is queues.UNBOUND_ERROR:
|
||||||
|
extraargs = ', unbound=queues.UNBOUND_ERROR'
|
||||||
|
elif unbound is queues.UNBOUND_REMOVE:
|
||||||
|
extraargs = ', unbound=queues.UNBOUND_REMOVE'
|
||||||
|
else:
|
||||||
|
raise NotImplementedError(repr(unbound))
|
||||||
|
interp = interpreters.create()
|
||||||
|
|
||||||
out = _run_output(
|
_run_output(interp, dedent(f"""
|
||||||
interp,
|
|
||||||
dedent(f"""
|
|
||||||
from test.support.interpreters import queues
|
from test.support.interpreters import queues
|
||||||
queue = queues.Queue({queue.id})
|
queue = queues.Queue({queue.id})
|
||||||
obj1 = b'spam'
|
obj1 = b'spam'
|
||||||
obj2 = b'eggs'
|
obj2 = b'eggs'
|
||||||
queue.put(obj1, syncobj=True)
|
queue.put(obj1, syncobj=True{extraargs})
|
||||||
queue.put(obj2, syncobj=True)
|
queue.put(obj2, syncobj=True{extraargs})
|
||||||
"""))
|
"""))
|
||||||
self.assertEqual(queue.qsize(), 2)
|
self.assertEqual(queue.qsize(), presize + 2)
|
||||||
|
|
||||||
obj1 = queue.get()
|
if presize == 0:
|
||||||
self.assertEqual(obj1, b'spam')
|
obj1 = queue.get()
|
||||||
self.assertEqual(queue.qsize(), 1)
|
self.assertEqual(obj1, b'spam')
|
||||||
|
self.assertEqual(queue.qsize(), presize + 1)
|
||||||
|
|
||||||
|
return interp
|
||||||
|
|
||||||
|
with self.subTest('default'): # UNBOUND
|
||||||
|
queue = queues.create()
|
||||||
|
interp = common(queue)
|
||||||
|
del interp
|
||||||
|
obj1 = queue.get()
|
||||||
|
self.assertIs(obj1, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
with self.assertRaises(queues.QueueEmpty):
|
||||||
|
queue.get_nowait()
|
||||||
|
|
||||||
|
with self.subTest('UNBOUND'):
|
||||||
|
queue = queues.create()
|
||||||
|
interp = common(queue, queues.UNBOUND)
|
||||||
|
del interp
|
||||||
|
obj1 = queue.get()
|
||||||
|
self.assertIs(obj1, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
with self.assertRaises(queues.QueueEmpty):
|
||||||
|
queue.get_nowait()
|
||||||
|
|
||||||
|
with self.subTest('UNBOUND_ERROR'):
|
||||||
|
queue = queues.create()
|
||||||
|
interp = common(queue, queues.UNBOUND_ERROR)
|
||||||
|
|
||||||
|
del interp
|
||||||
|
self.assertEqual(queue.qsize(), 1)
|
||||||
|
with self.assertRaises(queues.ItemInterpreterDestroyed):
|
||||||
|
queue.get()
|
||||||
|
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
with self.assertRaises(queues.QueueEmpty):
|
||||||
|
queue.get_nowait()
|
||||||
|
|
||||||
|
with self.subTest('UNBOUND_REMOVE'):
|
||||||
|
queue = queues.create()
|
||||||
|
|
||||||
|
interp = common(queue, queues.UNBOUND_REMOVE)
|
||||||
|
del interp
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
with self.assertRaises(queues.QueueEmpty):
|
||||||
|
queue.get_nowait()
|
||||||
|
|
||||||
|
queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
|
||||||
|
self.assertEqual(queue.qsize(), 1)
|
||||||
|
interp = common(queue, queues.UNBOUND_REMOVE, 1)
|
||||||
|
self.assertEqual(queue.qsize(), 3)
|
||||||
|
queue.put(42, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
self.assertEqual(queue.qsize(), 4)
|
||||||
|
del interp
|
||||||
|
self.assertEqual(queue.qsize(), 2)
|
||||||
|
obj1 = queue.get()
|
||||||
|
obj2 = queue.get()
|
||||||
|
self.assertEqual(obj1, b'ham')
|
||||||
|
self.assertEqual(obj2, 42)
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
with self.assertRaises(queues.QueueEmpty):
|
||||||
|
queue.get_nowait()
|
||||||
|
|
||||||
|
def test_put_cleared_with_subinterpreter_mixed(self):
|
||||||
|
queue = queues.create()
|
||||||
|
interp = interpreters.create()
|
||||||
|
_run_output(interp, dedent(f"""
|
||||||
|
from test.support.interpreters import queues
|
||||||
|
queue = queues.Queue({queue.id})
|
||||||
|
queue.put(1, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
|
||||||
|
queue.put(3, syncobj=True)
|
||||||
|
queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
queue.put(5, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
"""))
|
||||||
|
self.assertEqual(queue.qsize(), 5)
|
||||||
|
|
||||||
del interp
|
del interp
|
||||||
|
self.assertEqual(queue.qsize(), 4)
|
||||||
|
|
||||||
|
obj1 = queue.get()
|
||||||
|
self.assertIs(obj1, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 3)
|
||||||
|
|
||||||
|
with self.assertRaises(queues.ItemInterpreterDestroyed):
|
||||||
|
queue.get()
|
||||||
|
self.assertEqual(queue.qsize(), 2)
|
||||||
|
|
||||||
|
obj2 = queue.get()
|
||||||
|
self.assertIs(obj2, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 1)
|
||||||
|
|
||||||
|
obj3 = queue.get()
|
||||||
|
self.assertIs(obj3, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
|
||||||
|
def test_put_cleared_with_subinterpreter_multiple(self):
|
||||||
|
queue = queues.create()
|
||||||
|
interp1 = interpreters.create()
|
||||||
|
interp2 = interpreters.create()
|
||||||
|
|
||||||
|
queue.put(1, syncobj=True)
|
||||||
|
_run_output(interp1, dedent(f"""
|
||||||
|
from test.support.interpreters import queues
|
||||||
|
queue = queues.Queue({queue.id})
|
||||||
|
obj1 = queue.get()
|
||||||
|
queue.put(2, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
"""))
|
||||||
|
_run_output(interp2, dedent(f"""
|
||||||
|
from test.support.interpreters import queues
|
||||||
|
queue = queues.Queue({queue.id})
|
||||||
|
obj2 = queue.get()
|
||||||
|
obj1 = queue.get()
|
||||||
|
"""))
|
||||||
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
queue.put(3)
|
||||||
|
_run_output(interp1, dedent("""
|
||||||
|
queue.put(4, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
# interp closed here
|
||||||
|
queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
queue.put(6, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
"""))
|
||||||
|
_run_output(interp2, dedent("""
|
||||||
|
queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
|
||||||
|
# interp closed here
|
||||||
|
queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
|
||||||
|
queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
queue.put(8, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
"""))
|
||||||
|
_run_output(interp1, dedent("""
|
||||||
|
queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
|
||||||
|
queue.put(10, syncobj=True, unbound=queues.UNBOUND)
|
||||||
|
"""))
|
||||||
|
self.assertEqual(queue.qsize(), 10)
|
||||||
|
|
||||||
|
obj3 = queue.get()
|
||||||
|
self.assertEqual(obj3, 3)
|
||||||
|
self.assertEqual(queue.qsize(), 9)
|
||||||
|
|
||||||
|
obj4 = queue.get()
|
||||||
|
self.assertEqual(obj4, 4)
|
||||||
|
self.assertEqual(queue.qsize(), 8)
|
||||||
|
|
||||||
|
del interp1
|
||||||
|
self.assertEqual(queue.qsize(), 6)
|
||||||
|
|
||||||
|
# obj5 was removed
|
||||||
|
|
||||||
|
obj6 = queue.get()
|
||||||
|
self.assertIs(obj6, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 5)
|
||||||
|
|
||||||
|
obj7 = queue.get()
|
||||||
|
self.assertEqual(obj7, 7)
|
||||||
|
self.assertEqual(queue.qsize(), 4)
|
||||||
|
|
||||||
|
del interp2
|
||||||
|
self.assertEqual(queue.qsize(), 3)
|
||||||
|
|
||||||
|
# obj1
|
||||||
|
with self.assertRaises(queues.ItemInterpreterDestroyed):
|
||||||
|
queue.get()
|
||||||
|
self.assertEqual(queue.qsize(), 2)
|
||||||
|
|
||||||
|
# obj2 was removed
|
||||||
|
|
||||||
|
obj8 = queue.get()
|
||||||
|
self.assertIs(obj8, queues.UNBOUND)
|
||||||
|
self.assertEqual(queue.qsize(), 1)
|
||||||
|
|
||||||
|
# obj9 was removed
|
||||||
|
|
||||||
|
obj10 = queue.get()
|
||||||
|
self.assertIs(obj10, queues.UNBOUND)
|
||||||
self.assertEqual(queue.qsize(), 0)
|
self.assertEqual(queue.qsize(), 0)
|
||||||
|
|
||||||
def test_put_get_different_threads(self):
|
def test_put_get_different_threads(self):
|
||||||
|
|
|
@ -58,6 +58,19 @@ _release_xid_data(_PyCrossInterpreterData *data, int flags)
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int64_t
|
||||||
|
_get_interpid(_PyCrossInterpreterData *data)
|
||||||
|
{
|
||||||
|
int64_t interpid;
|
||||||
|
if (data != NULL) {
|
||||||
|
interpid = _PyCrossInterpreterData_INTERPID(data);
|
||||||
|
assert(!PyErr_Occurred());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
|
||||||
|
}
|
||||||
|
return interpid;
|
||||||
|
}
|
||||||
|
|
||||||
static PyInterpreterState *
|
static PyInterpreterState *
|
||||||
_get_current_interp(void)
|
_get_current_interp(void)
|
||||||
|
@ -389,47 +402,98 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* unbound items ************************************************************/
|
||||||
|
|
||||||
|
#define UNBOUND_REMOVE 1
|
||||||
|
#define UNBOUND_ERROR 2
|
||||||
|
#define UNBOUND_REPLACE 3
|
||||||
|
|
||||||
|
// It would also be possible to add UNBOUND_REPLACE where the replacement
|
||||||
|
// value is user-provided. There would be some limitations there, though.
|
||||||
|
// Another possibility would be something like UNBOUND_COPY, where the
|
||||||
|
// object is released but the underlying data is copied (with the "raw"
|
||||||
|
// allocator) and used when the item is popped off the queue.
|
||||||
|
|
||||||
|
static int
|
||||||
|
check_unbound(int unboundop)
|
||||||
|
{
|
||||||
|
switch (unboundop) {
|
||||||
|
case UNBOUND_REMOVE:
|
||||||
|
case UNBOUND_ERROR:
|
||||||
|
case UNBOUND_REPLACE:
|
||||||
|
return 1;
|
||||||
|
default:
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* the basic queue **********************************************************/
|
/* the basic queue **********************************************************/
|
||||||
|
|
||||||
struct _queueitem;
|
struct _queueitem;
|
||||||
|
|
||||||
typedef struct _queueitem {
|
typedef struct _queueitem {
|
||||||
|
/* The interpreter that added the item to the queue.
|
||||||
|
The actual bound interpid is found in item->data.
|
||||||
|
This is necessary because item->data might be NULL,
|
||||||
|
meaning the interpreter has been destroyed. */
|
||||||
|
int64_t interpid;
|
||||||
_PyCrossInterpreterData *data;
|
_PyCrossInterpreterData *data;
|
||||||
int fmt;
|
int fmt;
|
||||||
|
int unboundop;
|
||||||
struct _queueitem *next;
|
struct _queueitem *next;
|
||||||
} _queueitem;
|
} _queueitem;
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_queueitem_init(_queueitem *item,
|
_queueitem_init(_queueitem *item,
|
||||||
_PyCrossInterpreterData *data, int fmt)
|
int64_t interpid, _PyCrossInterpreterData *data,
|
||||||
|
int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
|
if (interpid < 0) {
|
||||||
|
interpid = _get_interpid(data);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
assert(data == NULL
|
||||||
|
|| _PyCrossInterpreterData_INTERPID(data) < 0
|
||||||
|
|| interpid == _PyCrossInterpreterData_INTERPID(data));
|
||||||
|
}
|
||||||
|
assert(check_unbound(unboundop));
|
||||||
*item = (_queueitem){
|
*item = (_queueitem){
|
||||||
|
.interpid = interpid,
|
||||||
.data = data,
|
.data = data,
|
||||||
.fmt = fmt,
|
.fmt = fmt,
|
||||||
|
.unboundop = unboundop,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
_queueitem_clear_data(_queueitem *item)
|
||||||
|
{
|
||||||
|
if (item->data == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// It was allocated in queue_put().
|
||||||
|
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
|
||||||
|
item->data = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_queueitem_clear(_queueitem *item)
|
_queueitem_clear(_queueitem *item)
|
||||||
{
|
{
|
||||||
item->next = NULL;
|
item->next = NULL;
|
||||||
|
_queueitem_clear_data(item);
|
||||||
if (item->data != NULL) {
|
|
||||||
// It was allocated in queue_put().
|
|
||||||
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
|
|
||||||
item->data = NULL;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static _queueitem *
|
static _queueitem *
|
||||||
_queueitem_new(_PyCrossInterpreterData *data, int fmt)
|
_queueitem_new(int64_t interpid, _PyCrossInterpreterData *data,
|
||||||
|
int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
_queueitem *item = GLOBAL_MALLOC(_queueitem);
|
_queueitem *item = GLOBAL_MALLOC(_queueitem);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
PyErr_NoMemory();
|
PyErr_NoMemory();
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
_queueitem_init(item, data, fmt);
|
_queueitem_init(item, interpid, data, fmt, unboundop);
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -452,15 +516,44 @@ _queueitem_free_all(_queueitem *item)
|
||||||
|
|
||||||
static void
|
static void
|
||||||
_queueitem_popped(_queueitem *item,
|
_queueitem_popped(_queueitem *item,
|
||||||
_PyCrossInterpreterData **p_data, int *p_fmt)
|
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
|
||||||
{
|
{
|
||||||
*p_data = item->data;
|
*p_data = item->data;
|
||||||
*p_fmt = item->fmt;
|
*p_fmt = item->fmt;
|
||||||
|
*p_unboundop = item->unboundop;
|
||||||
// We clear them here, so they won't be released in _queueitem_clear().
|
// We clear them here, so they won't be released in _queueitem_clear().
|
||||||
item->data = NULL;
|
item->data = NULL;
|
||||||
_queueitem_free(item);
|
_queueitem_free(item);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_queueitem_clear_interpreter(_queueitem *item)
|
||||||
|
{
|
||||||
|
assert(item->interpid >= 0);
|
||||||
|
if (item->data == NULL) {
|
||||||
|
// Its interpreter was already cleared (or it was never bound).
|
||||||
|
// For UNBOUND_REMOVE it should have been freed at that time.
|
||||||
|
assert(item->unboundop != UNBOUND_REMOVE);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
|
||||||
|
|
||||||
|
switch (item->unboundop) {
|
||||||
|
case UNBOUND_REMOVE:
|
||||||
|
// The caller must free/clear it.
|
||||||
|
return 1;
|
||||||
|
case UNBOUND_ERROR:
|
||||||
|
case UNBOUND_REPLACE:
|
||||||
|
// We won't need the cross-interpreter data later
|
||||||
|
// so we completely throw it away.
|
||||||
|
_queueitem_clear_data(item);
|
||||||
|
return 0;
|
||||||
|
default:
|
||||||
|
Py_FatalError("not reachable");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* the queue */
|
/* the queue */
|
||||||
|
|
||||||
|
@ -474,12 +567,16 @@ typedef struct _queue {
|
||||||
_queueitem *first;
|
_queueitem *first;
|
||||||
_queueitem *last;
|
_queueitem *last;
|
||||||
} items;
|
} items;
|
||||||
int fmt;
|
struct {
|
||||||
|
int fmt;
|
||||||
|
int unboundop;
|
||||||
|
} defaults;
|
||||||
} _queue;
|
} _queue;
|
||||||
|
|
||||||
static int
|
static int
|
||||||
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
|
_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
|
assert(check_unbound(unboundop));
|
||||||
PyThread_type_lock mutex = PyThread_allocate_lock();
|
PyThread_type_lock mutex = PyThread_allocate_lock();
|
||||||
if (mutex == NULL) {
|
if (mutex == NULL) {
|
||||||
return ERR_QUEUE_ALLOC;
|
return ERR_QUEUE_ALLOC;
|
||||||
|
@ -490,7 +587,10 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
|
||||||
.items = {
|
.items = {
|
||||||
.maxsize = maxsize,
|
.maxsize = maxsize,
|
||||||
},
|
},
|
||||||
.fmt = fmt,
|
.defaults = {
|
||||||
|
.fmt = fmt,
|
||||||
|
.unboundop = unboundop,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -571,7 +671,8 @@ _queue_unlock(_queue *queue)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
|
_queue_add(_queue *queue, int64_t interpid, _PyCrossInterpreterData *data,
|
||||||
|
int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
int err = _queue_lock(queue);
|
int err = _queue_lock(queue);
|
||||||
if (err < 0) {
|
if (err < 0) {
|
||||||
|
@ -587,7 +688,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
|
||||||
return ERR_QUEUE_FULL;
|
return ERR_QUEUE_FULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
_queueitem *item = _queueitem_new(data, fmt);
|
_queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
_queue_unlock(queue);
|
_queue_unlock(queue);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -608,7 +709,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
|
||||||
|
|
||||||
static int
|
static int
|
||||||
_queue_next(_queue *queue,
|
_queue_next(_queue *queue,
|
||||||
_PyCrossInterpreterData **p_data, int *p_fmt)
|
_PyCrossInterpreterData **p_data, int *p_fmt, int *p_unboundop)
|
||||||
{
|
{
|
||||||
int err = _queue_lock(queue);
|
int err = _queue_lock(queue);
|
||||||
if (err < 0) {
|
if (err < 0) {
|
||||||
|
@ -627,7 +728,7 @@ _queue_next(_queue *queue,
|
||||||
}
|
}
|
||||||
queue->items.count -= 1;
|
queue->items.count -= 1;
|
||||||
|
|
||||||
_queueitem_popped(item, p_data, p_fmt);
|
_queueitem_popped(item, p_data, p_fmt, p_unboundop);
|
||||||
|
|
||||||
_queue_unlock(queue);
|
_queue_unlock(queue);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -692,14 +793,17 @@ _queue_clear_interpreter(_queue *queue, int64_t interpid)
|
||||||
while (next != NULL) {
|
while (next != NULL) {
|
||||||
_queueitem *item = next;
|
_queueitem *item = next;
|
||||||
next = item->next;
|
next = item->next;
|
||||||
if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
|
int remove = (item->interpid == interpid)
|
||||||
|
? _queueitem_clear_interpreter(item)
|
||||||
|
: 0;
|
||||||
|
if (remove) {
|
||||||
|
_queueitem_free(item);
|
||||||
if (prev == NULL) {
|
if (prev == NULL) {
|
||||||
queue->items.first = item->next;
|
queue->items.first = next;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
prev->next = item->next;
|
prev->next = next;
|
||||||
}
|
}
|
||||||
_queueitem_free(item);
|
|
||||||
queue->items.count -= 1;
|
queue->items.count -= 1;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -966,18 +1070,19 @@ finally:
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct queue_id_and_fmt {
|
struct queue_id_and_info {
|
||||||
int64_t id;
|
int64_t id;
|
||||||
int fmt;
|
int fmt;
|
||||||
|
int unboundop;
|
||||||
};
|
};
|
||||||
|
|
||||||
static struct queue_id_and_fmt *
|
static struct queue_id_and_info *
|
||||||
_queues_list_all(_queues *queues, int64_t *count)
|
_queues_list_all(_queues *queues, int64_t *p_count)
|
||||||
{
|
{
|
||||||
struct queue_id_and_fmt *qids = NULL;
|
struct queue_id_and_info *qids = NULL;
|
||||||
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
|
||||||
struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
|
struct queue_id_and_info *ids = PyMem_NEW(struct queue_id_and_info,
|
||||||
(Py_ssize_t)(queues->count));
|
(Py_ssize_t)(queues->count));
|
||||||
if (ids == NULL) {
|
if (ids == NULL) {
|
||||||
goto done;
|
goto done;
|
||||||
}
|
}
|
||||||
|
@ -985,9 +1090,10 @@ _queues_list_all(_queues *queues, int64_t *count)
|
||||||
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
|
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
|
||||||
ids[i].id = ref->qid;
|
ids[i].id = ref->qid;
|
||||||
assert(ref->queue != NULL);
|
assert(ref->queue != NULL);
|
||||||
ids[i].fmt = ref->queue->fmt;
|
ids[i].fmt = ref->queue->defaults.fmt;
|
||||||
|
ids[i].unboundop = ref->queue->defaults.unboundop;
|
||||||
}
|
}
|
||||||
*count = queues->count;
|
*p_count = queues->count;
|
||||||
|
|
||||||
qids = ids;
|
qids = ids;
|
||||||
done:
|
done:
|
||||||
|
@ -1021,13 +1127,13 @@ _queue_free(_queue *queue)
|
||||||
|
|
||||||
// Create a new queue.
|
// Create a new queue.
|
||||||
static int64_t
|
static int64_t
|
||||||
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
|
queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
_queue *queue = GLOBAL_MALLOC(_queue);
|
_queue *queue = GLOBAL_MALLOC(_queue);
|
||||||
if (queue == NULL) {
|
if (queue == NULL) {
|
||||||
return ERR_QUEUE_ALLOC;
|
return ERR_QUEUE_ALLOC;
|
||||||
}
|
}
|
||||||
int err = _queue_init(queue, maxsize, fmt);
|
int err = _queue_init(queue, maxsize, fmt, unboundop);
|
||||||
if (err < 0) {
|
if (err < 0) {
|
||||||
GLOBAL_FREE(queue);
|
GLOBAL_FREE(queue);
|
||||||
return (int64_t)err;
|
return (int64_t)err;
|
||||||
|
@ -1056,7 +1162,7 @@ queue_destroy(_queues *queues, int64_t qid)
|
||||||
|
|
||||||
// Push an object onto the queue.
|
// Push an object onto the queue.
|
||||||
static int
|
static int
|
||||||
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
|
queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
|
||||||
{
|
{
|
||||||
// Look up the queue.
|
// Look up the queue.
|
||||||
_queue *queue = NULL;
|
_queue *queue = NULL;
|
||||||
|
@ -1077,9 +1183,12 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
|
||||||
GLOBAL_FREE(data);
|
GLOBAL_FREE(data);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
assert(_PyCrossInterpreterData_INTERPID(data) == \
|
||||||
|
PyInterpreterState_GetID(PyInterpreterState_Get()));
|
||||||
|
|
||||||
// Add the data to the queue.
|
// Add the data to the queue.
|
||||||
int res = _queue_add(queue, data, fmt);
|
int64_t interpid = -1; // _queueitem_init() will set it.
|
||||||
|
int res = _queue_add(queue, interpid, data, fmt, unboundop);
|
||||||
_queue_unmark_waiter(queue, queues->mutex);
|
_queue_unmark_waiter(queue, queues->mutex);
|
||||||
if (res != 0) {
|
if (res != 0) {
|
||||||
// We may chain an exception here:
|
// We may chain an exception here:
|
||||||
|
@ -1094,7 +1203,8 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
|
||||||
// Pop the next object off the queue. Fail if empty.
|
// Pop the next object off the queue. Fail if empty.
|
||||||
// XXX Support a "wait" mutex?
|
// XXX Support a "wait" mutex?
|
||||||
static int
|
static int
|
||||||
queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
|
queue_get(_queues *queues, int64_t qid,
|
||||||
|
PyObject **res, int *p_fmt, int *p_unboundop)
|
||||||
{
|
{
|
||||||
int err;
|
int err;
|
||||||
*res = NULL;
|
*res = NULL;
|
||||||
|
@ -1110,7 +1220,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
|
||||||
|
|
||||||
// Pop off the next item from the queue.
|
// Pop off the next item from the queue.
|
||||||
_PyCrossInterpreterData *data = NULL;
|
_PyCrossInterpreterData *data = NULL;
|
||||||
err = _queue_next(queue, &data, p_fmt);
|
err = _queue_next(queue, &data, p_fmt, p_unboundop);
|
||||||
_queue_unmark_waiter(queue, queues->mutex);
|
_queue_unmark_waiter(queue, queues->mutex);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
return err;
|
return err;
|
||||||
|
@ -1397,15 +1507,22 @@ qidarg_converter(PyObject *arg, void *ptr)
|
||||||
static PyObject *
|
static PyObject *
|
||||||
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
|
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
|
||||||
{
|
{
|
||||||
static char *kwlist[] = {"maxsize", "fmt", NULL};
|
static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
|
||||||
Py_ssize_t maxsize;
|
Py_ssize_t maxsize;
|
||||||
int fmt;
|
int fmt;
|
||||||
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
|
int unboundop;
|
||||||
&maxsize, &fmt)) {
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
|
||||||
|
&maxsize, &fmt, &unboundop))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (!check_unbound(unboundop)) {
|
||||||
|
PyErr_Format(PyExc_ValueError,
|
||||||
|
"unsupported unboundop %d", unboundop);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
|
int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
|
||||||
if (qid < 0) {
|
if (qid < 0) {
|
||||||
(void)handle_queue_error((int)qid, self, qid);
|
(void)handle_queue_error((int)qid, self, qid);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1427,7 +1544,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
|
||||||
}
|
}
|
||||||
|
|
||||||
PyDoc_STRVAR(queuesmod_create_doc,
|
PyDoc_STRVAR(queuesmod_create_doc,
|
||||||
"create(maxsize, fmt) -> qid\n\
|
"create(maxsize, fmt, unboundop) -> qid\n\
|
||||||
\n\
|
\n\
|
||||||
Create a new cross-interpreter queue and return its unique generated ID.\n\
|
Create a new cross-interpreter queue and return its unique generated ID.\n\
|
||||||
It is a new reference as though bind() had been called on the queue.\n\
|
It is a new reference as though bind() had been called on the queue.\n\
|
||||||
|
@ -1463,9 +1580,9 @@ static PyObject *
|
||||||
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
|
queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
|
||||||
{
|
{
|
||||||
int64_t count = 0;
|
int64_t count = 0;
|
||||||
struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
|
struct queue_id_and_info *qids = _queues_list_all(&_globals.queues, &count);
|
||||||
if (qids == NULL) {
|
if (qids == NULL) {
|
||||||
if (count == 0) {
|
if (!PyErr_Occurred() && count == 0) {
|
||||||
return PyList_New(0);
|
return PyList_New(0);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1474,9 +1591,10 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
|
||||||
if (ids == NULL) {
|
if (ids == NULL) {
|
||||||
goto finally;
|
goto finally;
|
||||||
}
|
}
|
||||||
struct queue_id_and_fmt *cur = qids;
|
struct queue_id_and_info *cur = qids;
|
||||||
for (int64_t i=0; i < count; cur++, i++) {
|
for (int64_t i=0; i < count; cur++, i++) {
|
||||||
PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
|
PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
|
||||||
|
cur->unboundop);
|
||||||
if (item == NULL) {
|
if (item == NULL) {
|
||||||
Py_SETREF(ids, NULL);
|
Py_SETREF(ids, NULL);
|
||||||
break;
|
break;
|
||||||
|
@ -1498,18 +1616,26 @@ Each corresponding default format is also included.");
|
||||||
static PyObject *
|
static PyObject *
|
||||||
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
|
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
|
||||||
{
|
{
|
||||||
static char *kwlist[] = {"qid", "obj", "fmt", NULL};
|
static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
|
||||||
qidarg_converter_data qidarg;
|
qidarg_converter_data qidarg;
|
||||||
PyObject *obj;
|
PyObject *obj;
|
||||||
int fmt;
|
int fmt;
|
||||||
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
|
int unboundop;
|
||||||
qidarg_converter, &qidarg, &obj, &fmt)) {
|
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
|
||||||
|
qidarg_converter, &qidarg, &obj, &fmt,
|
||||||
|
&unboundop))
|
||||||
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
int64_t qid = qidarg.id;
|
int64_t qid = qidarg.id;
|
||||||
|
if (!check_unbound(unboundop)) {
|
||||||
|
PyErr_Format(PyExc_ValueError,
|
||||||
|
"unsupported unboundop %d", unboundop);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Queue up the object. */
|
/* Queue up the object. */
|
||||||
int err = queue_put(&_globals.queues, qid, obj, fmt);
|
int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
|
||||||
// This is the only place that raises QueueFull.
|
// This is the only place that raises QueueFull.
|
||||||
if (handle_queue_error(err, self, qid)) {
|
if (handle_queue_error(err, self, qid)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1536,13 +1662,17 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
|
||||||
|
|
||||||
PyObject *obj = NULL;
|
PyObject *obj = NULL;
|
||||||
int fmt = 0;
|
int fmt = 0;
|
||||||
int err = queue_get(&_globals.queues, qid, &obj, &fmt);
|
int unboundop = 0;
|
||||||
|
int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
|
||||||
// This is the only place that raises QueueEmpty.
|
// This is the only place that raises QueueEmpty.
|
||||||
if (handle_queue_error(err, self, qid)) {
|
if (handle_queue_error(err, self, qid)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
PyObject *res = Py_BuildValue("Oi", obj, fmt);
|
if (obj == NULL) {
|
||||||
|
return Py_BuildValue("Oii", Py_None, fmt, unboundop);
|
||||||
|
}
|
||||||
|
PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
|
||||||
Py_DECREF(obj);
|
Py_DECREF(obj);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
@ -1656,17 +1786,12 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
|
||||||
if (handle_queue_error(err, self, qid)) {
|
if (handle_queue_error(err, self, qid)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
int fmt = queue->fmt;
|
int fmt = queue->defaults.fmt;
|
||||||
|
int unboundop = queue->defaults.unboundop;
|
||||||
_queue_unmark_waiter(queue, _globals.queues.mutex);
|
_queue_unmark_waiter(queue, _globals.queues.mutex);
|
||||||
|
|
||||||
PyObject *fmt_obj = PyLong_FromLong(fmt);
|
PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
|
||||||
if (fmt_obj == NULL) {
|
return defaults;
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
// For now queues only have one default.
|
|
||||||
PyObject *res = PyTuple_Pack(1, fmt_obj);
|
|
||||||
Py_DECREF(fmt_obj);
|
|
||||||
return res;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
|
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue