[3.14] gh-132775: Use _PyObject_GetXIData (With Fallback) (gh-134507)

This change includes some semi-related refactoring of queues and channels.

(cherry picked from commit d0eedfa10e, gh-134440)

Co-authored-by: Eric Snow <ericsnowcurrently@gmail.com>
This commit is contained in:
Miss Islington (bot) 2025-05-22 15:21:05 +02:00 committed by GitHub
parent 59cb829eb9
commit 85c8c0a003
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 485 additions and 437 deletions

View file

@ -36,9 +36,6 @@ Uncaught in the interpreter:
""".strip())
UNBOUND = 2 # error; this should not happen.
class WorkerContext(_thread.WorkerContext):
@classmethod
@ -47,23 +44,13 @@ class WorkerContext(_thread.WorkerContext):
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
if args or kwargs:
raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
data = textwrap.dedent(fn)
kind = 'script'
# Make sure the script compiles.
# Ideally we wouldn't throw away the resulting code
# object. However, there isn't much to be done until
# code objects are shareable and/or we do a better job
# of supporting code objects in _interpreters.exec().
compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
data = pickle.dumps((fn, args, kwargs))
kind = 'function'
return (data, kind)
task = (fn, args, kwargs)
data = pickle.dumps(task)
return data
if initializer is not None:
try:
@ -86,24 +73,20 @@ class WorkerContext(_thread.WorkerContext):
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
err = pickle.dumps(exc)
_interpqueues.put(resultsid, (None, err), 1, UNBOUND)
_interpqueues.put(resultsid, (None, exc))
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
_interpqueues.put(resultsid, (None, None), 0, UNBOUND)
_interpqueues.put(resultsid, (None, None))
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
try:
_interpqueues.put(resultsid, (res, None), 0, UNBOUND)
except _interpreters.NotShareableError:
res = pickle.dumps(res)
_interpqueues.put(resultsid, (res, None), 1, UNBOUND)
with cls._capture_exc(resultsid):
_interpqueues.put(resultsid, (res, None))
@classmethod
def _call_pickled(cls, pickled, resultsid):
@ -134,8 +117,7 @@ class WorkerContext(_thread.WorkerContext):
_interpreters.incref(self.interpid)
maxsize = 0
fmt = 0
self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
self.resultsid = _interpqueues.create(maxsize)
self._exec(f'from {__name__} import WorkerContext')
@ -166,17 +148,8 @@ class WorkerContext(_thread.WorkerContext):
pass
def run(self, task):
data, kind = task
if kind == 'script':
raise NotImplementedError('script kind disabled')
script = f"""
with WorkerContext._capture_exc({self.resultsid}):
{textwrap.indent(data, ' ')}
WorkerContext._send_script_result({self.resultsid})"""
elif kind == 'function':
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
else:
raise NotImplementedError(kind)
data = task
script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
try:
self._exec(script)
@ -199,15 +172,13 @@ WorkerContext._send_script_result({self.resultsid})"""
continue
else:
break
(res, excdata), pickled, unboundop = obj
(res, exc), unboundop = obj
assert unboundop is None, unboundop
if excdata is not None:
if exc is not None:
assert res is None, res
assert pickled
assert exc_wrapper is not None
exc = pickle.loads(excdata)
raise exc from exc_wrapper
return pickle.loads(res) if pickled else res
return res
class BrokenInterpreterPool(_thread.BrokenThreadPool):