"""Cross-interpreter Queues High Level Module.""" import queue import time import weakref import _xxinterpqueues as _queues # aliases: from _xxinterpqueues import ( QueueError, QueueNotFoundError, ) __all__ = [ 'create', 'list_all', 'Queue', 'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull', ] class QueueEmpty(_queues.QueueEmpty, queue.Empty): """Raised from get_nowait() when the queue is empty. It is also raised from get() if it times out. """ class QueueFull(_queues.QueueFull, queue.Full): """Raised from put_nowait() when the queue is full. It is also raised from put() if it times out. """ def create(maxsize=0): """Return a new cross-interpreter queue. The queue may be used to pass data safely between interpreters. """ qid = _queues.create(maxsize) return Queue(qid) def list_all(): """Return a list of all open queues.""" return [Queue(qid) for qid in _queues.list_all()] _known_queues = weakref.WeakValueDictionary() class Queue: """A cross-interpreter queue.""" def __new__(cls, id, /): # There is only one instance for any given ID. if isinstance(id, int): id = int(id) else: raise TypeError(f'id must be an int, got {id!r}') try: self = _known_queues[id] except KeyError: self = super().__new__(cls) self._id = id _known_queues[id] = self _queues.bind(id) return self def __del__(self): try: _queues.release(self._id) except QueueNotFoundError: pass try: del _known_queues[self._id] except KeyError: pass def __repr__(self): return f'{type(self).__name__}({self.id})' def __hash__(self): return hash(self._id) @property def id(self): return self._id @property def maxsize(self): try: return self._maxsize except AttributeError: self._maxsize = _queues.get_maxsize(self._id) return self._maxsize def empty(self): return self.qsize() == 0 def full(self): return _queues.is_full(self._id) def qsize(self): return _queues.get_count(self._id) def put(self, obj, timeout=None, *, _delay=10 / 1000, # 10 milliseconds ): """Add the object to the queue. This blocks while the queue is full. """ if timeout is not None: timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout while True: try: _queues.put(self._id, obj) except _queues.QueueFull as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueFull raise # re-raise time.sleep(_delay) else: break def put_nowait(self, obj): try: return _queues.put(self._id, obj) except _queues.QueueFull as exc: exc.__class__ = QueueFull raise # re-raise def get(self, timeout=None, *, _delay=10 / 1000, # 10 milliseconds ): """Return the next object from the queue. This blocks while the queue is empty. """ if timeout is not None: timeout = int(timeout) if timeout < 0: raise ValueError(f'timeout value must be non-negative') end = time.time() + timeout while True: try: return _queues.get(self._id) except _queues.QueueEmpty as exc: if timeout is not None and time.time() >= end: exc.__class__ = QueueEmpty raise # re-raise time.sleep(_delay) return obj def get_nowait(self): """Return the next object from the channel. If the queue is empty then raise QueueEmpty. Otherwise this is the same as get(). """ try: return _queues.get(self._id) except _queues.QueueEmpty as exc: exc.__class__ = QueueEmpty raise # re-raise _queues._register_queue_type(Queue)