[3.12] gh-104812: Run Pending Calls in any Thread (gh-104813) (gh-105752)

For a while now, pending calls only run in the main thread (in the main interpreter).  This PR changes things to allow any thread run a pending call, unless the pending call was explicitly added for the main thread to run.
(cherry picked from commit 757b402)
This commit is contained in:
Eric Snow 2023-06-13 18:50:08 -06:00 committed by GitHub
parent 75239d5ec1
commit 33d3069c45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1342 additions and 689 deletions

View file

@ -2,18 +2,21 @@
# these are all functions _testcapi exports whose name begins with 'test_'.
import _thread
from collections import OrderedDict
from collections import OrderedDict, deque
import contextlib
import importlib.machinery
import importlib.util
import json
import os
import pickle
import queue
import random
import subprocess
import sys
import textwrap
import threading
import time
import types
import unittest
import warnings
import weakref
@ -37,6 +40,10 @@ try:
import _testsinglephase
except ImportError:
_testsinglephase = None
try:
import _xxsubinterpreters as _interpreters
except ModuleNotFoundError:
_interpreters = None
# Skip this test if the _testcapi module isn't available.
_testcapi = import_helper.import_module('_testcapi')
@ -48,6 +55,12 @@ def decode_stderr(err):
return err.decode('utf-8', 'replace').replace('\r', '')
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(_interpreters is None,
'subinterpreters required')(meth)
def testfunction(self):
"""some doc"""
return self
@ -1260,6 +1273,10 @@ class TestHeapTypeRelative(unittest.TestCase):
class TestPendingCalls(unittest.TestCase):
# See the comment in ceval.c (at the "handle_eval_breaker" label)
# about when pending calls get run. This is especially relevant
# here for creating deterministic tests.
def pendingcalls_submit(self, l, n):
def callback():
#this function can be interrupted by thread switching so let's
@ -1342,6 +1359,390 @@ class TestPendingCalls(unittest.TestCase):
gen = genf()
self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
class PendingTask(types.SimpleNamespace):
_add_pending = _testinternalcapi.pending_threadfunc
def __init__(self, req, taskid=None, notify_done=None):
self.id = taskid
self.req = req
self.notify_done = notify_done
self.creator_tid = threading.get_ident()
self.requester_tid = None
self.runner_tid = None
self.result = None
def run(self):
assert self.result is None
self.runner_tid = threading.get_ident()
self._run()
if self.notify_done is not None:
self.notify_done()
def _run(self):
self.result = self.req
def run_in_pending_call(self, worker_tids):
assert self._add_pending is _testinternalcapi.pending_threadfunc
self.requester_tid = threading.get_ident()
def callback():
assert self.result is None
# It can be tricky to control which thread handles
# the eval breaker, so we take a naive approach to
# make sure.
if threading.get_ident() not in worker_tids:
self._add_pending(callback, ensure_added=True)
return
self.run()
self._add_pending(callback, ensure_added=True)
def create_thread(self, worker_tids):
return threading.Thread(
target=self.run_in_pending_call,
args=(worker_tids,),
)
def wait_for_result(self):
while self.result is None:
time.sleep(0.01)
@threading_helper.requires_working_threading()
def test_subthreads_can_handle_pending_calls(self):
payload = 'Spam spam spam spam. Lovely spam! Wonderful spam!'
task = self.PendingTask(payload)
def do_the_work():
tid = threading.get_ident()
t = task.create_thread({tid})
with threading_helper.start_threads([t]):
task.wait_for_result()
t = threading.Thread(target=do_the_work)
with threading_helper.start_threads([t]):
pass
self.assertEqual(task.result, payload)
@threading_helper.requires_working_threading()
def test_many_subthreads_can_handle_pending_calls(self):
main_tid = threading.get_ident()
self.assertEqual(threading.main_thread().ident, main_tid)
# We can't use queue.Queue since it isn't reentrant relative
# to pending calls.
_queue = deque()
_active = deque()
_done_lock = threading.Lock()
def queue_put(task):
_queue.append(task)
_active.append(True)
def queue_get():
try:
task = _queue.popleft()
except IndexError:
raise queue.Empty
return task
def queue_task_done():
_active.pop()
if not _active:
try:
_done_lock.release()
except RuntimeError:
assert not _done_lock.locked()
def queue_empty():
return not _queue
def queue_join():
_done_lock.acquire()
_done_lock.release()
tasks = []
for i in range(20):
task = self.PendingTask(
req=f'request {i}',
taskid=i,
notify_done=queue_task_done,
)
tasks.append(task)
queue_put(task)
# This will be released once all the tasks have finished.
_done_lock.acquire()
def add_tasks(worker_tids):
while True:
if done:
return
try:
task = queue_get()
except queue.Empty:
break
task.run_in_pending_call(worker_tids)
done = False
def run_tasks():
while not queue_empty():
if done:
return
time.sleep(0.01)
# Give the worker a chance to handle any remaining pending calls.
while not done:
time.sleep(0.01)
# Start the workers and wait for them to finish.
worker_threads = [threading.Thread(target=run_tasks)
for _ in range(3)]
with threading_helper.start_threads(worker_threads):
try:
# Add a pending call for each task.
worker_tids = [t.ident for t in worker_threads]
threads = [threading.Thread(target=add_tasks, args=(worker_tids,))
for _ in range(3)]
with threading_helper.start_threads(threads):
try:
pass
except BaseException:
done = True
raise # re-raise
# Wait for the pending calls to finish.
queue_join()
# Notify the workers that they can stop.
done = True
except BaseException:
done = True
raise # re-raise
runner_tids = [t.runner_tid for t in tasks]
self.assertNotIn(main_tid, runner_tids)
for task in tasks:
with self.subTest(f'task {task.id}'):
self.assertNotEqual(task.requester_tid, main_tid)
self.assertNotEqual(task.requester_tid, task.runner_tid)
self.assertNotIn(task.requester_tid, runner_tids)
@requires_subinterpreters
def test_isolated_subinterpreter(self):
# We exercise the most important permutations.
# This test relies on pending calls getting called
# (eval breaker tripped) at each loop iteration
# and at each call.
maxtext = 250
main_interpid = 0
interpid = _interpreters.create()
_interpreters.run_string(interpid, f"""if True:
import json
import os
import threading
import time
import _testinternalcapi
from test.support import threading_helper
""")
def create_pipe():
r, w = os.pipe()
self.addCleanup(lambda: os.close(r))
self.addCleanup(lambda: os.close(w))
return r, w
with self.subTest('add in main, run in subinterpreter'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
global done, waiting
waiting = True
os_read({r_done}, 1)
done = True
t = threading.Thread(target=wait)
with threading_helper.start_threads([t]):
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
""")
t = threading.Thread(target=do_work)
with threading_helper.start_threads([t]):
os.read(r_ready, 1)
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify(interpid)
# Signal the subinterpreter to stop.
os.write(w_done, b'\0')
self.assertEqual(actual, int(interpid))
with self.subTest('add in main, run in subinterpreter sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
waiting = False
done = False
def subthread():
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
t = threading.Thread(target=subthread)
with threading_helper.start_threads([t]):
# Wait until this interp has handled the pending call.
waiting = True
os.read({r_done}, 1)
done = True
""")
t = threading.Thread(target=do_work)
with threading_helper.start_threads([t]):
os.read(r_ready, 1)
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify(interpid)
# Signal the subinterpreter to stop.
os.write(w_done, b'\0')
self.assertEqual(actual, int(interpid))
with self.subTest('add in subinterpreter, run in main'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def add_job():
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({main_interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
nonlocal done, waiting
waiting = True
os_read(r_done, 1)
done = True
t1 = threading.Thread(target=add_job)
t2 = threading.Thread(target=wait)
with threading_helper.start_threads([t1, t2]):
while not waiting:
pass
os.write(w_ready, b'\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > timeout:
raise Exception('timed out!')
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(main_interpid))
with self.subTest('add in subinterpreter, run in sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def add_job():
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({main_interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
# Wait until this interp has handled the pending call.
waiting = False
done = False
def wait(os_read=os.read):
nonlocal done, waiting
waiting = True
os_read(r_done, 1)
done = True
def subthread():
while not waiting:
pass
os.write(w_ready, b'\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > timeout:
raise Exception('timed out!')
t1 = threading.Thread(target=add_job)
t2 = threading.Thread(target=wait)
t3 = threading.Thread(target=subthread)
with threading_helper.start_threads([t1, t2, t3]):
pass
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(main_interpid))
# XXX We can't use the rest until gh-105716 is fixed.
return
with self.subTest('add in subinterpreter, run in subinterpreter sub-thread'):
r_ready, w_ready = create_pipe()
r_done, w_done= create_pipe()
r_data, w_data= create_pipe()
timeout = time.time() + 30 # seconds
def do_work():
_interpreters.run_string(interpid, f"""if True:
waiting = False
done = False
def subthread():
while not waiting:
pass
os.write({w_ready}, b'\\0')
# Loop to trigger the eval breaker.
while not done:
time.sleep(0.01)
if time.time() > {timeout}:
raise Exception('timed out!')
t = threading.Thread(target=subthread)
with threading_helper.start_threads([t]):
# Wait until this interp has handled the pending call.
waiting = True
os.read({r_done}, 1)
done = True
""")
t = threading.Thread(target=do_work)
#with threading_helper.start_threads([t]):
t.start()
if True:
os.read(r_ready, 1)
_interpreters.run_string(interpid, f"""if True:
# Add the pending call and wait for it to finish.
actual = _testinternalcapi.pending_identify({interpid})
# Signal the subinterpreter to stop.
os.write({w_done}, b'\\0')
os.write({w_data}, actual.to_bytes(1, 'little'))
""")
t.join()
text = os.read(r_data, 1)
actual = int.from_bytes(text, 'little')
self.assertEqual(actual, int(interpid))
class SubinterpreterTest(unittest.TestCase):