mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 07:48:51 +00:00 
			
		
		
		
	 8b209fd4f8
			
		
	
	
		8b209fd4f8
		
			
		
	
	
	
	
		
			
			See 6b98b274b6 for an explanation of the problem and solution.  Here I've applied the solution to channels.
		
	
			
		
			
				
	
	
		
			313 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			313 lines
		
	
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Cross-interpreter Queues High Level Module."""
 | |
| 
 | |
| import pickle
 | |
| import queue
 | |
| import time
 | |
| import weakref
 | |
| import _interpqueues as _queues
 | |
| from . import _crossinterp
 | |
| 
 | |
| # aliases:
 | |
| from _interpqueues import (
 | |
|     QueueError, QueueNotFoundError,
 | |
| )
 | |
| from ._crossinterp import (
 | |
|     UNBOUND_ERROR, UNBOUND_REMOVE,
 | |
| )
 | |
| 
 | |
| __all__ = [
 | |
|     'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
 | |
|     'create', 'list_all',
 | |
|     'Queue',
 | |
|     'QueueError', 'QueueNotFoundError', 'QueueEmpty', 'QueueFull',
 | |
|     'ItemInterpreterDestroyed',
 | |
| ]
 | |
| 
 | |
| 
 | |
| class QueueEmpty(QueueError, queue.Empty):
 | |
|     """Raised from get_nowait() when the queue is empty.
 | |
| 
 | |
|     It is also raised from get() if it times out.
 | |
|     """
 | |
| 
 | |
| 
 | |
| class QueueFull(QueueError, queue.Full):
 | |
|     """Raised from put_nowait() when the queue is full.
 | |
| 
 | |
|     It is also raised from put() if it times out.
 | |
|     """
 | |
| 
 | |
| 
 | |
| class ItemInterpreterDestroyed(QueueError,
 | |
|                                _crossinterp.ItemInterpreterDestroyed):
 | |
|     """Raised from get() and get_nowait()."""
 | |
| 
 | |
| 
 | |
| _SHARED_ONLY = 0
 | |
| _PICKLED = 1
 | |
| 
 | |
| 
 | |
| UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
 | |
| 
 | |
| 
 | |
| def _serialize_unbound(unbound):
 | |
|     if unbound is UNBOUND:
 | |
|         unbound = _crossinterp.UNBOUND
 | |
|     return _crossinterp.serialize_unbound(unbound)
 | |
| 
 | |
| 
 | |
| def _resolve_unbound(flag):
 | |
|     resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
 | |
|     if resolved is _crossinterp.UNBOUND:
 | |
|         resolved = UNBOUND
 | |
|     return resolved
 | |
| 
 | |
| 
 | |
| def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
 | |
|     """Return a new cross-interpreter queue.
 | |
| 
 | |
|     The queue may be used to pass data safely between interpreters.
 | |
| 
 | |
|     "syncobj" sets the default for Queue.put()
 | |
|     and Queue.put_nowait().
 | |
| 
 | |
|     "unbounditems" likewise sets the default.  See Queue.put() for
 | |
|     supported values.  The default value is UNBOUND, which replaces
 | |
|     the unbound item.
 | |
|     """
 | |
|     fmt = _SHARED_ONLY if syncobj else _PICKLED
 | |
|     unbound = _serialize_unbound(unbounditems)
 | |
|     unboundop, = unbound
 | |
|     qid = _queues.create(maxsize, fmt, unboundop)
 | |
|     return Queue(qid, _fmt=fmt, _unbound=unbound)
 | |
| 
 | |
| 
 | |
| def list_all():
 | |
|     """Return a list of all open queues."""
 | |
|     return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
 | |
|             for qid, fmt, unboundop in _queues.list_all()]
 | |
| 
 | |
| 
 | |
| _known_queues = weakref.WeakValueDictionary()
 | |
| 
 | |
| class Queue:
 | |
|     """A cross-interpreter queue."""
 | |
| 
 | |
|     def __new__(cls, id, /, *, _fmt=None, _unbound=None):
 | |
|         # There is only one instance for any given ID.
 | |
|         if isinstance(id, int):
 | |
|             id = int(id)
 | |
|         else:
 | |
|             raise TypeError(f'id must be an int, got {id!r}')
 | |
|         if _fmt is None:
 | |
|             if _unbound is None:
 | |
|                 _fmt, op = _queues.get_queue_defaults(id)
 | |
|                 _unbound = (op,)
 | |
|             else:
 | |
|                 _fmt, _ = _queues.get_queue_defaults(id)
 | |
|         elif _unbound is None:
 | |
|             _, op = _queues.get_queue_defaults(id)
 | |
|             _unbound = (op,)
 | |
|         try:
 | |
|             self = _known_queues[id]
 | |
|         except KeyError:
 | |
|             self = super().__new__(cls)
 | |
|             self._id = id
 | |
|             self._fmt = _fmt
 | |
|             self._unbound = _unbound
 | |
|             _known_queues[id] = self
 | |
|             _queues.bind(id)
 | |
|         return self
 | |
| 
 | |
|     def __del__(self):
 | |
|         try:
 | |
|             _queues.release(self._id)
 | |
|         except QueueNotFoundError:
 | |
|             pass
 | |
|         try:
 | |
|             del _known_queues[self._id]
 | |
|         except KeyError:
 | |
|             pass
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return f'{type(self).__name__}({self.id})'
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash(self._id)
 | |
| 
 | |
|     # for pickling:
 | |
|     def __getnewargs__(self):
 | |
|         return (self._id,)
 | |
| 
 | |
|     # for pickling:
 | |
|     def __getstate__(self):
 | |
|         return None
 | |
| 
 | |
|     @property
 | |
|     def id(self):
 | |
|         return self._id
 | |
| 
 | |
|     @property
 | |
|     def maxsize(self):
 | |
|         try:
 | |
|             return self._maxsize
 | |
|         except AttributeError:
 | |
|             self._maxsize = _queues.get_maxsize(self._id)
 | |
|             return self._maxsize
 | |
| 
 | |
|     def empty(self):
 | |
|         return self.qsize() == 0
 | |
| 
 | |
|     def full(self):
 | |
|         return _queues.is_full(self._id)
 | |
| 
 | |
|     def qsize(self):
 | |
|         return _queues.get_count(self._id)
 | |
| 
 | |
|     def put(self, obj, timeout=None, *,
 | |
|             syncobj=None,
 | |
|             unbound=None,
 | |
|             _delay=10 / 1000,  # 10 milliseconds
 | |
|             ):
 | |
|         """Add the object to the queue.
 | |
| 
 | |
|         This blocks while the queue is full.
 | |
| 
 | |
|         If "syncobj" is None (the default) then it uses the
 | |
|         queue's default, set with create_queue().
 | |
| 
 | |
|         If "syncobj" is false then all objects are supported,
 | |
|         at the expense of worse performance.
 | |
| 
 | |
|         If "syncobj" is true then the object must be "shareable".
 | |
|         Examples of "shareable" objects include the builtin singletons,
 | |
|         str, and memoryview.  One benefit is that such objects are
 | |
|         passed through the queue efficiently.
 | |
| 
 | |
|         The key difference, though, is conceptual: the corresponding
 | |
|         object returned from Queue.get() will be strictly equivalent
 | |
|         to the given obj.  In other words, the two objects will be
 | |
|         effectively indistinguishable from each other, even if the
 | |
|         object is mutable.  The received object may actually be the
 | |
|         same object, or a copy (immutable values only), or a proxy.
 | |
|         Regardless, the received object should be treated as though
 | |
|         the original has been shared directly, whether or not it
 | |
|         actually is.  That's a slightly different and stronger promise
 | |
|         than just (initial) equality, which is all "syncobj=False"
 | |
|         can promise.
 | |
| 
 | |
|         "unbound" controls the behavior of Queue.get() for the given
 | |
|         object if the current interpreter (calling put()) is later
 | |
|         destroyed.
 | |
| 
 | |
|         If "unbound" is None (the default) then it uses the
 | |
|         queue's default, set with create_queue(),
 | |
|         which is usually UNBOUND.
 | |
| 
 | |
|         If "unbound" is UNBOUND_ERROR then get() will raise an
 | |
|         ItemInterpreterDestroyed exception if the original interpreter
 | |
|         has been destroyed.  This does not otherwise affect the queue;
 | |
|         the next call to put() will work like normal, returning the next
 | |
|         item in the queue.
 | |
| 
 | |
|         If "unbound" is UNBOUND_REMOVE then the item will be removed
 | |
|         from the queue as soon as the original interpreter is destroyed.
 | |
|         Be aware that this will introduce an imbalance between put()
 | |
|         and get() calls.
 | |
| 
 | |
|         If "unbound" is UNBOUND then it is returned by get() in place
 | |
|         of the unbound item.
 | |
|         """
 | |
|         if syncobj is None:
 | |
|             fmt = self._fmt
 | |
|         else:
 | |
|             fmt = _SHARED_ONLY if syncobj else _PICKLED
 | |
|         if unbound is None:
 | |
|             unboundop, = self._unbound
 | |
|         else:
 | |
|             unboundop, = _serialize_unbound(unbound)
 | |
|         if timeout is not None:
 | |
|             timeout = int(timeout)
 | |
|             if timeout < 0:
 | |
|                 raise ValueError(f'timeout value must be non-negative')
 | |
|             end = time.time() + timeout
 | |
|         if fmt is _PICKLED:
 | |
|             obj = pickle.dumps(obj)
 | |
|         while True:
 | |
|             try:
 | |
|                 _queues.put(self._id, obj, fmt, unboundop)
 | |
|             except QueueFull as exc:
 | |
|                 if timeout is not None and time.time() >= end:
 | |
|                     raise  # re-raise
 | |
|                 time.sleep(_delay)
 | |
|             else:
 | |
|                 break
 | |
| 
 | |
|     def put_nowait(self, obj, *, syncobj=None, unbound=None):
 | |
|         if syncobj is None:
 | |
|             fmt = self._fmt
 | |
|         else:
 | |
|             fmt = _SHARED_ONLY if syncobj else _PICKLED
 | |
|         if unbound is None:
 | |
|             unboundop, = self._unbound
 | |
|         else:
 | |
|             unboundop, = _serialize_unbound(unbound)
 | |
|         if fmt is _PICKLED:
 | |
|             obj = pickle.dumps(obj)
 | |
|         _queues.put(self._id, obj, fmt, unboundop)
 | |
| 
 | |
|     def get(self, timeout=None, *,
 | |
|             _delay=10 / 1000,  # 10 milliseconds
 | |
|             ):
 | |
|         """Return the next object from the queue.
 | |
| 
 | |
|         This blocks while the queue is empty.
 | |
| 
 | |
|         If the next item's original interpreter has been destroyed
 | |
|         then the "next object" is determined by the value of the
 | |
|         "unbound" argument to put().
 | |
|         """
 | |
|         if timeout is not None:
 | |
|             timeout = int(timeout)
 | |
|             if timeout < 0:
 | |
|                 raise ValueError(f'timeout value must be non-negative')
 | |
|             end = time.time() + timeout
 | |
|         while True:
 | |
|             try:
 | |
|                 obj, fmt, unboundop = _queues.get(self._id)
 | |
|             except QueueEmpty as exc:
 | |
|                 if timeout is not None and time.time() >= end:
 | |
|                     raise  # re-raise
 | |
|                 time.sleep(_delay)
 | |
|             else:
 | |
|                 break
 | |
|         if unboundop is not None:
 | |
|             assert obj is None, repr(obj)
 | |
|             return _resolve_unbound(unboundop)
 | |
|         if fmt == _PICKLED:
 | |
|             obj = pickle.loads(obj)
 | |
|         else:
 | |
|             assert fmt == _SHARED_ONLY
 | |
|         return obj
 | |
| 
 | |
|     def get_nowait(self):
 | |
|         """Return the next object from the channel.
 | |
| 
 | |
|         If the queue is empty then raise QueueEmpty.  Otherwise this
 | |
|         is the same as get().
 | |
|         """
 | |
|         try:
 | |
|             obj, fmt, unboundop = _queues.get(self._id)
 | |
|         except QueueEmpty as exc:
 | |
|             raise  # re-raise
 | |
|         if unboundop is not None:
 | |
|             assert obj is None, repr(obj)
 | |
|             return _resolve_unbound(unboundop)
 | |
|         if fmt == _PICKLED:
 | |
|             obj = pickle.loads(obj)
 | |
|         else:
 | |
|             assert fmt == _SHARED_ONLY
 | |
|         return obj
 | |
| 
 | |
| 
 | |
| _queues._register_heap_types(Queue, QueueEmpty, QueueFull)
 |