gh-89240: Enable multiprocessing on Windows to use large process pools (GH-107873)

We add _winapi.BatchedWaitForMultipleObjects to wait for larger numbers of handles.
This is an internal module, hence undocumented, and should be used with caution.
Check the docstring for info before using BatchedWaitForMultipleObjects.
This commit is contained in:
Steve Dower 2024-02-13 00:28:35 +00:00 committed by GitHub
parent 2f0778675a
commit ea25f32d5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1195 additions and 6 deletions

View file

@ -438,6 +438,39 @@ _winapi_ConnectNamedPipe_impl(PyObject *module, HANDLE handle,
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.CreateEventW -> HANDLE
security_attributes: LPSECURITY_ATTRIBUTES
manual_reset: BOOL
initial_state: BOOL
name: LPCWSTR(accept={str, NoneType})
[clinic start generated code]*/
static HANDLE
_winapi_CreateEventW_impl(PyObject *module,
LPSECURITY_ATTRIBUTES security_attributes,
BOOL manual_reset, BOOL initial_state,
LPCWSTR name)
/*[clinic end generated code: output=2d4c7d5852ecb298 input=4187cee28ac763f8]*/
{
HANDLE handle;
if (PySys_Audit("_winapi.CreateEventW", "bbu", manual_reset, initial_state, name) < 0) {
return INVALID_HANDLE_VALUE;
}
Py_BEGIN_ALLOW_THREADS
handle = CreateEventW(security_attributes, manual_reset, initial_state, name);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE) {
PyErr_SetFromWindowsErr(0);
}
return handle;
}
/*[clinic input]
_winapi.CreateFile -> HANDLE
@ -674,6 +707,37 @@ cleanup:
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.CreateMutexW -> HANDLE
security_attributes: LPSECURITY_ATTRIBUTES
initial_owner: BOOL
name: LPCWSTR(accept={str, NoneType})
[clinic start generated code]*/
static HANDLE
_winapi_CreateMutexW_impl(PyObject *module,
LPSECURITY_ATTRIBUTES security_attributes,
BOOL initial_owner, LPCWSTR name)
/*[clinic end generated code: output=31b9ee8fc37e49a5 input=7d54b921e723254a]*/
{
HANDLE handle;
if (PySys_Audit("_winapi.CreateMutexW", "bu", initial_owner, name) < 0) {
return INVALID_HANDLE_VALUE;
}
Py_BEGIN_ALLOW_THREADS
handle = CreateMutexW(security_attributes, initial_owner, name);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE) {
PyErr_SetFromWindowsErr(0);
}
return handle;
}
/*[clinic input]
_winapi.CreateNamedPipe -> HANDLE
@ -1590,6 +1654,67 @@ _winapi_UnmapViewOfFile_impl(PyObject *module, LPCVOID address)
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.OpenEventW -> HANDLE
desired_access: DWORD
inherit_handle: BOOL
name: LPCWSTR
[clinic start generated code]*/
static HANDLE
_winapi_OpenEventW_impl(PyObject *module, DWORD desired_access,
BOOL inherit_handle, LPCWSTR name)
/*[clinic end generated code: output=c4a45e95545a4bd2 input=dec26598748d35aa]*/
{
HANDLE handle;
if (PySys_Audit("_winapi.OpenEventW", "Iu", desired_access, name) < 0) {
return INVALID_HANDLE_VALUE;
}
Py_BEGIN_ALLOW_THREADS
handle = OpenEventW(desired_access, inherit_handle, name);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE) {
PyErr_SetFromWindowsErr(0);
}
return handle;
}
/*[clinic input]
_winapi.OpenMutexW -> HANDLE
desired_access: DWORD
inherit_handle: BOOL
name: LPCWSTR
[clinic start generated code]*/
static HANDLE
_winapi_OpenMutexW_impl(PyObject *module, DWORD desired_access,
BOOL inherit_handle, LPCWSTR name)
/*[clinic end generated code: output=dda39d7844397bf0 input=f3a7b466c5307712]*/
{
HANDLE handle;
if (PySys_Audit("_winapi.OpenMutexW", "Iu", desired_access, name) < 0) {
return INVALID_HANDLE_VALUE;
}
Py_BEGIN_ALLOW_THREADS
handle = OpenMutexW(desired_access, inherit_handle, name);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE) {
PyErr_SetFromWindowsErr(0);
}
return handle;
}
/*[clinic input]
_winapi.OpenFileMapping -> HANDLE
@ -1820,6 +1945,75 @@ _winapi_ReadFile_impl(PyObject *module, HANDLE handle, DWORD size,
return Py_BuildValue("NI", buf, err);
}
/*[clinic input]
_winapi.ReleaseMutex
mutex: HANDLE
[clinic start generated code]*/
static PyObject *
_winapi_ReleaseMutex_impl(PyObject *module, HANDLE mutex)
/*[clinic end generated code: output=5b9001a72dd8af37 input=49e9d20de3559d84]*/
{
int err = 0;
Py_BEGIN_ALLOW_THREADS
if (!ReleaseMutex(mutex)) {
err = GetLastError();
}
Py_END_ALLOW_THREADS
if (err) {
return PyErr_SetFromWindowsErr(err);
}
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.ResetEvent
event: HANDLE
[clinic start generated code]*/
static PyObject *
_winapi_ResetEvent_impl(PyObject *module, HANDLE event)
/*[clinic end generated code: output=81c8501d57c0530d input=e2d42d990322e87a]*/
{
int err = 0;
Py_BEGIN_ALLOW_THREADS
if (!ResetEvent(event)) {
err = GetLastError();
}
Py_END_ALLOW_THREADS
if (err) {
return PyErr_SetFromWindowsErr(err);
}
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.SetEvent
event: HANDLE
[clinic start generated code]*/
static PyObject *
_winapi_SetEvent_impl(PyObject *module, HANDLE event)
/*[clinic end generated code: output=c18ba09eb9aa774d input=e660e830a37c09f8]*/
{
int err = 0;
Py_BEGIN_ALLOW_THREADS
if (!SetEvent(event)) {
err = GetLastError();
}
Py_END_ALLOW_THREADS
if (err) {
return PyErr_SetFromWindowsErr(err);
}
Py_RETURN_NONE;
}
/*[clinic input]
_winapi.SetNamedPipeHandleState
@ -1942,6 +2136,310 @@ _winapi_WaitNamedPipe_impl(PyObject *module, LPCTSTR name, DWORD timeout)
Py_RETURN_NONE;
}
typedef struct {
HANDLE handles[MAXIMUM_WAIT_OBJECTS];
HANDLE cancel_event;
DWORD handle_base;
DWORD handle_count;
HANDLE thread;
volatile DWORD result;
} BatchedWaitData;
static DWORD WINAPI
_batched_WaitForMultipleObjects_thread(LPVOID param)
{
BatchedWaitData *data = (BatchedWaitData *)param;
data->result = WaitForMultipleObjects(
data->handle_count,
data->handles,
FALSE,
INFINITE
);
if (data->result == WAIT_FAILED) {
DWORD err = GetLastError();
SetEvent(data->cancel_event);
return err;
} else if (data->result >= WAIT_ABANDONED_0 && data->result < WAIT_ABANDONED_0 + MAXIMUM_WAIT_OBJECTS) {
data->result = WAIT_FAILED;
SetEvent(data->cancel_event);
return ERROR_ABANDONED_WAIT_0;
}
return 0;
}
/*[clinic input]
_winapi.BatchedWaitForMultipleObjects
handle_seq: object
wait_all: BOOL
milliseconds: DWORD(c_default='INFINITE') = _winapi.INFINITE
Supports a larger number of handles than WaitForMultipleObjects
Note that the handles may be waited on other threads, which could cause
issues for objects like mutexes that become associated with the thread
that was waiting for them. Objects may also be left signalled, even if
the wait fails.
It is recommended to use WaitForMultipleObjects whenever possible, and
only switch to BatchedWaitForMultipleObjects for scenarios where you
control all the handles involved, such as your own thread pool or
files, and all wait objects are left unmodified by a wait (for example,
manual reset events, threads, and files/pipes).
Overlapped handles returned from this module use manual reset events.
[clinic start generated code]*/
static PyObject *
_winapi_BatchedWaitForMultipleObjects_impl(PyObject *module,
PyObject *handle_seq,
BOOL wait_all, DWORD milliseconds)
/*[clinic end generated code: output=d21c1a4ad0a252fd input=7e196f29005dc77b]*/
{
Py_ssize_t thread_count = 0, handle_count = 0, i, j;
Py_ssize_t nhandles;
BatchedWaitData *thread_data[MAXIMUM_WAIT_OBJECTS];
HANDLE handles[MAXIMUM_WAIT_OBJECTS];
HANDLE sigint_event = NULL;
HANDLE cancel_event = NULL;
DWORD result;
const Py_ssize_t _MAXIMUM_TOTAL_OBJECTS = (MAXIMUM_WAIT_OBJECTS - 1) * (MAXIMUM_WAIT_OBJECTS - 1);
if (!PySequence_Check(handle_seq)) {
PyErr_Format(PyExc_TypeError,
"sequence type expected, got '%s'",
Py_TYPE(handle_seq)->tp_name);
return NULL;
}
nhandles = PySequence_Length(handle_seq);
if (nhandles == -1) {
return NULL;
}
if (nhandles == 0) {
return wait_all ? Py_NewRef(Py_None) : PyList_New(0);
}
/* If this is the main thread then make the wait interruptible
by Ctrl-C. When waiting for *all* handles, it is only checked
in between batches. */
if (_PyOS_IsMainThread()) {
sigint_event = _PyOS_SigintEvent();
assert(sigint_event != NULL);
}
if (nhandles < 0 || nhandles > _MAXIMUM_TOTAL_OBJECTS) {
PyErr_Format(PyExc_ValueError,
"need at most %zd handles, got a sequence of length %zd",
_MAXIMUM_TOTAL_OBJECTS, nhandles);
return NULL;
}
if (!wait_all) {
cancel_event = CreateEventW(NULL, TRUE, FALSE, NULL);
if (!cancel_event) {
PyErr_SetExcFromWindowsErr(PyExc_OSError, 0);
return NULL;
}
}
i = 0;
while (i < nhandles) {
BatchedWaitData *data = (BatchedWaitData*)PyMem_Malloc(sizeof(BatchedWaitData));
if (!data) {
goto error;
}
thread_data[thread_count++] = data;
data->thread = NULL;
data->cancel_event = cancel_event;
data->handle_base = Py_SAFE_DOWNCAST(i, Py_ssize_t, DWORD);
data->handle_count = Py_SAFE_DOWNCAST(nhandles - i, Py_ssize_t, DWORD);
if (data->handle_count > MAXIMUM_WAIT_OBJECTS - 1) {
data->handle_count = MAXIMUM_WAIT_OBJECTS - 1;
}
for (j = 0; j < data->handle_count; ++i, ++j) {
PyObject *v = PySequence_GetItem(handle_seq, i);
if (!v || !PyArg_Parse(v, F_HANDLE, &data->handles[j])) {
Py_XDECREF(v);
goto error;
}
Py_DECREF(v);
}
if (!wait_all) {
data->handles[data->handle_count++] = cancel_event;
}
}
DWORD err = 0;
/* We need to use different strategies when waiting for ALL handles
as opposed to ANY handle. This is because there is no way to
(safely) interrupt a thread that is waiting for all handles in a
group. So for ALL handles, we loop over each set and wait. For
ANY handle, we use threads and wait on them. */
if (wait_all) {
Py_BEGIN_ALLOW_THREADS
long long deadline = 0;
if (milliseconds != INFINITE) {
deadline = (long long)GetTickCount64() + milliseconds;
}
for (i = 0; !err && i < thread_count; ++i) {
DWORD timeout = milliseconds;
if (deadline) {
long long time_to_deadline = deadline - GetTickCount64();
if (time_to_deadline <= 0) {
err = WAIT_TIMEOUT;
break;
} else if (time_to_deadline < UINT_MAX) {
timeout = (DWORD)time_to_deadline;
}
}
result = WaitForMultipleObjects(thread_data[i]->handle_count,
thread_data[i]->handles, TRUE, timeout);
// ABANDONED is not possible here because we own all the handles
if (result == WAIT_FAILED) {
err = GetLastError();
} else if (result == WAIT_TIMEOUT) {
err = WAIT_TIMEOUT;
}
if (!err && sigint_event) {
result = WaitForSingleObject(sigint_event, 0);
if (result == WAIT_OBJECT_0) {
err = ERROR_CONTROL_C_EXIT;
} else if (result == WAIT_FAILED) {
err = GetLastError();
}
}
}
CloseHandle(cancel_event);
Py_END_ALLOW_THREADS
} else {
Py_BEGIN_ALLOW_THREADS
for (i = 0; i < thread_count; ++i) {
BatchedWaitData *data = thread_data[i];
data->thread = CreateThread(
NULL,
1, // smallest possible initial stack
_batched_WaitForMultipleObjects_thread,
(LPVOID)data,
CREATE_SUSPENDED,
NULL
);
if (!data->thread) {
err = GetLastError();
break;
}
handles[handle_count++] = data->thread;
}
Py_END_ALLOW_THREADS
if (err) {
PyErr_SetExcFromWindowsErr(PyExc_OSError, err);
goto error;
}
if (handle_count > MAXIMUM_WAIT_OBJECTS - 1) {
// basically an assert, but stronger
PyErr_SetString(PyExc_SystemError, "allocated too many wait objects");
goto error;
}
Py_BEGIN_ALLOW_THREADS
// Once we start resuming threads, can no longer "goto error"
for (i = 0; i < thread_count; ++i) {
ResumeThread(thread_data[i]->thread);
}
if (sigint_event) {
handles[handle_count++] = sigint_event;
}
result = WaitForMultipleObjects((DWORD)handle_count, handles, wait_all, milliseconds);
// ABANDONED is not possible here because we own all the handles
if (result == WAIT_FAILED) {
err = GetLastError();
} else if (result == WAIT_TIMEOUT) {
err = WAIT_TIMEOUT;
} else if (sigint_event && result == WAIT_OBJECT_0 + handle_count) {
err = ERROR_CONTROL_C_EXIT;
}
SetEvent(cancel_event);
// Wait for all threads to finish before we start freeing their memory
if (sigint_event) {
handle_count -= 1;
}
WaitForMultipleObjects((DWORD)handle_count, handles, TRUE, INFINITE);
for (i = 0; i < thread_count; ++i) {
if (!err && thread_data[i]->result == WAIT_FAILED) {
if (!GetExitCodeThread(thread_data[i]->thread, &err)) {
err = GetLastError();
}
}
CloseHandle(thread_data[i]->thread);
}
CloseHandle(cancel_event);
Py_END_ALLOW_THREADS
}
PyObject *triggered_indices;
if (sigint_event != NULL && err == ERROR_CONTROL_C_EXIT) {
errno = EINTR;
PyErr_SetFromErrno(PyExc_OSError);
triggered_indices = NULL;
} else if (err) {
PyErr_SetExcFromWindowsErr(PyExc_OSError, err);
triggered_indices = NULL;
} else if (wait_all) {
triggered_indices = Py_NewRef(Py_None);
} else {
triggered_indices = PyList_New(0);
if (triggered_indices) {
for (i = 0; i < thread_count; ++i) {
Py_ssize_t triggered = (Py_ssize_t)thread_data[i]->result - WAIT_OBJECT_0;
if (triggered >= 0 && triggered < thread_data[i]->handle_count - 1) {
PyObject *v = PyLong_FromSsize_t(thread_data[i]->handle_base + triggered);
if (!v || PyList_Append(triggered_indices, v) < 0) {
Py_XDECREF(v);
Py_CLEAR(triggered_indices);
break;
}
Py_DECREF(v);
}
}
}
}
for (i = 0; i < thread_count; ++i) {
PyMem_Free((void *)thread_data[i]);
}
return triggered_indices;
error:
// We should only enter here before any threads start running.
// Once we start resuming threads, different cleanup is required
CloseHandle(cancel_event);
while (--thread_count >= 0) {
HANDLE t = thread_data[thread_count]->thread;
if (t) {
TerminateThread(t, WAIT_ABANDONED_0);
CloseHandle(t);
}
PyMem_Free((void *)thread_data[thread_count]);
}
return NULL;
}
/*[clinic input]
_winapi.WaitForMultipleObjects
@ -2335,8 +2833,10 @@ _winapi_CopyFile2_impl(PyObject *module, LPCWSTR existing_file_name,
static PyMethodDef winapi_functions[] = {
_WINAPI_CLOSEHANDLE_METHODDEF
_WINAPI_CONNECTNAMEDPIPE_METHODDEF
_WINAPI_CREATEEVENTW_METHODDEF
_WINAPI_CREATEFILE_METHODDEF
_WINAPI_CREATEFILEMAPPING_METHODDEF
_WINAPI_CREATEMUTEXW_METHODDEF
_WINAPI_CREATENAMEDPIPE_METHODDEF
_WINAPI_CREATEPIPE_METHODDEF
_WINAPI_CREATEPROCESS_METHODDEF
@ -2350,17 +2850,23 @@ static PyMethodDef winapi_functions[] = {
_WINAPI_GETSTDHANDLE_METHODDEF
_WINAPI_GETVERSION_METHODDEF
_WINAPI_MAPVIEWOFFILE_METHODDEF
_WINAPI_OPENEVENTW_METHODDEF
_WINAPI_OPENFILEMAPPING_METHODDEF
_WINAPI_OPENMUTEXW_METHODDEF
_WINAPI_OPENPROCESS_METHODDEF
_WINAPI_PEEKNAMEDPIPE_METHODDEF
_WINAPI_LCMAPSTRINGEX_METHODDEF
_WINAPI_READFILE_METHODDEF
_WINAPI_RELEASEMUTEX_METHODDEF
_WINAPI_RESETEVENT_METHODDEF
_WINAPI_SETEVENT_METHODDEF
_WINAPI_SETNAMEDPIPEHANDLESTATE_METHODDEF
_WINAPI_TERMINATEPROCESS_METHODDEF
_WINAPI_UNMAPVIEWOFFILE_METHODDEF
_WINAPI_VIRTUALQUERYSIZE_METHODDEF
_WINAPI_WAITNAMEDPIPE_METHODDEF
_WINAPI_WAITFORMULTIPLEOBJECTS_METHODDEF
_WINAPI_BATCHEDWAITFORMULTIPLEOBJECTS_METHODDEF
_WINAPI_WAITFORSINGLEOBJECT_METHODDEF
_WINAPI_WRITEFILE_METHODDEF
_WINAPI_GETACP_METHODDEF