import unittest from threading import Thread, Barrier from itertools import batched, chain, cycle from test.support import threading_helper threading_helper.requires_working_threading(module=True) class ItertoolsThreading(unittest.TestCase): @threading_helper.reap_threads def test_batched(self): number_of_threads = 10 number_of_iterations = 20 barrier = Barrier(number_of_threads) def work(it): barrier.wait() while True: try: next(it) except StopIteration: break data = tuple(range(1000)) for it in range(number_of_iterations): batch_iterator = batched(data, 2) worker_threads = [] for ii in range(number_of_threads): worker_threads.append( Thread(target=work, args=[batch_iterator])) with threading_helper.start_threads(worker_threads): pass barrier.reset() @threading_helper.reap_threads def test_cycle(self): number_of_threads = 6 number_of_iterations = 10 number_of_cycles = 400 barrier = Barrier(number_of_threads) def work(it): barrier.wait() for _ in range(number_of_cycles): try: next(it) except StopIteration: pass data = (1, 2, 3, 4) for it in range(number_of_iterations): cycle_iterator = cycle(data) worker_threads = [] for ii in range(number_of_threads): worker_threads.append( Thread(target=work, args=[cycle_iterator])) with threading_helper.start_threads(worker_threads): pass barrier.reset() @threading_helper.reap_threads def test_chain(self): number_of_threads = 6 number_of_iterations = 20 barrier = Barrier(number_of_threads) def work(it): barrier.wait() while True: try: next(it) except StopIteration: break data = [(1, )] * 200 for it in range(number_of_iterations): chain_iterator = chain(*data) worker_threads = [] for ii in range(number_of_threads): worker_threads.append( Thread(target=work, args=[chain_iterator])) with threading_helper.start_threads(worker_threads): pass barrier.reset() if __name__ == "__main__": unittest.main()