mirror of
https://github.com/python/cpython.git
synced 2025-12-23 09:19:18 +00:00
gh-120321: Make gi_frame_state transitions atomic in FT build (gh-142599)
This makes generator frame state transitions atomic in the free threading build, which avoids segfaults when trying to execute a generator from multiple threads concurrently. There are still a few operations that aren't thread-safe and may crash if performed concurrently on the same generator/coroutine: * Accessing gi_yieldfrom/cr_await/ag_await * Accessing gi_frame/cr_frame/ag_frame * Async generator operations
This commit is contained in:
parent
e2a7db7175
commit
08bc03ff2a
16 changed files with 1124 additions and 883 deletions
|
|
@ -49,3 +49,74 @@ class TestFTGenerators(TestCase):
|
|||
self.concurrent_write_with_func(func=set_gen_name)
|
||||
with self.subTest(func=set_gen_qualname):
|
||||
self.concurrent_write_with_func(func=set_gen_qualname)
|
||||
|
||||
def test_concurrent_send(self):
|
||||
def gen():
|
||||
yield 1
|
||||
yield 2
|
||||
yield 3
|
||||
yield 4
|
||||
yield 5
|
||||
|
||||
def run_test(drive_generator):
|
||||
g = gen()
|
||||
values = []
|
||||
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g, values,))
|
||||
self.assertEqual(sorted(values), [1, 2, 3, 4, 5])
|
||||
|
||||
def call_next(g, values):
|
||||
while True:
|
||||
try:
|
||||
values.append(next(g))
|
||||
except ValueError:
|
||||
continue
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
with self.subTest(method='next'):
|
||||
run_test(call_next)
|
||||
|
||||
def call_send(g, values):
|
||||
while True:
|
||||
try:
|
||||
values.append(g.send(None))
|
||||
except ValueError:
|
||||
continue
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
with self.subTest(method='send'):
|
||||
run_test(call_send)
|
||||
|
||||
def for_iter_gen(g, values):
|
||||
while True:
|
||||
try:
|
||||
for value in g:
|
||||
values.append(value)
|
||||
else:
|
||||
break
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
with self.subTest(method='for'):
|
||||
run_test(for_iter_gen)
|
||||
|
||||
def test_concurrent_close(self):
|
||||
def gen():
|
||||
for i in range(10):
|
||||
yield i
|
||||
time.sleep(0.001)
|
||||
|
||||
def drive_generator(g):
|
||||
while True:
|
||||
try:
|
||||
for value in g:
|
||||
if value == 5:
|
||||
g.close()
|
||||
else:
|
||||
return
|
||||
except ValueError as e:
|
||||
self.assertEqual(e.args[0], "generator already executing")
|
||||
|
||||
g = gen()
|
||||
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue