gh-133485: Use interpreters.Interpreter in InterpreterPoolExecutor (gh-133957)
Some checks are pending
Tests / Change detection (push) Waiting to run
Tests / Docs (push) Blocked by required conditions
Tests / Check if Autoconf files are up to date (push) Blocked by required conditions
Tests / Check if generated files are up to date (push) Blocked by required conditions
Tests / (push) Blocked by required conditions
Tests / Windows MSI (push) Blocked by required conditions
Tests / Ubuntu SSL tests with OpenSSL (push) Blocked by required conditions
Tests / WASI (push) Blocked by required conditions
Tests / Hypothesis tests on Ubuntu (push) Blocked by required conditions
Tests / Address sanitizer (push) Blocked by required conditions
Tests / Cross build Linux (push) Blocked by required conditions
Tests / CIFuzz (push) Blocked by required conditions
Tests / All required checks pass (push) Blocked by required conditions
Lint / lint (push) Waiting to run
mypy / Run mypy on Lib/_pyrepl (push) Waiting to run
mypy / Run mypy on Lib/test/libregrtest (push) Waiting to run
mypy / Run mypy on Lib/tomllib (push) Waiting to run
mypy / Run mypy on Tools/build (push) Waiting to run
mypy / Run mypy on Tools/cases_generator (push) Waiting to run
mypy / Run mypy on Tools/clinic (push) Waiting to run
mypy / Run mypy on Tools/jit (push) Waiting to run
mypy / Run mypy on Tools/peg_generator (push) Waiting to run

Most importantly, this resolves the issues with functions and types defined in __main__.
It also expands the number of supported objects and simplifies the implementation.
This commit is contained in:
Eric Snow 2025-06-18 17:57:14 -06:00 committed by GitHub
parent 15f2bac02c
commit 725da50520
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 266 additions and 205 deletions

View file

@ -265,7 +265,7 @@ Each worker's interpreter is isolated from all the other interpreters.
"Isolated" means each interpreter has its own runtime state and "Isolated" means each interpreter has its own runtime state and
operates completely independently. For example, if you redirect operates completely independently. For example, if you redirect
:data:`sys.stdout` in one interpreter, it will not be automatically :data:`sys.stdout` in one interpreter, it will not be automatically
redirected any other interpreter. If you import a module in one redirected to any other interpreter. If you import a module in one
interpreter, it is not automatically imported in any other. You interpreter, it is not automatically imported in any other. You
would need to import the module separately in interpreter where would need to import the module separately in interpreter where
you need it. In fact, each module imported in an interpreter is you need it. In fact, each module imported in an interpreter is
@ -287,7 +287,7 @@ efficient alternative is to serialize with :mod:`pickle` and then send
the bytes over a shared :mod:`socket <socket>` or the bytes over a shared :mod:`socket <socket>` or
:func:`pipe <os.pipe>`. :func:`pipe <os.pipe>`.
.. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None) .. class:: InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously A :class:`ThreadPoolExecutor` subclass that executes calls asynchronously
using a pool of at most *max_workers* threads. Each thread runs using a pool of at most *max_workers* threads. Each thread runs
@ -304,21 +304,10 @@ the bytes over a shared :mod:`socket <socket>` or
and *initargs* using :mod:`pickle` when sending them to the worker's and *initargs* using :mod:`pickle` when sending them to the worker's
interpreter. interpreter.
.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.
.. note:: .. note::
The executor may replace uncaught exceptions from *initializer* The executor may replace uncaught exceptions from *initializer*
with :class:`~concurrent.futures.interpreter.ExecutionFailed`. with :class:`~concurrent.futures.interpreter.ExecutionFailed`.
The optional *shared* argument is a :class:`dict` of objects that all
interpreters in the pool share. The *shared* items are added to each
interpreter's ``__main__`` module. Not all objects are shareable.
Shareable objects include the builtin singletons, :class:`str`
and :class:`bytes`, and :class:`memoryview`. See :pep:`734`
for more info.
Other caveats from parent :class:`ThreadPoolExecutor` apply here. Other caveats from parent :class:`ThreadPoolExecutor` apply here.
:meth:`~Executor.submit` and :meth:`~Executor.map` work like normal, :meth:`~Executor.submit` and :meth:`~Executor.map` work like normal,
@ -326,10 +315,6 @@ except the worker serializes the callable and arguments using
:mod:`pickle` when sending them to its interpreter. The worker :mod:`pickle` when sending them to its interpreter. The worker
likewise serializes the return value when sending it back. likewise serializes the return value when sending it back.
.. note::
Functions defined in the ``__main__`` module cannot be pickled
and thus cannot be used.
When a worker's current task raises an uncaught exception, the worker When a worker's current task raises an uncaught exception, the worker
always tries to preserve the exception as-is. If that is successful always tries to preserve the exception as-is. If that is successful
then it also sets the ``__cause__`` to a corresponding then it also sets the ``__cause__`` to a corresponding

View file

@ -1,56 +1,39 @@
"""Implements InterpreterPoolExecutor.""" """Implements InterpreterPoolExecutor."""
import contextlib from concurrent import interpreters
import pickle import sys
import textwrap import textwrap
from . import thread as _thread from . import thread as _thread
import _interpreters import traceback
import _interpqueues
class ExecutionFailed(_interpreters.InterpreterError): def do_call(results, func, args, kwargs):
"""An unhandled exception happened during execution.""" try:
return func(*args, **kwargs)
def __init__(self, excinfo): except BaseException as exc:
msg = excinfo.formatted # Send the captured exception out on the results queue,
if not msg: # but still leave it unhandled for the interpreter to handle.
if excinfo.type and excinfo.msg:
msg = f'{excinfo.type.__name__}: {excinfo.msg}'
else:
msg = excinfo.type.__name__ or excinfo.msg
super().__init__(msg)
self.excinfo = excinfo
def __str__(self):
try: try:
formatted = self.excinfo.errdisplay results.put(exc)
except Exception: except interpreters.NotShareableError:
return super().__str__() # The exception is not shareable.
else: print('exception is not shareable:', file=sys.stderr)
return textwrap.dedent(f""" traceback.print_exception(exc)
{super().__str__()} results.put(None)
raise # re-raise
Uncaught in the interpreter:
{formatted}
""".strip())
class WorkerContext(_thread.WorkerContext): class WorkerContext(_thread.WorkerContext):
@classmethod @classmethod
def prepare(cls, initializer, initargs, shared): def prepare(cls, initializer, initargs):
def resolve_task(fn, args, kwargs): def resolve_task(fn, args, kwargs):
if isinstance(fn, str): if isinstance(fn, str):
# XXX Circle back to this later. # XXX Circle back to this later.
raise TypeError('scripts not supported') raise TypeError('scripts not supported')
else: else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
task = (fn, args, kwargs) task = (fn, args, kwargs)
data = pickle.dumps(task) return task
return data
if initializer is not None: if initializer is not None:
try: try:
@ -62,68 +45,24 @@ class WorkerContext(_thread.WorkerContext):
else: else:
initdata = None initdata = None
def create_context(): def create_context():
return cls(initdata, shared) return cls(initdata)
return create_context, resolve_task return create_context, resolve_task
@classmethod def __init__(self, initdata):
@contextlib.contextmanager
def _capture_exc(cls, resultsid):
try:
yield
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
_interpqueues.put(resultsid, (None, exc))
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None))
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))
@classmethod
def _call_pickled(cls, pickled, resultsid):
with cls._capture_exc(resultsid):
fn, args, kwargs = pickle.loads(pickled)
cls._call(fn, args, kwargs, resultsid)
def __init__(self, initdata, shared=None):
self.initdata = initdata self.initdata = initdata
self.shared = dict(shared) if shared else None self.interp = None
self.interpid = None self.results = None
self.resultsid = None
def __del__(self): def __del__(self):
if self.interpid is not None: if self.interp is not None:
self.finalize() self.finalize()
def _exec(self, script):
assert self.interpid is not None
excinfo = _interpreters.exec(self.interpid, script, restrict=True)
if excinfo is not None:
raise ExecutionFailed(excinfo)
def initialize(self): def initialize(self):
assert self.interpid is None, self.interpid assert self.interp is None, self.interp
self.interpid = _interpreters.create(reqrefs=True) self.interp = interpreters.create()
try: try:
_interpreters.incref(self.interpid)
maxsize = 0 maxsize = 0
self.resultsid = _interpqueues.create(maxsize) self.results = interpreters.create_queue(maxsize)
self._exec(f'from {__name__} import WorkerContext')
if self.shared:
_interpreters.set___main___attrs(
self.interpid, self.shared, restrict=True)
if self.initdata: if self.initdata:
self.run(self.initdata) self.run(self.initdata)
@ -132,53 +71,25 @@ class WorkerContext(_thread.WorkerContext):
raise # re-raise raise # re-raise
def finalize(self): def finalize(self):
interpid = self.interpid interp = self.interp
resultsid = self.resultsid results = self.results
self.resultsid = None self.results = None
self.interpid = None self.interp = None
if resultsid is not None: if results is not None:
try: del results
_interpqueues.destroy(resultsid) if interp is not None:
except _interpqueues.QueueNotFoundError: interp.close()
pass
if interpid is not None:
try:
_interpreters.decref(interpid)
except _interpreters.InterpreterNotFoundError:
pass
def run(self, task): def run(self, task):
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
try: try:
self._exec(script) return self.interp.call(do_call, self.results, *task)
except ExecutionFailed as exc: except interpreters.ExecutionFailed as wrapper:
exc_wrapper = exc # Wait for the exception data to show up.
else: exc = self.results.get()
exc_wrapper = None if exc is None:
# The exception must have been not shareable.
# Return the result, or raise the exception.
while True:
try:
obj = _interpqueues.get(self.resultsid)
except _interpqueues.QueueNotFoundError:
raise # re-raise raise # re-raise
except _interpqueues.QueueError: raise exc from wrapper
continue
except ModuleNotFoundError:
# interpreters._queues doesn't exist, which means
# QueueEmpty doesn't. Act as though it does.
continue
else:
break
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if exc is not None:
assert res is None, res
assert exc_wrapper is not None
raise exc from exc_wrapper
return res
class BrokenInterpreterPool(_thread.BrokenThreadPool): class BrokenInterpreterPool(_thread.BrokenThreadPool):
@ -192,11 +103,11 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
BROKEN = BrokenInterpreterPool BROKEN = BrokenInterpreterPool
@classmethod @classmethod
def prepare_context(cls, initializer, initargs, shared): def prepare_context(cls, initializer, initargs):
return WorkerContext.prepare(initializer, initargs, shared) return WorkerContext.prepare(initializer, initargs)
def __init__(self, max_workers=None, thread_name_prefix='', def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), shared=None): initializer=None, initargs=()):
"""Initializes a new InterpreterPoolExecutor instance. """Initializes a new InterpreterPoolExecutor instance.
Args: Args:
@ -206,8 +117,6 @@ class InterpreterPoolExecutor(_thread.ThreadPoolExecutor):
initializer: A callable or script used to initialize initializer: A callable or script used to initialize
each worker interpreter. each worker interpreter.
initargs: A tuple of arguments to pass to the initializer. initargs: A tuple of arguments to pass to the initializer.
shared: A mapping of shareabled objects to be inserted into
each worker interpreter.
""" """
super().__init__(max_workers, thread_name_prefix, super().__init__(max_workers, thread_name_prefix,
initializer, initargs, shared=shared) initializer, initargs)

View file

@ -20,6 +20,10 @@ INITIALIZER_STATUS = 'uninitialized'
def init(x): def init(x):
global INITIALIZER_STATUS global INITIALIZER_STATUS
INITIALIZER_STATUS = x INITIALIZER_STATUS = x
# InterpreterPoolInitializerTest.test_initializer fails
# if we don't have a LOAD_GLOBAL. (It could be any global.)
# We will address this separately.
INITIALIZER_STATUS
def get_init_status(): def get_init_status():
return INITIALIZER_STATUS return INITIALIZER_STATUS

View file

@ -2,35 +2,78 @@ import asyncio
import contextlib import contextlib
import io import io
import os import os
import pickle import sys
import time import time
import unittest import unittest
from concurrent.futures.interpreter import ( from concurrent.futures.interpreter import BrokenInterpreterPool
ExecutionFailed, BrokenInterpreterPool, from concurrent import interpreters
)
from concurrent.interpreters import _queues as queues from concurrent.interpreters import _queues as queues
import _interpreters import _interpreters
from test import support from test import support
from test.support import os_helper
from test.support import script_helper
import test.test_asyncio.utils as testasyncio_utils import test.test_asyncio.utils as testasyncio_utils
from .executor import ExecutorTest, mul from .executor import ExecutorTest, mul
from .util import BaseTestCase, InterpreterPoolMixin, setup_module from .util import BaseTestCase, InterpreterPoolMixin, setup_module
WINDOWS = sys.platform.startswith('win')
@contextlib.contextmanager
def nonblocking(fd):
blocking = os.get_blocking(fd)
if blocking:
os.set_blocking(fd, False)
try:
yield
finally:
if blocking:
os.set_blocking(fd, blocking)
def read_file_with_timeout(fd, nbytes, timeout):
with nonblocking(fd):
end = time.time() + timeout
try:
return os.read(fd, nbytes)
except BlockingIOError:
pass
while time.time() < end:
try:
return os.read(fd, nbytes)
except BlockingIOError:
continue
else:
raise TimeoutError('nothing to read')
if not WINDOWS:
import select
def read_file_with_timeout(fd, nbytes, timeout):
r, _, _ = select.select([fd], [], [], timeout)
if fd not in r:
raise TimeoutError('nothing to read')
return os.read(fd, nbytes)
def noop(): def noop():
pass pass
def write_msg(fd, msg): def write_msg(fd, msg):
import os
os.write(fd, msg + b'\0') os.write(fd, msg + b'\0')
def read_msg(fd): def read_msg(fd, timeout=10.0):
msg = b'' msg = b''
while ch := os.read(fd, 1): ch = read_file_with_timeout(fd, 1, timeout)
if ch == b'\0': while ch != b'\0':
return msg
msg += ch msg += ch
ch = os.read(fd, 1)
return msg
def get_current_name(): def get_current_name():
@ -113,6 +156,38 @@ class InterpreterPoolExecutorTest(
self.assertEqual(before, b'\0') self.assertEqual(before, b'\0')
self.assertEqual(after, msg) self.assertEqual(after, msg)
def test_init_with___main___global(self):
# See https://github.com/python/cpython/pull/133957#issuecomment-2927415311.
text = """if True:
from concurrent.futures import InterpreterPoolExecutor
INITIALIZER_STATUS = 'uninitialized'
def init(x):
global INITIALIZER_STATUS
INITIALIZER_STATUS = x
INITIALIZER_STATUS
def get_init_status():
return INITIALIZER_STATUS
if __name__ == "__main__":
exe = InterpreterPoolExecutor(initializer=init,
initargs=('initialized',))
fut = exe.submit(get_init_status)
print(fut.result()) # 'initialized'
exe.shutdown(wait=True)
print(INITIALIZER_STATUS) # 'uninitialized'
"""
with os_helper.temp_dir() as tempdir:
filename = script_helper.make_script(tempdir, 'my-script', text)
res = script_helper.assert_python_ok(filename)
stdout = res.out.decode('utf-8').strip()
self.assertEqual(stdout.splitlines(), [
'initialized',
'uninitialized',
])
def test_init_closure(self): def test_init_closure(self):
count = 0 count = 0
def init1(): def init1():
@ -121,10 +196,19 @@ class InterpreterPoolExecutorTest(
nonlocal count nonlocal count
count += 1 count += 1
with self.assertRaises(pickle.PicklingError): with contextlib.redirect_stderr(io.StringIO()) as stderr:
self.executor_type(initializer=init1) with self.executor_type(initializer=init1) as executor:
with self.assertRaises(pickle.PicklingError): fut = executor.submit(lambda: None)
self.executor_type(initializer=init2) self.assertIn('NotShareableError', stderr.getvalue())
with self.assertRaises(BrokenInterpreterPool):
fut.result()
with contextlib.redirect_stderr(io.StringIO()) as stderr:
with self.executor_type(initializer=init2) as executor:
fut = executor.submit(lambda: None)
self.assertIn('NotShareableError', stderr.getvalue())
with self.assertRaises(BrokenInterpreterPool):
fut.result()
def test_init_instance_method(self): def test_init_instance_method(self):
class Spam: class Spam:
@ -132,26 +216,12 @@ class InterpreterPoolExecutorTest(
raise NotImplementedError raise NotImplementedError
spam = Spam() spam = Spam()
with self.assertRaises(pickle.PicklingError): with contextlib.redirect_stderr(io.StringIO()) as stderr:
self.executor_type(initializer=spam.initializer) with self.executor_type(initializer=spam.initializer) as executor:
fut = executor.submit(lambda: None)
def test_init_shared(self): self.assertIn('NotShareableError', stderr.getvalue())
msg = b'eggs' with self.assertRaises(BrokenInterpreterPool):
r, w = self.pipe() fut.result()
script = f"""if True:
import os
if __name__ != '__main__':
import __main__
spam = __main__.spam
os.write({w}, spam + b'\\0')
"""
executor = self.executor_type(shared={'spam': msg})
fut = executor.submit(exec, script)
fut.result()
after = read_msg(r)
self.assertEqual(after, msg)
@unittest.expectedFailure @unittest.expectedFailure
def test_init_exception_in_script(self): def test_init_exception_in_script(self):
@ -178,8 +248,6 @@ class InterpreterPoolExecutorTest(
stderr = stderr.getvalue() stderr = stderr.getvalue()
self.assertIn('ExecutionFailed: Exception: spam', stderr) self.assertIn('ExecutionFailed: Exception: spam', stderr)
self.assertIn('Uncaught in the interpreter:', stderr) self.assertIn('Uncaught in the interpreter:', stderr)
self.assertIn('The above exception was the direct cause of the following exception:',
stderr)
@unittest.expectedFailure @unittest.expectedFailure
def test_submit_script(self): def test_submit_script(self):
@ -208,10 +276,14 @@ class InterpreterPoolExecutorTest(
return spam return spam
executor = self.executor_type() executor = self.executor_type()
with self.assertRaises(pickle.PicklingError):
executor.submit(task1) fut = executor.submit(task1)
with self.assertRaises(pickle.PicklingError): with self.assertRaises(_interpreters.NotShareableError):
executor.submit(task2) fut.result()
fut = executor.submit(task2)
with self.assertRaises(_interpreters.NotShareableError):
fut.result()
def test_submit_local_instance(self): def test_submit_local_instance(self):
class Spam: class Spam:
@ -219,8 +291,9 @@ class InterpreterPoolExecutorTest(
self.value = True self.value = True
executor = self.executor_type() executor = self.executor_type()
with self.assertRaises(pickle.PicklingError): fut = executor.submit(Spam)
executor.submit(Spam) with self.assertRaises(_interpreters.NotShareableError):
fut.result()
def test_submit_instance_method(self): def test_submit_instance_method(self):
class Spam: class Spam:
@ -229,8 +302,9 @@ class InterpreterPoolExecutorTest(
spam = Spam() spam = Spam()
executor = self.executor_type() executor = self.executor_type()
with self.assertRaises(pickle.PicklingError): fut = executor.submit(spam.run)
executor.submit(spam.run) with self.assertRaises(_interpreters.NotShareableError):
fut.result()
def test_submit_func_globals(self): def test_submit_func_globals(self):
executor = self.executor_type() executor = self.executor_type()
@ -242,13 +316,14 @@ class InterpreterPoolExecutorTest(
@unittest.expectedFailure @unittest.expectedFailure
def test_submit_exception_in_script(self): def test_submit_exception_in_script(self):
# Scripts are not supported currently.
fut = self.executor.submit('raise Exception("spam")') fut = self.executor.submit('raise Exception("spam")')
with self.assertRaises(Exception) as captured: with self.assertRaises(Exception) as captured:
fut.result() fut.result()
self.assertIs(type(captured.exception), Exception) self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam') self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__ cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed) self.assertIs(type(cause), interpreters.ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'): for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr), self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr)) getattr(Exception, attr))
@ -261,7 +336,7 @@ class InterpreterPoolExecutorTest(
self.assertIs(type(captured.exception), Exception) self.assertIs(type(captured.exception), Exception)
self.assertEqual(str(captured.exception), 'spam') self.assertEqual(str(captured.exception), 'spam')
cause = captured.exception.__cause__ cause = captured.exception.__cause__
self.assertIs(type(cause), ExecutionFailed) self.assertIs(type(cause), interpreters.ExecutionFailed)
for attr in ('__name__', '__qualname__', '__module__'): for attr in ('__name__', '__qualname__', '__module__'):
self.assertEqual(getattr(cause.excinfo.type, attr), self.assertEqual(getattr(cause.excinfo.type, attr),
getattr(Exception, attr)) getattr(Exception, attr))
@ -269,16 +344,93 @@ class InterpreterPoolExecutorTest(
def test_saturation(self): def test_saturation(self):
blocker = queues.create() blocker = queues.create()
executor = self.executor_type(4, shared=dict(blocker=blocker)) executor = self.executor_type(4)
for i in range(15 * executor._max_workers): for i in range(15 * executor._max_workers):
executor.submit(exec, 'import __main__; __main__.blocker.get()') executor.submit(blocker.get)
#executor.submit('blocker.get()')
self.assertEqual(len(executor._threads), executor._max_workers) self.assertEqual(len(executor._threads), executor._max_workers)
for i in range(15 * executor._max_workers): for i in range(15 * executor._max_workers):
blocker.put_nowait(None) blocker.put_nowait(None)
executor.shutdown(wait=True) executor.shutdown(wait=True)
def test_blocking(self):
# There is no guarantee that a worker will be created for every
# submitted task. That's because there's a race between:
#
# * a new worker thread, created when task A was just submitted,
# becoming non-idle when it picks up task A
# * after task B is added to the queue, a new worker thread
# is started only if there are no idle workers
# (the check in ThreadPoolExecutor._adjust_thread_count())
#
# That means we must not block waiting for *all* tasks to report
# "ready" before we unblock the known-ready workers.
ready = queues.create()
blocker = queues.create()
def run(taskid, ready, blocker):
# There can't be any globals here.
ready.put_nowait(taskid)
blocker.get() # blocking
numtasks = 10
futures = []
with self.executor_type() as executor:
# Request the jobs.
for i in range(numtasks):
fut = executor.submit(run, i, ready, blocker)
futures.append(fut)
pending = numtasks
while pending > 0:
# Wait for any to be ready.
done = 0
for _ in range(pending):
try:
ready.get(timeout=1) # blocking
except interpreters.QueueEmpty:
pass
else:
done += 1
pending -= done
# Unblock the workers.
for _ in range(done):
blocker.put_nowait(None)
def test_blocking_with_limited_workers(self):
# This is essentially the same as test_blocking,
# but we explicitly force a limited number of workers,
# instead of it happening implicitly sometimes due to a race.
ready = queues.create()
blocker = queues.create()
def run(taskid, ready, blocker):
# There can't be any globals here.
ready.put_nowait(taskid)
blocker.get() # blocking
numtasks = 10
futures = []
with self.executor_type(4) as executor:
# Request the jobs.
for i in range(numtasks):
fut = executor.submit(run, i, ready, blocker)
futures.append(fut)
pending = numtasks
while pending > 0:
# Wait for any to be ready.
done = 0
for _ in range(pending):
try:
ready.get(timeout=1) # blocking
except interpreters.QueueEmpty:
pass
else:
done += 1
pending -= done
# Unblock the workers.
for _ in range(done):
blocker.put_nowait(None)
@support.requires_gil_enabled("gh-117344: test is flaky without the GIL") @support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
def test_idle_thread_reuse(self): def test_idle_thread_reuse(self):
executor = self.executor_type() executor = self.executor_type()
@ -289,12 +441,21 @@ class InterpreterPoolExecutorTest(
executor.shutdown(wait=True) executor.shutdown(wait=True)
def test_pickle_errors_propagate(self): def test_pickle_errors_propagate(self):
# GH-125864: Pickle errors happen before the script tries to execute, so the # GH-125864: Pickle errors happen before the script tries to execute,
# queue used to wait infinitely. # so the queue used to wait infinitely.
fut = self.executor.submit(PickleShenanigans(0)) fut = self.executor.submit(PickleShenanigans(0))
with self.assertRaisesRegex(RuntimeError, "gotcha"): expected = interpreters.NotShareableError
with self.assertRaisesRegex(expected, 'args not shareable') as cm:
fut.result() fut.result()
self.assertRegex(str(cm.exception.__cause__), 'unpickled')
def test_no_stale_references(self):
# Weak references don't cross between interpreters.
raise unittest.SkipTest('not applicable')
def test_free_reference(self):
# Weak references don't cross between interpreters.
raise unittest.SkipTest('not applicable')
class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase): class AsyncioTest(InterpretersMixin, testasyncio_utils.TestCase):

View file

@ -674,7 +674,9 @@ _PyPickle_Loads(struct _unpickle_context *ctx, PyObject *pickled)
finally: finally:
if (exc != NULL) { if (exc != NULL) {
sync_module_capture_exc(tstate, &ctx->main); if (_PyErr_Occurred(tstate)) {
sync_module_capture_exc(tstate, &ctx->main);
}
// We restore the original exception. // We restore the original exception.
// It might make sense to chain it (__context__). // It might make sense to chain it (__context__).
_PyErr_SetRaisedException(tstate, exc); _PyErr_SetRaisedException(tstate, exc);