mirror of
				https://github.com/python/cpython.git
				synced 2025-10-22 06:32:43 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			205 lines
		
	
	
	
		
			5.4 KiB
		
	
	
	
		
			ReStructuredText
		
	
	
	
	
	
			
		
		
	
	
			205 lines
		
	
	
	
		
			5.4 KiB
		
	
	
	
		
			ReStructuredText
		
	
	
	
	
	
| .. currentmodule:: asyncio
 | |
| 
 | |
| .. _asyncio-queues:
 | |
| 
 | |
| ======
 | |
| Queues
 | |
| ======
 | |
| 
 | |
| **Source code:** :source:`Lib/asyncio/queues.py`
 | |
| 
 | |
| ------------------------------------------------
 | |
| 
 | |
| asyncio queues are designed to be similar to classes of the
 | |
| :mod:`queue` module.  Although asyncio queues are not thread-safe,
 | |
| they are designed to be used specifically in async/await code.
 | |
| 
 | |
| Note that methods of asyncio queues don't have a *timeout* parameter;
 | |
| use :func:`asyncio.wait_for` function to do queue operations with a
 | |
| timeout.
 | |
| 
 | |
| See also the `Examples`_ section below.
 | |
| 
 | |
| Queue
 | |
| =====
 | |
| 
 | |
| .. class:: Queue(maxsize=0)
 | |
| 
 | |
|    A first in, first out (FIFO) queue.
 | |
| 
 | |
|    If *maxsize* is less than or equal to zero, the queue size is
 | |
|    infinite.  If it is an integer greater than ``0``, then
 | |
|    ``await put()`` blocks when the queue reaches *maxsize*
 | |
|    until an item is removed by :meth:`get`.
 | |
| 
 | |
|    Unlike the standard library threading :mod:`queue`, the size of
 | |
|    the queue is always known and can be returned by calling the
 | |
|    :meth:`qsize` method.
 | |
| 
 | |
| 
 | |
|    This class is :ref:`not thread safe <asyncio-multithreading>`.
 | |
| 
 | |
|    .. attribute:: maxsize
 | |
| 
 | |
|       Number of items allowed in the queue.
 | |
| 
 | |
|    .. method:: empty()
 | |
| 
 | |
|       Return ``True`` if the queue is empty, ``False`` otherwise.
 | |
| 
 | |
|    .. method:: full()
 | |
| 
 | |
|       Return ``True`` if there are :attr:`maxsize` items in the queue.
 | |
| 
 | |
|       If the queue was initialized with ``maxsize=0`` (the default),
 | |
|       then :meth:`full()` never returns ``True``.
 | |
| 
 | |
|    .. coroutinemethod:: get()
 | |
| 
 | |
|       Remove and return an item from the queue. If queue is empty,
 | |
|       wait until an item is available.
 | |
| 
 | |
|    .. method:: get_nowait()
 | |
| 
 | |
|       Return an item if one is immediately available, else raise
 | |
|       :exc:`QueueEmpty`.
 | |
| 
 | |
|    .. coroutinemethod:: join()
 | |
| 
 | |
|       Block until all items in the queue have been received and processed.
 | |
| 
 | |
|       The count of unfinished tasks goes up whenever an item is added
 | |
|       to the queue. The count goes down whenever a consumer coroutine calls
 | |
|       :meth:`task_done` to indicate that the item was retrieved and all
 | |
|       work on it is complete.  When the count of unfinished tasks drops
 | |
|       to zero, :meth:`join` unblocks.
 | |
| 
 | |
|    .. coroutinemethod:: put(item)
 | |
| 
 | |
|       Put an item into the queue. If the queue is full, wait until a
 | |
|       free slot is available before adding the item.
 | |
| 
 | |
|    .. method:: put_nowait(item)
 | |
| 
 | |
|       Put an item into the queue without blocking.
 | |
| 
 | |
|       If no free slot is immediately available, raise :exc:`QueueFull`.
 | |
| 
 | |
|    .. method:: qsize()
 | |
| 
 | |
|       Return the number of items in the queue.
 | |
| 
 | |
|    .. method:: task_done()
 | |
| 
 | |
|       Indicate that a formerly enqueued task is complete.
 | |
| 
 | |
|       Used by queue consumers. For each :meth:`~Queue.get` used to
 | |
|       fetch a task, a subsequent call to :meth:`task_done` tells the
 | |
|       queue that the processing on the task is complete.
 | |
| 
 | |
|       If a :meth:`join` is currently blocking, it will resume when all
 | |
|       items have been processed (meaning that a :meth:`task_done`
 | |
|       call was received for every item that had been :meth:`~Queue.put`
 | |
|       into the queue).
 | |
| 
 | |
|       Raises :exc:`ValueError` if called more times than there were
 | |
|       items placed in the queue.
 | |
| 
 | |
| 
 | |
| Priority Queue
 | |
| ==============
 | |
| 
 | |
| .. class:: PriorityQueue
 | |
| 
 | |
|    A variant of :class:`Queue`; retrieves entries in priority order
 | |
|    (lowest first).
 | |
| 
 | |
|    Entries are typically tuples of the form
 | |
|    ``(priority_number, data)``.
 | |
| 
 | |
| 
 | |
| LIFO Queue
 | |
| ==========
 | |
| 
 | |
| .. class:: LifoQueue
 | |
| 
 | |
|    A variant of :class:`Queue` that retrieves most recently added
 | |
|    entries first (last in, first out).
 | |
| 
 | |
| 
 | |
| Exceptions
 | |
| ==========
 | |
| 
 | |
| .. exception:: QueueEmpty
 | |
| 
 | |
|    This exception is raised when the :meth:`~Queue.get_nowait` method
 | |
|    is called on an empty queue.
 | |
| 
 | |
| 
 | |
| .. exception:: QueueFull
 | |
| 
 | |
|    Exception raised when the :meth:`~Queue.put_nowait` method is called
 | |
|    on a queue that has reached its *maxsize*.
 | |
| 
 | |
| 
 | |
| Examples
 | |
| ========
 | |
| 
 | |
| .. _asyncio_example_queue_dist:
 | |
| 
 | |
| Queues can be used to distribute workload between several
 | |
| concurrent tasks::
 | |
| 
 | |
|    import asyncio
 | |
|    import random
 | |
|    import time
 | |
| 
 | |
| 
 | |
|    async def worker(name, queue):
 | |
|        while True:
 | |
|            # Get a "work item" out of the queue.
 | |
|            sleep_for = await queue.get()
 | |
| 
 | |
|            # Sleep for the "sleep_for" seconds.
 | |
|            await asyncio.sleep(sleep_for)
 | |
| 
 | |
|            # Notify the queue that the "work item" has been processed.
 | |
|            queue.task_done()
 | |
| 
 | |
|            print(f'{name} has slept for {sleep_for:.2f} seconds')
 | |
| 
 | |
| 
 | |
|    async def main():
 | |
|        # Create a queue that we will use to store our "workload".
 | |
|        queue = asyncio.Queue()
 | |
| 
 | |
|        # Generate random timings and put them into the queue.
 | |
|        total_sleep_time = 0
 | |
|        for _ in range(20):
 | |
|            sleep_for = random.uniform(0.05, 1.0)
 | |
|            total_sleep_time += sleep_for
 | |
|            queue.put_nowait(sleep_for)
 | |
| 
 | |
|        # Create three worker tasks to process the queue concurrently.
 | |
|        tasks = []
 | |
|        for i in range(3):
 | |
|            task = asyncio.create_task(worker(f'worker-{i}', queue))
 | |
|            tasks.append(task)
 | |
| 
 | |
|        # Wait until the queue is fully processed.
 | |
|        started_at = time.monotonic()
 | |
|        await queue.join()
 | |
|        total_slept_for = time.monotonic() - started_at
 | |
| 
 | |
|        # Cancel our worker tasks.
 | |
|        for task in tasks:
 | |
|            task.cancel()
 | |
|        # Wait until all worker tasks are cancelled.
 | |
|        await asyncio.gather(*tasks, return_exceptions=True)
 | |
| 
 | |
|        print('====')
 | |
|        print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
 | |
|        print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
 | |
| 
 | |
| 
 | |
|    asyncio.run(main())
 | 
