Merged revisions 64104,64117 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r64104 | benjamin.peterson | 2008-06-10 21:40:25 -0500 (Tue, 10 Jun 2008) | 2 lines

  add the multiprocessing package to fulfill PEP 371
........
  r64117 | benjamin.peterson | 2008-06-11 07:26:31 -0500 (Wed, 11 Jun 2008) | 2 lines

  fix import of multiprocessing by juggling imports
........
This commit is contained in:
Benjamin Peterson 2008-06-11 16:44:04 +00:00
parent eec3d71379
commit e711cafab1
32 changed files with 12513 additions and 1 deletions

View file

@ -0,0 +1,515 @@
/*
* Definition of a `Connection` type.
* Used by `socket_connection.c` and `pipe_connection.c`.
*
* connection.h
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#ifndef CONNECTION_H
#define CONNECTION_H
/*
* Read/write flags
*/
#define READABLE 1
#define WRITABLE 2
#define CHECK_READABLE(self) \
if (!(self->flags & READABLE)) { \
PyErr_SetString(PyExc_IOError, "connection is write-only"); \
return NULL; \
}
#define CHECK_WRITABLE(self) \
if (!(self->flags & WRITABLE)) { \
PyErr_SetString(PyExc_IOError, "connection is read-only"); \
return NULL; \
}
/*
* Allocation and deallocation
*/
static PyObject *
connection_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
ConnectionObject *self;
HANDLE handle;
BOOL readable = TRUE, writable = TRUE;
static char *kwlist[] = {"handle", "readable", "writable", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, F_HANDLE "|ii", kwlist,
&handle, &readable, &writable))
return NULL;
if (handle == INVALID_HANDLE_VALUE || (Py_ssize_t)handle < 0) {
PyErr_Format(PyExc_IOError, "invalid handle %"
PY_FORMAT_SIZE_T "d", (Py_ssize_t)handle);
return NULL;
}
if (!readable && !writable) {
PyErr_SetString(PyExc_ValueError,
"either readable or writable must be true");
return NULL;
}
self = PyObject_New(ConnectionObject, type);
if (self == NULL)
return NULL;
self->weakreflist = NULL;
self->handle = handle;
self->flags = 0;
if (readable)
self->flags |= READABLE;
if (writable)
self->flags |= WRITABLE;
assert(self->flags >= 1 && self->flags <= 3);
return (PyObject*)self;
}
static void
connection_dealloc(ConnectionObject* self)
{
if (self->weakreflist != NULL)
PyObject_ClearWeakRefs((PyObject*)self);
if (self->handle != INVALID_HANDLE_VALUE) {
Py_BEGIN_ALLOW_THREADS
CLOSE(self->handle);
Py_END_ALLOW_THREADS
}
PyObject_Del(self);
}
/*
* Functions for transferring buffers
*/
static PyObject *
connection_sendbytes(ConnectionObject *self, PyObject *args)
{
char *buffer;
Py_ssize_t length, offset=0, size=PY_SSIZE_T_MIN;
int res;
if (!PyArg_ParseTuple(args, F_RBUFFER "#|" F_PY_SSIZE_T F_PY_SSIZE_T,
&buffer, &length, &offset, &size))
return NULL;
CHECK_WRITABLE(self);
if (offset < 0) {
PyErr_SetString(PyExc_ValueError, "offset is negative");
return NULL;
}
if (length < offset) {
PyErr_SetString(PyExc_ValueError, "buffer length < offset");
return NULL;
}
if (size == PY_SSIZE_T_MIN) {
size = length - offset;
} else {
if (size < 0) {
PyErr_SetString(PyExc_ValueError, "size is negative");
return NULL;
}
if (offset + size > length) {
PyErr_SetString(PyExc_ValueError,
"buffer length < offset + size");
return NULL;
}
}
Py_BEGIN_ALLOW_THREADS
res = conn_send_string(self, buffer + offset, size);
Py_END_ALLOW_THREADS
if (res < 0)
return mp_SetError(PyExc_IOError, res);
Py_RETURN_NONE;
}
static PyObject *
connection_recvbytes(ConnectionObject *self, PyObject *args)
{
char *freeme = NULL;
Py_ssize_t res, maxlength = PY_SSIZE_T_MAX;
PyObject *result = NULL;
if (!PyArg_ParseTuple(args, "|" F_PY_SSIZE_T, &maxlength))
return NULL;
CHECK_READABLE(self);
if (maxlength < 0) {
PyErr_SetString(PyExc_ValueError, "maxlength < 0");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
&freeme, maxlength);
Py_END_ALLOW_THREADS
if (res < 0) {
if (res == MP_BAD_MESSAGE_LENGTH) {
if ((self->flags & WRITABLE) == 0) {
Py_BEGIN_ALLOW_THREADS
CLOSE(self->handle);
Py_END_ALLOW_THREADS
self->handle = INVALID_HANDLE_VALUE;
} else {
self->flags = WRITABLE;
}
}
mp_SetError(PyExc_IOError, res);
} else {
if (freeme == NULL) {
result = PyBytes_FromStringAndSize(self->buffer, res);
} else {
result = PyBytes_FromStringAndSize(freeme, res);
PyMem_Free(freeme);
}
}
return result;
}
static PyObject *
connection_recvbytes_into(ConnectionObject *self, PyObject *args)
{
char *freeme = NULL, *buffer = NULL;
Py_ssize_t res, length, offset = 0;
PyObject *result = NULL;
if (!PyArg_ParseTuple(args, "w#|" F_PY_SSIZE_T,
&buffer, &length, &offset))
return NULL;
CHECK_READABLE(self);
if (offset < 0) {
PyErr_SetString(PyExc_ValueError, "negative offset");
return NULL;
}
if (offset > length) {
PyErr_SetString(PyExc_ValueError, "offset too large");
return NULL;
}
Py_BEGIN_ALLOW_THREADS
res = conn_recv_string(self, buffer+offset, length-offset,
&freeme, PY_SSIZE_T_MAX);
Py_END_ALLOW_THREADS
if (res < 0) {
if (res == MP_BAD_MESSAGE_LENGTH) {
if ((self->flags & WRITABLE) == 0) {
Py_BEGIN_ALLOW_THREADS
CLOSE(self->handle);
Py_END_ALLOW_THREADS
self->handle = INVALID_HANDLE_VALUE;
} else {
self->flags = WRITABLE;
}
}
mp_SetError(PyExc_IOError, res);
} else {
if (freeme == NULL) {
result = PyInt_FromSsize_t(res);
} else {
result = PyObject_CallFunction(BufferTooShort,
F_RBUFFER "#",
freeme, res);
PyMem_Free(freeme);
if (result) {
PyErr_SetObject(BufferTooShort, result);
Py_DECREF(result);
}
return NULL;
}
}
return result;
}
/*
* Functions for transferring objects
*/
static PyObject *
connection_send_obj(ConnectionObject *self, PyObject *obj)
{
char *buffer;
int res;
Py_ssize_t length;
PyObject *pickled_string = NULL;
CHECK_WRITABLE(self);
pickled_string = PyObject_CallFunctionObjArgs(pickle_dumps, obj,
pickle_protocol, NULL);
if (!pickled_string)
goto failure;
if (PyBytes_AsStringAndSize(pickled_string, &buffer, &length) < 0)
goto failure;
Py_BEGIN_ALLOW_THREADS
res = conn_send_string(self, buffer, (int)length);
Py_END_ALLOW_THREADS
if (res < 0) {
mp_SetError(PyExc_IOError, res);
goto failure;
}
Py_XDECREF(pickled_string);
Py_RETURN_NONE;
failure:
Py_XDECREF(pickled_string);
return NULL;
}
static PyObject *
connection_recv_obj(ConnectionObject *self)
{
char *freeme = NULL;
Py_ssize_t res;
PyObject *temp = NULL, *result = NULL;
CHECK_READABLE(self);
Py_BEGIN_ALLOW_THREADS
res = conn_recv_string(self, self->buffer, CONNECTION_BUFFER_SIZE,
&freeme, PY_SSIZE_T_MAX);
Py_END_ALLOW_THREADS
if (res < 0) {
if (res == MP_BAD_MESSAGE_LENGTH) {
if ((self->flags & WRITABLE) == 0) {
Py_BEGIN_ALLOW_THREADS
CLOSE(self->handle);
Py_END_ALLOW_THREADS
self->handle = INVALID_HANDLE_VALUE;
} else {
self->flags = WRITABLE;
}
}
mp_SetError(PyExc_IOError, res);
} else {
if (freeme == NULL) {
temp = PyBytes_FromStringAndSize(self->buffer, res);
} else {
temp = PyBytes_FromStringAndSize(freeme, res);
PyMem_Free(freeme);
}
}
if (temp)
result = PyObject_CallFunctionObjArgs(pickle_loads,
temp, NULL);
Py_XDECREF(temp);
return result;
}
/*
* Other functions
*/
static PyObject *
connection_poll(ConnectionObject *self, PyObject *args)
{
PyObject *timeout_obj = NULL;
double timeout = 0.0;
int res;
CHECK_READABLE(self);
if (!PyArg_ParseTuple(args, "|O", &timeout_obj))
return NULL;
if (timeout_obj == NULL) {
timeout = 0.0;
} else if (timeout_obj == Py_None) {
timeout = -1.0; /* block forever */
} else {
timeout = PyFloat_AsDouble(timeout_obj);
if (PyErr_Occurred())
return NULL;
if (timeout < 0.0)
timeout = 0.0;
}
Py_BEGIN_ALLOW_THREADS
res = conn_poll(self, timeout);
Py_END_ALLOW_THREADS
switch (res) {
case TRUE:
Py_RETURN_TRUE;
case FALSE:
Py_RETURN_FALSE;
default:
return mp_SetError(PyExc_IOError, res);
}
}
static PyObject *
connection_fileno(ConnectionObject* self)
{
if (self->handle == INVALID_HANDLE_VALUE) {
PyErr_SetString(PyExc_IOError, "handle is invalid");
return NULL;
}
return PyInt_FromLong((long)self->handle);
}
static PyObject *
connection_close(ConnectionObject *self)
{
if (self->handle != INVALID_HANDLE_VALUE) {
Py_BEGIN_ALLOW_THREADS
CLOSE(self->handle);
Py_END_ALLOW_THREADS
self->handle = INVALID_HANDLE_VALUE;
}
Py_RETURN_NONE;
}
static PyObject *
connection_repr(ConnectionObject *self)
{
static char *conn_type[] = {"read-only", "write-only", "read-write"};
assert(self->flags >= 1 && self->flags <= 3);
return FROM_FORMAT("<%s %s, handle %" PY_FORMAT_SIZE_T "d>",
conn_type[self->flags - 1],
CONNECTION_NAME, (Py_ssize_t)self->handle);
}
/*
* Getters and setters
*/
static PyObject *
connection_closed(ConnectionObject *self, void *closure)
{
return PyBool_FromLong((long)(self->handle == INVALID_HANDLE_VALUE));
}
static PyObject *
connection_readable(ConnectionObject *self, void *closure)
{
return PyBool_FromLong((long)(self->flags & READABLE));
}
static PyObject *
connection_writable(ConnectionObject *self, void *closure)
{
return PyBool_FromLong((long)(self->flags & WRITABLE));
}
/*
* Tables
*/
static PyMethodDef connection_methods[] = {
{"send_bytes", (PyCFunction)connection_sendbytes, METH_VARARGS,
"send the byte data from a readable buffer-like object"},
{"recv_bytes", (PyCFunction)connection_recvbytes, METH_VARARGS,
"receive byte data as a string"},
{"recv_bytes_into",(PyCFunction)connection_recvbytes_into,METH_VARARGS,
"receive byte data into a writeable buffer-like object\n"
"returns the number of bytes read"},
{"send", (PyCFunction)connection_send_obj, METH_O,
"send a (picklable) object"},
{"recv", (PyCFunction)connection_recv_obj, METH_NOARGS,
"receive a (picklable) object"},
{"poll", (PyCFunction)connection_poll, METH_VARARGS,
"whether there is any input available to be read"},
{"fileno", (PyCFunction)connection_fileno, METH_NOARGS,
"file descriptor or handle of the connection"},
{"close", (PyCFunction)connection_close, METH_NOARGS,
"close the connection"},
{NULL} /* Sentinel */
};
static PyGetSetDef connection_getset[] = {
{"closed", (getter)connection_closed, NULL,
"True if the connection is closed", NULL},
{"readable", (getter)connection_readable, NULL,
"True if the connection is readable", NULL},
{"writable", (getter)connection_writable, NULL,
"True if the connection is writable", NULL},
{NULL}
};
/*
* Connection type
*/
PyDoc_STRVAR(connection_doc,
"Connection type whose constructor signature is\n\n"
" Connection(handle, readable=True, writable=True).\n\n"
"The constructor does *not* duplicate the handle.");
PyTypeObject CONNECTION_TYPE = {
PyVarObject_HEAD_INIT(NULL, 0)
/* tp_name */ "_multiprocessing." CONNECTION_NAME,
/* tp_basicsize */ sizeof(ConnectionObject),
/* tp_itemsize */ 0,
/* tp_dealloc */ (destructor)connection_dealloc,
/* tp_print */ 0,
/* tp_getattr */ 0,
/* tp_setattr */ 0,
/* tp_compare */ 0,
/* tp_repr */ (reprfunc)connection_repr,
/* tp_as_number */ 0,
/* tp_as_sequence */ 0,
/* tp_as_mapping */ 0,
/* tp_hash */ 0,
/* tp_call */ 0,
/* tp_str */ 0,
/* tp_getattro */ 0,
/* tp_setattro */ 0,
/* tp_as_buffer */ 0,
/* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_HAVE_WEAKREFS,
/* tp_doc */ connection_doc,
/* tp_traverse */ 0,
/* tp_clear */ 0,
/* tp_richcompare */ 0,
/* tp_weaklistoffset */ offsetof(ConnectionObject, weakreflist),
/* tp_iter */ 0,
/* tp_iternext */ 0,
/* tp_methods */ connection_methods,
/* tp_members */ 0,
/* tp_getset */ connection_getset,
/* tp_base */ 0,
/* tp_dict */ 0,
/* tp_descr_get */ 0,
/* tp_descr_set */ 0,
/* tp_dictoffset */ 0,
/* tp_init */ 0,
/* tp_alloc */ 0,
/* tp_new */ connection_new,
};
#endif /* CONNECTION_H */

View file

@ -0,0 +1,322 @@
/*
* Extension module used by mutliprocessing package
*
* multiprocessing.c
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
PyObject *create_win32_namespace(void);
PyObject *pickle_dumps, *pickle_loads, *pickle_protocol;
PyObject *ProcessError, *BufferTooShort;
/*
* Function which raises exceptions based on error codes
*/
PyObject *
mp_SetError(PyObject *Type, int num)
{
switch (num) {
#ifdef MS_WINDOWS
case MP_STANDARD_ERROR:
if (Type == NULL)
Type = PyExc_WindowsError;
PyErr_SetExcFromWindowsErr(Type, 0);
break;
case MP_SOCKET_ERROR:
if (Type == NULL)
Type = PyExc_WindowsError;
PyErr_SetExcFromWindowsErr(Type, WSAGetLastError());
break;
#else /* !MS_WINDOWS */
case MP_STANDARD_ERROR:
case MP_SOCKET_ERROR:
if (Type == NULL)
Type = PyExc_OSError;
PyErr_SetFromErrno(Type);
break;
#endif /* !MS_WINDOWS */
case MP_MEMORY_ERROR:
PyErr_NoMemory();
break;
case MP_END_OF_FILE:
PyErr_SetNone(PyExc_EOFError);
break;
case MP_EARLY_END_OF_FILE:
PyErr_SetString(PyExc_IOError,
"got end of file during message");
break;
case MP_BAD_MESSAGE_LENGTH:
PyErr_SetString(PyExc_IOError, "bad message length");
break;
case MP_EXCEPTION_HAS_BEEN_SET:
break;
default:
PyErr_Format(PyExc_RuntimeError,
"unkown error number %d", num);
}
return NULL;
}
/*
* Windows only
*/
#ifdef MS_WINDOWS
/* On Windows we set an event to signal Ctrl-C; compare with timemodule.c */
HANDLE sigint_event = NULL;
static BOOL WINAPI
ProcessingCtrlHandler(DWORD dwCtrlType)
{
SetEvent(sigint_event);
return FALSE;
}
/*
* Unix only
*/
#else /* !MS_WINDOWS */
#if HAVE_FD_TRANSFER
/* Functions for transferring file descriptors between processes.
Reimplements some of the functionality of the fdcred
module at http://www.mca-ltd.com/resources/fdcred_1.tgz. */
static PyObject *
multiprocessing_sendfd(PyObject *self, PyObject *args)
{
int conn, fd, res;
char dummy_char;
char buf[CMSG_SPACE(sizeof(int))];
struct msghdr msg = {0};
struct iovec dummy_iov;
struct cmsghdr *cmsg;
if (!PyArg_ParseTuple(args, "ii", &conn, &fd))
return NULL;
dummy_iov.iov_base = &dummy_char;
dummy_iov.iov_len = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
msg.msg_iov = &dummy_iov;
msg.msg_iovlen = 1;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
msg.msg_controllen = cmsg->cmsg_len;
*(int*)CMSG_DATA(cmsg) = fd;
Py_BEGIN_ALLOW_THREADS
res = sendmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
if (res < 0)
return PyErr_SetFromErrno(PyExc_OSError);
Py_RETURN_NONE;
}
static PyObject *
multiprocessing_recvfd(PyObject *self, PyObject *args)
{
int conn, fd, res;
char dummy_char;
char buf[CMSG_SPACE(sizeof(int))];
struct msghdr msg = {0};
struct iovec dummy_iov;
struct cmsghdr *cmsg;
if (!PyArg_ParseTuple(args, "i", &conn))
return NULL;
dummy_iov.iov_base = &dummy_char;
dummy_iov.iov_len = 1;
msg.msg_control = buf;
msg.msg_controllen = sizeof(buf);
msg.msg_iov = &dummy_iov;
msg.msg_iovlen = 1;
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
msg.msg_controllen = cmsg->cmsg_len;
Py_BEGIN_ALLOW_THREADS
res = recvmsg(conn, &msg, 0);
Py_END_ALLOW_THREADS
if (res < 0)
return PyErr_SetFromErrno(PyExc_OSError);
fd = *(int*)CMSG_DATA(cmsg);
return Py_BuildValue("i", fd);
}
#endif /* HAVE_FD_TRANSFER */
#endif /* !MS_WINDOWS */
/*
* All platforms
*/
static PyObject*
multiprocessing_address_of_buffer(PyObject *self, PyObject *obj)
{
void *buffer;
Py_ssize_t buffer_len;
if (PyObject_AsWriteBuffer(obj, &buffer, &buffer_len) < 0)
return NULL;
return Py_BuildValue("N" F_PY_SSIZE_T,
PyLong_FromVoidPtr(buffer), buffer_len);
}
/*
* Function table
*/
static PyMethodDef module_methods[] = {
{"address_of_buffer", multiprocessing_address_of_buffer, METH_O,
"address_of_buffer(obj) -> int\n"
"Return address of obj assuming obj supports buffer inteface"},
#if HAVE_FD_TRANSFER
{"sendfd", multiprocessing_sendfd, METH_VARARGS,
"sendfd(sockfd, fd) -> None\n"
"Send file descriptor given by fd over the unix domain socket\n"
"whose file decriptor is sockfd"},
{"recvfd", multiprocessing_recvfd, METH_VARARGS,
"recvfd(sockfd) -> fd\n"
"Receive a file descriptor over a unix domain socket\n"
"whose file decriptor is sockfd"},
#endif
{NULL}
};
/*
* Initialize
*/
static struct PyModuleDef multiprocessing_module = {
PyModuleDef_HEAD_INIT,
"_multiprocessing",
NULL,
-1,
module_methods,
NULL,
NULL,
NULL,
NULL
};
PyObject*
PyInit__multiprocessing(void)
{
PyObject *module, *temp;
/* Initialize module */
module = PyModule_Create(&multiprocessing_module);
if (!module)
return NULL;
/* Get copy of objects from pickle */
temp = PyImport_ImportModule(PICKLE_MODULE);
if (!temp)
return NULL;
pickle_dumps = PyObject_GetAttrString(temp, "dumps");
pickle_loads = PyObject_GetAttrString(temp, "loads");
pickle_protocol = PyObject_GetAttrString(temp, "HIGHEST_PROTOCOL");
Py_XDECREF(temp);
/* Get copy of BufferTooShort */
temp = PyImport_ImportModule("multiprocessing");
if (!temp)
return NULL;
BufferTooShort = PyObject_GetAttrString(temp, "BufferTooShort");
Py_XDECREF(temp);
/* Add connection type to module */
if (PyType_Ready(&ConnectionType) < 0)
return NULL;
Py_INCREF(&ConnectionType);
PyModule_AddObject(module, "Connection", (PyObject*)&ConnectionType);
#if defined(MS_WINDOWS) || HAVE_SEM_OPEN
/* Add SemLock type to module */
if (PyType_Ready(&SemLockType) < 0)
return NULL;
Py_INCREF(&SemLockType);
PyDict_SetItemString(SemLockType.tp_dict, "SEM_VALUE_MAX",
Py_BuildValue("i", SEM_VALUE_MAX));
PyModule_AddObject(module, "SemLock", (PyObject*)&SemLockType);
#endif
#ifdef MS_WINDOWS
/* Add PipeConnection to module */
if (PyType_Ready(&PipeConnectionType) < 0)
return NULL;
Py_INCREF(&PipeConnectionType);
PyModule_AddObject(module, "PipeConnection",
(PyObject*)&PipeConnectionType);
/* Initialize win32 class and add to multiprocessing */
temp = create_win32_namespace();
if (!temp)
return NULL;
PyModule_AddObject(module, "win32", temp);
/* Initialize the event handle used to signal Ctrl-C */
sigint_event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!sigint_event) {
PyErr_SetFromWindowsErr(0);
return NULL;
}
if (!SetConsoleCtrlHandler(ProcessingCtrlHandler, TRUE)) {
PyErr_SetFromWindowsErr(0);
return NULL;
}
#endif
/* Add configuration macros */
temp = PyDict_New();
if (!temp)
return NULL;
if (PyModule_AddObject(module, "flags", temp) < 0)
return NULL;
#define ADD_FLAG(name) \
if (PyDict_SetItemString(temp, #name, Py_BuildValue("i", name)) < 0) return NULL
#ifdef HAVE_SEM_OPEN
ADD_FLAG(HAVE_SEM_OPEN);
#endif
#ifdef HAVE_SEM_TIMEDWAIT
ADD_FLAG(HAVE_SEM_TIMEDWAIT);
#endif
#ifdef HAVE_FD_TRANSFER
ADD_FLAG(HAVE_FD_TRANSFER);
#endif
#ifdef HAVE_BROKEN_SEM_GETVALUE
ADD_FLAG(HAVE_BROKEN_SEM_GETVALUE);
#endif
#ifdef HAVE_BROKEN_SEM_UNLINK
ADD_FLAG(HAVE_BROKEN_SEM_UNLINK);
#endif
return module;
}

View file

@ -0,0 +1,163 @@
#ifndef MULTIPROCESSING_H
#define MULTIPROCESSING_H
#define PY_SSIZE_T_CLEAN
#include "Python.h"
#include "structmember.h"
#include "pythread.h"
/*
* Platform includes and definitions
*/
#ifdef MS_WINDOWS
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
# include <winsock2.h>
# include <process.h> /* getpid() */
# define SEM_HANDLE HANDLE
# define SEM_VALUE_MAX LONG_MAX
#else
# include <fcntl.h> /* O_CREAT and O_EXCL */
# include <sys/socket.h>
# include <arpa/inet.h> /* htonl() and ntohl() */
# if HAVE_SEM_OPEN
# include <semaphore.h>
typedef sem_t *SEM_HANDLE;
# endif
# define HANDLE int
# define SOCKET int
# define BOOL int
# define UINT32 uint32_t
# define INT32 int32_t
# define TRUE 1
# define FALSE 0
# define INVALID_HANDLE_VALUE (-1)
#endif
/*
* Make sure Py_ssize_t available
*/
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t;
# define PY_SSIZE_T_MAX INT_MAX
# define PY_SSIZE_T_MIN INT_MIN
# define F_PY_SSIZE_T "i"
# define PY_FORMAT_SIZE_T ""
# define PyInt_FromSsize_t(n) PyInt_FromLong((long)n)
#else
# define F_PY_SSIZE_T "n"
#endif
/*
* Format codes
*/
#if SIZEOF_VOID_P == SIZEOF_LONG
# define F_POINTER "k"
# define T_POINTER T_ULONG
#elif defined(HAVE_LONG_LONG) && (SIZEOF_VOID_P == SIZEOF_LONG_LONG)
# define F_POINTER "K"
# define T_POINTER T_ULONGLONG
#else
# error "can't find format code for unsigned integer of same size as void*"
#endif
#ifdef MS_WINDOWS
# define F_HANDLE F_POINTER
# define T_HANDLE T_POINTER
# define F_SEM_HANDLE F_HANDLE
# define T_SEM_HANDLE T_HANDLE
# define F_DWORD "k"
# define T_DWORD T_ULONG
#else
# define F_HANDLE "i"
# define T_HANDLE T_INT
# define F_SEM_HANDLE F_POINTER
# define T_SEM_HANDLE T_POINTER
#endif
#if PY_VERSION_HEX >= 0x03000000
# define F_RBUFFER "y"
#else
# define F_RBUFFER "s"
#endif
/*
* Error codes which can be returned by functions called without GIL
*/
#define MP_SUCCESS (0)
#define MP_STANDARD_ERROR (-1)
#define MP_MEMORY_ERROR (-1001)
#define MP_END_OF_FILE (-1002)
#define MP_EARLY_END_OF_FILE (-1003)
#define MP_BAD_MESSAGE_LENGTH (-1004)
#define MP_SOCKET_ERROR (-1005)
#define MP_EXCEPTION_HAS_BEEN_SET (-1006)
PyObject *mp_SetError(PyObject *Type, int num);
/*
* Externs - not all will really exist on all platforms
*/
extern PyObject *pickle_dumps;
extern PyObject *pickle_loads;
extern PyObject *pickle_protocol;
extern PyObject *BufferTooShort;
extern PyTypeObject SemLockType;
extern PyTypeObject ConnectionType;
extern PyTypeObject PipeConnectionType;
extern HANDLE sigint_event;
/*
* Py3k compatibility
*/
#if PY_VERSION_HEX >= 0x03000000
# define PICKLE_MODULE "pickle"
# define FROM_FORMAT PyUnicode_FromFormat
# define PyInt_FromLong PyLong_FromLong
# define PyInt_FromSsize_t PyLong_FromSsize_t
#else
# define PICKLE_MODULE "cPickle"
# define FROM_FORMAT PyString_FromFormat
#endif
#ifndef PyVarObject_HEAD_INIT
# define PyVarObject_HEAD_INIT(type, size) PyObject_HEAD_INIT(type) size,
#endif
#ifndef Py_TPFLAGS_HAVE_WEAKREFS
# define Py_TPFLAGS_HAVE_WEAKREFS 0
#endif
/*
* Connection definition
*/
#define CONNECTION_BUFFER_SIZE 1024
typedef struct {
PyObject_HEAD
HANDLE handle;
int flags;
PyObject *weakreflist;
char buffer[CONNECTION_BUFFER_SIZE];
} ConnectionObject;
/*
* Miscellaneous
*/
#define MAX_MESSAGE_LENGTH 0x7fffffff
#ifndef MIN
# define MIN(x, y) ((x) < (y) ? x : y)
# define MAX(x, y) ((x) > (y) ? x : y)
#endif
#endif /* MULTIPROCESSING_H */

View file

@ -0,0 +1,136 @@
/*
* A type which wraps a pipe handle in message oriented mode
*
* pipe_connection.c
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
#define CLOSE(h) CloseHandle(h)
/*
* Send string to the pipe; assumes in message oriented mode
*/
static Py_ssize_t
conn_send_string(ConnectionObject *conn, char *string, size_t length)
{
DWORD amount_written;
return WriteFile(conn->handle, string, length, &amount_written, NULL)
? MP_SUCCESS : MP_STANDARD_ERROR;
}
/*
* Attempts to read into buffer, or if buffer too small into *newbuffer.
*
* Returns number of bytes read. Assumes in message oriented mode.
*/
static Py_ssize_t
conn_recv_string(ConnectionObject *conn, char *buffer,
size_t buflength, char **newbuffer, size_t maxlength)
{
DWORD left, length, full_length, err;
*newbuffer = NULL;
if (ReadFile(conn->handle, buffer, MIN(buflength, maxlength),
&length, NULL))
return length;
err = GetLastError();
if (err != ERROR_MORE_DATA) {
if (err == ERROR_BROKEN_PIPE)
return MP_END_OF_FILE;
return MP_STANDARD_ERROR;
}
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, NULL, &left))
return MP_STANDARD_ERROR;
full_length = length + left;
if (full_length > maxlength)
return MP_BAD_MESSAGE_LENGTH;
*newbuffer = PyMem_Malloc(full_length);
if (*newbuffer == NULL)
return MP_MEMORY_ERROR;
memcpy(*newbuffer, buffer, length);
if (ReadFile(conn->handle, *newbuffer+length, left, &length, NULL)) {
assert(length == left);
return full_length;
} else {
PyMem_Free(*newbuffer);
return MP_STANDARD_ERROR;
}
}
/*
* Check whether any data is available for reading
*/
#define conn_poll(conn, timeout) conn_poll_save(conn, timeout, _save)
static int
conn_poll_save(ConnectionObject *conn, double timeout, PyThreadState *_save)
{
DWORD bytes, deadline, delay;
int difference, res;
BOOL block = FALSE;
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
return MP_STANDARD_ERROR;
if (timeout == 0.0)
return bytes > 0;
if (timeout < 0.0)
block = TRUE;
else
/* XXX does not check for overflow */
deadline = GetTickCount() + (DWORD)(1000 * timeout + 0.5);
Sleep(0);
for (delay = 1 ; ; delay += 1) {
if (!PeekNamedPipe(conn->handle, NULL, 0, NULL, &bytes, NULL))
return MP_STANDARD_ERROR;
else if (bytes > 0)
return TRUE;
if (!block) {
difference = deadline - GetTickCount();
if (difference < 0)
return FALSE;
if ((int)delay > difference)
delay = difference;
}
if (delay > 20)
delay = 20;
Sleep(delay);
/* check for signals */
Py_BLOCK_THREADS
res = PyErr_CheckSignals();
Py_UNBLOCK_THREADS
if (res)
return MP_EXCEPTION_HAS_BEEN_SET;
}
}
/*
* "connection.h" defines the PipeConnection type using the definitions above
*/
#define CONNECTION_NAME "PipeConnection"
#define CONNECTION_TYPE PipeConnectionType
#include "connection.h"

View file

@ -0,0 +1,625 @@
/*
* A type which wraps a semaphore
*
* semaphore.c
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
enum { RECURSIVE_MUTEX, SEMAPHORE };
typedef struct {
PyObject_HEAD
SEM_HANDLE handle;
long last_tid;
int count;
int maxvalue;
int kind;
} SemLockObject;
#define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
#ifdef MS_WINDOWS
/*
* Windows definitions
*/
#define SEM_FAILED NULL
#define SEM_CLEAR_ERROR() SetLastError(0)
#define SEM_GET_LAST_ERROR() GetLastError()
#define SEM_CREATE(name, val, max) CreateSemaphore(NULL, val, max, NULL)
#define SEM_CLOSE(sem) (CloseHandle(sem) ? 0 : -1)
#define SEM_GETVALUE(sem, pval) _GetSemaphoreValue(sem, pval)
#define SEM_UNLINK(name) 0
static int
_GetSemaphoreValue(HANDLE handle, long *value)
{
long previous;
switch (WaitForSingleObject(handle, 0)) {
case WAIT_OBJECT_0:
if (!ReleaseSemaphore(handle, 1, &previous))
return MP_STANDARD_ERROR;
*value = previous + 1;
return 0;
case WAIT_TIMEOUT:
*value = 0;
return 0;
default:
return MP_STANDARD_ERROR;
}
}
static PyObject *
semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
{
int blocking = 1;
double timeout;
PyObject *timeout_obj = Py_None;
DWORD res, full_msecs, msecs, start, ticks;
static char *kwlist[] = {"block", "timeout", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
&blocking, &timeout_obj))
return NULL;
/* calculate timeout */
if (!blocking) {
full_msecs = 0;
} else if (timeout_obj == Py_None) {
full_msecs = INFINITE;
} else {
timeout = PyFloat_AsDouble(timeout_obj);
if (PyErr_Occurred())
return NULL;
timeout *= 1000.0; /* convert to millisecs */
if (timeout < 0.0) {
timeout = 0.0;
} else if (timeout >= 0.5 * INFINITE) { /* 25 days */
PyErr_SetString(PyExc_OverflowError,
"timeout is too large");
return NULL;
}
full_msecs = (DWORD)(timeout + 0.5);
}
/* check whether we already own the lock */
if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
++self->count;
Py_RETURN_TRUE;
}
/* check whether we can acquire without blocking */
if (WaitForSingleObject(self->handle, 0) == WAIT_OBJECT_0) {
self->last_tid = GetCurrentThreadId();
++self->count;
Py_RETURN_TRUE;
}
msecs = full_msecs;
start = GetTickCount();
for ( ; ; ) {
HANDLE handles[2] = {self->handle, sigint_event};
/* do the wait */
Py_BEGIN_ALLOW_THREADS
ResetEvent(sigint_event);
res = WaitForMultipleObjects(2, handles, FALSE, msecs);
Py_END_ALLOW_THREADS
/* handle result */
if (res != WAIT_OBJECT_0 + 1)
break;
/* got SIGINT so give signal handler a chance to run */
Sleep(1);
/* if this is main thread let KeyboardInterrupt be raised */
if (PyErr_CheckSignals())
return NULL;
/* recalculate timeout */
if (msecs != INFINITE) {
ticks = GetTickCount();
if ((DWORD)(ticks - start) >= full_msecs)
Py_RETURN_FALSE;
msecs = full_msecs - (ticks - start);
}
}
/* handle result */
switch (res) {
case WAIT_TIMEOUT:
Py_RETURN_FALSE;
case WAIT_OBJECT_0:
self->last_tid = GetCurrentThreadId();
++self->count;
Py_RETURN_TRUE;
case WAIT_FAILED:
return PyErr_SetFromWindowsErr(0);
default:
PyErr_Format(PyExc_RuntimeError, "WaitForSingleObject() or "
"WaitForMultipleObjects() gave unrecognized "
"value %d", res);
return NULL;
}
}
static PyObject *
semlock_release(SemLockObject *self, PyObject *args)
{
if (self->kind == RECURSIVE_MUTEX) {
if (!ISMINE(self)) {
PyErr_SetString(PyExc_AssertionError, "attempt to "
"release recursive lock not owned "
"by thread");
return NULL;
}
if (self->count > 1) {
--self->count;
Py_RETURN_NONE;
}
assert(self->count == 1);
}
if (!ReleaseSemaphore(self->handle, 1, NULL)) {
if (GetLastError() == ERROR_TOO_MANY_POSTS) {
PyErr_SetString(PyExc_ValueError, "semaphore or lock "
"released too many times");
return NULL;
} else {
return PyErr_SetFromWindowsErr(0);
}
}
--self->count;
Py_RETURN_NONE;
}
#else /* !MS_WINDOWS */
/*
* Unix definitions
*/
#define SEM_CLEAR_ERROR()
#define SEM_GET_LAST_ERROR() 0
#define SEM_CREATE(name, val, max) sem_open(name, O_CREAT | O_EXCL, 0600, val)
#define SEM_CLOSE(sem) sem_close(sem)
#define SEM_GETVALUE(sem, pval) sem_getvalue(sem, pval)
#define SEM_UNLINK(name) sem_unlink(name)
#if HAVE_BROKEN_SEM_UNLINK
# define sem_unlink(name) 0
#endif
#if !HAVE_SEM_TIMEDWAIT
# define sem_timedwait(sem,deadline) sem_timedwait_save(sem,deadline,_save)
int
sem_timedwait_save(sem_t *sem, struct timespec *deadline, PyThreadState *_save)
{
int res;
unsigned long delay, difference;
struct timeval now, tvdeadline, tvdelay;
errno = 0;
tvdeadline.tv_sec = deadline->tv_sec;
tvdeadline.tv_usec = deadline->tv_nsec / 1000;
for (delay = 0 ; ; delay += 1000) {
/* poll */
if (sem_trywait(sem) == 0)
return 0;
else if (errno != EAGAIN)
return MP_STANDARD_ERROR;
/* get current time */
if (gettimeofday(&now, NULL) < 0)
return MP_STANDARD_ERROR;
/* check for timeout */
if (tvdeadline.tv_sec < now.tv_sec ||
(tvdeadline.tv_sec == now.tv_sec &&
tvdeadline.tv_usec <= now.tv_usec)) {
errno = ETIMEDOUT;
return MP_STANDARD_ERROR;
}
/* calculate how much time is left */
difference = (tvdeadline.tv_sec - now.tv_sec) * 1000000 +
(tvdeadline.tv_usec - now.tv_usec);
/* check delay not too long -- maximum is 20 msecs */
if (delay > 20000)
delay = 20000;
if (delay > difference)
delay = difference;
/* sleep */
tvdelay.tv_sec = delay / 1000000;
tvdelay.tv_usec = delay % 1000000;
if (select(0, NULL, NULL, NULL, &tvdelay) < 0)
return MP_STANDARD_ERROR;
/* check for signals */
Py_BLOCK_THREADS
res = PyErr_CheckSignals();
Py_UNBLOCK_THREADS
if (res) {
errno = EINTR;
return MP_EXCEPTION_HAS_BEEN_SET;
}
}
}
#endif /* !HAVE_SEM_TIMEDWAIT */
static PyObject *
semlock_acquire(SemLockObject *self, PyObject *args, PyObject *kwds)
{
int blocking = 1, res;
double timeout;
PyObject *timeout_obj = Py_None;
struct timespec deadline = {0};
struct timeval now;
long sec, nsec;
static char *kwlist[] = {"block", "timeout", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|iO", kwlist,
&blocking, &timeout_obj))
return NULL;
if (self->kind == RECURSIVE_MUTEX && ISMINE(self)) {
++self->count;
Py_RETURN_TRUE;
}
if (timeout_obj != Py_None) {
timeout = PyFloat_AsDouble(timeout_obj);
if (PyErr_Occurred())
return NULL;
if (timeout < 0.0)
timeout = 0.0;
if (gettimeofday(&now, NULL) < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
sec = (long) timeout;
nsec = (long) (1e9 * (timeout - sec) + 0.5);
deadline.tv_sec = now.tv_sec + sec;
deadline.tv_nsec = now.tv_usec * 1000 + nsec;
deadline.tv_sec += (deadline.tv_nsec / 1000000000);
deadline.tv_nsec %= 1000000000;
}
do {
Py_BEGIN_ALLOW_THREADS
if (blocking && timeout_obj == Py_None)
res = sem_wait(self->handle);
else if (!blocking)
res = sem_trywait(self->handle);
else
res = sem_timedwait(self->handle, &deadline);
Py_END_ALLOW_THREADS
if (res == MP_EXCEPTION_HAS_BEEN_SET)
break;
} while (res < 0 && errno == EINTR && !PyErr_CheckSignals());
if (res < 0) {
if (errno == EAGAIN || errno == ETIMEDOUT)
Py_RETURN_FALSE;
else if (errno == EINTR)
return NULL;
else
return PyErr_SetFromErrno(PyExc_OSError);
}
++self->count;
self->last_tid = PyThread_get_thread_ident();
Py_RETURN_TRUE;
}
static PyObject *
semlock_release(SemLockObject *self, PyObject *args)
{
if (self->kind == RECURSIVE_MUTEX) {
if (!ISMINE(self)) {
PyErr_SetString(PyExc_AssertionError, "attempt to "
"release recursive lock not owned "
"by thread");
return NULL;
}
if (self->count > 1) {
--self->count;
Py_RETURN_NONE;
}
assert(self->count == 1);
} else {
#if HAVE_BROKEN_SEM_GETVALUE
/* We will only check properly the maxvalue == 1 case */
if (self->maxvalue == 1) {
/* make sure that already locked */
if (sem_trywait(self->handle) < 0) {
if (errno != EAGAIN) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
/* it is already locked as expected */
} else {
/* it was not locked so undo wait and raise */
if (sem_post(self->handle) < 0) {
PyErr_SetFromErrno(PyExc_OSError);
return NULL;
}
PyErr_SetString(PyExc_ValueError, "semaphore "
"or lock released too many "
"times");
return NULL;
}
}
#else
int sval;
/* This check is not an absolute guarantee that the semaphore
does not rise above maxvalue. */
if (sem_getvalue(self->handle, &sval) < 0) {
return PyErr_SetFromErrno(PyExc_OSError);
} else if (sval >= self->maxvalue) {
PyErr_SetString(PyExc_ValueError, "semaphore or lock "
"released too many times");
return NULL;
}
#endif
}
if (sem_post(self->handle) < 0)
return PyErr_SetFromErrno(PyExc_OSError);
--self->count;
Py_RETURN_NONE;
}
#endif /* !MS_WINDOWS */
/*
* All platforms
*/
static PyObject *
newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
{
SemLockObject *self;
self = PyObject_New(SemLockObject, type);
if (!self)
return NULL;
self->handle = handle;
self->kind = kind;
self->count = 0;
self->last_tid = 0;
self->maxvalue = maxvalue;
return (PyObject*)self;
}
static PyObject *
semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
char buffer[256];
SEM_HANDLE handle = SEM_FAILED;
int kind, maxvalue, value;
PyObject *result;
static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
static int counter = 0;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
&kind, &value, &maxvalue))
return NULL;
if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
PyErr_SetString(PyExc_ValueError, "unrecognized kind");
return NULL;
}
PyOS_snprintf(buffer, sizeof(buffer), "/mp%d-%d", getpid(), counter++);
SEM_CLEAR_ERROR();
handle = SEM_CREATE(buffer, value, maxvalue);
/* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
goto failure;
if (SEM_UNLINK(buffer) < 0)
goto failure;
result = newsemlockobject(type, handle, kind, maxvalue);
if (!result)
goto failure;
return result;
failure:
if (handle != SEM_FAILED)
SEM_CLOSE(handle);
mp_SetError(NULL, MP_STANDARD_ERROR);
return NULL;
}
static PyObject *
semlock_rebuild(PyTypeObject *type, PyObject *args)
{
SEM_HANDLE handle;
int kind, maxvalue;
if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
&handle, &kind, &maxvalue))
return NULL;
return newsemlockobject(type, handle, kind, maxvalue);
}
static void
semlock_dealloc(SemLockObject* self)
{
if (self->handle != SEM_FAILED)
SEM_CLOSE(self->handle);
PyObject_Del(self);
}
static PyObject *
semlock_count(SemLockObject *self)
{
return PyInt_FromLong((long)self->count);
}
static PyObject *
semlock_ismine(SemLockObject *self)
{
/* only makes sense for a lock */
return PyBool_FromLong(ISMINE(self));
}
static PyObject *
semlock_getvalue(SemLockObject *self)
{
#if HAVE_BROKEN_SEM_GETVALUE
PyErr_SetNone(PyExc_NotImplementedError);
return NULL;
#else
int sval;
if (SEM_GETVALUE(self->handle, &sval) < 0)
return mp_SetError(NULL, MP_STANDARD_ERROR);
/* some posix implementations use negative numbers to indicate
the number of waiting threads */
if (sval < 0)
sval = 0;
return PyInt_FromLong((long)sval);
#endif
}
static PyObject *
semlock_iszero(SemLockObject *self)
{
int sval;
#if HAVE_BROKEN_SEM_GETVALUE
if (sem_trywait(self->handle) < 0) {
if (errno == EAGAIN)
Py_RETURN_TRUE;
return mp_SetError(NULL, MP_STANDARD_ERROR);
} else {
if (sem_post(self->handle) < 0)
return mp_SetError(NULL, MP_STANDARD_ERROR);
Py_RETURN_FALSE;
}
#else
if (SEM_GETVALUE(self->handle, &sval) < 0)
return mp_SetError(NULL, MP_STANDARD_ERROR);
return PyBool_FromLong((long)sval == 0);
#endif
}
static PyObject *
semlock_afterfork(SemLockObject *self)
{
self->count = 0;
Py_RETURN_NONE;
}
/*
* Semaphore methods
*/
static PyMethodDef semlock_methods[] = {
{"acquire", (PyCFunction)semlock_acquire, METH_VARARGS | METH_KEYWORDS,
"acquire the semaphore/lock"},
{"release", (PyCFunction)semlock_release, METH_NOARGS,
"release the semaphore/lock"},
{"__enter__", (PyCFunction)semlock_acquire, METH_VARARGS,
"enter the semaphore/lock"},
{"__exit__", (PyCFunction)semlock_release, METH_VARARGS,
"exit the semaphore/lock"},
{"_count", (PyCFunction)semlock_count, METH_NOARGS,
"num of `acquire()`s minus num of `release()`s for this process"},
{"_is_mine", (PyCFunction)semlock_ismine, METH_NOARGS,
"whether the lock is owned by this thread"},
{"_get_value", (PyCFunction)semlock_getvalue, METH_NOARGS,
"get the value of the semaphore"},
{"_is_zero", (PyCFunction)semlock_iszero, METH_NOARGS,
"returns whether semaphore has value zero"},
{"_rebuild", (PyCFunction)semlock_rebuild, METH_VARARGS | METH_CLASS,
""},
{"_after_fork", (PyCFunction)semlock_afterfork, METH_NOARGS,
"rezero the net acquisition count after fork()"},
{NULL}
};
/*
* Member table
*/
static PyMemberDef semlock_members[] = {
{"handle", T_SEM_HANDLE, offsetof(SemLockObject, handle), READONLY,
""},
{"kind", T_INT, offsetof(SemLockObject, kind), READONLY,
""},
{"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
""},
{NULL}
};
/*
* Semaphore type
*/
PyTypeObject SemLockType = {
PyVarObject_HEAD_INIT(NULL, 0)
/* tp_name */ "_multiprocessing.SemLock",
/* tp_basicsize */ sizeof(SemLockObject),
/* tp_itemsize */ 0,
/* tp_dealloc */ (destructor)semlock_dealloc,
/* tp_print */ 0,
/* tp_getattr */ 0,
/* tp_setattr */ 0,
/* tp_compare */ 0,
/* tp_repr */ 0,
/* tp_as_number */ 0,
/* tp_as_sequence */ 0,
/* tp_as_mapping */ 0,
/* tp_hash */ 0,
/* tp_call */ 0,
/* tp_str */ 0,
/* tp_getattro */ 0,
/* tp_setattro */ 0,
/* tp_as_buffer */ 0,
/* tp_flags */ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
/* tp_doc */ "Semaphore/Mutex type",
/* tp_traverse */ 0,
/* tp_clear */ 0,
/* tp_richcompare */ 0,
/* tp_weaklistoffset */ 0,
/* tp_iter */ 0,
/* tp_iternext */ 0,
/* tp_methods */ semlock_methods,
/* tp_members */ semlock_members,
/* tp_getset */ 0,
/* tp_base */ 0,
/* tp_dict */ 0,
/* tp_descr_get */ 0,
/* tp_descr_set */ 0,
/* tp_dictoffset */ 0,
/* tp_init */ 0,
/* tp_alloc */ 0,
/* tp_new */ semlock_new,
};

View file

@ -0,0 +1,180 @@
/*
* A type which wraps a socket
*
* socket_connection.c
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
#ifdef MS_WINDOWS
# define WRITE(h, buffer, length) send((SOCKET)h, buffer, length, 0)
# define READ(h, buffer, length) recv((SOCKET)h, buffer, length, 0)
# define CLOSE(h) closesocket((SOCKET)h)
#else
# define WRITE(h, buffer, length) write(h, buffer, length)
# define READ(h, buffer, length) read(h, buffer, length)
# define CLOSE(h) close(h)
#endif
/*
* Send string to file descriptor
*/
static Py_ssize_t
_conn_sendall(HANDLE h, char *string, size_t length)
{
char *p = string;
Py_ssize_t res;
while (length > 0) {
res = WRITE(h, p, length);
if (res < 0)
return MP_SOCKET_ERROR;
length -= res;
p += res;
}
return MP_SUCCESS;
}
/*
* Receive string of exact length from file descriptor
*/
static Py_ssize_t
_conn_recvall(HANDLE h, char *buffer, size_t length)
{
size_t remaining = length;
Py_ssize_t temp;
char *p = buffer;
while (remaining > 0) {
temp = READ(h, p, remaining);
if (temp <= 0) {
if (temp == 0)
return remaining == length ?
MP_END_OF_FILE : MP_EARLY_END_OF_FILE;
else
return temp;
}
remaining -= temp;
p += temp;
}
return MP_SUCCESS;
}
/*
* Send a string prepended by the string length in network byte order
*/
static Py_ssize_t
conn_send_string(ConnectionObject *conn, char *string, size_t length)
{
/* The "header" of the message is a 32 bit unsigned number (in
network order) which specifies the length of the "body". If
the message is shorter than about 16kb then it is quicker to
combine the "header" and the "body" of the message and send
them at once. */
if (length < (16*1024)) {
char *message;
int res;
message = PyMem_Malloc(length+4);
if (message == NULL)
return MP_MEMORY_ERROR;
*(UINT32*)message = htonl((UINT32)length);
memcpy(message+4, string, length);
res = _conn_sendall(conn->handle, message, length+4);
PyMem_Free(message);
return res;
} else {
UINT32 lenbuff;
if (length > MAX_MESSAGE_LENGTH)
return MP_BAD_MESSAGE_LENGTH;
lenbuff = htonl((UINT32)length);
return _conn_sendall(conn->handle, (char*)&lenbuff, 4) ||
_conn_sendall(conn->handle, string, length);
}
}
/*
* Attempts to read into buffer, or failing that into *newbuffer
*
* Returns number of bytes read.
*/
static Py_ssize_t
conn_recv_string(ConnectionObject *conn, char *buffer,
size_t buflength, char **newbuffer, size_t maxlength)
{
int res;
UINT32 ulength;
*newbuffer = NULL;
res = _conn_recvall(conn->handle, (char*)&ulength, 4);
if (res < 0)
return res;
ulength = ntohl(ulength);
if (ulength > maxlength)
return MP_BAD_MESSAGE_LENGTH;
if (ulength <= buflength) {
res = _conn_recvall(conn->handle, buffer, (size_t)ulength);
return res < 0 ? res : ulength;
} else {
*newbuffer = PyMem_Malloc((size_t)ulength);
if (*newbuffer == NULL)
return MP_MEMORY_ERROR;
res = _conn_recvall(conn->handle, *newbuffer, (size_t)ulength);
return res < 0 ? (Py_ssize_t)res : (Py_ssize_t)ulength;
}
}
/*
* Check whether any data is available for reading -- neg timeout blocks
*/
static int
conn_poll(ConnectionObject *conn, double timeout)
{
int res;
fd_set rfds;
FD_ZERO(&rfds);
FD_SET((SOCKET)conn->handle, &rfds);
if (timeout < 0.0) {
res = select((int)conn->handle+1, &rfds, NULL, NULL, NULL);
} else {
struct timeval tv;
tv.tv_sec = (long)timeout;
tv.tv_usec = (long)((timeout - tv.tv_sec) * 1e6 + 0.5);
res = select((int)conn->handle+1, &rfds, NULL, NULL, &tv);
}
if (res < 0) {
return MP_SOCKET_ERROR;
} else if (FD_ISSET(conn->handle, &rfds)) {
return TRUE;
} else {
assert(res == 0);
return FALSE;
}
}
/*
* "connection.h" defines the Connection type using defs above
*/
#define CONNECTION_NAME "Connection"
#define CONNECTION_TYPE ConnectionType
#include "connection.h"

View file

@ -0,0 +1,260 @@
/*
* Win32 functions used by multiprocessing package
*
* win32_functions.c
*
* Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
*/
#include "multiprocessing.h"
#define WIN32_FUNCTION(func) \
{#func, (PyCFunction)win32_ ## func, METH_VARARGS | METH_STATIC, ""}
#define WIN32_CONSTANT(fmt, con) \
PyDict_SetItemString(Win32Type.tp_dict, #con, Py_BuildValue(fmt, con))
static PyObject *
win32_CloseHandle(PyObject *self, PyObject *args)
{
HANDLE hObject;
BOOL success;
if (!PyArg_ParseTuple(args, F_HANDLE, &hObject))
return NULL;
Py_BEGIN_ALLOW_THREADS
success = CloseHandle(hObject);
Py_END_ALLOW_THREADS
if (!success)
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyObject *
win32_ConnectNamedPipe(PyObject *self, PyObject *args)
{
HANDLE hNamedPipe;
LPOVERLAPPED lpOverlapped;
BOOL success;
if (!PyArg_ParseTuple(args, F_HANDLE F_POINTER,
&hNamedPipe, &lpOverlapped))
return NULL;
Py_BEGIN_ALLOW_THREADS
success = ConnectNamedPipe(hNamedPipe, lpOverlapped);
Py_END_ALLOW_THREADS
if (!success)
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyObject *
win32_CreateFile(PyObject *self, PyObject *args)
{
LPCTSTR lpFileName;
DWORD dwDesiredAccess;
DWORD dwShareMode;
LPSECURITY_ATTRIBUTES lpSecurityAttributes;
DWORD dwCreationDisposition;
DWORD dwFlagsAndAttributes;
HANDLE hTemplateFile;
HANDLE handle;
if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_POINTER
F_DWORD F_DWORD F_HANDLE,
&lpFileName, &dwDesiredAccess, &dwShareMode,
&lpSecurityAttributes, &dwCreationDisposition,
&dwFlagsAndAttributes, &hTemplateFile))
return NULL;
Py_BEGIN_ALLOW_THREADS
handle = CreateFile(lpFileName, dwDesiredAccess,
dwShareMode, lpSecurityAttributes,
dwCreationDisposition,
dwFlagsAndAttributes, hTemplateFile);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE)
return PyErr_SetFromWindowsErr(0);
return Py_BuildValue(F_HANDLE, handle);
}
static PyObject *
win32_CreateNamedPipe(PyObject *self, PyObject *args)
{
LPCTSTR lpName;
DWORD dwOpenMode;
DWORD dwPipeMode;
DWORD nMaxInstances;
DWORD nOutBufferSize;
DWORD nInBufferSize;
DWORD nDefaultTimeOut;
LPSECURITY_ATTRIBUTES lpSecurityAttributes;
HANDLE handle;
if (!PyArg_ParseTuple(args, "s" F_DWORD F_DWORD F_DWORD
F_DWORD F_DWORD F_DWORD F_POINTER,
&lpName, &dwOpenMode, &dwPipeMode,
&nMaxInstances, &nOutBufferSize,
&nInBufferSize, &nDefaultTimeOut,
&lpSecurityAttributes))
return NULL;
Py_BEGIN_ALLOW_THREADS
handle = CreateNamedPipe(lpName, dwOpenMode, dwPipeMode,
nMaxInstances, nOutBufferSize,
nInBufferSize, nDefaultTimeOut,
lpSecurityAttributes);
Py_END_ALLOW_THREADS
if (handle == INVALID_HANDLE_VALUE)
return PyErr_SetFromWindowsErr(0);
return Py_BuildValue(F_HANDLE, handle);
}
static PyObject *
win32_ExitProcess(PyObject *self, PyObject *args)
{
UINT uExitCode;
if (!PyArg_ParseTuple(args, "I", &uExitCode))
return NULL;
ExitProcess(uExitCode);
return NULL;
}
static PyObject *
win32_GetLastError(PyObject *self, PyObject *args)
{
return Py_BuildValue(F_DWORD, GetLastError());
}
static PyObject *
win32_OpenProcess(PyObject *self, PyObject *args)
{
DWORD dwDesiredAccess;
BOOL bInheritHandle;
DWORD dwProcessId;
HANDLE handle;
if (!PyArg_ParseTuple(args, F_DWORD "i" F_DWORD,
&dwDesiredAccess, &bInheritHandle, &dwProcessId))
return NULL;
handle = OpenProcess(dwDesiredAccess, bInheritHandle, dwProcessId);
if (handle == NULL)
return PyErr_SetFromWindowsErr(0);
return Py_BuildValue(F_HANDLE, handle);
}
static PyObject *
win32_SetNamedPipeHandleState(PyObject *self, PyObject *args)
{
HANDLE hNamedPipe;
PyObject *oArgs[3];
DWORD dwArgs[3], *pArgs[3] = {NULL, NULL, NULL};
int i;
if (!PyArg_ParseTuple(args, F_HANDLE "OOO",
&hNamedPipe, &oArgs[0], &oArgs[1], &oArgs[2]))
return NULL;
PyErr_Clear();
for (i = 0 ; i < 3 ; i++) {
if (oArgs[i] != Py_None) {
dwArgs[i] = PyInt_AsUnsignedLongMask(oArgs[i]);
if (PyErr_Occurred())
return NULL;
pArgs[i] = &dwArgs[i];
}
}
if (!SetNamedPipeHandleState(hNamedPipe, pArgs[0], pArgs[1], pArgs[2]))
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyObject *
win32_WaitNamedPipe(PyObject *self, PyObject *args)
{
LPCTSTR lpNamedPipeName;
DWORD nTimeOut;
BOOL success;
if (!PyArg_ParseTuple(args, "s" F_DWORD, &lpNamedPipeName, &nTimeOut))
return NULL;
Py_BEGIN_ALLOW_THREADS
success = WaitNamedPipe(lpNamedPipeName, nTimeOut);
Py_END_ALLOW_THREADS
if (!success)
return PyErr_SetFromWindowsErr(0);
Py_RETURN_NONE;
}
static PyMethodDef win32_methods[] = {
WIN32_FUNCTION(CloseHandle),
WIN32_FUNCTION(GetLastError),
WIN32_FUNCTION(OpenProcess),
WIN32_FUNCTION(ExitProcess),
WIN32_FUNCTION(ConnectNamedPipe),
WIN32_FUNCTION(CreateFile),
WIN32_FUNCTION(CreateNamedPipe),
WIN32_FUNCTION(SetNamedPipeHandleState),
WIN32_FUNCTION(WaitNamedPipe),
{NULL}
};
PyTypeObject Win32Type = {
PyVarObject_HEAD_INIT(NULL, 0)
};
PyObject *
create_win32_namespace(void)
{
Win32Type.tp_name = "_multiprocessing.win32";
Win32Type.tp_methods = win32_methods;
if (PyType_Ready(&Win32Type) < 0)
return NULL;
Py_INCREF(&Win32Type);
WIN32_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_BUSY);
WIN32_CONSTANT(F_DWORD, ERROR_PIPE_CONNECTED);
WIN32_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT);
WIN32_CONSTANT(F_DWORD, GENERIC_READ);
WIN32_CONSTANT(F_DWORD, GENERIC_WRITE);
WIN32_CONSTANT(F_DWORD, INFINITE);
WIN32_CONSTANT(F_DWORD, NMPWAIT_WAIT_FOREVER);
WIN32_CONSTANT(F_DWORD, OPEN_EXISTING);
WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_DUPLEX);
WIN32_CONSTANT(F_DWORD, PIPE_ACCESS_INBOUND);
WIN32_CONSTANT(F_DWORD, PIPE_READMODE_MESSAGE);
WIN32_CONSTANT(F_DWORD, PIPE_TYPE_MESSAGE);
WIN32_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES);
WIN32_CONSTANT(F_DWORD, PIPE_WAIT);
WIN32_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
WIN32_CONSTANT("i", NULL);
return (PyObject*)&Win32Type;
}