Issue #25233: Rewrite the guts of Queue to be more understandable and correct.

This commit is contained in:
Guido van Rossum 2015-09-28 07:42:34 -07:00
parent 16a1f28194
commit 99f96c5451
3 changed files with 91 additions and 118 deletions

View file

@ -47,7 +47,7 @@ class Queue:
# Futures.
self._getters = collections.deque()
# Futures
# Futures.
self._putters = collections.deque()
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
@ -67,10 +67,13 @@ class Queue:
# End of the overridable methods.
def __put_internal(self, item):
self._put(item)
self._unfinished_tasks += 1
self._finished.clear()
def _wakeup_next(self, waiters):
# Wake up the next waiter (if any) that isn't cancelled.
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
break
def __repr__(self):
return '<{} at {:#x} {}>'.format(
@ -91,16 +94,6 @@ class Queue:
result += ' tasks={}'.format(self._unfinished_tasks)
return result
def _consume_done_getters(self):
# Delete waiters at the head of the get() queue who've timed out.
while self._getters and self._getters[0].done():
self._getters.popleft()
def _consume_done_putters(self):
# Delete waiters at the head of the put() queue who've timed out.
while self._putters and self._putters[0].done():
self._putters.popleft()
def qsize(self):
"""Number of items in the queue."""
return len(self._queue)
@ -134,47 +127,31 @@ class Queue:
This method is a coroutine.
"""
self._consume_done_getters()
if self._getters:
assert not self._queue, (
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get())
elif self._maxsize > 0 and self._maxsize <= self.qsize():
waiter = futures.Future(loop=self._loop)
self._putters.append(waiter)
yield from waiter
self._put(item)
else:
self.__put_internal(item)
while self.full():
putter = futures.Future(loop=self._loop)
self._putters.append(putter)
try:
yield from putter
except:
putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
def put_nowait(self, item):
"""Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
"""
self._consume_done_getters()
if self._getters:
assert not self._queue, (
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(self._get())
elif self._maxsize > 0 and self._maxsize <= self.qsize():
if self.full():
raise QueueFull
else:
self.__put_internal(item)
self._put(item)
self._unfinished_tasks += 1
self._finished.clear()
self._wakeup_next(self._getters)
@coroutine
def get(self):
@ -184,77 +161,30 @@ class Queue:
This method is a coroutine.
"""
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
putter = self._putters.popleft()
# When a getter runs and frees up a slot so this putter can
# run, we need to defer the put for a tick to ensure that
# getters and putters alternate perfectly. See
# ChannelTest.test_wait.
self._loop.call_soon(putter._set_result_unless_cancelled, None)
return self._get()
elif self.qsize():
return self._get()
else:
waiter = futures.Future(loop=self._loop)
self._getters.append(waiter)
while self.empty():
getter = futures.Future(loop=self._loop)
self._getters.append(getter)
try:
return (yield from waiter)
except futures.CancelledError:
# if we get CancelledError, it means someone cancelled this
# get() coroutine. But there is a chance that the waiter
# already is ready and contains an item that has just been
# removed from the queue. In this case, we need to put the item
# back into the front of the queue. This get() must either
# succeed without fault or, if it gets cancelled, it must be as
# if it never happened.
if waiter.done():
self._put_it_back(waiter.result())
yield from getter
except:
getter.cancel() # Just in case getter is not done yet.
if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise
def _put_it_back(self, item):
"""
This is called when we have a waiter to get() an item and this waiter
gets cancelled. In this case, we put the item back: wake up another
waiter or put it in the _queue.
"""
self._consume_done_getters()
if self._getters:
assert not self._queue, (
'queue non-empty, why are getters waiting?')
getter = self._getters.popleft()
self.__put_internal(item)
# getter cannot be cancelled, we just removed done getters
getter.set_result(item)
else:
self._queue.appendleft(item)
return self.get_nowait()
def get_nowait(self):
"""Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
"""
self._consume_done_putters()
if self._putters:
assert self.full(), 'queue not full, why are putters waiting?'
putter = self._putters.popleft()
# Wake putter on next tick.
# getter cannot be cancelled, we just removed done putters
putter.set_result(None)
return self._get()
elif self.qsize():
return self._get()
else:
if self.empty():
raise QueueEmpty
item = self._get()
self._wakeup_next(self._putters)
return item
def task_done(self):
"""Indicate that a formerly enqueued task is complete.