mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
gh-123471: make concurrent iteration over itertools.cycle
safe under free-threading (#131212)
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
This commit is contained in:
parent
b6237c3602
commit
26a1cd4e8c
3 changed files with 46 additions and 14 deletions
|
@ -1,15 +1,15 @@
|
|||
import unittest
|
||||
from threading import Thread, Barrier
|
||||
from itertools import batched
|
||||
from itertools import batched, cycle
|
||||
from test.support import threading_helper
|
||||
|
||||
|
||||
threading_helper.requires_working_threading(module=True)
|
||||
|
||||
class EnumerateThreading(unittest.TestCase):
|
||||
class ItertoolsThreading(unittest.TestCase):
|
||||
|
||||
@threading_helper.reap_threads
|
||||
def test_threading(self):
|
||||
def test_batched(self):
|
||||
number_of_threads = 10
|
||||
number_of_iterations = 20
|
||||
barrier = Barrier(number_of_threads)
|
||||
|
@ -34,5 +34,31 @@ class EnumerateThreading(unittest.TestCase):
|
|||
|
||||
barrier.reset()
|
||||
|
||||
@threading_helper.reap_threads
|
||||
def test_cycle(self):
|
||||
number_of_threads = 6
|
||||
number_of_iterations = 10
|
||||
number_of_cycles = 400
|
||||
|
||||
barrier = Barrier(number_of_threads)
|
||||
def work(it):
|
||||
barrier.wait()
|
||||
for _ in range(number_of_cycles):
|
||||
_ = next(it)
|
||||
|
||||
data = (1, 2, 3, 4)
|
||||
for it in range(number_of_iterations):
|
||||
cycle_iterator = cycle(data)
|
||||
worker_threads = []
|
||||
for ii in range(number_of_threads):
|
||||
worker_threads.append(
|
||||
Thread(target=work, args=[cycle_iterator]))
|
||||
|
||||
with threading_helper.start_threads(worker_threads):
|
||||
pass
|
||||
|
||||
barrier.reset()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1 @@
|
|||
Make concurrent iterations over :class:`itertools.cycle` safe under free-threading.
|
|
@ -1124,7 +1124,6 @@ typedef struct {
|
|||
PyObject *it;
|
||||
PyObject *saved;
|
||||
Py_ssize_t index;
|
||||
int firstpass;
|
||||
} cycleobject;
|
||||
|
||||
#define cycleobject_CAST(op) ((cycleobject *)(op))
|
||||
|
@ -1165,8 +1164,7 @@ itertools_cycle_impl(PyTypeObject *type, PyObject *iterable)
|
|||
}
|
||||
lz->it = it;
|
||||
lz->saved = saved;
|
||||
lz->index = 0;
|
||||
lz->firstpass = 0;
|
||||
lz->index = -1;
|
||||
|
||||
return (PyObject *)lz;
|
||||
}
|
||||
|
@ -1199,11 +1197,11 @@ cycle_next(PyObject *op)
|
|||
cycleobject *lz = cycleobject_CAST(op);
|
||||
PyObject *item;
|
||||
|
||||
if (lz->it != NULL) {
|
||||
Py_ssize_t index = FT_ATOMIC_LOAD_SSIZE_RELAXED(lz->index);
|
||||
|
||||
if (index < 0) {
|
||||
item = PyIter_Next(lz->it);
|
||||
if (item != NULL) {
|
||||
if (lz->firstpass)
|
||||
return item;
|
||||
if (PyList_Append(lz->saved, item)) {
|
||||
Py_DECREF(item);
|
||||
return NULL;
|
||||
|
@ -1213,15 +1211,22 @@ cycle_next(PyObject *op)
|
|||
/* Note: StopIteration is already cleared by PyIter_Next() */
|
||||
if (PyErr_Occurred())
|
||||
return NULL;
|
||||
index = 0;
|
||||
FT_ATOMIC_STORE_SSIZE_RELAXED(lz->index, 0);
|
||||
#ifndef Py_GIL_DISABLED
|
||||
Py_CLEAR(lz->it);
|
||||
#endif
|
||||
}
|
||||
if (PyList_GET_SIZE(lz->saved) == 0)
|
||||
return NULL;
|
||||
item = PyList_GET_ITEM(lz->saved, lz->index);
|
||||
lz->index++;
|
||||
if (lz->index >= PyList_GET_SIZE(lz->saved))
|
||||
lz->index = 0;
|
||||
return Py_NewRef(item);
|
||||
item = PyList_GetItemRef(lz->saved, index);
|
||||
assert(item);
|
||||
index++;
|
||||
if (index >= PyList_GET_SIZE(lz->saved)) {
|
||||
index = 0;
|
||||
}
|
||||
FT_ATOMIC_STORE_SSIZE_RELAXED(lz->index, index);
|
||||
return item;
|
||||
}
|
||||
|
||||
static PyType_Slot cycle_slots[] = {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue