mirror of
https://github.com/python/cpython.git
synced 2025-07-13 22:35:18 +00:00

Make tuple iteration more thread-safe, and actually test concurrent iteration of tuple, range and list. (This is prep work for enabling specialization of FOR_ITER in free-threaded builds.) The basic premise is: Iterating over a shared iterable (list, tuple or range) should be safe, not involve data races, and behave like iteration normally does. Using a shared iterator should not crash or involve data races, and should only produce items regular iteration would produce. It is not guaranteed to produce all items, or produce each item only once. (This is not the case for range iteration even after this PR.) Providing stronger guarantees is possible for some of these iterators, but it's not always straight-forward and can significantly hamper the common case. Since iterators in general aren't shared between threads, and it's simply impossible to concurrently use many iterators (like generators), better to make sharing iterators without explicit synchronization clearly wrong. Specific issues fixed in order to make the tests pass: - List iteration could occasionally fail an assertion when a shared list was shrunk and an item past the new end was retrieved concurrently. There's still some unsafety when deleting/inserting multiple items through for example slice assignment, which uses memmove/memcpy. - Tuple iteration could occasionally crash when the iterator's reference to the tuple was cleared on exhaustion. Like with list iteration, in free-threaded builds we can't safely and efficiently clear the iterator's reference to the iterable (doing it safely would mean extra, slow refcount operations), so just keep the iterable reference around.
127 lines
4.5 KiB
Python
127 lines
4.5 KiB
Python
import threading
|
|
import unittest
|
|
from test import support
|
|
|
|
# The race conditions these tests were written for only happen every now and
|
|
# then, even with the current numbers. To find rare race conditions, bumping
|
|
# these up will help, but it makes the test runtime highly variable under
|
|
# free-threading. Overhead is much higher under ThreadSanitizer, but it's
|
|
# also much better at detecting certain races, so we don't need as many
|
|
# items/threads.
|
|
if support.check_sanitizer(thread=True):
|
|
NUMITEMS = 1000
|
|
NUMTHREADS = 2
|
|
else:
|
|
NUMITEMS = 100000
|
|
NUMTHREADS = 5
|
|
NUMMUTATORS = 2
|
|
|
|
class ContendedTupleIterationTest(unittest.TestCase):
|
|
def make_testdata(self, n):
|
|
return tuple(range(n))
|
|
|
|
def assert_iterator_results(self, results, expected):
|
|
# Most iterators are not atomic (yet?) so they can skip or duplicate
|
|
# items, but they should not invent new items (like the range
|
|
# iterator currently does).
|
|
extra_items = set(results) - set(expected)
|
|
self.assertEqual(set(), extra_items)
|
|
|
|
def run_threads(self, func, *args, numthreads=NUMTHREADS):
|
|
threads = []
|
|
for _ in range(numthreads):
|
|
t = threading.Thread(target=func, args=args)
|
|
t.start()
|
|
threads.append(t)
|
|
return threads
|
|
|
|
def test_iteration(self):
|
|
"""Test iteration over a shared container"""
|
|
seq = self.make_testdata(NUMITEMS)
|
|
results = []
|
|
start = threading.Barrier(NUMTHREADS)
|
|
def worker():
|
|
idx = 0
|
|
start.wait()
|
|
for item in seq:
|
|
idx += 1
|
|
results.append(idx)
|
|
threads = self.run_threads(worker)
|
|
for t in threads:
|
|
t.join()
|
|
# Each thread has its own iterator, so results should be entirely predictable.
|
|
self.assertEqual(results, [NUMITEMS] * NUMTHREADS)
|
|
|
|
def test_shared_iterator(self):
|
|
"""Test iteration over a shared iterator"""
|
|
seq = self.make_testdata(NUMITEMS)
|
|
it = iter(seq)
|
|
results = []
|
|
start = threading.Barrier(NUMTHREADS)
|
|
def worker():
|
|
items = []
|
|
start.wait()
|
|
# We want a tight loop, so put items in the shared list at the end.
|
|
for item in it:
|
|
items.append(item)
|
|
results.extend(items)
|
|
threads = self.run_threads(worker)
|
|
for t in threads:
|
|
t.join()
|
|
self.assert_iterator_results(results, seq)
|
|
|
|
class ContendedListIterationTest(ContendedTupleIterationTest):
|
|
def make_testdata(self, n):
|
|
return list(range(n))
|
|
|
|
def test_iteration_while_mutating(self):
|
|
"""Test iteration over a shared mutating container."""
|
|
seq = self.make_testdata(NUMITEMS)
|
|
results = []
|
|
start = threading.Barrier(NUMTHREADS + NUMMUTATORS)
|
|
endmutate = threading.Event()
|
|
def mutator():
|
|
orig = seq[:]
|
|
# Make changes big enough to cause resizing of the list, with
|
|
# items shifted around for good measure.
|
|
replacement = (orig * 3)[NUMITEMS//2:]
|
|
start.wait()
|
|
while not endmutate.is_set():
|
|
seq.extend(replacement)
|
|
seq[:0] = orig
|
|
seq.__imul__(2)
|
|
seq.extend(seq)
|
|
seq[:] = orig
|
|
def worker():
|
|
items = []
|
|
start.wait()
|
|
# We want a tight loop, so put items in the shared list at the end.
|
|
for item in seq:
|
|
items.append(item)
|
|
results.extend(items)
|
|
mutators = ()
|
|
try:
|
|
threads = self.run_threads(worker)
|
|
mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
|
|
for t in threads:
|
|
t.join()
|
|
finally:
|
|
endmutate.set()
|
|
for m in mutators:
|
|
m.join()
|
|
self.assert_iterator_results(results, list(seq))
|
|
|
|
|
|
class ContendedRangeIterationTest(ContendedTupleIterationTest):
|
|
def make_testdata(self, n):
|
|
return range(n)
|
|
|
|
def assert_iterator_results(self, results, expected):
|
|
# Range iterators that are shared between threads will (right now)
|
|
# sometimes produce items after the end of the range, sometimes
|
|
# _far_ after the end of the range. That should be fixed, but for
|
|
# now, let's just check they're integers that could have resulted
|
|
# from stepping beyond the range bounds.
|
|
extra_items = set(results) - set(expected)
|
|
for item in extra_items:
|
|
self.assertEqual((item - expected.start) % expected.step, 0)
|