gh-138253: Fix compatibility of sub-interpreters queues with queue.Queue (GH-138256)
Some checks are pending
Tests / Change detection (push) Waiting to run
Tests / Docs (push) Blocked by required conditions
Tests / Check if Autoconf files are up to date (push) Blocked by required conditions
Tests / Check if generated files are up to date (push) Blocked by required conditions
Tests / (push) Blocked by required conditions
Tests / Windows MSI (push) Blocked by required conditions
Tests / Ubuntu SSL tests with OpenSSL (push) Blocked by required conditions
Tests / Ubuntu SSL tests with AWS-LC (push) Blocked by required conditions
Tests / Android (aarch64) (push) Blocked by required conditions
Tests / Android (x86_64) (push) Blocked by required conditions
Tests / WASI (push) Blocked by required conditions
Tests / Hypothesis tests on Ubuntu (push) Blocked by required conditions
Tests / Address sanitizer (push) Blocked by required conditions
Tests / Sanitizers (push) Blocked by required conditions
Tests / Cross build Linux (push) Blocked by required conditions
Tests / CIFuzz (push) Blocked by required conditions
Tests / All required checks pass (push) Blocked by required conditions
Lint / lint (push) Waiting to run
mypy / Run mypy on Lib/_pyrepl (push) Waiting to run
mypy / Run mypy on Lib/test/libregrtest (push) Waiting to run
mypy / Run mypy on Lib/tomllib (push) Waiting to run
mypy / Run mypy on Tools/build (push) Waiting to run
mypy / Run mypy on Tools/cases_generator (push) Waiting to run
mypy / Run mypy on Tools/clinic (push) Waiting to run
mypy / Run mypy on Tools/jit (push) Waiting to run
mypy / Run mypy on Tools/peg_generator (push) Waiting to run

Add the block parameter in the put() and get() methods of
the concurrent.interpreters queues for compatibility with the
queue.Queue interface.
This commit is contained in:
Serhiy Storchaka 2025-09-02 08:59:20 +03:00 committed by GitHub
parent a2ba0a7552
commit cb18269e1b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 24 additions and 5 deletions

View file

@ -170,13 +170,13 @@ class Queue:
def qsize(self):
return _queues.get_count(self._id)
def put(self, obj, timeout=None, *,
def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
This blocks while the queue is full.
If "block" is true, this blocks while the queue is full.
For most objects, the object received through Queue.get() will
be a new one, equivalent to the original and not sharing any
@ -209,6 +209,8 @@ class Queue:
If "unbounditems" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
if not block:
return self.put_nowait(obj, unbounditems=unbounditems)
if unbounditems is None:
unboundop = -1
else:
@ -235,17 +237,19 @@ class Queue:
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
def get(self, timeout=None, *,
def get(self, block=True, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
"""Return the next object from the queue.
This blocks while the queue is empty.
If "block" is true, this blocks while the queue is empty.
If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
"unbounditems" argument to put().
"""
if not block:
return self.get_nowait()
if timeout is not None:
timeout = int(timeout)
if timeout < 0:

View file

@ -11,7 +11,7 @@ from concurrent import interpreters
from concurrent.interpreters import _queues as queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase
HUGE_TIMEOUT = 3600
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
@ -306,6 +306,8 @@ class TestQueueOps(TestBase):
queue.put(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1)
with self.assertRaises(queues.QueueFull):
queue.put(None, HUGE_TIMEOUT, 0.1)
queue.get()
queue.put(None)
@ -315,6 +317,10 @@ class TestQueueOps(TestBase):
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, False)
with self.assertRaises(queues.QueueFull):
queue.put(None, False, timeout=HUGE_TIMEOUT)
queue.get()
queue.put_nowait(None)
@ -345,11 +351,17 @@ class TestQueueOps(TestBase):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get(timeout=0.1)
with self.assertRaises(queues.QueueEmpty):
queue.get(HUGE_TIMEOUT, 0.1)
def test_get_nowait(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
with self.assertRaises(queues.QueueEmpty):
queue.get(False)
with self.assertRaises(queues.QueueEmpty):
queue.get(False, timeout=HUGE_TIMEOUT)
def test_put_get_full_fallback(self):
expected = list(range(20))

View file

@ -0,0 +1,3 @@
Add the *block* parameter in the :meth:`!put` and :meth:`!get` methods
of the :mod:`concurrent.interpreters` queues for compatibility with the
:class:`queue.Queue` interface.