mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 16:27:06 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			261 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
	
		
			9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """A multi-producer, multi-consumer queue."""
 | |
| 
 | |
| from time import time as _time
 | |
| try:
 | |
|     import threading as _threading
 | |
| except ImportError:
 | |
|     import dummy_threading as _threading
 | |
| from collections import deque
 | |
| import heapq
 | |
| 
 | |
| __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
 | |
| 
 | |
| class Empty(Exception):
 | |
|     "Exception raised by Queue.get(block=0)/get_nowait()."
 | |
|     pass
 | |
| 
 | |
| class Full(Exception):
 | |
|     "Exception raised by Queue.put(block=0)/put_nowait()."
 | |
|     pass
 | |
| 
 | |
| class Queue:
 | |
|     """Create a queue object with a given maximum size.
 | |
| 
 | |
|     If maxsize is <= 0, the queue size is infinite.
 | |
|     """
 | |
|     def __init__(self, maxsize=0):
 | |
|         self.maxsize = maxsize
 | |
|         self._init(maxsize)
 | |
|         # mutex must be held whenever the queue is mutating.  All methods
 | |
|         # that acquire mutex must release it before returning.  mutex
 | |
|         # is shared between the three conditions, so acquiring and
 | |
|         # releasing the conditions also acquires and releases mutex.
 | |
|         self.mutex = _threading.Lock()
 | |
|         # Notify not_empty whenever an item is added to the queue; a
 | |
|         # thread waiting to get is notified then.
 | |
|         self.not_empty = _threading.Condition(self.mutex)
 | |
|         # Notify not_full whenever an item is removed from the queue;
 | |
|         # a thread waiting to put is notified then.
 | |
|         self.not_full = _threading.Condition(self.mutex)
 | |
|         # Notify all_tasks_done whenever the number of unfinished tasks
 | |
|         # drops to zero; thread waiting to join() is notified to resume
 | |
|         self.all_tasks_done = _threading.Condition(self.mutex)
 | |
|         self.unfinished_tasks = 0
 | |
| 
 | |
|     def task_done(self):
 | |
|         """Indicate that a formerly enqueued task is complete.
 | |
| 
 | |
|         Used by Queue consumer threads.  For each get() used to fetch a task,
 | |
|         a subsequent call to task_done() tells the queue that the processing
 | |
|         on the task is complete.
 | |
| 
 | |
|         If a join() is currently blocking, it will resume when all items
 | |
|         have been processed (meaning that a task_done() call was received
 | |
|         for every item that had been put() into the queue).
 | |
| 
 | |
|         Raises a ValueError if called more times than there were items
 | |
|         placed in the queue.
 | |
|         """
 | |
|         self.all_tasks_done.acquire()
 | |
|         try:
 | |
|             unfinished = self.unfinished_tasks - 1
 | |
|             if unfinished <= 0:
 | |
|                 if unfinished < 0:
 | |
|                     raise ValueError('task_done() called too many times')
 | |
|                 self.all_tasks_done.notify_all()
 | |
|             self.unfinished_tasks = unfinished
 | |
|         finally:
 | |
|             self.all_tasks_done.release()
 | |
| 
 | |
|     def join(self):
 | |
|         """Blocks until all items in the Queue have been gotten and processed.
 | |
| 
 | |
|         The count of unfinished tasks goes up whenever an item is added to the
 | |
|         queue. The count goes down whenever a consumer thread calls task_done()
 | |
|         to indicate the item was retrieved and all work on it is complete.
 | |
| 
 | |
|         When the count of unfinished tasks drops to zero, join() unblocks.
 | |
|         """
 | |
|         self.all_tasks_done.acquire()
 | |
|         try:
 | |
|             while self.unfinished_tasks:
 | |
|                 self.all_tasks_done.wait()
 | |
|         finally:
 | |
|             self.all_tasks_done.release()
 | |
| 
 | |
|     def qsize(self):
 | |
|         """Return the approximate size of the queue (not reliable!)."""
 | |
|         self.mutex.acquire()
 | |
|         n = self._qsize()
 | |
|         self.mutex.release()
 | |
|         return n
 | |
| 
 | |
|     def empty(self):
 | |
|         """Return True if the queue is empty, False otherwise (not reliable!).
 | |
| 
 | |
|         This method is likely to be removed at some point.  Use qsize() == 0
 | |
|         as a direct substitute, but be aware that either approach risks a race
 | |
|         condition where a queue can grow before the result of empty() or
 | |
|         qsize() can be used.
 | |
| 
 | |
|         To create code that needs to wait for all queued tasks to be
 | |
|         completed, the preferred technique is to use the join() method.
 | |
| 
 | |
|         """
 | |
|         self.mutex.acquire()
 | |
|         n = not self._qsize()
 | |
|         self.mutex.release()
 | |
|         return n
 | |
| 
 | |
|     def full(self):
 | |
|         """Return True if the queue is full, False otherwise (not reliable!).
 | |
| 
 | |
|         This method is likely to be removed at some point.  Use qsize() >= n
 | |
|         as a direct substitute, but be aware that either approach risks a race
 | |
|         condition where a queue can shrink before the result of full() or
 | |
|         qsize() can be used.
 | |
| 
 | |
|         """
 | |
|         self.mutex.acquire()
 | |
|         n = 0 < self.maxsize <= self._qsize()
 | |
|         self.mutex.release()
 | |
|         return n
 | |
| 
 | |
|     def put(self, item, block=True, timeout=None):
 | |
|         """Put an item into the queue.
 | |
| 
 | |
|         If optional args 'block' is true and 'timeout' is None (the default),
 | |
|         block if necessary until a free slot is available. If 'timeout' is
 | |
|         a positive number, it blocks at most 'timeout' seconds and raises
 | |
|         the Full exception if no free slot was available within that time.
 | |
|         Otherwise ('block' is false), put an item on the queue if a free slot
 | |
|         is immediately available, else raise the Full exception ('timeout'
 | |
|         is ignored in that case).
 | |
|         """
 | |
|         self.not_full.acquire()
 | |
|         try:
 | |
|             if self.maxsize > 0:
 | |
|                 if not block:
 | |
|                     if self._qsize() >= self.maxsize:
 | |
|                         raise Full
 | |
|                 elif timeout is None:
 | |
|                     while self._qsize() >= self.maxsize:
 | |
|                         self.not_full.wait()
 | |
|                 elif timeout < 0:
 | |
|                     raise ValueError("'timeout' must be a positive number")
 | |
|                 else:
 | |
|                     endtime = _time() + timeout
 | |
|                     while self._qsize() >= self.maxsize:
 | |
|                         remaining = endtime - _time()
 | |
|                         if remaining <= 0.0:
 | |
|                             raise Full
 | |
|                         self.not_full.wait(remaining)
 | |
|             self._put(item)
 | |
|             self.unfinished_tasks += 1
 | |
|             self.not_empty.notify()
 | |
|         finally:
 | |
|             self.not_full.release()
 | |
| 
 | |
|     def put_nowait(self, item):
 | |
|         """Put an item into the queue without blocking.
 | |
| 
 | |
|         Only enqueue the item if a free slot is immediately available.
 | |
|         Otherwise raise the Full exception.
 | |
|         """
 | |
|         return self.put(item, False)
 | |
| 
 | |
|     def get(self, block=True, timeout=None):
 | |
|         """Remove and return an item from the queue.
 | |
| 
 | |
|         If optional args 'block' is true and 'timeout' is None (the default),
 | |
|         block if necessary until an item is available. If 'timeout' is
 | |
|         a positive number, it blocks at most 'timeout' seconds and raises
 | |
|         the Empty exception if no item was available within that time.
 | |
|         Otherwise ('block' is false), return an item if one is immediately
 | |
|         available, else raise the Empty exception ('timeout' is ignored
 | |
|         in that case).
 | |
|         """
 | |
|         self.not_empty.acquire()
 | |
|         try:
 | |
|             if not block:
 | |
|                 if not self._qsize():
 | |
|                     raise Empty
 | |
|             elif timeout is None:
 | |
|                 while not self._qsize():
 | |
|                     self.not_empty.wait()
 | |
|             elif timeout < 0:
 | |
|                 raise ValueError("'timeout' must be a positive number")
 | |
|             else:
 | |
|                 endtime = _time() + timeout
 | |
|                 while not self._qsize():
 | |
|                     remaining = endtime - _time()
 | |
|                     if remaining <= 0.0:
 | |
|                         raise Empty
 | |
|                     self.not_empty.wait(remaining)
 | |
|             item = self._get()
 | |
|             self.not_full.notify()
 | |
|             return item
 | |
|         finally:
 | |
|             self.not_empty.release()
 | |
| 
 | |
|     def get_nowait(self):
 | |
|         """Remove and return an item from the queue without blocking.
 | |
| 
 | |
|         Only get an item if one is immediately available. Otherwise
 | |
|         raise the Empty exception.
 | |
|         """
 | |
|         return self.get(False)
 | |
| 
 | |
|     # Override these methods to implement other queue organizations
 | |
|     # (e.g. stack or priority queue).
 | |
|     # These will only be called with appropriate locks held
 | |
| 
 | |
|     # Initialize the queue representation
 | |
|     def _init(self, maxsize):
 | |
|         self.queue = deque()
 | |
| 
 | |
|     def _qsize(self, len=len):
 | |
|         return len(self.queue)
 | |
| 
 | |
|     # Put a new item in the queue
 | |
|     def _put(self, item):
 | |
|         self.queue.append(item)
 | |
| 
 | |
|     # Get an item from the queue
 | |
|     def _get(self):
 | |
|         return self.queue.popleft()
 | |
| 
 | |
| 
 | |
| class PriorityQueue(Queue):
 | |
|     '''Variant of Queue that retrieves open entries in priority order (lowest first).
 | |
| 
 | |
|     Entries are typically tuples of the form:  (priority number, data).
 | |
|     '''
 | |
| 
 | |
|     def _init(self, maxsize):
 | |
|         self.queue = []
 | |
| 
 | |
|     def _qsize(self, len=len):
 | |
|         return len(self.queue)
 | |
| 
 | |
|     def _put(self, item, heappush=heapq.heappush):
 | |
|         heappush(self.queue, item)
 | |
| 
 | |
|     def _get(self, heappop=heapq.heappop):
 | |
|         return heappop(self.queue)
 | |
| 
 | |
| 
 | |
| class LifoQueue(Queue):
 | |
|     '''Variant of Queue that retrieves most recently added entries first.'''
 | |
| 
 | |
|     def _init(self, maxsize):
 | |
|         self.queue = []
 | |
| 
 | |
|     def _qsize(self, len=len):
 | |
|         return len(self.queue)
 | |
| 
 | |
|     def _put(self, item):
 | |
|         self.queue.append(item)
 | |
| 
 | |
|     def _get(self):
 | |
|         return self.queue.pop()
 | 
