darn! I converted half of the files the wrong way.

This commit is contained in:
Benjamin Peterson 2008-06-13 19:20:48 +00:00
parent dfd79494ce
commit 7f03ea77bf
12 changed files with 4726 additions and 4726 deletions

View file

@ -1,271 +1,271 @@
# #
# Package analogous to 'threading.py' but using processes # Package analogous to 'threading.py' but using processes
# #
# multiprocessing/__init__.py # multiprocessing/__init__.py
# #
# This package is intended to duplicate the functionality (and much of # This package is intended to duplicate the functionality (and much of
# the API) of threading.py but uses processes instead of threads. A # the API) of threading.py but uses processes instead of threads. A
# subpackage 'multiprocessing.dummy' has the same API but is a simple # subpackage 'multiprocessing.dummy' has the same API but is a simple
# wrapper for 'threading'. # wrapper for 'threading'.
# #
# Try calling `multiprocessing.doc.main()` to read the html # Try calling `multiprocessing.doc.main()` to read the html
# documentation in in a webbrowser. # documentation in in a webbrowser.
# #
# #
# Copyright (c) 2006-2008, R Oudkerk # Copyright (c) 2006-2008, R Oudkerk
# All rights reserved. # All rights reserved.
# #
# Redistribution and use in source and binary forms, with or without # Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions # modification, are permitted provided that the following conditions
# are met: # are met:
# #
# 1. Redistributions of source code must retain the above copyright # 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer. # notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright # 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the # notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution. # documentation and/or other materials provided with the distribution.
# 3. Neither the name of author nor the names of any contributors may be # 3. Neither the name of author nor the names of any contributors may be
# used to endorse or promote products derived from this software # used to endorse or promote products derived from this software
# without specific prior written permission. # without specific prior written permission.
# #
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# #
__version__ = '0.70a1' __version__ = '0.70a1'
__all__ = [ __all__ = [
'Process', 'current_process', 'active_children', 'freeze_support', 'Process', 'current_process', 'active_children', 'freeze_support',
'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger', 'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
'allow_connection_pickling', 'BufferTooShort', 'TimeoutError', 'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array', 'Event', 'Queue', 'JoinableQueue', 'Pool', 'Value', 'Array',
'RawValue', 'RawArray' 'RawValue', 'RawArray'
] ]
__author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)' __author__ = 'R. Oudkerk (r.m.oudkerk@gmail.com)'
# #
# Imports # Imports
# #
import os import os
import sys import sys
from multiprocessing.process import Process, current_process, active_children from multiprocessing.process import Process, current_process, active_children
# #
# Exceptions # Exceptions
# #
class ProcessError(Exception): class ProcessError(Exception):
pass pass
class BufferTooShort(ProcessError): class BufferTooShort(ProcessError):
pass pass
class TimeoutError(ProcessError): class TimeoutError(ProcessError):
pass pass
class AuthenticationError(ProcessError): class AuthenticationError(ProcessError):
pass pass
# This is down here because _multiprocessing uses BufferTooShort # This is down here because _multiprocessing uses BufferTooShort
import _multiprocessing import _multiprocessing
# #
# Definitions not depending on native semaphores # Definitions not depending on native semaphores
# #
def Manager(): def Manager():
''' '''
Returns a manager associated with a running server process Returns a manager associated with a running server process
The managers methods such as `Lock()`, `Condition()` and `Queue()` The managers methods such as `Lock()`, `Condition()` and `Queue()`
can be used to create shared objects. can be used to create shared objects.
''' '''
from multiprocessing.managers import SyncManager from multiprocessing.managers import SyncManager
m = SyncManager() m = SyncManager()
m.start() m.start()
return m return m
def Pipe(duplex=True): def Pipe(duplex=True):
''' '''
Returns two connection object connected by a pipe Returns two connection object connected by a pipe
''' '''
from multiprocessing.connection import Pipe from multiprocessing.connection import Pipe
return Pipe(duplex) return Pipe(duplex)
def cpu_count(): def cpu_count():
''' '''
Returns the number of CPUs in the system Returns the number of CPUs in the system
''' '''
if sys.platform == 'win32': if sys.platform == 'win32':
try: try:
num = int(os.environ['NUMBER_OF_PROCESSORS']) num = int(os.environ['NUMBER_OF_PROCESSORS'])
except (ValueError, KeyError): except (ValueError, KeyError):
num = 0 num = 0
elif sys.platform == 'darwin': elif sys.platform == 'darwin':
try: try:
num = int(os.popen('sysctl -n hw.ncpu').read()) num = int(os.popen('sysctl -n hw.ncpu').read())
except ValueError: except ValueError:
num = 0 num = 0
else: else:
try: try:
num = os.sysconf('SC_NPROCESSORS_ONLN') num = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, OSError, AttributeError): except (ValueError, OSError, AttributeError):
num = 0 num = 0
if num >= 1: if num >= 1:
return num return num
else: else:
raise NotImplementedError('cannot determine number of cpus') raise NotImplementedError('cannot determine number of cpus')
def freeze_support(): def freeze_support():
''' '''
Check whether this is a fake forked process in a frozen executable. Check whether this is a fake forked process in a frozen executable.
If so then run code specified by commandline and exit. If so then run code specified by commandline and exit.
''' '''
if sys.platform == 'win32' and getattr(sys, 'frozen', False): if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from multiprocessing.forking import freeze_support from multiprocessing.forking import freeze_support
freeze_support() freeze_support()
def get_logger(): def get_logger():
''' '''
Return package logger -- if it does not already exist then it is created Return package logger -- if it does not already exist then it is created
''' '''
from multiprocessing.util import get_logger from multiprocessing.util import get_logger
return get_logger() return get_logger()
def log_to_stderr(level=None): def log_to_stderr(level=None):
''' '''
Turn on logging and add a handler which prints to stderr Turn on logging and add a handler which prints to stderr
''' '''
from multiprocessing.util import log_to_stderr from multiprocessing.util import log_to_stderr
return log_to_stderr(level) return log_to_stderr(level)
def allow_connection_pickling(): def allow_connection_pickling():
''' '''
Install support for sending connections and sockets between processes Install support for sending connections and sockets between processes
''' '''
from multiprocessing import reduction from multiprocessing import reduction
# #
# Definitions depending on native semaphores # Definitions depending on native semaphores
# #
def Lock(): def Lock():
''' '''
Returns a non-recursive lock object Returns a non-recursive lock object
''' '''
from multiprocessing.synchronize import Lock from multiprocessing.synchronize import Lock
return Lock() return Lock()
def RLock(): def RLock():
''' '''
Returns a recursive lock object Returns a recursive lock object
''' '''
from multiprocessing.synchronize import RLock from multiprocessing.synchronize import RLock
return RLock() return RLock()
def Condition(lock=None): def Condition(lock=None):
''' '''
Returns a condition object Returns a condition object
''' '''
from multiprocessing.synchronize import Condition from multiprocessing.synchronize import Condition
return Condition(lock) return Condition(lock)
def Semaphore(value=1): def Semaphore(value=1):
''' '''
Returns a semaphore object Returns a semaphore object
''' '''
from multiprocessing.synchronize import Semaphore from multiprocessing.synchronize import Semaphore
return Semaphore(value) return Semaphore(value)
def BoundedSemaphore(value=1): def BoundedSemaphore(value=1):
''' '''
Returns a bounded semaphore object Returns a bounded semaphore object
''' '''
from multiprocessing.synchronize import BoundedSemaphore from multiprocessing.synchronize import BoundedSemaphore
return BoundedSemaphore(value) return BoundedSemaphore(value)
def Event(): def Event():
''' '''
Returns an event object Returns an event object
''' '''
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event
return Event() return Event()
def Queue(maxsize=0): def Queue(maxsize=0):
''' '''
Returns a queue object Returns a queue object
''' '''
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
return Queue(maxsize) return Queue(maxsize)
def JoinableQueue(maxsize=0): def JoinableQueue(maxsize=0):
''' '''
Returns a queue object Returns a queue object
''' '''
from multiprocessing.queues import JoinableQueue from multiprocessing.queues import JoinableQueue
return JoinableQueue(maxsize) return JoinableQueue(maxsize)
def Pool(processes=None, initializer=None, initargs=()): def Pool(processes=None, initializer=None, initargs=()):
''' '''
Returns a process pool object Returns a process pool object
''' '''
from multiprocessing.pool import Pool from multiprocessing.pool import Pool
return Pool(processes, initializer, initargs) return Pool(processes, initializer, initargs)
def RawValue(typecode_or_type, *args): def RawValue(typecode_or_type, *args):
''' '''
Returns a shared object Returns a shared object
''' '''
from multiprocessing.sharedctypes import RawValue from multiprocessing.sharedctypes import RawValue
return RawValue(typecode_or_type, *args) return RawValue(typecode_or_type, *args)
def RawArray(typecode_or_type, size_or_initializer): def RawArray(typecode_or_type, size_or_initializer):
''' '''
Returns a shared array Returns a shared array
''' '''
from multiprocessing.sharedctypes import RawArray from multiprocessing.sharedctypes import RawArray
return RawArray(typecode_or_type, size_or_initializer) return RawArray(typecode_or_type, size_or_initializer)
def Value(typecode_or_type, *args, **kwds): def Value(typecode_or_type, *args, **kwds):
''' '''
Returns a synchronized shared object Returns a synchronized shared object
''' '''
from multiprocessing.sharedctypes import Value from multiprocessing.sharedctypes import Value
return Value(typecode_or_type, *args, **kwds) return Value(typecode_or_type, *args, **kwds)
def Array(typecode_or_type, size_or_initializer, **kwds): def Array(typecode_or_type, size_or_initializer, **kwds):
''' '''
Returns a synchronized shared array Returns a synchronized shared array
''' '''
from multiprocessing.sharedctypes import Array from multiprocessing.sharedctypes import Array
return Array(typecode_or_type, size_or_initializer, **kwds) return Array(typecode_or_type, size_or_initializer, **kwds)
# #
# #
# #
if sys.platform == 'win32': if sys.platform == 'win32':
def set_executable(executable): def set_executable(executable):
''' '''
Sets the path to a python.exe or pythonw.exe binary used to run Sets the path to a python.exe or pythonw.exe binary used to run
child processes on Windows instead of sys.executable. child processes on Windows instead of sys.executable.
Useful for people embedding Python. Useful for people embedding Python.
''' '''
from multiprocessing.forking import set_executable from multiprocessing.forking import set_executable
set_executable(executable) set_executable(executable)
__all__ += ['set_executable'] __all__ += ['set_executable']

View file

@ -1,425 +1,425 @@
# #
# A higher level module for using sockets (or Windows named pipes) # A higher level module for using sockets (or Windows named pipes)
# #
# multiprocessing/connection.py # multiprocessing/connection.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = [ 'Client', 'Listener', 'Pipe' ] __all__ = [ 'Client', 'Listener', 'Pipe' ]
import os import os
import sys import sys
import socket import socket
import time import time
import tempfile import tempfile
import itertools import itertools
import _multiprocessing import _multiprocessing
from multiprocessing import current_process from multiprocessing import current_process
from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
from multiprocessing.forking import duplicate, close from multiprocessing.forking import duplicate, close
# #
# #
# #
BUFSIZE = 8192 BUFSIZE = 8192
_mmap_counter = itertools.count() _mmap_counter = itertools.count()
default_family = 'AF_INET' default_family = 'AF_INET'
families = ['AF_INET'] families = ['AF_INET']
if hasattr(socket, 'AF_UNIX'): if hasattr(socket, 'AF_UNIX'):
default_family = 'AF_UNIX' default_family = 'AF_UNIX'
families += ['AF_UNIX'] families += ['AF_UNIX']
if sys.platform == 'win32': if sys.platform == 'win32':
default_family = 'AF_PIPE' default_family = 'AF_PIPE'
families += ['AF_PIPE'] families += ['AF_PIPE']
# #
# #
# #
def arbitrary_address(family): def arbitrary_address(family):
''' '''
Return an arbitrary free address for the given family Return an arbitrary free address for the given family
''' '''
if family == 'AF_INET': if family == 'AF_INET':
return ('localhost', 0) return ('localhost', 0)
elif family == 'AF_UNIX': elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir()) return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
elif family == 'AF_PIPE': elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), _mmap_counter.next())) (os.getpid(), _mmap_counter.next()))
else: else:
raise ValueError('unrecognized family') raise ValueError('unrecognized family')
def address_type(address): def address_type(address):
''' '''
Return the types of the address Return the types of the address
This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
''' '''
if type(address) == tuple: if type(address) == tuple:
return 'AF_INET' return 'AF_INET'
elif type(address) is str and address.startswith('\\\\'): elif type(address) is str and address.startswith('\\\\'):
return 'AF_PIPE' return 'AF_PIPE'
elif type(address) is str: elif type(address) is str:
return 'AF_UNIX' return 'AF_UNIX'
else: else:
raise ValueError('address type of %r unrecognized' % address) raise ValueError('address type of %r unrecognized' % address)
# #
# Public functions # Public functions
# #
class Listener(object): class Listener(object):
''' '''
Returns a listener object. Returns a listener object.
This is a wrapper for a bound socket which is 'listening' for This is a wrapper for a bound socket which is 'listening' for
connections, or for a Windows named pipe. connections, or for a Windows named pipe.
''' '''
def __init__(self, address=None, family=None, backlog=1, authkey=None): def __init__(self, address=None, family=None, backlog=1, authkey=None):
family = family or (address and address_type(address)) \ family = family or (address and address_type(address)) \
or default_family or default_family
address = address or arbitrary_address(family) address = address or arbitrary_address(family)
if family == 'AF_PIPE': if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog) self._listener = PipeListener(address, backlog)
else: else:
self._listener = SocketListener(address, family, backlog) self._listener = SocketListener(address, family, backlog)
if authkey is not None and not isinstance(authkey, bytes): if authkey is not None and not isinstance(authkey, bytes):
raise TypeError, 'authkey should be a byte string' raise TypeError, 'authkey should be a byte string'
self._authkey = authkey self._authkey = authkey
def accept(self): def accept(self):
''' '''
Accept a connection on the bound socket or named pipe of `self`. Accept a connection on the bound socket or named pipe of `self`.
Returns a `Connection` object. Returns a `Connection` object.
''' '''
c = self._listener.accept() c = self._listener.accept()
if self._authkey: if self._authkey:
deliver_challenge(c, self._authkey) deliver_challenge(c, self._authkey)
answer_challenge(c, self._authkey) answer_challenge(c, self._authkey)
return c return c
def close(self): def close(self):
''' '''
Close the bound socket or named pipe of `self`. Close the bound socket or named pipe of `self`.
''' '''
return self._listener.close() return self._listener.close()
address = property(lambda self: self._listener._address) address = property(lambda self: self._listener._address)
last_accepted = property(lambda self: self._listener._last_accepted) last_accepted = property(lambda self: self._listener._last_accepted)
def Client(address, family=None, authkey=None): def Client(address, family=None, authkey=None):
''' '''
Returns a connection to the address of a `Listener` Returns a connection to the address of a `Listener`
''' '''
family = family or address_type(address) family = family or address_type(address)
if family == 'AF_PIPE': if family == 'AF_PIPE':
c = PipeClient(address) c = PipeClient(address)
else: else:
c = SocketClient(address) c = SocketClient(address)
if authkey is not None and not isinstance(authkey, bytes): if authkey is not None and not isinstance(authkey, bytes):
raise TypeError, 'authkey should be a byte string' raise TypeError, 'authkey should be a byte string'
if authkey is not None: if authkey is not None:
answer_challenge(c, authkey) answer_challenge(c, authkey)
deliver_challenge(c, authkey) deliver_challenge(c, authkey)
return c return c
if sys.platform != 'win32': if sys.platform != 'win32':
def Pipe(duplex=True): def Pipe(duplex=True):
''' '''
Returns pair of connection objects at either end of a pipe Returns pair of connection objects at either end of a pipe
''' '''
if duplex: if duplex:
s1, s2 = socket.socketpair() s1, s2 = socket.socketpair()
c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
c2 = _multiprocessing.Connection(os.dup(s2.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
s1.close() s1.close()
s2.close() s2.close()
else: else:
fd1, fd2 = os.pipe() fd1, fd2 = os.pipe()
c1 = _multiprocessing.Connection(fd1, writable=False) c1 = _multiprocessing.Connection(fd1, writable=False)
c2 = _multiprocessing.Connection(fd2, readable=False) c2 = _multiprocessing.Connection(fd2, readable=False)
return c1, c2 return c1, c2
else: else:
from ._multiprocessing import win32 from ._multiprocessing import win32
def Pipe(duplex=True): def Pipe(duplex=True):
''' '''
Returns pair of connection objects at either end of a pipe Returns pair of connection objects at either end of a pipe
''' '''
address = arbitrary_address('AF_PIPE') address = arbitrary_address('AF_PIPE')
if duplex: if duplex:
openmode = win32.PIPE_ACCESS_DUPLEX openmode = win32.PIPE_ACCESS_DUPLEX
access = win32.GENERIC_READ | win32.GENERIC_WRITE access = win32.GENERIC_READ | win32.GENERIC_WRITE
obsize, ibsize = BUFSIZE, BUFSIZE obsize, ibsize = BUFSIZE, BUFSIZE
else: else:
openmode = win32.PIPE_ACCESS_INBOUND openmode = win32.PIPE_ACCESS_INBOUND
access = win32.GENERIC_WRITE access = win32.GENERIC_WRITE
obsize, ibsize = 0, BUFSIZE obsize, ibsize = 0, BUFSIZE
h1 = win32.CreateNamedPipe( h1 = win32.CreateNamedPipe(
address, openmode, address, openmode,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT, win32.PIPE_WAIT,
1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
) )
h2 = win32.CreateFile( h2 = win32.CreateFile(
address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
) )
win32.SetNamedPipeHandleState( win32.SetNamedPipeHandleState(
h2, win32.PIPE_READMODE_MESSAGE, None, None h2, win32.PIPE_READMODE_MESSAGE, None, None
) )
try: try:
win32.ConnectNamedPipe(h1, win32.NULL) win32.ConnectNamedPipe(h1, win32.NULL)
except WindowsError, e: except WindowsError, e:
if e.args[0] != win32.ERROR_PIPE_CONNECTED: if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise raise
c1 = _multiprocessing.PipeConnection(h1, writable=duplex) c1 = _multiprocessing.PipeConnection(h1, writable=duplex)
c2 = _multiprocessing.PipeConnection(h2, readable=duplex) c2 = _multiprocessing.PipeConnection(h2, readable=duplex)
return c1, c2 return c1, c2
# #
# Definitions for connections based on sockets # Definitions for connections based on sockets
# #
class SocketListener(object): class SocketListener(object):
''' '''
Represtation of a socket which is bound to an address and listening Represtation of a socket which is bound to an address and listening
''' '''
def __init__(self, address, family, backlog=1): def __init__(self, address, family, backlog=1):
self._socket = socket.socket(getattr(socket, family)) self._socket = socket.socket(getattr(socket, family))
self._socket.bind(address) self._socket.bind(address)
self._socket.listen(backlog) self._socket.listen(backlog)
address = self._socket.getsockname() address = self._socket.getsockname()
if type(address) is tuple: if type(address) is tuple:
address = (socket.getfqdn(address[0]),) + address[1:] address = (socket.getfqdn(address[0]),) + address[1:]
self._address = address self._address = address
self._family = family self._family = family
self._last_accepted = None self._last_accepted = None
sub_debug('listener bound to address %r', self._address) sub_debug('listener bound to address %r', self._address)
if family == 'AF_UNIX': if family == 'AF_UNIX':
self._unlink = Finalize( self._unlink = Finalize(
self, os.unlink, args=(self._address,), exitpriority=0 self, os.unlink, args=(self._address,), exitpriority=0
) )
else: else:
self._unlink = None self._unlink = None
def accept(self): def accept(self):
s, self._last_accepted = self._socket.accept() s, self._last_accepted = self._socket.accept()
fd = duplicate(s.fileno()) fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd) conn = _multiprocessing.Connection(fd)
s.close() s.close()
return conn return conn
def close(self): def close(self):
self._socket.close() self._socket.close()
if self._unlink is not None: if self._unlink is not None:
self._unlink() self._unlink()
def SocketClient(address): def SocketClient(address):
''' '''
Return a connection object connected to the socket given by `address` Return a connection object connected to the socket given by `address`
''' '''
family = address_type(address) family = address_type(address)
s = socket.socket( getattr(socket, family) ) s = socket.socket( getattr(socket, family) )
while 1: while 1:
try: try:
s.connect(address) s.connect(address)
except socket.error, e: except socket.error, e:
if e.args[0] != 10061: # 10061 => connection refused if e.args[0] != 10061: # 10061 => connection refused
debug('failed to connect to address %s', address) debug('failed to connect to address %s', address)
raise raise
time.sleep(0.01) time.sleep(0.01)
else: else:
break break
else: else:
raise raise
fd = duplicate(s.fileno()) fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd) conn = _multiprocessing.Connection(fd)
s.close() s.close()
return conn return conn
# #
# Definitions for connections based on named pipes # Definitions for connections based on named pipes
# #
if sys.platform == 'win32': if sys.platform == 'win32':
class PipeListener(object): class PipeListener(object):
''' '''
Representation of a named pipe Representation of a named pipe
''' '''
def __init__(self, address, backlog=None): def __init__(self, address, backlog=None):
self._address = address self._address = address
handle = win32.CreateNamedPipe( handle = win32.CreateNamedPipe(
address, win32.PIPE_ACCESS_DUPLEX, address, win32.PIPE_ACCESS_DUPLEX,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT, win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL win32.NMPWAIT_WAIT_FOREVER, win32.NULL
) )
self._handle_queue = [handle] self._handle_queue = [handle]
self._last_accepted = None self._last_accepted = None
sub_debug('listener created with address=%r', self._address) sub_debug('listener created with address=%r', self._address)
self.close = Finalize( self.close = Finalize(
self, PipeListener._finalize_pipe_listener, self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0 args=(self._handle_queue, self._address), exitpriority=0
) )
def accept(self): def accept(self):
newhandle = win32.CreateNamedPipe( newhandle = win32.CreateNamedPipe(
self._address, win32.PIPE_ACCESS_DUPLEX, self._address, win32.PIPE_ACCESS_DUPLEX,
win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
win32.PIPE_WAIT, win32.PIPE_WAIT,
win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, win32.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE,
win32.NMPWAIT_WAIT_FOREVER, win32.NULL win32.NMPWAIT_WAIT_FOREVER, win32.NULL
) )
self._handle_queue.append(newhandle) self._handle_queue.append(newhandle)
handle = self._handle_queue.pop(0) handle = self._handle_queue.pop(0)
try: try:
win32.ConnectNamedPipe(handle, win32.NULL) win32.ConnectNamedPipe(handle, win32.NULL)
except WindowsError, e: except WindowsError, e:
if e.args[0] != win32.ERROR_PIPE_CONNECTED: if e.args[0] != win32.ERROR_PIPE_CONNECTED:
raise raise
return _multiprocessing.PipeConnection(handle) return _multiprocessing.PipeConnection(handle)
@staticmethod @staticmethod
def _finalize_pipe_listener(queue, address): def _finalize_pipe_listener(queue, address):
sub_debug('closing listener with address=%r', address) sub_debug('closing listener with address=%r', address)
for handle in queue: for handle in queue:
close(handle) close(handle)
def PipeClient(address): def PipeClient(address):
''' '''
Return a connection object connected to the pipe given by `address` Return a connection object connected to the pipe given by `address`
''' '''
while 1: while 1:
try: try:
win32.WaitNamedPipe(address, 1000) win32.WaitNamedPipe(address, 1000)
h = win32.CreateFile( h = win32.CreateFile(
address, win32.GENERIC_READ | win32.GENERIC_WRITE, address, win32.GENERIC_READ | win32.GENERIC_WRITE,
0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
) )
except WindowsError, e: except WindowsError, e:
if e.args[0] not in (win32.ERROR_SEM_TIMEOUT, if e.args[0] not in (win32.ERROR_SEM_TIMEOUT,
win32.ERROR_PIPE_BUSY): win32.ERROR_PIPE_BUSY):
raise raise
else: else:
break break
else: else:
raise raise
win32.SetNamedPipeHandleState( win32.SetNamedPipeHandleState(
h, win32.PIPE_READMODE_MESSAGE, None, None h, win32.PIPE_READMODE_MESSAGE, None, None
) )
return _multiprocessing.PipeConnection(h) return _multiprocessing.PipeConnection(h)
# #
# Authentication stuff # Authentication stuff
# #
MESSAGE_LENGTH = 20 MESSAGE_LENGTH = 20
CHALLENGE = '#CHALLENGE#' CHALLENGE = '#CHALLENGE#'
WELCOME = '#WELCOME#' WELCOME = '#WELCOME#'
FAILURE = '#FAILURE#' FAILURE = '#FAILURE#'
if sys.version_info >= (3, 0): # XXX can use bytes literals in 2.6/3.0 if sys.version_info >= (3, 0): # XXX can use bytes literals in 2.6/3.0
CHALLENGE = CHALLENGE.encode('ascii') CHALLENGE = CHALLENGE.encode('ascii')
WELCOME = WELCOME.encode('ascii') WELCOME = WELCOME.encode('ascii')
FAILURE = FAILURE.encode('ascii') FAILURE = FAILURE.encode('ascii')
def deliver_challenge(connection, authkey): def deliver_challenge(connection, authkey):
import hmac import hmac
assert isinstance(authkey, bytes) assert isinstance(authkey, bytes)
message = os.urandom(MESSAGE_LENGTH) message = os.urandom(MESSAGE_LENGTH)
connection.send_bytes(CHALLENGE + message) connection.send_bytes(CHALLENGE + message)
digest = hmac.new(authkey, message).digest() digest = hmac.new(authkey, message).digest()
response = connection.recv_bytes(256) # reject large message response = connection.recv_bytes(256) # reject large message
if response == digest: if response == digest:
connection.send_bytes(WELCOME) connection.send_bytes(WELCOME)
else: else:
connection.send_bytes(FAILURE) connection.send_bytes(FAILURE)
raise AuthenticationError('digest received was wrong') raise AuthenticationError('digest received was wrong')
def answer_challenge(connection, authkey): def answer_challenge(connection, authkey):
import hmac import hmac
assert isinstance(authkey, bytes) assert isinstance(authkey, bytes)
message = connection.recv_bytes(256) # reject large message message = connection.recv_bytes(256) # reject large message
assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message
message = message[len(CHALLENGE):] message = message[len(CHALLENGE):]
digest = hmac.new(authkey, message).digest() digest = hmac.new(authkey, message).digest()
connection.send_bytes(digest) connection.send_bytes(digest)
response = connection.recv_bytes(256) # reject large message response = connection.recv_bytes(256) # reject large message
if response != WELCOME: if response != WELCOME:
raise AuthenticationError('digest sent was rejected') raise AuthenticationError('digest sent was rejected')
# #
# Support for using xmlrpclib for serialization # Support for using xmlrpclib for serialization
# #
class ConnectionWrapper(object): class ConnectionWrapper(object):
def __init__(self, conn, dumps, loads): def __init__(self, conn, dumps, loads):
self._conn = conn self._conn = conn
self._dumps = dumps self._dumps = dumps
self._loads = loads self._loads = loads
for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
obj = getattr(conn, attr) obj = getattr(conn, attr)
setattr(self, attr, obj) setattr(self, attr, obj)
def send(self, obj): def send(self, obj):
s = self._dumps(obj) s = self._dumps(obj)
self._conn.send_bytes(s) self._conn.send_bytes(s)
def recv(self): def recv(self):
s = self._conn.recv_bytes() s = self._conn.recv_bytes()
return self._loads(s) return self._loads(s)
def _xml_dumps(obj): def _xml_dumps(obj):
return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8') return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
def _xml_loads(s): def _xml_loads(s):
(obj,), method = xmlrpclib.loads(s.decode('utf8')) (obj,), method = xmlrpclib.loads(s.decode('utf8'))
return obj return obj
class XmlListener(Listener): class XmlListener(Listener):
def accept(self): def accept(self):
global xmlrpclib global xmlrpclib
import xmlrpclib import xmlrpclib
obj = Listener.accept(self) obj = Listener.accept(self)
return ConnectionWrapper(obj, _xml_dumps, _xml_loads) return ConnectionWrapper(obj, _xml_dumps, _xml_loads)
def XmlClient(*args, **kwds): def XmlClient(*args, **kwds):
global xmlrpclib global xmlrpclib
import xmlrpclib import xmlrpclib
return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads)

View file

@ -1,429 +1,429 @@
# #
# Module for starting a process object using os.fork() or CreateProcess() # Module for starting a process object using os.fork() or CreateProcess()
# #
# multiprocessing/forking.py # multiprocessing/forking.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
import os import os
import sys import sys
import signal import signal
from multiprocessing import util, process from multiprocessing import util, process
__all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close'] __all__ = ['Popen', 'assert_spawning', 'exit', 'duplicate', 'close']
# #
# Check that the current thread is spawning a child process # Check that the current thread is spawning a child process
# #
def assert_spawning(self): def assert_spawning(self):
if not Popen.thread_is_spawning(): if not Popen.thread_is_spawning():
raise RuntimeError( raise RuntimeError(
'%s objects should only be shared between processes' '%s objects should only be shared between processes'
' through inheritance' % type(self).__name__ ' through inheritance' % type(self).__name__
) )
# #
# Unix # Unix
# #
if sys.platform != 'win32': if sys.platform != 'win32':
import time import time
exit = os._exit exit = os._exit
duplicate = os.dup duplicate = os.dup
close = os.close close = os.close
# #
# We define a Popen class similar to the one from subprocess, but # We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument. # whose constructor takes a process object as its argument.
# #
class Popen(object): class Popen(object):
def __init__(self, process_obj): def __init__(self, process_obj):
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
self.returncode = None self.returncode = None
self.pid = os.fork() self.pid = os.fork()
if self.pid == 0: if self.pid == 0:
if 'random' in sys.modules: if 'random' in sys.modules:
import random import random
random.seed() random.seed()
code = process_obj._bootstrap() code = process_obj._bootstrap()
sys.stdout.flush() sys.stdout.flush()
sys.stderr.flush() sys.stderr.flush()
os._exit(code) os._exit(code)
def poll(self, flag=os.WNOHANG): def poll(self, flag=os.WNOHANG):
if self.returncode is None: if self.returncode is None:
pid, sts = os.waitpid(self.pid, flag) pid, sts = os.waitpid(self.pid, flag)
if pid == self.pid: if pid == self.pid:
if os.WIFSIGNALED(sts): if os.WIFSIGNALED(sts):
self.returncode = -os.WTERMSIG(sts) self.returncode = -os.WTERMSIG(sts)
else: else:
assert os.WIFEXITED(sts) assert os.WIFEXITED(sts)
self.returncode = os.WEXITSTATUS(sts) self.returncode = os.WEXITSTATUS(sts)
return self.returncode return self.returncode
def wait(self, timeout=None): def wait(self, timeout=None):
if timeout is None: if timeout is None:
return self.poll(0) return self.poll(0)
deadline = time.time() + timeout deadline = time.time() + timeout
delay = 0.0005 delay = 0.0005
while 1: while 1:
res = self.poll() res = self.poll()
if res is not None: if res is not None:
break break
remaining = deadline - time.time() remaining = deadline - time.time()
if remaining <= 0: if remaining <= 0:
break break
delay = min(delay * 2, remaining, 0.05) delay = min(delay * 2, remaining, 0.05)
time.sleep(delay) time.sleep(delay)
return res return res
def terminate(self): def terminate(self):
if self.returncode is None: if self.returncode is None:
try: try:
os.kill(self.pid, signal.SIGTERM) os.kill(self.pid, signal.SIGTERM)
except OSError, e: except OSError, e:
if self.wait(timeout=0.1) is None: if self.wait(timeout=0.1) is None:
raise raise
@staticmethod @staticmethod
def thread_is_spawning(): def thread_is_spawning():
return False return False
# #
# Windows # Windows
# #
else: else:
import thread import thread
import msvcrt import msvcrt
import _subprocess import _subprocess
import copy_reg import copy_reg
import time import time
from ._multiprocessing import win32, Connection, PipeConnection from ._multiprocessing import win32, Connection, PipeConnection
from .util import Finalize from .util import Finalize
try: try:
from cPickle import dump, load, HIGHEST_PROTOCOL from cPickle import dump, load, HIGHEST_PROTOCOL
except ImportError: except ImportError:
from pickle import dump, load, HIGHEST_PROTOCOL from pickle import dump, load, HIGHEST_PROTOCOL
# #
# #
# #
TERMINATE = 0x10000 TERMINATE = 0x10000
WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
exit = win32.ExitProcess exit = win32.ExitProcess
close = win32.CloseHandle close = win32.CloseHandle
# #
# _python_exe is the assumed path to the python executable. # _python_exe is the assumed path to the python executable.
# People embedding Python want to modify it. # People embedding Python want to modify it.
# #
if sys.executable.lower().endswith('pythonservice.exe'): if sys.executable.lower().endswith('pythonservice.exe'):
_python_exe = os.path.join(sys.exec_prefix, 'python.exe') _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
else: else:
_python_exe = sys.executable _python_exe = sys.executable
def set_executable(exe): def set_executable(exe):
global _python_exe global _python_exe
_python_exe = exe _python_exe = exe
# #
# #
# #
def duplicate(handle, target_process=None, inheritable=False): def duplicate(handle, target_process=None, inheritable=False):
if target_process is None: if target_process is None:
target_process = _subprocess.GetCurrentProcess() target_process = _subprocess.GetCurrentProcess()
return _subprocess.DuplicateHandle( return _subprocess.DuplicateHandle(
_subprocess.GetCurrentProcess(), handle, target_process, _subprocess.GetCurrentProcess(), handle, target_process,
0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS 0, inheritable, _subprocess.DUPLICATE_SAME_ACCESS
).Detach() ).Detach()
# #
# We define a Popen class similar to the one from subprocess, but # We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument. # whose constructor takes a process object as its argument.
# #
class Popen(object): class Popen(object):
''' '''
Start a subprocess to run the code of a process object Start a subprocess to run the code of a process object
''' '''
_tls = thread._local() _tls = thread._local()
def __init__(self, process_obj): def __init__(self, process_obj):
# create pipe for communication with child # create pipe for communication with child
rfd, wfd = os.pipe() rfd, wfd = os.pipe()
# get handle for read end of the pipe and make it inheritable # get handle for read end of the pipe and make it inheritable
rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True) rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
os.close(rfd) os.close(rfd)
# start process # start process
cmd = get_command_line() + [rhandle] cmd = get_command_line() + [rhandle]
cmd = ' '.join('"%s"' % x for x in cmd) cmd = ' '.join('"%s"' % x for x in cmd)
hp, ht, pid, tid = _subprocess.CreateProcess( hp, ht, pid, tid = _subprocess.CreateProcess(
_python_exe, cmd, None, None, 1, 0, None, None, None _python_exe, cmd, None, None, 1, 0, None, None, None
) )
ht.Close() ht.Close()
close(rhandle) close(rhandle)
# set attributes of self # set attributes of self
self.pid = pid self.pid = pid
self.returncode = None self.returncode = None
self._handle = hp self._handle = hp
# send information to child # send information to child
prep_data = get_preparation_data(process_obj._name) prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb') to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp) Popen._tls.process_handle = int(hp)
try: try:
dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally: finally:
del Popen._tls.process_handle del Popen._tls.process_handle
to_child.close() to_child.close()
@staticmethod @staticmethod
def thread_is_spawning(): def thread_is_spawning():
return getattr(Popen._tls, 'process_handle', None) is not None return getattr(Popen._tls, 'process_handle', None) is not None
@staticmethod @staticmethod
def duplicate_for_child(handle): def duplicate_for_child(handle):
return duplicate(handle, Popen._tls.process_handle) return duplicate(handle, Popen._tls.process_handle)
def wait(self, timeout=None): def wait(self, timeout=None):
if self.returncode is None: if self.returncode is None:
if timeout is None: if timeout is None:
msecs = _subprocess.INFINITE msecs = _subprocess.INFINITE
else: else:
msecs = max(0, int(timeout * 1000 + 0.5)) msecs = max(0, int(timeout * 1000 + 0.5))
res = _subprocess.WaitForSingleObject(int(self._handle), msecs) res = _subprocess.WaitForSingleObject(int(self._handle), msecs)
if res == _subprocess.WAIT_OBJECT_0: if res == _subprocess.WAIT_OBJECT_0:
code = _subprocess.GetExitCodeProcess(self._handle) code = _subprocess.GetExitCodeProcess(self._handle)
if code == TERMINATE: if code == TERMINATE:
code = -signal.SIGTERM code = -signal.SIGTERM
self.returncode = code self.returncode = code
return self.returncode return self.returncode
def poll(self): def poll(self):
return self.wait(timeout=0) return self.wait(timeout=0)
def terminate(self): def terminate(self):
if self.returncode is None: if self.returncode is None:
try: try:
_subprocess.TerminateProcess(int(self._handle), TERMINATE) _subprocess.TerminateProcess(int(self._handle), TERMINATE)
except WindowsError: except WindowsError:
if self.wait(timeout=0.1) is None: if self.wait(timeout=0.1) is None:
raise raise
# #
# #
# #
def is_forking(argv): def is_forking(argv):
''' '''
Return whether commandline indicates we are forking Return whether commandline indicates we are forking
''' '''
if len(argv) >= 2 and argv[1] == '--multiprocessing-fork': if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
assert len(argv) == 3 assert len(argv) == 3
return True return True
else: else:
return False return False
def freeze_support(): def freeze_support():
''' '''
Run code for process object if this in not the main process Run code for process object if this in not the main process
''' '''
if is_forking(sys.argv): if is_forking(sys.argv):
main() main()
sys.exit() sys.exit()
def get_command_line(): def get_command_line():
''' '''
Returns prefix of command line used for spawning a child process Returns prefix of command line used for spawning a child process
''' '''
if process.current_process()._identity==() and is_forking(sys.argv): if process.current_process()._identity==() and is_forking(sys.argv):
raise RuntimeError(''' raise RuntimeError('''
Attempt to start a new process before the current process Attempt to start a new process before the current process
has finished its bootstrapping phase. has finished its bootstrapping phase.
This probably means that you are on Windows and you have This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module: forgotten to use the proper idiom in the main module:
if __name__ == '__main__': if __name__ == '__main__':
freeze_support() freeze_support()
... ...
The "freeze_support()" line can be omitted if the program The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.''') is not going to be frozen to produce a Windows executable.''')
if getattr(sys, 'frozen', False): if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork'] return [sys.executable, '--multiprocessing-fork']
else: else:
prog = 'from multiprocessing.forking import main; main()' prog = 'from multiprocessing.forking import main; main()'
return [_python_exe, '-c', prog, '--multiprocessing-fork'] return [_python_exe, '-c', prog, '--multiprocessing-fork']
def main(): def main():
''' '''
Run code specifed by data received over pipe Run code specifed by data received over pipe
''' '''
assert is_forking(sys.argv) assert is_forking(sys.argv)
handle = int(sys.argv[-1]) handle = int(sys.argv[-1])
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY) fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
from_parent = os.fdopen(fd, 'rb') from_parent = os.fdopen(fd, 'rb')
process.current_process()._inheriting = True process.current_process()._inheriting = True
preparation_data = load(from_parent) preparation_data = load(from_parent)
prepare(preparation_data) prepare(preparation_data)
self = load(from_parent) self = load(from_parent)
process.current_process()._inheriting = False process.current_process()._inheriting = False
from_parent.close() from_parent.close()
exitcode = self._bootstrap() exitcode = self._bootstrap()
exit(exitcode) exit(exitcode)
def get_preparation_data(name): def get_preparation_data(name):
''' '''
Return info about parent needed by child to unpickle process object Return info about parent needed by child to unpickle process object
''' '''
from .util import _logger, _log_to_stderr from .util import _logger, _log_to_stderr
d = dict( d = dict(
name=name, name=name,
sys_path=sys.path, sys_path=sys.path,
sys_argv=sys.argv, sys_argv=sys.argv,
log_to_stderr=_log_to_stderr, log_to_stderr=_log_to_stderr,
orig_dir=process.ORIGINAL_DIR, orig_dir=process.ORIGINAL_DIR,
authkey=process.current_process().get_authkey(), authkey=process.current_process().get_authkey(),
) )
if _logger is not None: if _logger is not None:
d['log_level'] = _logger.getEffectiveLevel() d['log_level'] = _logger.getEffectiveLevel()
if not WINEXE: if not WINEXE:
main_path = getattr(sys.modules['__main__'], '__file__', None) main_path = getattr(sys.modules['__main__'], '__file__', None)
if not main_path and sys.argv[0] not in ('', '-c'): if not main_path and sys.argv[0] not in ('', '-c'):
main_path = sys.argv[0] main_path = sys.argv[0]
if main_path is not None: if main_path is not None:
if not os.path.isabs(main_path) and \ if not os.path.isabs(main_path) and \
process.ORIGINAL_DIR is not None: process.ORIGINAL_DIR is not None:
main_path = os.path.join(process.ORIGINAL_DIR, main_path) main_path = os.path.join(process.ORIGINAL_DIR, main_path)
d['main_path'] = os.path.normpath(main_path) d['main_path'] = os.path.normpath(main_path)
return d return d
# #
# Make (Pipe)Connection picklable # Make (Pipe)Connection picklable
# #
def reduce_connection(conn): def reduce_connection(conn):
if not Popen.thread_is_spawning(): if not Popen.thread_is_spawning():
raise RuntimeError( raise RuntimeError(
'By default %s objects can only be shared between processes\n' 'By default %s objects can only be shared between processes\n'
'using inheritance' % type(conn).__name__ 'using inheritance' % type(conn).__name__
) )
return type(conn), (Popen.duplicate_for_child(conn.fileno()), return type(conn), (Popen.duplicate_for_child(conn.fileno()),
conn.readable, conn.writable) conn.readable, conn.writable)
copy_reg.pickle(Connection, reduce_connection) copy_reg.pickle(Connection, reduce_connection)
copy_reg.pickle(PipeConnection, reduce_connection) copy_reg.pickle(PipeConnection, reduce_connection)
# #
# Prepare current process # Prepare current process
# #
old_main_modules = [] old_main_modules = []
def prepare(data): def prepare(data):
''' '''
Try to get current process ready to unpickle process object Try to get current process ready to unpickle process object
''' '''
old_main_modules.append(sys.modules['__main__']) old_main_modules.append(sys.modules['__main__'])
if 'name' in data: if 'name' in data:
process.current_process().set_name(data['name']) process.current_process().set_name(data['name'])
if 'authkey' in data: if 'authkey' in data:
process.current_process()._authkey = data['authkey'] process.current_process()._authkey = data['authkey']
if 'log_to_stderr' in data and data['log_to_stderr']: if 'log_to_stderr' in data and data['log_to_stderr']:
util.log_to_stderr() util.log_to_stderr()
if 'log_level' in data: if 'log_level' in data:
util.get_logger().setLevel(data['log_level']) util.get_logger().setLevel(data['log_level'])
if 'sys_path' in data: if 'sys_path' in data:
sys.path = data['sys_path'] sys.path = data['sys_path']
if 'sys_argv' in data: if 'sys_argv' in data:
sys.argv = data['sys_argv'] sys.argv = data['sys_argv']
if 'dir' in data: if 'dir' in data:
os.chdir(data['dir']) os.chdir(data['dir'])
if 'orig_dir' in data: if 'orig_dir' in data:
process.ORIGINAL_DIR = data['orig_dir'] process.ORIGINAL_DIR = data['orig_dir']
if 'main_path' in data: if 'main_path' in data:
main_path = data['main_path'] main_path = data['main_path']
main_name = os.path.splitext(os.path.basename(main_path))[0] main_name = os.path.splitext(os.path.basename(main_path))[0]
if main_name == '__init__': if main_name == '__init__':
main_name = os.path.basename(os.path.dirname(main_path)) main_name = os.path.basename(os.path.dirname(main_path))
if main_name != 'ipython': if main_name != 'ipython':
import imp import imp
if main_path is None: if main_path is None:
dirs = None dirs = None
elif os.path.basename(main_path).startswith('__init__.py'): elif os.path.basename(main_path).startswith('__init__.py'):
dirs = [os.path.dirname(os.path.dirname(main_path))] dirs = [os.path.dirname(os.path.dirname(main_path))]
else: else:
dirs = [os.path.dirname(main_path)] dirs = [os.path.dirname(main_path)]
assert main_name not in sys.modules, main_name assert main_name not in sys.modules, main_name
file, path_name, etc = imp.find_module(main_name, dirs) file, path_name, etc = imp.find_module(main_name, dirs)
try: try:
# We would like to do "imp.load_module('__main__', ...)" # We would like to do "imp.load_module('__main__', ...)"
# here. However, that would cause 'if __name__ == # here. However, that would cause 'if __name__ ==
# "__main__"' clauses to be executed. # "__main__"' clauses to be executed.
main_module = imp.load_module( main_module = imp.load_module(
'__parents_main__', file, path_name, etc '__parents_main__', file, path_name, etc
) )
finally: finally:
if file: if file:
file.close() file.close()
sys.modules['__main__'] = main_module sys.modules['__main__'] = main_module
main_module.__name__ = '__main__' main_module.__name__ = '__main__'
# Try to make the potentially picklable objects in # Try to make the potentially picklable objects in
# sys.modules['__main__'] realize they are in the main # sys.modules['__main__'] realize they are in the main
# module -- somewhat ugly. # module -- somewhat ugly.
for obj in main_module.__dict__.values(): for obj in main_module.__dict__.values():
try: try:
if obj.__module__ == '__parents_main__': if obj.__module__ == '__parents_main__':
obj.__module__ = '__main__' obj.__module__ = '__main__'
except Exception: except Exception:
pass pass

View file

@ -1,201 +1,201 @@
# #
# Module which supports allocation of memory from an mmap # Module which supports allocation of memory from an mmap
# #
# multiprocessing/heap.py # multiprocessing/heap.py
# #
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
# #
import bisect import bisect
import mmap import mmap
import tempfile import tempfile
import os import os
import sys import sys
import threading import threading
import itertools import itertools
import _multiprocessing import _multiprocessing
from multiprocessing.util import Finalize, info from multiprocessing.util import Finalize, info
from multiprocessing.forking import assert_spawning from multiprocessing.forking import assert_spawning
__all__ = ['BufferWrapper'] __all__ = ['BufferWrapper']
# #
# Inheirtable class which wraps an mmap, and from which blocks can be allocated # Inheirtable class which wraps an mmap, and from which blocks can be allocated
# #
if sys.platform == 'win32': if sys.platform == 'win32':
from ._multiprocessing import win32 from ._multiprocessing import win32
class Arena(object): class Arena(object):
_counter = itertools.count() _counter = itertools.count()
def __init__(self, size): def __init__(self, size):
self.size = size self.size = size
self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next()) self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next())
self.buffer = mmap.mmap(-1, self.size, tagname=self.name) self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
assert win32.GetLastError() == 0, 'tagname already in use' assert win32.GetLastError() == 0, 'tagname already in use'
self._state = (self.size, self.name) self._state = (self.size, self.name)
def __getstate__(self): def __getstate__(self):
assert_spawning(self) assert_spawning(self)
return self._state return self._state
def __setstate__(self, state): def __setstate__(self, state):
self.size, self.name = self._state = state self.size, self.name = self._state = state
self.buffer = mmap.mmap(-1, self.size, tagname=self.name) self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
else: else:
class Arena(object): class Arena(object):
def __init__(self, size): def __init__(self, size):
self.buffer = mmap.mmap(-1, size) self.buffer = mmap.mmap(-1, size)
self.size = size self.size = size
self.name = None self.name = None
# #
# Class allowing allocation of chunks of memory from arenas # Class allowing allocation of chunks of memory from arenas
# #
class Heap(object): class Heap(object):
_alignment = 8 _alignment = 8
def __init__(self, size=mmap.PAGESIZE): def __init__(self, size=mmap.PAGESIZE):
self._lastpid = os.getpid() self._lastpid = os.getpid()
self._lock = threading.Lock() self._lock = threading.Lock()
self._size = size self._size = size
self._lengths = [] self._lengths = []
self._len_to_seq = {} self._len_to_seq = {}
self._start_to_block = {} self._start_to_block = {}
self._stop_to_block = {} self._stop_to_block = {}
self._allocated_blocks = set() self._allocated_blocks = set()
self._arenas = [] self._arenas = []
@staticmethod @staticmethod
def _roundup(n, alignment): def _roundup(n, alignment):
# alignment must be a power of 2 # alignment must be a power of 2
mask = alignment - 1 mask = alignment - 1
return (n + mask) & ~mask return (n + mask) & ~mask
def _malloc(self, size): def _malloc(self, size):
# returns a large enough block -- it might be much larger # returns a large enough block -- it might be much larger
i = bisect.bisect_left(self._lengths, size) i = bisect.bisect_left(self._lengths, size)
if i == len(self._lengths): if i == len(self._lengths):
length = self._roundup(max(self._size, size), mmap.PAGESIZE) length = self._roundup(max(self._size, size), mmap.PAGESIZE)
self._size *= 2 self._size *= 2
info('allocating a new mmap of length %d', length) info('allocating a new mmap of length %d', length)
arena = Arena(length) arena = Arena(length)
self._arenas.append(arena) self._arenas.append(arena)
return (arena, 0, length) return (arena, 0, length)
else: else:
length = self._lengths[i] length = self._lengths[i]
seq = self._len_to_seq[length] seq = self._len_to_seq[length]
block = seq.pop() block = seq.pop()
if not seq: if not seq:
del self._len_to_seq[length], self._lengths[i] del self._len_to_seq[length], self._lengths[i]
(arena, start, stop) = block (arena, start, stop) = block
del self._start_to_block[(arena, start)] del self._start_to_block[(arena, start)]
del self._stop_to_block[(arena, stop)] del self._stop_to_block[(arena, stop)]
return block return block
def _free(self, block): def _free(self, block):
# free location and try to merge with neighbours # free location and try to merge with neighbours
(arena, start, stop) = block (arena, start, stop) = block
try: try:
prev_block = self._stop_to_block[(arena, start)] prev_block = self._stop_to_block[(arena, start)]
except KeyError: except KeyError:
pass pass
else: else:
start, _ = self._absorb(prev_block) start, _ = self._absorb(prev_block)
try: try:
next_block = self._start_to_block[(arena, stop)] next_block = self._start_to_block[(arena, stop)]
except KeyError: except KeyError:
pass pass
else: else:
_, stop = self._absorb(next_block) _, stop = self._absorb(next_block)
block = (arena, start, stop) block = (arena, start, stop)
length = stop - start length = stop - start
try: try:
self._len_to_seq[length].append(block) self._len_to_seq[length].append(block)
except KeyError: except KeyError:
self._len_to_seq[length] = [block] self._len_to_seq[length] = [block]
bisect.insort(self._lengths, length) bisect.insort(self._lengths, length)
self._start_to_block[(arena, start)] = block self._start_to_block[(arena, start)] = block
self._stop_to_block[(arena, stop)] = block self._stop_to_block[(arena, stop)] = block
def _absorb(self, block): def _absorb(self, block):
# deregister this block so it can be merged with a neighbour # deregister this block so it can be merged with a neighbour
(arena, start, stop) = block (arena, start, stop) = block
del self._start_to_block[(arena, start)] del self._start_to_block[(arena, start)]
del self._stop_to_block[(arena, stop)] del self._stop_to_block[(arena, stop)]
length = stop - start length = stop - start
seq = self._len_to_seq[length] seq = self._len_to_seq[length]
seq.remove(block) seq.remove(block)
if not seq: if not seq:
del self._len_to_seq[length] del self._len_to_seq[length]
self._lengths.remove(length) self._lengths.remove(length)
return start, stop return start, stop
def free(self, block): def free(self, block):
# free a block returned by malloc() # free a block returned by malloc()
assert os.getpid() == self._lastpid assert os.getpid() == self._lastpid
self._lock.acquire() self._lock.acquire()
try: try:
self._allocated_blocks.remove(block) self._allocated_blocks.remove(block)
self._free(block) self._free(block)
finally: finally:
self._lock.release() self._lock.release()
def malloc(self, size): def malloc(self, size):
# return a block of right size (possibly rounded up) # return a block of right size (possibly rounded up)
assert 0 <= size < sys.maxint assert 0 <= size < sys.maxint
if os.getpid() != self._lastpid: if os.getpid() != self._lastpid:
self.__init__() # reinitialize after fork self.__init__() # reinitialize after fork
self._lock.acquire() self._lock.acquire()
try: try:
size = self._roundup(max(size,1), self._alignment) size = self._roundup(max(size,1), self._alignment)
(arena, start, stop) = self._malloc(size) (arena, start, stop) = self._malloc(size)
new_stop = start + size new_stop = start + size
if new_stop < stop: if new_stop < stop:
self._free((arena, new_stop, stop)) self._free((arena, new_stop, stop))
block = (arena, start, new_stop) block = (arena, start, new_stop)
self._allocated_blocks.add(block) self._allocated_blocks.add(block)
return block return block
finally: finally:
self._lock.release() self._lock.release()
# #
# Class representing a chunk of an mmap -- can be inherited # Class representing a chunk of an mmap -- can be inherited
# #
class BufferWrapper(object): class BufferWrapper(object):
_heap = Heap() _heap = Heap()
def __init__(self, size): def __init__(self, size):
assert 0 <= size < sys.maxint assert 0 <= size < sys.maxint
block = BufferWrapper._heap.malloc(size) block = BufferWrapper._heap.malloc(size)
self._state = (block, size) self._state = (block, size)
Finalize(self, BufferWrapper._heap.free, args=(block,)) Finalize(self, BufferWrapper._heap.free, args=(block,))
def get_address(self): def get_address(self):
(arena, start, stop), size = self._state (arena, start, stop), size = self._state
address, length = _multiprocessing.address_of_buffer(arena.buffer) address, length = _multiprocessing.address_of_buffer(arena.buffer)
assert size <= length assert size <= length
return address + start return address + start
def get_size(self): def get_size(self):
return self._state[1] return self._state[1]

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -1,302 +1,302 @@
# #
# Module providing the `Process` class which emulates `threading.Thread` # Module providing the `Process` class which emulates `threading.Thread`
# #
# multiprocessing/process.py # multiprocessing/process.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = ['Process', 'current_process', 'active_children'] __all__ = ['Process', 'current_process', 'active_children']
# #
# Imports # Imports
# #
import os import os
import sys import sys
import signal import signal
import itertools import itertools
# #
# #
# #
try: try:
ORIGINAL_DIR = os.path.abspath(os.getcwd()) ORIGINAL_DIR = os.path.abspath(os.getcwd())
except OSError: except OSError:
ORIGINAL_DIR = None ORIGINAL_DIR = None
try: try:
bytes bytes
except NameError: except NameError:
bytes = str # XXX not needed in Py2.6 and Py3.0 bytes = str # XXX not needed in Py2.6 and Py3.0
# #
# Public functions # Public functions
# #
def current_process(): def current_process():
''' '''
Return process object representing the current process Return process object representing the current process
''' '''
return _current_process return _current_process
def active_children(): def active_children():
''' '''
Return list of process objects corresponding to live child processes Return list of process objects corresponding to live child processes
''' '''
_cleanup() _cleanup()
return list(_current_process._children) return list(_current_process._children)
# #
# #
# #
def _cleanup(): def _cleanup():
# check for processes which have finished # check for processes which have finished
for p in list(_current_process._children): for p in list(_current_process._children):
if p._popen.poll() is not None: if p._popen.poll() is not None:
_current_process._children.discard(p) _current_process._children.discard(p)
# #
# The `Process` class # The `Process` class
# #
class Process(object): class Process(object):
''' '''
Process objects represent activity that is run in a separate process Process objects represent activity that is run in a separate process
The class is analagous to `threading.Thread` The class is analagous to `threading.Thread`
''' '''
_Popen = None _Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert group is None, 'group argument must be None for now' assert group is None, 'group argument must be None for now'
count = _current_process._counter.next() count = _current_process._counter.next()
self._identity = _current_process._identity + (count,) self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey self._authkey = _current_process._authkey
self._daemonic = _current_process._daemonic self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid() self._parent_pid = os.getpid()
self._popen = None self._popen = None
self._target = target self._target = target
self._args = tuple(args) self._args = tuple(args)
self._kwargs = dict(kwargs) self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \ self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity) ':'.join(str(i) for i in self._identity)
def run(self): def run(self):
''' '''
Method to be run in sub-process; can be overridden in sub-class Method to be run in sub-process; can be overridden in sub-class
''' '''
if self._target: if self._target:
self._target(*self._args, **self._kwargs) self._target(*self._args, **self._kwargs)
def start(self): def start(self):
''' '''
Start child process Start child process
''' '''
assert self._popen is None, 'cannot start a process twice' assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \ assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process' 'can only start a process object created by current process'
assert not _current_process._daemonic, \ assert not _current_process._daemonic, \
'daemonic processes are not allowed to have children' 'daemonic processes are not allowed to have children'
_cleanup() _cleanup()
if self._Popen is not None: if self._Popen is not None:
Popen = self._Popen Popen = self._Popen
else: else:
from .forking import Popen from .forking import Popen
self._popen = Popen(self) self._popen = Popen(self)
_current_process._children.add(self) _current_process._children.add(self)
def terminate(self): def terminate(self):
''' '''
Terminate process; sends SIGTERM signal or uses TerminateProcess() Terminate process; sends SIGTERM signal or uses TerminateProcess()
''' '''
self._popen.terminate() self._popen.terminate()
def join(self, timeout=None): def join(self, timeout=None):
''' '''
Wait until child process terminates Wait until child process terminates
''' '''
assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._parent_pid == os.getpid(), 'can only join a child process'
assert self._popen is not None, 'can only join a started process' assert self._popen is not None, 'can only join a started process'
res = self._popen.wait(timeout) res = self._popen.wait(timeout)
if res is not None: if res is not None:
_current_process._children.discard(self) _current_process._children.discard(self)
def is_alive(self): def is_alive(self):
''' '''
Return whether process is alive Return whether process is alive
''' '''
if self is _current_process: if self is _current_process:
return True return True
assert self._parent_pid == os.getpid(), 'can only test a child process' assert self._parent_pid == os.getpid(), 'can only test a child process'
if self._popen is None: if self._popen is None:
return False return False
self._popen.poll() self._popen.poll()
return self._popen.returncode is None return self._popen.returncode is None
def get_name(self): def get_name(self):
''' '''
Return name of process Return name of process
''' '''
return self._name return self._name
def set_name(self, name): def set_name(self, name):
''' '''
Set name of process Set name of process
''' '''
assert isinstance(name, str), 'name must be a string' assert isinstance(name, str), 'name must be a string'
self._name = name self._name = name
def is_daemon(self): def is_daemon(self):
''' '''
Return whether process is a daemon Return whether process is a daemon
''' '''
return self._daemonic return self._daemonic
def set_daemon(self, daemonic): def set_daemon(self, daemonic):
''' '''
Set whether process is a daemon Set whether process is a daemon
''' '''
assert self._popen is None, 'process has already started' assert self._popen is None, 'process has already started'
self._daemonic = daemonic self._daemonic = daemonic
def get_authkey(self): def get_authkey(self):
''' '''
Return authorization key of process Return authorization key of process
''' '''
return self._authkey return self._authkey
def set_authkey(self, authkey): def set_authkey(self, authkey):
''' '''
Set authorization key of process Set authorization key of process
''' '''
self._authkey = AuthenticationString(authkey) self._authkey = AuthenticationString(authkey)
def get_exitcode(self): def get_exitcode(self):
''' '''
Return exit code of process or `None` if it has yet to stop Return exit code of process or `None` if it has yet to stop
''' '''
if self._popen is None: if self._popen is None:
return self._popen return self._popen
return self._popen.poll() return self._popen.poll()
def get_ident(self): def get_ident(self):
''' '''
Return indentifier (PID) of process or `None` if it has yet to start Return indentifier (PID) of process or `None` if it has yet to start
''' '''
if self is _current_process: if self is _current_process:
return os.getpid() return os.getpid()
else: else:
return self._popen and self._popen.pid return self._popen and self._popen.pid
pid = property(get_ident) pid = property(get_ident)
def __repr__(self): def __repr__(self):
if self is _current_process: if self is _current_process:
status = 'started' status = 'started'
elif self._parent_pid != os.getpid(): elif self._parent_pid != os.getpid():
status = 'unknown' status = 'unknown'
elif self._popen is None: elif self._popen is None:
status = 'initial' status = 'initial'
else: else:
if self._popen.poll() is not None: if self._popen.poll() is not None:
status = self.get_exitcode() status = self.get_exitcode()
else: else:
status = 'started' status = 'started'
if type(status) is int: if type(status) is int:
if status == 0: if status == 0:
status = 'stopped' status = 'stopped'
else: else:
status = 'stopped[%s]' % _exitcode_to_name.get(status, status) status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
return '<%s(%s, %s%s)>' % (type(self).__name__, self._name, return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
status, self._daemonic and ' daemon' or '') status, self._daemonic and ' daemon' or '')
## ##
def _bootstrap(self): def _bootstrap(self):
from . import util from . import util
global _current_process global _current_process
try: try:
self._children = set() self._children = set()
self._counter = itertools.count(1) self._counter = itertools.count(1)
try: try:
os.close(sys.stdin.fileno()) os.close(sys.stdin.fileno())
except (OSError, ValueError): except (OSError, ValueError):
pass pass
_current_process = self _current_process = self
util._finalizer_registry.clear() util._finalizer_registry.clear()
util._run_after_forkers() util._run_after_forkers()
util.info('child process calling self.run()') util.info('child process calling self.run()')
try: try:
self.run() self.run()
exitcode = 0 exitcode = 0
finally: finally:
util._exit_function() util._exit_function()
except SystemExit, e: except SystemExit, e:
if not e.args: if not e.args:
exitcode = 1 exitcode = 1
elif type(e.args[0]) is int: elif type(e.args[0]) is int:
exitcode = e.args[0] exitcode = e.args[0]
else: else:
sys.stderr.write(e.args[0] + '\n') sys.stderr.write(e.args[0] + '\n')
sys.stderr.flush() sys.stderr.flush()
exitcode = 1 exitcode = 1
except: except:
exitcode = 1 exitcode = 1
import traceback import traceback
sys.stderr.write('Process %s:\n' % self.get_name()) sys.stderr.write('Process %s:\n' % self.get_name())
sys.stderr.flush() sys.stderr.flush()
traceback.print_exc() traceback.print_exc()
util.info('process exiting with exitcode %d' % exitcode) util.info('process exiting with exitcode %d' % exitcode)
return exitcode return exitcode
# #
# We subclass bytes to avoid accidental transmission of auth keys over network # We subclass bytes to avoid accidental transmission of auth keys over network
# #
class AuthenticationString(bytes): class AuthenticationString(bytes):
def __reduce__(self): def __reduce__(self):
from .forking import Popen from .forking import Popen
if not Popen.thread_is_spawning(): if not Popen.thread_is_spawning():
raise TypeError( raise TypeError(
'Pickling an AuthenticationString object is ' 'Pickling an AuthenticationString object is '
'disallowed for security reasons' 'disallowed for security reasons'
) )
return AuthenticationString, (bytes(self),) return AuthenticationString, (bytes(self),)
# #
# Create object representing the main process # Create object representing the main process
# #
class _MainProcess(Process): class _MainProcess(Process):
def __init__(self): def __init__(self):
self._identity = () self._identity = ()
self._daemonic = False self._daemonic = False
self._name = 'MainProcess' self._name = 'MainProcess'
self._parent_pid = None self._parent_pid = None
self._popen = None self._popen = None
self._counter = itertools.count(1) self._counter = itertools.count(1)
self._children = set() self._children = set()
self._authkey = AuthenticationString(os.urandom(32)) self._authkey = AuthenticationString(os.urandom(32))
self._tempdir = None self._tempdir = None
_current_process = _MainProcess() _current_process = _MainProcess()
del _MainProcess del _MainProcess
# #
# Give names to some return codes # Give names to some return codes
# #
_exitcode_to_name = {} _exitcode_to_name = {}
for name, signum in signal.__dict__.items(): for name, signum in signal.__dict__.items():
if name[:3]=='SIG' and '_' not in name: if name[:3]=='SIG' and '_' not in name:
_exitcode_to_name[-signum] = name _exitcode_to_name[-signum] = name

View file

@ -1,356 +1,356 @@
# #
# Module implementing queues # Module implementing queues
# #
# multiprocessing/queues.py # multiprocessing/queues.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = ['Queue', 'SimpleQueue'] __all__ = ['Queue', 'SimpleQueue']
import sys import sys
import os import os
import threading import threading
import collections import collections
import time import time
import atexit import atexit
import weakref import weakref
from Queue import Empty, Full from Queue import Empty, Full
import _multiprocessing import _multiprocessing
from multiprocessing import Pipe from multiprocessing import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning from multiprocessing.forking import assert_spawning
# #
# Queue type using a pipe, buffer and thread # Queue type using a pipe, buffer and thread
# #
class Queue(object): class Queue(object):
def __init__(self, maxsize=0): def __init__(self, maxsize=0):
if maxsize <= 0: if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False) self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock() self._rlock = Lock()
self._opid = os.getpid() self._opid = os.getpid()
if sys.platform == 'win32': if sys.platform == 'win32':
self._wlock = None self._wlock = None
else: else:
self._wlock = Lock() self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize) self._sem = BoundedSemaphore(maxsize)
self._after_fork() self._after_fork()
if sys.platform != 'win32': if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork) register_after_fork(self, Queue._after_fork)
def __getstate__(self): def __getstate__(self):
assert_spawning(self) assert_spawning(self)
return (self._maxsize, self._reader, self._writer, return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state): def __setstate__(self, state):
(self._maxsize, self._reader, self._writer, (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork() self._after_fork()
def _after_fork(self): def _after_fork(self):
debug('Queue._after_fork()') debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock()) self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque() self._buffer = collections.deque()
self._thread = None self._thread = None
self._jointhread = None self._jointhread = None
self._joincancelled = False self._joincancelled = False
self._closed = False self._closed = False
self._close = None self._close = None
self._send = self._writer.send self._send = self._writer.send
self._recv = self._reader.recv self._recv = self._reader.recv
self._poll = self._reader.poll self._poll = self._reader.poll
def put(self, obj, block=True, timeout=None): def put(self, obj, block=True, timeout=None):
assert not self._closed assert not self._closed
if not self._sem.acquire(block, timeout): if not self._sem.acquire(block, timeout):
raise Full raise Full
self._notempty.acquire() self._notempty.acquire()
try: try:
if self._thread is None: if self._thread is None:
self._start_thread() self._start_thread()
self._buffer.append(obj) self._buffer.append(obj)
self._notempty.notify() self._notempty.notify()
finally: finally:
self._notempty.release() self._notempty.release()
def get(self, block=True, timeout=None): def get(self, block=True, timeout=None):
if block and timeout is None: if block and timeout is None:
self._rlock.acquire() self._rlock.acquire()
try: try:
res = self._recv() res = self._recv()
self._sem.release() self._sem.release()
return res return res
finally: finally:
self._rlock.release() self._rlock.release()
else: else:
if block: if block:
deadline = time.time() + timeout deadline = time.time() + timeout
if not self._rlock.acquire(block, timeout): if not self._rlock.acquire(block, timeout):
raise Empty raise Empty
try: try:
if not self._poll(block and (deadline-time.time()) or 0.0): if not self._poll(block and (deadline-time.time()) or 0.0):
raise Empty raise Empty
res = self._recv() res = self._recv()
self._sem.release() self._sem.release()
return res return res
finally: finally:
self._rlock.release() self._rlock.release()
def qsize(self): def qsize(self):
# Raises NotImplementError on Mac OSX because of broken sem_getvalue() # Raises NotImplementError on Mac OSX because of broken sem_getvalue()
return self._maxsize - self._sem._semlock._get_value() return self._maxsize - self._sem._semlock._get_value()
def empty(self): def empty(self):
return not self._poll() return not self._poll()
def full(self): def full(self):
return self._sem._semlock._is_zero() return self._sem._semlock._is_zero()
def get_nowait(self): def get_nowait(self):
return self.get(False) return self.get(False)
def put_nowait(self, obj): def put_nowait(self, obj):
return self.put(obj, False) return self.put(obj, False)
def close(self): def close(self):
self._closed = True self._closed = True
self._reader.close() self._reader.close()
if self._close: if self._close:
self._close() self._close()
def join_thread(self): def join_thread(self):
debug('Queue.join_thread()') debug('Queue.join_thread()')
assert self._closed assert self._closed
if self._jointhread: if self._jointhread:
self._jointhread() self._jointhread()
def cancel_join_thread(self): def cancel_join_thread(self):
debug('Queue.cancel_join_thread()') debug('Queue.cancel_join_thread()')
self._joincancelled = True self._joincancelled = True
try: try:
self._jointhread.cancel() self._jointhread.cancel()
except AttributeError: except AttributeError:
pass pass
def _start_thread(self): def _start_thread(self):
debug('Queue._start_thread()') debug('Queue._start_thread()')
# Start thread which transfers data from buffer to pipe # Start thread which transfers data from buffer to pipe
self._buffer.clear() self._buffer.clear()
self._thread = threading.Thread( self._thread = threading.Thread(
target=Queue._feed, target=Queue._feed,
args=(self._buffer, self._notempty, self._send, args=(self._buffer, self._notempty, self._send,
self._wlock, self._writer.close), self._wlock, self._writer.close),
name='QueueFeederThread' name='QueueFeederThread'
) )
self._thread.set_daemon(True) self._thread.set_daemon(True)
debug('doing self._thread.start()') debug('doing self._thread.start()')
self._thread.start() self._thread.start()
debug('... done self._thread.start()') debug('... done self._thread.start()')
# On process exit we will wait for data to be flushed to pipe. # On process exit we will wait for data to be flushed to pipe.
# #
# However, if this process created the queue then all # However, if this process created the queue then all
# processes which use the queue will be descendants of this # processes which use the queue will be descendants of this
# process. Therefore waiting for the queue to be flushed # process. Therefore waiting for the queue to be flushed
# is pointless once all the child processes have been joined. # is pointless once all the child processes have been joined.
created_by_this_process = (self._opid == os.getpid()) created_by_this_process = (self._opid == os.getpid())
if not self._joincancelled and not created_by_this_process: if not self._joincancelled and not created_by_this_process:
self._jointhread = Finalize( self._jointhread = Finalize(
self._thread, Queue._finalize_join, self._thread, Queue._finalize_join,
[weakref.ref(self._thread)], [weakref.ref(self._thread)],
exitpriority=-5 exitpriority=-5
) )
# Send sentinel to the thread queue object when garbage collected # Send sentinel to the thread queue object when garbage collected
self._close = Finalize( self._close = Finalize(
self, Queue._finalize_close, self, Queue._finalize_close,
[self._buffer, self._notempty], [self._buffer, self._notempty],
exitpriority=10 exitpriority=10
) )
@staticmethod @staticmethod
def _finalize_join(twr): def _finalize_join(twr):
debug('joining queue thread') debug('joining queue thread')
thread = twr() thread = twr()
if thread is not None: if thread is not None:
thread.join() thread.join()
debug('... queue thread joined') debug('... queue thread joined')
else: else:
debug('... queue thread already dead') debug('... queue thread already dead')
@staticmethod @staticmethod
def _finalize_close(buffer, notempty): def _finalize_close(buffer, notempty):
debug('telling queue thread to quit') debug('telling queue thread to quit')
notempty.acquire() notempty.acquire()
try: try:
buffer.append(_sentinel) buffer.append(_sentinel)
notempty.notify() notempty.notify()
finally: finally:
notempty.release() notempty.release()
@staticmethod @staticmethod
def _feed(buffer, notempty, send, writelock, close): def _feed(buffer, notempty, send, writelock, close):
debug('starting thread to feed data to pipe') debug('starting thread to feed data to pipe')
from .util import is_exiting from .util import is_exiting
nacquire = notempty.acquire nacquire = notempty.acquire
nrelease = notempty.release nrelease = notempty.release
nwait = notempty.wait nwait = notempty.wait
bpopleft = buffer.popleft bpopleft = buffer.popleft
sentinel = _sentinel sentinel = _sentinel
if sys.platform != 'win32': if sys.platform != 'win32':
wacquire = writelock.acquire wacquire = writelock.acquire
wrelease = writelock.release wrelease = writelock.release
else: else:
wacquire = None wacquire = None
try: try:
while 1: while 1:
nacquire() nacquire()
try: try:
if not buffer: if not buffer:
nwait() nwait()
finally: finally:
nrelease() nrelease()
try: try:
while 1: while 1:
obj = bpopleft() obj = bpopleft()
if obj is sentinel: if obj is sentinel:
debug('feeder thread got sentinel -- exiting') debug('feeder thread got sentinel -- exiting')
close() close()
return return
if wacquire is None: if wacquire is None:
send(obj) send(obj)
else: else:
wacquire() wacquire()
try: try:
send(obj) send(obj)
finally: finally:
wrelease() wrelease()
except IndexError: except IndexError:
pass pass
except Exception, e: except Exception, e:
# Since this runs in a daemon thread the resources it uses # Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up. # may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has # We ignore errors which happen after the process has
# started to cleanup. # started to cleanup.
try: try:
if is_exiting(): if is_exiting():
info('error in queue thread: %s', e) info('error in queue thread: %s', e)
else: else:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
except Exception: except Exception:
pass pass
_sentinel = object() _sentinel = object()
# #
# A queue type which also supports join() and task_done() methods # A queue type which also supports join() and task_done() methods
# #
# Note that if you do not call task_done() for each finished task then # Note that if you do not call task_done() for each finished task then
# eventually the counter's semaphore may overflow causing Bad Things # eventually the counter's semaphore may overflow causing Bad Things
# to happen. # to happen.
# #
class JoinableQueue(Queue): class JoinableQueue(Queue):
def __init__(self, maxsize=0): def __init__(self, maxsize=0):
Queue.__init__(self, maxsize) Queue.__init__(self, maxsize)
self._unfinished_tasks = Semaphore(0) self._unfinished_tasks = Semaphore(0)
self._cond = Condition() self._cond = Condition()
def __getstate__(self): def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
def __setstate__(self, state): def __setstate__(self, state):
Queue.__setstate__(self, state[:-2]) Queue.__setstate__(self, state[:-2])
self._cond, self._unfinished_tasks = state[-2:] self._cond, self._unfinished_tasks = state[-2:]
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
Queue.put(self, item, block, timeout) Queue.put(self, item, block, timeout)
self._unfinished_tasks.release() self._unfinished_tasks.release()
def task_done(self): def task_done(self):
self._cond.acquire() self._cond.acquire()
try: try:
if not self._unfinished_tasks.acquire(False): if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times') raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero(): if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all() self._cond.notify_all()
finally: finally:
self._cond.release() self._cond.release()
def join(self): def join(self):
self._cond.acquire() self._cond.acquire()
try: try:
if not self._unfinished_tasks._semlock._is_zero(): if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait() self._cond.wait()
finally: finally:
self._cond.release() self._cond.release()
# #
# Simplified Queue type -- really just a locked pipe # Simplified Queue type -- really just a locked pipe
# #
class SimpleQueue(object): class SimpleQueue(object):
def __init__(self): def __init__(self):
self._reader, self._writer = Pipe(duplex=False) self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock() self._rlock = Lock()
if sys.platform == 'win32': if sys.platform == 'win32':
self._wlock = None self._wlock = None
else: else:
self._wlock = Lock() self._wlock = Lock()
self._make_methods() self._make_methods()
def empty(self): def empty(self):
return not self._reader.poll() return not self._reader.poll()
def __getstate__(self): def __getstate__(self):
assert_spawning(self) assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock) return (self._reader, self._writer, self._rlock, self._wlock)
def __setstate__(self, state): def __setstate__(self, state):
(self._reader, self._writer, self._rlock, self._wlock) = state (self._reader, self._writer, self._rlock, self._wlock) = state
self._make_methods() self._make_methods()
def _make_methods(self): def _make_methods(self):
recv = self._reader.recv recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release racquire, rrelease = self._rlock.acquire, self._rlock.release
def get(): def get():
racquire() racquire()
try: try:
return recv() return recv()
finally: finally:
rrelease() rrelease()
self.get = get self.get = get
if self._wlock is None: if self._wlock is None:
# writes to a message oriented win32 pipe are atomic # writes to a message oriented win32 pipe are atomic
self.put = self._writer.send self.put = self._writer.send
else: else:
send = self._writer.send send = self._writer.send
wacquire, wrelease = self._wlock.acquire, self._wlock.release wacquire, wrelease = self._wlock.acquire, self._wlock.release
def put(obj): def put(obj):
wacquire() wacquire()
try: try:
return send(obj) return send(obj)
finally: finally:
wrelease() wrelease()
self.put = put self.put = put

View file

@ -1,190 +1,190 @@
# #
# Module to allow connection and socket objects to be transferred # Module to allow connection and socket objects to be transferred
# between processes # between processes
# #
# multiprocessing/reduction.py # multiprocessing/reduction.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = [] __all__ = []
import os import os
import sys import sys
import socket import socket
import threading import threading
import copy_reg import copy_reg
import _multiprocessing import _multiprocessing
from multiprocessing import current_process from multiprocessing import current_process
from multiprocessing.forking import Popen, duplicate, close from multiprocessing.forking import Popen, duplicate, close
from multiprocessing.util import register_after_fork, debug, sub_debug from multiprocessing.util import register_after_fork, debug, sub_debug
from multiprocessing.connection import Client, Listener from multiprocessing.connection import Client, Listener
# #
# #
# #
if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')):
raise ImportError('pickling of connections not supported') raise ImportError('pickling of connections not supported')
# #
# Platform specific definitions # Platform specific definitions
# #
if sys.platform == 'win32': if sys.platform == 'win32':
import _subprocess import _subprocess
from ._multiprocessing import win32 from ._multiprocessing import win32
def send_handle(conn, handle, destination_pid): def send_handle(conn, handle, destination_pid):
process_handle = win32.OpenProcess( process_handle = win32.OpenProcess(
win32.PROCESS_ALL_ACCESS, False, destination_pid win32.PROCESS_ALL_ACCESS, False, destination_pid
) )
try: try:
new_handle = duplicate(handle, process_handle) new_handle = duplicate(handle, process_handle)
conn.send(new_handle) conn.send(new_handle)
finally: finally:
close(process_handle) close(process_handle)
def recv_handle(conn): def recv_handle(conn):
return conn.recv() return conn.recv()
else: else:
def send_handle(conn, handle, destination_pid): def send_handle(conn, handle, destination_pid):
_multiprocessing.sendfd(conn.fileno(), handle) _multiprocessing.sendfd(conn.fileno(), handle)
def recv_handle(conn): def recv_handle(conn):
return _multiprocessing.recvfd(conn.fileno()) return _multiprocessing.recvfd(conn.fileno())
# #
# Support for a per-process server thread which caches pickled handles # Support for a per-process server thread which caches pickled handles
# #
_cache = set() _cache = set()
def _reset(obj): def _reset(obj):
global _lock, _listener, _cache global _lock, _listener, _cache
for h in _cache: for h in _cache:
close(h) close(h)
_cache.clear() _cache.clear()
_lock = threading.Lock() _lock = threading.Lock()
_listener = None _listener = None
_reset(None) _reset(None)
register_after_fork(_reset, _reset) register_after_fork(_reset, _reset)
def _get_listener(): def _get_listener():
global _listener global _listener
if _listener is None: if _listener is None:
_lock.acquire() _lock.acquire()
try: try:
if _listener is None: if _listener is None:
debug('starting listener and thread for sending handles') debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().get_authkey()) _listener = Listener(authkey=current_process().get_authkey())
t = threading.Thread(target=_serve) t = threading.Thread(target=_serve)
t.set_daemon(True) t.set_daemon(True)
t.start() t.start()
finally: finally:
_lock.release() _lock.release()
return _listener return _listener
def _serve(): def _serve():
from .util import is_exiting, sub_warning from .util import is_exiting, sub_warning
while 1: while 1:
try: try:
conn = _listener.accept() conn = _listener.accept()
handle_wanted, destination_pid = conn.recv() handle_wanted, destination_pid = conn.recv()
_cache.remove(handle_wanted) _cache.remove(handle_wanted)
send_handle(conn, handle_wanted, destination_pid) send_handle(conn, handle_wanted, destination_pid)
close(handle_wanted) close(handle_wanted)
conn.close() conn.close()
except: except:
if not is_exiting(): if not is_exiting():
import traceback import traceback
sub_warning( sub_warning(
'thread for sharing handles raised exception :\n' + 'thread for sharing handles raised exception :\n' +
'-'*79 + '\n' + traceback.format_exc() + '-'*79 '-'*79 + '\n' + traceback.format_exc() + '-'*79
) )
# #
# Functions to be used for pickling/unpickling objects with handles # Functions to be used for pickling/unpickling objects with handles
# #
def reduce_handle(handle): def reduce_handle(handle):
if Popen.thread_is_spawning(): if Popen.thread_is_spawning():
return (None, Popen.duplicate_for_child(handle), True) return (None, Popen.duplicate_for_child(handle), True)
dup_handle = duplicate(handle) dup_handle = duplicate(handle)
_cache.add(dup_handle) _cache.add(dup_handle)
sub_debug('reducing handle %d', handle) sub_debug('reducing handle %d', handle)
return (_get_listener().address, dup_handle, False) return (_get_listener().address, dup_handle, False)
def rebuild_handle(pickled_data): def rebuild_handle(pickled_data):
address, handle, inherited = pickled_data address, handle, inherited = pickled_data
if inherited: if inherited:
return handle return handle
sub_debug('rebuilding handle %d', handle) sub_debug('rebuilding handle %d', handle)
conn = Client(address, authkey=current_process().get_authkey()) conn = Client(address, authkey=current_process().get_authkey())
conn.send((handle, os.getpid())) conn.send((handle, os.getpid()))
new_handle = recv_handle(conn) new_handle = recv_handle(conn)
conn.close() conn.close()
return new_handle return new_handle
# #
# Register `_multiprocessing.Connection` with `copy_reg` # Register `_multiprocessing.Connection` with `copy_reg`
# #
def reduce_connection(conn): def reduce_connection(conn):
rh = reduce_handle(conn.fileno()) rh = reduce_handle(conn.fileno())
return rebuild_connection, (rh, conn.readable, conn.writable) return rebuild_connection, (rh, conn.readable, conn.writable)
def rebuild_connection(reduced_handle, readable, writable): def rebuild_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle) handle = rebuild_handle(reduced_handle)
return _multiprocessing.Connection( return _multiprocessing.Connection(
handle, readable=readable, writable=writable handle, readable=readable, writable=writable
) )
copy_reg.pickle(_multiprocessing.Connection, reduce_connection) copy_reg.pickle(_multiprocessing.Connection, reduce_connection)
# #
# Register `socket.socket` with `copy_reg` # Register `socket.socket` with `copy_reg`
# #
def fromfd(fd, family, type_, proto=0): def fromfd(fd, family, type_, proto=0):
s = socket.fromfd(fd, family, type_, proto) s = socket.fromfd(fd, family, type_, proto)
if s.__class__ is not socket.socket: if s.__class__ is not socket.socket:
s = socket.socket(_sock=s) s = socket.socket(_sock=s)
return s return s
def reduce_socket(s): def reduce_socket(s):
reduced_handle = reduce_handle(s.fileno()) reduced_handle = reduce_handle(s.fileno())
return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
def rebuild_socket(reduced_handle, family, type_, proto): def rebuild_socket(reduced_handle, family, type_, proto):
fd = rebuild_handle(reduced_handle) fd = rebuild_handle(reduced_handle)
_sock = fromfd(fd, family, type_, proto) _sock = fromfd(fd, family, type_, proto)
close(fd) close(fd)
return _sock return _sock
copy_reg.pickle(socket.socket, reduce_socket) copy_reg.pickle(socket.socket, reduce_socket)
# #
# Register `_multiprocessing.PipeConnection` with `copy_reg` # Register `_multiprocessing.PipeConnection` with `copy_reg`
# #
if sys.platform == 'win32': if sys.platform == 'win32':
def reduce_pipe_connection(conn): def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno()) rh = reduce_handle(conn.fileno())
return rebuild_pipe_connection, (rh, conn.readable, conn.writable) return rebuild_pipe_connection, (rh, conn.readable, conn.writable)
def rebuild_pipe_connection(reduced_handle, readable, writable): def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle) handle = rebuild_handle(reduced_handle)
return _multiprocessing.PipeConnection( return _multiprocessing.PipeConnection(
handle, readable=readable, writable=writable handle, readable=readable, writable=writable
) )
copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection)

View file

@ -1,234 +1,234 @@
# #
# Module which supports allocation of ctypes objects from shared memory # Module which supports allocation of ctypes objects from shared memory
# #
# multiprocessing/sharedctypes.py # multiprocessing/sharedctypes.py
# #
# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
# #
import sys import sys
import ctypes import ctypes
import weakref import weakref
import copy_reg import copy_reg
from multiprocessing import heap, RLock from multiprocessing import heap, RLock
from multiprocessing.forking import assert_spawning from multiprocessing.forking import assert_spawning
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized'] __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
# #
# #
# #
typecode_to_type = { typecode_to_type = {
'c': ctypes.c_char, 'u': ctypes.c_wchar, 'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort, 'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint, 'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong, 'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double 'f': ctypes.c_float, 'd': ctypes.c_double
} }
# #
# #
# #
def _new_value(type_): def _new_value(type_):
size = ctypes.sizeof(type_) size = ctypes.sizeof(type_)
wrapper = heap.BufferWrapper(size) wrapper = heap.BufferWrapper(size)
return rebuild_ctype(type_, wrapper, None) return rebuild_ctype(type_, wrapper, None)
def RawValue(typecode_or_type, *args): def RawValue(typecode_or_type, *args):
''' '''
Returns a ctypes object allocated from shared memory Returns a ctypes object allocated from shared memory
''' '''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type) type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
obj = _new_value(type_) obj = _new_value(type_)
ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj)) ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
obj.__init__(*args) obj.__init__(*args)
return obj return obj
def RawArray(typecode_or_type, size_or_initializer): def RawArray(typecode_or_type, size_or_initializer):
''' '''
Returns a ctypes array allocated from shared memory Returns a ctypes array allocated from shared memory
''' '''
type_ = typecode_to_type.get(typecode_or_type, typecode_or_type) type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
if isinstance(size_or_initializer, int): if isinstance(size_or_initializer, int):
type_ = type_ * size_or_initializer type_ = type_ * size_or_initializer
return _new_value(type_) return _new_value(type_)
else: else:
type_ = type_ * len(size_or_initializer) type_ = type_ * len(size_or_initializer)
result = _new_value(type_) result = _new_value(type_)
result.__init__(*size_or_initializer) result.__init__(*size_or_initializer)
return result return result
def Value(typecode_or_type, *args, **kwds): def Value(typecode_or_type, *args, **kwds):
''' '''
Return a synchronization wrapper for a Value Return a synchronization wrapper for a Value
''' '''
lock = kwds.pop('lock', None) lock = kwds.pop('lock', None)
if kwds: if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys()) raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
obj = RawValue(typecode_or_type, *args) obj = RawValue(typecode_or_type, *args)
if lock is None: if lock is None:
lock = RLock() lock = RLock()
assert hasattr(lock, 'acquire') assert hasattr(lock, 'acquire')
return synchronized(obj, lock) return synchronized(obj, lock)
def Array(typecode_or_type, size_or_initializer, **kwds): def Array(typecode_or_type, size_or_initializer, **kwds):
''' '''
Return a synchronization wrapper for a RawArray Return a synchronization wrapper for a RawArray
''' '''
lock = kwds.pop('lock', None) lock = kwds.pop('lock', None)
if kwds: if kwds:
raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys()) raise ValueError('unrecognized keyword argument(s): %s' % kwds.keys())
obj = RawArray(typecode_or_type, size_or_initializer) obj = RawArray(typecode_or_type, size_or_initializer)
if lock is None: if lock is None:
lock = RLock() lock = RLock()
assert hasattr(lock, 'acquire') assert hasattr(lock, 'acquire')
return synchronized(obj, lock) return synchronized(obj, lock)
def copy(obj): def copy(obj):
new_obj = _new_value(type(obj)) new_obj = _new_value(type(obj))
ctypes.pointer(new_obj)[0] = obj ctypes.pointer(new_obj)[0] = obj
return new_obj return new_obj
def synchronized(obj, lock=None): def synchronized(obj, lock=None):
assert not isinstance(obj, SynchronizedBase), 'object already synchronized' assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
if isinstance(obj, ctypes._SimpleCData): if isinstance(obj, ctypes._SimpleCData):
return Synchronized(obj, lock) return Synchronized(obj, lock)
elif isinstance(obj, ctypes.Array): elif isinstance(obj, ctypes.Array):
if obj._type_ is ctypes.c_char: if obj._type_ is ctypes.c_char:
return SynchronizedString(obj, lock) return SynchronizedString(obj, lock)
return SynchronizedArray(obj, lock) return SynchronizedArray(obj, lock)
else: else:
cls = type(obj) cls = type(obj)
try: try:
scls = class_cache[cls] scls = class_cache[cls]
except KeyError: except KeyError:
names = [field[0] for field in cls._fields_] names = [field[0] for field in cls._fields_]
d = dict((name, make_property(name)) for name in names) d = dict((name, make_property(name)) for name in names)
classname = 'Synchronized' + cls.__name__ classname = 'Synchronized' + cls.__name__
scls = class_cache[cls] = type(classname, (SynchronizedBase,), d) scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
return scls(obj, lock) return scls(obj, lock)
# #
# Functions for pickling/unpickling # Functions for pickling/unpickling
# #
def reduce_ctype(obj): def reduce_ctype(obj):
assert_spawning(obj) assert_spawning(obj)
if isinstance(obj, ctypes.Array): if isinstance(obj, ctypes.Array):
return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_) return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
else: else:
return rebuild_ctype, (type(obj), obj._wrapper, None) return rebuild_ctype, (type(obj), obj._wrapper, None)
def rebuild_ctype(type_, wrapper, length): def rebuild_ctype(type_, wrapper, length):
if length is not None: if length is not None:
type_ = type_ * length type_ = type_ * length
if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table: if sys.platform == 'win32' and type_ not in copy_reg.dispatch_table:
copy_reg.pickle(type_, reduce_ctype) copy_reg.pickle(type_, reduce_ctype)
obj = type_.from_address(wrapper.get_address()) obj = type_.from_address(wrapper.get_address())
obj._wrapper = wrapper obj._wrapper = wrapper
return obj return obj
# #
# Function to create properties # Function to create properties
# #
def make_property(name): def make_property(name):
try: try:
return prop_cache[name] return prop_cache[name]
except KeyError: except KeyError:
d = {} d = {}
exec template % ((name,)*7) in d exec template % ((name,)*7) in d
prop_cache[name] = d[name] prop_cache[name] = d[name]
return d[name] return d[name]
template = ''' template = '''
def get%s(self): def get%s(self):
self.acquire() self.acquire()
try: try:
return self._obj.%s return self._obj.%s
finally: finally:
self.release() self.release()
def set%s(self, value): def set%s(self, value):
self.acquire() self.acquire()
try: try:
self._obj.%s = value self._obj.%s = value
finally: finally:
self.release() self.release()
%s = property(get%s, set%s) %s = property(get%s, set%s)
''' '''
prop_cache = {} prop_cache = {}
class_cache = weakref.WeakKeyDictionary() class_cache = weakref.WeakKeyDictionary()
# #
# Synchronized wrappers # Synchronized wrappers
# #
class SynchronizedBase(object): class SynchronizedBase(object):
def __init__(self, obj, lock=None): def __init__(self, obj, lock=None):
self._obj = obj self._obj = obj
self._lock = lock or RLock() self._lock = lock or RLock()
self.acquire = self._lock.acquire self.acquire = self._lock.acquire
self.release = self._lock.release self.release = self._lock.release
def __reduce__(self): def __reduce__(self):
assert_spawning(self) assert_spawning(self)
return synchronized, (self._obj, self._lock) return synchronized, (self._obj, self._lock)
def get_obj(self): def get_obj(self):
return self._obj return self._obj
def get_lock(self): def get_lock(self):
return self._lock return self._lock
def __repr__(self): def __repr__(self):
return '<%s wrapper for %s>' % (type(self).__name__, self._obj) return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
class Synchronized(SynchronizedBase): class Synchronized(SynchronizedBase):
value = make_property('value') value = make_property('value')
class SynchronizedArray(SynchronizedBase): class SynchronizedArray(SynchronizedBase):
def __len__(self): def __len__(self):
return len(self._obj) return len(self._obj)
def __getitem__(self, i): def __getitem__(self, i):
self.acquire() self.acquire()
try: try:
return self._obj[i] return self._obj[i]
finally: finally:
self.release() self.release()
def __setitem__(self, i, value): def __setitem__(self, i, value):
self.acquire() self.acquire()
try: try:
self._obj[i] = value self._obj[i] = value
finally: finally:
self.release() self.release()
def __getslice__(self, start, stop): def __getslice__(self, start, stop):
self.acquire() self.acquire()
try: try:
return self._obj[start:stop] return self._obj[start:stop]
finally: finally:
self.release() self.release()
def __setslice__(self, start, stop, values): def __setslice__(self, start, stop, values):
self.acquire() self.acquire()
try: try:
self._obj[start:stop] = values self._obj[start:stop] = values
finally: finally:
self.release() self.release()
class SynchronizedString(SynchronizedArray): class SynchronizedString(SynchronizedArray):
value = make_property('value') value = make_property('value')
raw = make_property('raw') raw = make_property('raw')

View file

@ -1,294 +1,294 @@
# #
# Module implementing synchronization primitives # Module implementing synchronization primitives
# #
# multiprocessing/synchronize.py # multiprocessing/synchronize.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
__all__ = [ __all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
] ]
import threading import threading
import os import os
import sys import sys
from time import time as _time, sleep as _sleep from time import time as _time, sleep as _sleep
import _multiprocessing import _multiprocessing
from multiprocessing.process import current_process from multiprocessing.process import current_process
from multiprocessing.util import Finalize, register_after_fork, debug from multiprocessing.util import Finalize, register_after_fork, debug
from multiprocessing.forking import assert_spawning, Popen from multiprocessing.forking import assert_spawning, Popen
# #
# Constants # Constants
# #
RECURSIVE_MUTEX, SEMAPHORE = range(2) RECURSIVE_MUTEX, SEMAPHORE = range(2)
SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
# #
# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
# #
class SemLock(object): class SemLock(object):
def __init__(self, kind, value, maxvalue): def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug('created semlock with handle %s' % sl.handle) debug('created semlock with handle %s' % sl.handle)
self._make_methods() self._make_methods()
if sys.platform != 'win32': if sys.platform != 'win32':
def _after_fork(obj): def _after_fork(obj):
obj._semlock._after_fork() obj._semlock._after_fork()
register_after_fork(self, _after_fork) register_after_fork(self, _after_fork)
def _make_methods(self): def _make_methods(self):
self.acquire = self._semlock.acquire self.acquire = self._semlock.acquire
self.release = self._semlock.release self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__ self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__ self.__exit__ = self._semlock.__exit__
def __getstate__(self): def __getstate__(self):
assert_spawning(self) assert_spawning(self)
sl = self._semlock sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): def __setstate__(self, state):
self._semlock = _multiprocessing.SemLock._rebuild(*state) self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0]) debug('recreated blocker with handle %r' % state[0])
self._make_methods() self._make_methods()
# #
# Semaphore # Semaphore
# #
class Semaphore(SemLock): class Semaphore(SemLock):
def __init__(self, value=1): def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
def get_value(self): def get_value(self):
return self._semlock._get_value() return self._semlock._get_value()
def __repr__(self): def __repr__(self):
try: try:
value = self._semlock._get_value() value = self._semlock._get_value()
except Exception: except Exception:
value = 'unknown' value = 'unknown'
return '<Semaphore(value=%s)>' % value return '<Semaphore(value=%s)>' % value
# #
# Bounded semaphore # Bounded semaphore
# #
class BoundedSemaphore(Semaphore): class BoundedSemaphore(Semaphore):
def __init__(self, value=1): def __init__(self, value=1):
SemLock.__init__(self, SEMAPHORE, value, value) SemLock.__init__(self, SEMAPHORE, value, value)
def __repr__(self): def __repr__(self):
try: try:
value = self._semlock._get_value() value = self._semlock._get_value()
except Exception: except Exception:
value = 'unknown' value = 'unknown'
return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
(value, self._semlock.maxvalue) (value, self._semlock.maxvalue)
# #
# Non-recursive lock # Non-recursive lock
# #
class Lock(SemLock): class Lock(SemLock):
def __init__(self): def __init__(self):
SemLock.__init__(self, SEMAPHORE, 1, 1) SemLock.__init__(self, SEMAPHORE, 1, 1)
def __repr__(self): def __repr__(self):
try: try:
if self._semlock._is_mine(): if self._semlock._is_mine():
name = current_process().get_name() name = current_process().get_name()
if threading.current_thread().get_name() != 'MainThread': if threading.current_thread().get_name() != 'MainThread':
name += '|' + threading.current_thread().get_name() name += '|' + threading.current_thread().get_name()
elif self._semlock._get_value() == 1: elif self._semlock._get_value() == 1:
name = 'None' name = 'None'
elif self._semlock._count() > 0: elif self._semlock._count() > 0:
name = 'SomeOtherThread' name = 'SomeOtherThread'
else: else:
name = 'SomeOtherProcess' name = 'SomeOtherProcess'
except Exception: except Exception:
name = 'unknown' name = 'unknown'
return '<Lock(owner=%s)>' % name return '<Lock(owner=%s)>' % name
# #
# Recursive lock # Recursive lock
# #
class RLock(SemLock): class RLock(SemLock):
def __init__(self): def __init__(self):
SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
def __repr__(self): def __repr__(self):
try: try:
if self._semlock._is_mine(): if self._semlock._is_mine():
name = current_process().get_name() name = current_process().get_name()
if threading.current_thread().get_name() != 'MainThread': if threading.current_thread().get_name() != 'MainThread':
name += '|' + threading.current_thread().get_name() name += '|' + threading.current_thread().get_name()
count = self._semlock._count() count = self._semlock._count()
elif self._semlock._get_value() == 1: elif self._semlock._get_value() == 1:
name, count = 'None', 0 name, count = 'None', 0
elif self._semlock._count() > 0: elif self._semlock._count() > 0:
name, count = 'SomeOtherThread', 'nonzero' name, count = 'SomeOtherThread', 'nonzero'
else: else:
name, count = 'SomeOtherProcess', 'nonzero' name, count = 'SomeOtherProcess', 'nonzero'
except Exception: except Exception:
name, count = 'unknown', 'unknown' name, count = 'unknown', 'unknown'
return '<RLock(%s, %s)>' % (name, count) return '<RLock(%s, %s)>' % (name, count)
# #
# Condition variable # Condition variable
# #
class Condition(object): class Condition(object):
def __init__(self, lock=None): def __init__(self, lock=None):
self._lock = lock or RLock() self._lock = lock or RLock()
self._sleeping_count = Semaphore(0) self._sleeping_count = Semaphore(0)
self._woken_count = Semaphore(0) self._woken_count = Semaphore(0)
self._wait_semaphore = Semaphore(0) self._wait_semaphore = Semaphore(0)
self._make_methods() self._make_methods()
def __getstate__(self): def __getstate__(self):
assert_spawning(self) assert_spawning(self)
return (self._lock, self._sleeping_count, return (self._lock, self._sleeping_count,
self._woken_count, self._wait_semaphore) self._woken_count, self._wait_semaphore)
def __setstate__(self, state): def __setstate__(self, state):
(self._lock, self._sleeping_count, (self._lock, self._sleeping_count,
self._woken_count, self._wait_semaphore) = state self._woken_count, self._wait_semaphore) = state
self._make_methods() self._make_methods()
def _make_methods(self): def _make_methods(self):
self.acquire = self._lock.acquire self.acquire = self._lock.acquire
self.release = self._lock.release self.release = self._lock.release
self.__enter__ = self._lock.__enter__ self.__enter__ = self._lock.__enter__
self.__exit__ = self._lock.__exit__ self.__exit__ = self._lock.__exit__
def __repr__(self): def __repr__(self):
try: try:
num_waiters = (self._sleeping_count._semlock._get_value() - num_waiters = (self._sleeping_count._semlock._get_value() -
self._woken_count._semlock._get_value()) self._woken_count._semlock._get_value())
except Exception: except Exception:
num_waiters = 'unkown' num_waiters = 'unkown'
return '<Condition(%s, %s)>' % (self._lock, num_waiters) return '<Condition(%s, %s)>' % (self._lock, num_waiters)
def wait(self, timeout=None): def wait(self, timeout=None):
assert self._lock._semlock._is_mine(), \ assert self._lock._semlock._is_mine(), \
'must acquire() condition before using wait()' 'must acquire() condition before using wait()'
# indicate that this thread is going to sleep # indicate that this thread is going to sleep
self._sleeping_count.release() self._sleeping_count.release()
# release lock # release lock
count = self._lock._semlock._count() count = self._lock._semlock._count()
for i in xrange(count): for i in xrange(count):
self._lock.release() self._lock.release()
try: try:
# wait for notification or timeout # wait for notification or timeout
self._wait_semaphore.acquire(True, timeout) self._wait_semaphore.acquire(True, timeout)
finally: finally:
# indicate that this thread has woken # indicate that this thread has woken
self._woken_count.release() self._woken_count.release()
# reacquire lock # reacquire lock
for i in xrange(count): for i in xrange(count):
self._lock.acquire() self._lock.acquire()
def notify(self): def notify(self):
assert self._lock._semlock._is_mine(), 'lock is not owned' assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False) assert not self._wait_semaphore.acquire(False)
# to take account of timeouts since last notify() we subtract # to take account of timeouts since last notify() we subtract
# woken_count from sleeping_count and rezero woken_count # woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False): while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False) res = self._sleeping_count.acquire(False)
assert res assert res
if self._sleeping_count.acquire(False): # try grabbing a sleeper if self._sleeping_count.acquire(False): # try grabbing a sleeper
self._wait_semaphore.release() # wake up one sleeper self._wait_semaphore.release() # wake up one sleeper
self._woken_count.acquire() # wait for the sleeper to wake self._woken_count.acquire() # wait for the sleeper to wake
# rezero _wait_semaphore in case a timeout just happened # rezero _wait_semaphore in case a timeout just happened
self._wait_semaphore.acquire(False) self._wait_semaphore.acquire(False)
def notify_all(self): def notify_all(self):
assert self._lock._semlock._is_mine(), 'lock is not owned' assert self._lock._semlock._is_mine(), 'lock is not owned'
assert not self._wait_semaphore.acquire(False) assert not self._wait_semaphore.acquire(False)
# to take account of timeouts since last notify*() we subtract # to take account of timeouts since last notify*() we subtract
# woken_count from sleeping_count and rezero woken_count # woken_count from sleeping_count and rezero woken_count
while self._woken_count.acquire(False): while self._woken_count.acquire(False):
res = self._sleeping_count.acquire(False) res = self._sleeping_count.acquire(False)
assert res assert res
sleepers = 0 sleepers = 0
while self._sleeping_count.acquire(False): while self._sleeping_count.acquire(False):
self._wait_semaphore.release() # wake up one sleeper self._wait_semaphore.release() # wake up one sleeper
sleepers += 1 sleepers += 1
if sleepers: if sleepers:
for i in xrange(sleepers): for i in xrange(sleepers):
self._woken_count.acquire() # wait for a sleeper to wake self._woken_count.acquire() # wait for a sleeper to wake
# rezero wait_semaphore in case some timeouts just happened # rezero wait_semaphore in case some timeouts just happened
while self._wait_semaphore.acquire(False): while self._wait_semaphore.acquire(False):
pass pass
# #
# Event # Event
# #
class Event(object): class Event(object):
def __init__(self): def __init__(self):
self._cond = Condition(Lock()) self._cond = Condition(Lock())
self._flag = Semaphore(0) self._flag = Semaphore(0)
def is_set(self): def is_set(self):
self._cond.acquire() self._cond.acquire()
try: try:
if self._flag.acquire(False): if self._flag.acquire(False):
self._flag.release() self._flag.release()
return True return True
return False return False
finally: finally:
self._cond.release() self._cond.release()
def set(self): def set(self):
self._cond.acquire() self._cond.acquire()
try: try:
self._flag.acquire(False) self._flag.acquire(False)
self._flag.release() self._flag.release()
self._cond.notify_all() self._cond.notify_all()
finally: finally:
self._cond.release() self._cond.release()
def clear(self): def clear(self):
self._cond.acquire() self._cond.acquire()
try: try:
self._flag.acquire(False) self._flag.acquire(False)
finally: finally:
self._cond.release() self._cond.release()
def wait(self, timeout=None): def wait(self, timeout=None):
self._cond.acquire() self._cond.acquire()
try: try:
if self._flag.acquire(False): if self._flag.acquire(False):
self._flag.release() self._flag.release()
else: else:
self._cond.wait(timeout) self._cond.wait(timeout)
finally: finally:
self._cond.release() self._cond.release()

View file

@ -1,336 +1,336 @@
# #
# Module providing various facilities to other parts of the package # Module providing various facilities to other parts of the package
# #
# multiprocessing/util.py # multiprocessing/util.py
# #
# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
# #
import itertools import itertools
import weakref import weakref
import copy_reg import copy_reg
import atexit import atexit
import threading # we want threading to install it's import threading # we want threading to install it's
# cleanup function before multiprocessing does # cleanup function before multiprocessing does
from multiprocessing.process import current_process, active_children from multiprocessing.process import current_process, active_children
__all__ = [ __all__ = [
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal' 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal'
] ]
# #
# Logging # Logging
# #
NOTSET = 0 NOTSET = 0
SUBDEBUG = 5 SUBDEBUG = 5
DEBUG = 10 DEBUG = 10
INFO = 20 INFO = 20
SUBWARNING = 25 SUBWARNING = 25
LOGGER_NAME = 'multiprocessing' LOGGER_NAME = 'multiprocessing'
DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
_logger = None _logger = None
_log_to_stderr = False _log_to_stderr = False
def sub_debug(msg, *args): def sub_debug(msg, *args):
if _logger: if _logger:
_logger.log(SUBDEBUG, msg, *args) _logger.log(SUBDEBUG, msg, *args)
def debug(msg, *args): def debug(msg, *args):
if _logger: if _logger:
_logger.log(DEBUG, msg, *args) _logger.log(DEBUG, msg, *args)
def info(msg, *args): def info(msg, *args):
if _logger: if _logger:
_logger.log(INFO, msg, *args) _logger.log(INFO, msg, *args)
def sub_warning(msg, *args): def sub_warning(msg, *args):
if _logger: if _logger:
_logger.log(SUBWARNING, msg, *args) _logger.log(SUBWARNING, msg, *args)
def get_logger(): def get_logger():
''' '''
Returns logger used by multiprocessing Returns logger used by multiprocessing
''' '''
global _logger global _logger
if not _logger: if not _logger:
import logging, atexit import logging, atexit
# XXX multiprocessing should cleanup before logging # XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'): if hasattr(atexit, 'unregister'):
atexit.unregister(_exit_function) atexit.unregister(_exit_function)
atexit.register(_exit_function) atexit.register(_exit_function)
else: else:
atexit._exithandlers.remove((_exit_function, (), {})) atexit._exithandlers.remove((_exit_function, (), {}))
atexit._exithandlers.append((_exit_function, (), {})) atexit._exithandlers.append((_exit_function, (), {}))
_check_logger_class() _check_logger_class()
_logger = logging.getLogger(LOGGER_NAME) _logger = logging.getLogger(LOGGER_NAME)
return _logger return _logger
def _check_logger_class(): def _check_logger_class():
''' '''
Make sure process name is recorded when loggers are used Make sure process name is recorded when loggers are used
''' '''
# XXX This function is unnecessary once logging is patched # XXX This function is unnecessary once logging is patched
import logging import logging
if hasattr(logging, 'multiprocessing'): if hasattr(logging, 'multiprocessing'):
return return
logging._acquireLock() logging._acquireLock()
try: try:
OldLoggerClass = logging.getLoggerClass() OldLoggerClass = logging.getLoggerClass()
if not getattr(OldLoggerClass, '_process_aware', False): if not getattr(OldLoggerClass, '_process_aware', False):
class ProcessAwareLogger(OldLoggerClass): class ProcessAwareLogger(OldLoggerClass):
_process_aware = True _process_aware = True
def makeRecord(self, *args, **kwds): def makeRecord(self, *args, **kwds):
record = OldLoggerClass.makeRecord(self, *args, **kwds) record = OldLoggerClass.makeRecord(self, *args, **kwds)
record.processName = current_process()._name record.processName = current_process()._name
return record return record
logging.setLoggerClass(ProcessAwareLogger) logging.setLoggerClass(ProcessAwareLogger)
finally: finally:
logging._releaseLock() logging._releaseLock()
def log_to_stderr(level=None): def log_to_stderr(level=None):
''' '''
Turn on logging and add a handler which prints to stderr Turn on logging and add a handler which prints to stderr
''' '''
global _log_to_stderr global _log_to_stderr
import logging import logging
logger = get_logger() logger = get_logger()
formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setFormatter(formatter) handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
if level is not None: if level is not None:
logger.setLevel(level) logger.setLevel(level)
_log_to_stderr = True _log_to_stderr = True
# #
# Function returning a temp directory which will be removed on exit # Function returning a temp directory which will be removed on exit
# #
def get_temp_dir(): def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up # get name of a temp directory which will be automatically cleaned up
if current_process()._tempdir is None: if current_process()._tempdir is None:
import shutil, tempfile import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-') tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir) info('created temp directory %s', tempdir)
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
current_process()._tempdir = tempdir current_process()._tempdir = tempdir
return current_process()._tempdir return current_process()._tempdir
# #
# Support for reinitialization of objects when bootstrapping a child process # Support for reinitialization of objects when bootstrapping a child process
# #
_afterfork_registry = weakref.WeakValueDictionary() _afterfork_registry = weakref.WeakValueDictionary()
_afterfork_counter = itertools.count() _afterfork_counter = itertools.count()
def _run_after_forkers(): def _run_after_forkers():
items = list(_afterfork_registry.items()) items = list(_afterfork_registry.items())
items.sort() items.sort()
for (index, ident, func), obj in items: for (index, ident, func), obj in items:
try: try:
func(obj) func(obj)
except Exception, e: except Exception, e:
info('after forker raised exception %s', e) info('after forker raised exception %s', e)
def register_after_fork(obj, func): def register_after_fork(obj, func):
_afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
# #
# Finalization using weakrefs # Finalization using weakrefs
# #
_finalizer_registry = {} _finalizer_registry = {}
_finalizer_counter = itertools.count() _finalizer_counter = itertools.count()
class Finalize(object): class Finalize(object):
''' '''
Class which supports object finalization using weakrefs Class which supports object finalization using weakrefs
''' '''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
assert exitpriority is None or type(exitpriority) is int assert exitpriority is None or type(exitpriority) is int
if obj is not None: if obj is not None:
self._weakref = weakref.ref(obj, self) self._weakref = weakref.ref(obj, self)
else: else:
assert exitpriority is not None assert exitpriority is not None
self._callback = callback self._callback = callback
self._args = args self._args = args
self._kwargs = kwargs or {} self._kwargs = kwargs or {}
self._key = (exitpriority, _finalizer_counter.next()) self._key = (exitpriority, _finalizer_counter.next())
_finalizer_registry[self._key] = self _finalizer_registry[self._key] = self
def __call__(self, wr=None): def __call__(self, wr=None):
''' '''
Run the callback unless it has already been called or cancelled Run the callback unless it has already been called or cancelled
''' '''
try: try:
del _finalizer_registry[self._key] del _finalizer_registry[self._key]
except KeyError: except KeyError:
sub_debug('finalizer no longer registered') sub_debug('finalizer no longer registered')
else: else:
sub_debug('finalizer calling %s with args %s and kwargs %s', sub_debug('finalizer calling %s with args %s and kwargs %s',
self._callback, self._args, self._kwargs) self._callback, self._args, self._kwargs)
res = self._callback(*self._args, **self._kwargs) res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \ self._weakref = self._callback = self._args = \
self._kwargs = self._key = None self._kwargs = self._key = None
return res return res
def cancel(self): def cancel(self):
''' '''
Cancel finalization of the object Cancel finalization of the object
''' '''
try: try:
del _finalizer_registry[self._key] del _finalizer_registry[self._key]
except KeyError: except KeyError:
pass pass
else: else:
self._weakref = self._callback = self._args = \ self._weakref = self._callback = self._args = \
self._kwargs = self._key = None self._kwargs = self._key = None
def still_active(self): def still_active(self):
''' '''
Return whether this finalizer is still waiting to invoke callback Return whether this finalizer is still waiting to invoke callback
''' '''
return self._key in _finalizer_registry return self._key in _finalizer_registry
def __repr__(self): def __repr__(self):
try: try:
obj = self._weakref() obj = self._weakref()
except (AttributeError, TypeError): except (AttributeError, TypeError):
obj = None obj = None
if obj is None: if obj is None:
return '<Finalize object, dead>' return '<Finalize object, dead>'
x = '<Finalize object, callback=%s' % \ x = '<Finalize object, callback=%s' % \
getattr(self._callback, '__name__', self._callback) getattr(self._callback, '__name__', self._callback)
if self._args: if self._args:
x += ', args=' + str(self._args) x += ', args=' + str(self._args)
if self._kwargs: if self._kwargs:
x += ', kwargs=' + str(self._kwargs) x += ', kwargs=' + str(self._kwargs)
if self._key[0] is not None: if self._key[0] is not None:
x += ', exitprority=' + str(self._key[0]) x += ', exitprority=' + str(self._key[0])
return x + '>' return x + '>'
def _run_finalizers(minpriority=None): def _run_finalizers(minpriority=None):
''' '''
Run all finalizers whose exit priority is not None and at least minpriority Run all finalizers whose exit priority is not None and at least minpriority
Finalizers with highest priority are called first; finalizers with Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation. the same priority will be called in reverse order of creation.
''' '''
if minpriority is None: if minpriority is None:
f = lambda p : p[0][0] is not None f = lambda p : p[0][0] is not None
else: else:
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
items = [x for x in _finalizer_registry.items() if f(x)] items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True) items.sort(reverse=True)
for key, finalizer in items: for key, finalizer in items:
sub_debug('calling %s', finalizer) sub_debug('calling %s', finalizer)
try: try:
finalizer() finalizer()
except Exception: except Exception:
import traceback import traceback
traceback.print_exc() traceback.print_exc()
if minpriority is None: if minpriority is None:
_finalizer_registry.clear() _finalizer_registry.clear()
# #
# Clean up on exit # Clean up on exit
# #
def is_exiting(): def is_exiting():
''' '''
Returns true if the process is shutting down Returns true if the process is shutting down
''' '''
return _exiting or _exiting is None return _exiting or _exiting is None
_exiting = False _exiting = False
def _exit_function(): def _exit_function():
global _exiting global _exiting
info('process shutting down') info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0') debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0) _run_finalizers(0)
for p in active_children(): for p in active_children():
if p._daemonic: if p._daemonic:
info('calling terminate() for daemon %s', p.get_name()) info('calling terminate() for daemon %s', p.get_name())
p._popen.terminate() p._popen.terminate()
for p in active_children(): for p in active_children():
info('calling join() for process %s', p.get_name()) info('calling join() for process %s', p.get_name())
p.join() p.join()
debug('running the remaining "atexit" finalizers') debug('running the remaining "atexit" finalizers')
_run_finalizers() _run_finalizers()
atexit.register(_exit_function) atexit.register(_exit_function)
# #
# Some fork aware types # Some fork aware types
# #
class ForkAwareThreadLock(object): class ForkAwareThreadLock(object):
def __init__(self): def __init__(self):
self._lock = threading.Lock() self._lock = threading.Lock()
self.acquire = self._lock.acquire self.acquire = self._lock.acquire
self.release = self._lock.release self.release = self._lock.release
register_after_fork(self, ForkAwareThreadLock.__init__) register_after_fork(self, ForkAwareThreadLock.__init__)
class ForkAwareLocal(threading.local): class ForkAwareLocal(threading.local):
def __init__(self): def __init__(self):
register_after_fork(self, lambda obj : obj.__dict__.clear()) register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self): def __reduce__(self):
return type(self), () return type(self), ()
# #
# Try making some callable types picklable # Try making some callable types picklable
# #
def _reduce_method(m): def _reduce_method(m):
if m.im_self is None: if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name) return getattr, (m.im_class, m.im_func.func_name)
else: else:
return getattr, (m.im_self, m.im_func.func_name) return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(type(Finalize.__init__), _reduce_method) copy_reg.pickle(type(Finalize.__init__), _reduce_method)
def _reduce_method_descriptor(m): def _reduce_method_descriptor(m):
return getattr, (m.__objclass__, m.__name__) return getattr, (m.__objclass__, m.__name__)
copy_reg.pickle(type(list.append), _reduce_method_descriptor) copy_reg.pickle(type(list.append), _reduce_method_descriptor)
copy_reg.pickle(type(int.__add__), _reduce_method_descriptor) copy_reg.pickle(type(int.__add__), _reduce_method_descriptor)
def _reduce_builtin_function_or_method(m): def _reduce_builtin_function_or_method(m):
return getattr, (m.__self__, m.__name__) return getattr, (m.__self__, m.__name__)
copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method) copy_reg.pickle(type(list().append), _reduce_builtin_function_or_method)
copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method) copy_reg.pickle(type(int().__add__), _reduce_builtin_function_or_method)
try: try:
from functools import partial from functools import partial
except ImportError: except ImportError:
pass pass
else: else:
def _reduce_partial(p): def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {}) return _rebuild_partial, (p.func, p.args, p.keywords or {})
def _rebuild_partial(func, args, keywords): def _rebuild_partial(func, args, keywords):
return partial(func, *args, **keywords) return partial(func, *args, **keywords)
copy_reg.pickle(partial, _reduce_partial) copy_reg.pickle(partial, _reduce_partial)