import threading import unittest from test import support # The race conditions these tests were written for only happen every now and # then, even with the current numbers. To find rare race conditions, bumping # these up will help, but it makes the test runtime highly variable under # free-threading. Overhead is much higher under ThreadSanitizer, but it's # also much better at detecting certain races, so we don't need as many # items/threads. if support.check_sanitizer(thread=True): NUMITEMS = 1000 NUMTHREADS = 2 else: NUMITEMS = 100000 NUMTHREADS = 5 NUMMUTATORS = 2 class ContendedTupleIterationTest(unittest.TestCase): def make_testdata(self, n): return tuple(range(n)) def assert_iterator_results(self, results, expected): # Most iterators are not atomic (yet?) so they can skip or duplicate # items, but they should not invent new items (like the range # iterator currently does). extra_items = set(results) - set(expected) self.assertEqual(set(), extra_items) def run_threads(self, func, *args, numthreads=NUMTHREADS): threads = [] for _ in range(numthreads): t = threading.Thread(target=func, args=args) t.start() threads.append(t) return threads def test_iteration(self): """Test iteration over a shared container""" seq = self.make_testdata(NUMITEMS) results = [] start = threading.Barrier(NUMTHREADS) def worker(): idx = 0 start.wait() for item in seq: idx += 1 results.append(idx) threads = self.run_threads(worker) for t in threads: t.join() # Each thread has its own iterator, so results should be entirely predictable. self.assertEqual(results, [NUMITEMS] * NUMTHREADS) def test_shared_iterator(self): """Test iteration over a shared iterator""" seq = self.make_testdata(NUMITEMS) it = iter(seq) results = [] start = threading.Barrier(NUMTHREADS) def worker(): items = [] start.wait() # We want a tight loop, so put items in the shared list at the end. for item in it: items.append(item) results.extend(items) threads = self.run_threads(worker) for t in threads: t.join() self.assert_iterator_results(results, seq) class ContendedListIterationTest(ContendedTupleIterationTest): def make_testdata(self, n): return list(range(n)) def test_iteration_while_mutating(self): """Test iteration over a shared mutating container.""" seq = self.make_testdata(NUMITEMS) results = [] start = threading.Barrier(NUMTHREADS + NUMMUTATORS) endmutate = threading.Event() def mutator(): orig = seq[:] # Make changes big enough to cause resizing of the list, with # items shifted around for good measure. replacement = (orig * 3)[NUMITEMS//2:] start.wait() while not endmutate.is_set(): seq.extend(replacement) seq[:0] = orig seq.__imul__(2) seq.extend(seq) seq[:] = orig def worker(): items = [] start.wait() # We want a tight loop, so put items in the shared list at the end. for item in seq: items.append(item) results.extend(items) mutators = () try: threads = self.run_threads(worker) mutators = self.run_threads(mutator, numthreads=NUMMUTATORS) for t in threads: t.join() finally: endmutate.set() for m in mutators: m.join() self.assert_iterator_results(results, list(seq)) class ContendedRangeIterationTest(ContendedTupleIterationTest): def make_testdata(self, n): return range(n) def assert_iterator_results(self, results, expected): # Range iterators that are shared between threads will (right now) # sometimes produce items after the end of the range, sometimes # _far_ after the end of the range. That should be fixed, but for # now, let's just check they're integers that could have resulted # from stepping beyond the range bounds. extra_items = set(results) - set(expected) for item in extra_items: self.assertEqual((item - expected.start) % expected.step, 0)