mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Fix asyncio issue 235 (merge from 3.4).
This commit is contained in:
commit
f75d4a7ad0
3 changed files with 36 additions and 11 deletions
|
@ -53,6 +53,8 @@ class Queue:
|
||||||
self._finished.set()
|
self._finished.set()
|
||||||
self._init(maxsize)
|
self._init(maxsize)
|
||||||
|
|
||||||
|
# These three are overridable in subclasses.
|
||||||
|
|
||||||
def _init(self, maxsize):
|
def _init(self, maxsize):
|
||||||
self._queue = collections.deque()
|
self._queue = collections.deque()
|
||||||
|
|
||||||
|
@ -61,6 +63,11 @@ class Queue:
|
||||||
|
|
||||||
def _put(self, item):
|
def _put(self, item):
|
||||||
self._queue.append(item)
|
self._queue.append(item)
|
||||||
|
|
||||||
|
# End of the overridable methods.
|
||||||
|
|
||||||
|
def __put_internal(self, item):
|
||||||
|
self._put(item)
|
||||||
self._unfinished_tasks += 1
|
self._unfinished_tasks += 1
|
||||||
self._finished.clear()
|
self._finished.clear()
|
||||||
|
|
||||||
|
@ -132,7 +139,7 @@ class Queue:
|
||||||
'queue non-empty, why are getters waiting?')
|
'queue non-empty, why are getters waiting?')
|
||||||
|
|
||||||
getter = self._getters.popleft()
|
getter = self._getters.popleft()
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
|
|
||||||
# getter cannot be cancelled, we just removed done getters
|
# getter cannot be cancelled, we just removed done getters
|
||||||
getter.set_result(self._get())
|
getter.set_result(self._get())
|
||||||
|
@ -144,7 +151,7 @@ class Queue:
|
||||||
yield from waiter
|
yield from waiter
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
|
|
||||||
def put_nowait(self, item):
|
def put_nowait(self, item):
|
||||||
"""Put an item into the queue without blocking.
|
"""Put an item into the queue without blocking.
|
||||||
|
@ -157,7 +164,7 @@ class Queue:
|
||||||
'queue non-empty, why are getters waiting?')
|
'queue non-empty, why are getters waiting?')
|
||||||
|
|
||||||
getter = self._getters.popleft()
|
getter = self._getters.popleft()
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
|
|
||||||
# getter cannot be cancelled, we just removed done getters
|
# getter cannot be cancelled, we just removed done getters
|
||||||
getter.set_result(self._get())
|
getter.set_result(self._get())
|
||||||
|
@ -165,7 +172,7 @@ class Queue:
|
||||||
elif self._maxsize > 0 and self._maxsize <= self.qsize():
|
elif self._maxsize > 0 and self._maxsize <= self.qsize():
|
||||||
raise QueueFull
|
raise QueueFull
|
||||||
else:
|
else:
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
|
|
||||||
@coroutine
|
@coroutine
|
||||||
def get(self):
|
def get(self):
|
||||||
|
@ -179,7 +186,7 @@ class Queue:
|
||||||
if self._putters:
|
if self._putters:
|
||||||
assert self.full(), 'queue not full, why are putters waiting?'
|
assert self.full(), 'queue not full, why are putters waiting?'
|
||||||
item, putter = self._putters.popleft()
|
item, putter = self._putters.popleft()
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
|
|
||||||
# When a getter runs and frees up a slot so this putter can
|
# When a getter runs and frees up a slot so this putter can
|
||||||
# run, we need to defer the put for a tick to ensure that
|
# run, we need to defer the put for a tick to ensure that
|
||||||
|
@ -206,7 +213,7 @@ class Queue:
|
||||||
if self._putters:
|
if self._putters:
|
||||||
assert self.full(), 'queue not full, why are putters waiting?'
|
assert self.full(), 'queue not full, why are putters waiting?'
|
||||||
item, putter = self._putters.popleft()
|
item, putter = self._putters.popleft()
|
||||||
self._put(item)
|
self.__put_internal(item)
|
||||||
# Wake putter on next tick.
|
# Wake putter on next tick.
|
||||||
|
|
||||||
# getter cannot be cancelled, we just removed done putters
|
# getter cannot be cancelled, we just removed done putters
|
||||||
|
|
|
@ -408,14 +408,16 @@ class PriorityQueueTests(_QueueTestBase):
|
||||||
self.assertEqual([1, 2, 3], items)
|
self.assertEqual([1, 2, 3], items)
|
||||||
|
|
||||||
|
|
||||||
class QueueJoinTests(_QueueTestBase):
|
class _QueueJoinTestMixin:
|
||||||
|
|
||||||
|
q_class = None
|
||||||
|
|
||||||
def test_task_done_underflow(self):
|
def test_task_done_underflow(self):
|
||||||
q = asyncio.Queue(loop=self.loop)
|
q = self.q_class(loop=self.loop)
|
||||||
self.assertRaises(ValueError, q.task_done)
|
self.assertRaises(ValueError, q.task_done)
|
||||||
|
|
||||||
def test_task_done(self):
|
def test_task_done(self):
|
||||||
q = asyncio.Queue(loop=self.loop)
|
q = self.q_class(loop=self.loop)
|
||||||
for i in range(100):
|
for i in range(100):
|
||||||
q.put_nowait(i)
|
q.put_nowait(i)
|
||||||
|
|
||||||
|
@ -452,7 +454,7 @@ class QueueJoinTests(_QueueTestBase):
|
||||||
self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
|
self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
|
||||||
|
|
||||||
def test_join_empty_queue(self):
|
def test_join_empty_queue(self):
|
||||||
q = asyncio.Queue(loop=self.loop)
|
q = self.q_class(loop=self.loop)
|
||||||
|
|
||||||
# Test that a queue join()s successfully, and before anything else
|
# Test that a queue join()s successfully, and before anything else
|
||||||
# (done twice for insurance).
|
# (done twice for insurance).
|
||||||
|
@ -465,12 +467,24 @@ class QueueJoinTests(_QueueTestBase):
|
||||||
self.loop.run_until_complete(join())
|
self.loop.run_until_complete(join())
|
||||||
|
|
||||||
def test_format(self):
|
def test_format(self):
|
||||||
q = asyncio.Queue(loop=self.loop)
|
q = self.q_class(loop=self.loop)
|
||||||
self.assertEqual(q._format(), 'maxsize=0')
|
self.assertEqual(q._format(), 'maxsize=0')
|
||||||
|
|
||||||
q._unfinished_tasks = 2
|
q._unfinished_tasks = 2
|
||||||
self.assertEqual(q._format(), 'maxsize=0 tasks=2')
|
self.assertEqual(q._format(), 'maxsize=0 tasks=2')
|
||||||
|
|
||||||
|
|
||||||
|
class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
|
||||||
|
q_class = asyncio.Queue
|
||||||
|
|
||||||
|
|
||||||
|
class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
|
||||||
|
q_class = asyncio.LifoQueue
|
||||||
|
|
||||||
|
|
||||||
|
class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
|
||||||
|
q_class = asyncio.PriorityQueue
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|
|
@ -49,6 +49,10 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Fix asyncio issue 235: LifoQueue and PriorityQueue's put didn't
|
||||||
|
increment unfinished tasks (this bug was introduced when
|
||||||
|
JoinableQueue was merged with Queue).
|
||||||
|
|
||||||
- Issue #23908: os functions now reject paths with embedded null character
|
- Issue #23908: os functions now reject paths with embedded null character
|
||||||
on Windows instead of silently truncate them.
|
on Windows instead of silently truncate them.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue