mirror of
https://github.com/python/cpython.git
synced 2025-10-02 13:22:19 +00:00
bpo-33078 - Fix queue size on pickling error (GH-6119) (GH-6178)
(cherry picked from commit e2f33add63
)
Co-authored-by: Thomas Moreau <thomas.moreau.2010@gmail.com>
This commit is contained in:
parent
22136c94b6
commit
bb5b529197
3 changed files with 23 additions and 2 deletions
|
@ -161,7 +161,7 @@ class Queue(object):
|
||||||
target=Queue._feed,
|
target=Queue._feed,
|
||||||
args=(self._buffer, self._notempty, self._send_bytes,
|
args=(self._buffer, self._notempty, self._send_bytes,
|
||||||
self._wlock, self._writer.close, self._ignore_epipe,
|
self._wlock, self._writer.close, self._ignore_epipe,
|
||||||
self._on_queue_feeder_error),
|
self._on_queue_feeder_error, self._sem),
|
||||||
name='QueueFeederThread'
|
name='QueueFeederThread'
|
||||||
)
|
)
|
||||||
self._thread.daemon = True
|
self._thread.daemon = True
|
||||||
|
@ -203,7 +203,7 @@ class Queue(object):
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
|
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
|
||||||
onerror):
|
onerror, queue_sem):
|
||||||
debug('starting thread to feed data to pipe')
|
debug('starting thread to feed data to pipe')
|
||||||
nacquire = notempty.acquire
|
nacquire = notempty.acquire
|
||||||
nrelease = notempty.release
|
nrelease = notempty.release
|
||||||
|
@ -255,6 +255,12 @@ class Queue(object):
|
||||||
info('error in queue thread: %s', e)
|
info('error in queue thread: %s', e)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
|
# Since the object has not been sent in the queue, we need
|
||||||
|
# to decrease the size of the queue. The error acts as
|
||||||
|
# if the object had been silently removed from the queue
|
||||||
|
# and this step is necessary to have a properly working
|
||||||
|
# queue.
|
||||||
|
queue_sem.release()
|
||||||
onerror(e, obj)
|
onerror(e, obj)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
@ -1056,6 +1056,19 @@ class _TestQueue(BaseTestCase):
|
||||||
self.assertTrue(q.get(timeout=1.0))
|
self.assertTrue(q.get(timeout=1.0))
|
||||||
close_queue(q)
|
close_queue(q)
|
||||||
|
|
||||||
|
with test.support.captured_stderr():
|
||||||
|
# bpo-33078: verify that the queue size is correctly handled
|
||||||
|
# on errors.
|
||||||
|
q = self.Queue(maxsize=1)
|
||||||
|
q.put(NotSerializable())
|
||||||
|
q.put(True)
|
||||||
|
self.assertEqual(q.qsize(), 1)
|
||||||
|
# bpo-30595: use a timeout of 1 second for slow buildbots
|
||||||
|
self.assertTrue(q.get(timeout=1.0))
|
||||||
|
# Check that the size of the queue is correct
|
||||||
|
self.assertEqual(q.qsize(), 0)
|
||||||
|
close_queue(q)
|
||||||
|
|
||||||
def test_queue_feeder_on_queue_feeder_error(self):
|
def test_queue_feeder_on_queue_feeder_error(self):
|
||||||
# bpo-30006: verify feeder handles exceptions using the
|
# bpo-30006: verify feeder handles exceptions using the
|
||||||
# _on_queue_feeder_error hook.
|
# _on_queue_feeder_error hook.
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix the size handling in multiprocessing.Queue when a pickling error
|
||||||
|
occurs.
|
Loading…
Add table
Add a link
Reference in a new issue