mirror of
https://github.com/python/cpython.git
synced 2025-07-08 03:45:36 +00:00
288 lines
8.3 KiB
Python
288 lines
8.3 KiB
Python
"""Cross-interpreter Queues High Level Module."""
|
|
|
|
import queue
|
|
import time
|
|
import weakref
|
|
import _interpqueues as _queues
|
|
from . import _crossinterp
|
|
|
|
# aliases:
|
|
from _interpqueues import (
|
|
QueueError, QueueNotFoundError,
|
|
)
|
|
from ._crossinterp import (
|
|
UNBOUND_ERROR, UNBOUND_REMOVE,
|
|
)
|
|
|
|
__all__ = [
|
|
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
|
|
'create', 'list_all',
|
|
'Queue',
|
|
'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
|
|
'ItemInterpreterDestroyed',
|
|
]
|
|
|
|
|
|
class QueueEmpty(QueueError, queue.Empty):
|
|
"""Raised from get_nowait() when the queue is empty.
|
|
|
|
It is also raised from get() if it times out.
|
|
"""
|
|
|
|
|
|
class QueueFull(QueueError, queue.Full):
|
|
"""Raised from put_nowait() when the queue is full.
|
|
|
|
It is also raised from put() if it times out.
|
|
"""
|
|
|
|
|
|
class ItemInterpreterDestroyed(QueueError,
|
|
_crossinterp.ItemInterpreterDestroyed):
|
|
"""Raised from get() and get_nowait()."""
|
|
|
|
|
|
_SHARED_ONLY = 0
|
|
_PICKLED = 1
|
|
|
|
|
|
UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
|
|
|
|
|
|
def _serialize_unbound(unbound):
|
|
if unbound is UNBOUND:
|
|
unbound = _crossinterp.UNBOUND
|
|
return _crossinterp.serialize_unbound(unbound)
|
|
|
|
|
|
def _resolve_unbound(flag):
|
|
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
|
|
if resolved is _crossinterp.UNBOUND:
|
|
resolved = UNBOUND
|
|
return resolved
|
|
|
|
|
|
def create(maxsize=0, *, unbounditems=UNBOUND):
|
|
"""Return a new cross-interpreter queue.
|
|
|
|
The queue may be used to pass data safely between interpreters.
|
|
|
|
"unbounditems" sets the default for Queue.put(); see that method for
|
|
supported values. The default value is UNBOUND, which replaces
|
|
the unbound item.
|
|
"""
|
|
unbound = _serialize_unbound(unbounditems)
|
|
unboundop, = unbound
|
|
qid = _queues.create(maxsize, unboundop, -1)
|
|
self = Queue(qid)
|
|
self._set_unbound(unboundop, unbounditems)
|
|
return self
|
|
|
|
|
|
def list_all():
|
|
"""Return a list of all open queues."""
|
|
queues = []
|
|
for qid, unboundop, _ in _queues.list_all():
|
|
self = Queue(qid)
|
|
if not hasattr(self, '_unbound'):
|
|
self._set_unbound(unboundop)
|
|
else:
|
|
assert self._unbound[0] == unboundop
|
|
queues.append(self)
|
|
return queues
|
|
|
|
|
|
_known_queues = weakref.WeakValueDictionary()
|
|
|
|
class Queue:
|
|
"""A cross-interpreter queue."""
|
|
|
|
def __new__(cls, id, /):
|
|
# There is only one instance for any given ID.
|
|
if isinstance(id, int):
|
|
id = int(id)
|
|
else:
|
|
raise TypeError(f'id must be an int, got {id!r}')
|
|
try:
|
|
self = _known_queues[id]
|
|
except KeyError:
|
|
self = super().__new__(cls)
|
|
self._id = id
|
|
_known_queues[id] = self
|
|
_queues.bind(id)
|
|
return self
|
|
|
|
def __del__(self):
|
|
try:
|
|
_queues.release(self._id)
|
|
except QueueNotFoundError:
|
|
pass
|
|
try:
|
|
del _known_queues[self._id]
|
|
except KeyError:
|
|
pass
|
|
|
|
def __repr__(self):
|
|
return f'{type(self).__name__}({self.id})'
|
|
|
|
def __hash__(self):
|
|
return hash(self._id)
|
|
|
|
# for pickling:
|
|
def __getnewargs__(self):
|
|
return (self._id,)
|
|
|
|
# for pickling:
|
|
def __getstate__(self):
|
|
return None
|
|
|
|
def _set_unbound(self, op, items=None):
|
|
assert not hasattr(self, '_unbound')
|
|
if items is None:
|
|
items = _resolve_unbound(op)
|
|
unbound = (op, items)
|
|
self._unbound = unbound
|
|
return unbound
|
|
|
|
@property
|
|
def id(self):
|
|
return self._id
|
|
|
|
@property
|
|
def unbounditems(self):
|
|
try:
|
|
_, items = self._unbound
|
|
except AttributeError:
|
|
op, _ = _queues.get_queue_defaults(self._id)
|
|
_, items = self._set_unbound(op)
|
|
return items
|
|
|
|
@property
|
|
def maxsize(self):
|
|
try:
|
|
return self._maxsize
|
|
except AttributeError:
|
|
self._maxsize = _queues.get_maxsize(self._id)
|
|
return self._maxsize
|
|
|
|
def empty(self):
|
|
return self.qsize() == 0
|
|
|
|
def full(self):
|
|
return _queues.is_full(self._id)
|
|
|
|
def qsize(self):
|
|
return _queues.get_count(self._id)
|
|
|
|
def put(self, obj, timeout=None, *,
|
|
unbounditems=None,
|
|
_delay=10 / 1000, # 10 milliseconds
|
|
):
|
|
"""Add the object to the queue.
|
|
|
|
This blocks while the queue is full.
|
|
|
|
For most objects, the object received through Queue.get() will
|
|
be a new one, equivalent to the original and not sharing any
|
|
actual underlying data. The notable exceptions include
|
|
cross-interpreter types (like Queue) and memoryview, where the
|
|
underlying data is actually shared. Furthermore, some types
|
|
can be sent through a queue more efficiently than others. This
|
|
group includes various immutable types like int, str, bytes, and
|
|
tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
|
|
|
|
"unbounditems" controls the behavior of Queue.get() for the given
|
|
object if the current interpreter (calling put()) is later
|
|
destroyed.
|
|
|
|
If "unbounditems" is None (the default) then it uses the
|
|
queue's default, set with create_queue(),
|
|
which is usually UNBOUND.
|
|
|
|
If "unbounditems" is UNBOUND_ERROR then get() will raise an
|
|
ItemInterpreterDestroyed exception if the original interpreter
|
|
has been destroyed. This does not otherwise affect the queue;
|
|
the next call to put() will work like normal, returning the next
|
|
item in the queue.
|
|
|
|
If "unbounditems" is UNBOUND_REMOVE then the item will be removed
|
|
from the queue as soon as the original interpreter is destroyed.
|
|
Be aware that this will introduce an imbalance between put()
|
|
and get() calls.
|
|
|
|
If "unbounditems" is UNBOUND then it is returned by get() in place
|
|
of the unbound item.
|
|
"""
|
|
if unbounditems is None:
|
|
unboundop = -1
|
|
else:
|
|
unboundop, = _serialize_unbound(unbounditems)
|
|
if timeout is not None:
|
|
timeout = int(timeout)
|
|
if timeout < 0:
|
|
raise ValueError(f'timeout value must be non-negative')
|
|
end = time.time() + timeout
|
|
while True:
|
|
try:
|
|
_queues.put(self._id, obj, unboundop)
|
|
except QueueFull as exc:
|
|
if timeout is not None and time.time() >= end:
|
|
raise # re-raise
|
|
time.sleep(_delay)
|
|
else:
|
|
break
|
|
|
|
def put_nowait(self, obj, *, unbounditems=None):
|
|
if unbounditems is None:
|
|
unboundop = -1
|
|
else:
|
|
unboundop, = _serialize_unbound(unbounditems)
|
|
_queues.put(self._id, obj, unboundop)
|
|
|
|
def get(self, timeout=None, *,
|
|
_delay=10 / 1000, # 10 milliseconds
|
|
):
|
|
"""Return the next object from the queue.
|
|
|
|
This blocks while the queue is empty.
|
|
|
|
If the next item's original interpreter has been destroyed
|
|
then the "next object" is determined by the value of the
|
|
"unbounditems" argument to put().
|
|
"""
|
|
if timeout is not None:
|
|
timeout = int(timeout)
|
|
if timeout < 0:
|
|
raise ValueError(f'timeout value must be non-negative')
|
|
end = time.time() + timeout
|
|
while True:
|
|
try:
|
|
obj, unboundop = _queues.get(self._id)
|
|
except QueueEmpty as exc:
|
|
if timeout is not None and time.time() >= end:
|
|
raise # re-raise
|
|
time.sleep(_delay)
|
|
else:
|
|
break
|
|
if unboundop is not None:
|
|
assert obj is None, repr(obj)
|
|
return _resolve_unbound(unboundop)
|
|
return obj
|
|
|
|
def get_nowait(self):
|
|
"""Return the next object from the channel.
|
|
|
|
If the queue is empty then raise QueueEmpty. Otherwise this
|
|
is the same as get().
|
|
"""
|
|
try:
|
|
obj, unboundop = _queues.get(self._id)
|
|
except QueueEmpty as exc:
|
|
raise # re-raise
|
|
if unboundop is not None:
|
|
assert obj is None, repr(obj)
|
|
return _resolve_unbound(unboundop)
|
|
return obj
|
|
|
|
|
|
_queues._register_heap_types(Queue, QueueEmpty, QueueFull)
|