Set eol-style correctly for mp_distributing.py.

This commit is contained in:
Georg Brandl 2009-01-03 19:10:12 +00:00
parent b7e835b820
commit 69bd8d2189

View file

@ -1,364 +1,364 @@
# #
# Module to allow spawning of processes on foreign host # Module to allow spawning of processes on foreign host
# #
# Depends on `multiprocessing` package -- tested with `processing-0.60` # Depends on `multiprocessing` package -- tested with `processing-0.60`
# #
# Copyright (c) 2006-2008, R Oudkerk # Copyright (c) 2006-2008, R Oudkerk
# All rights reserved. # All rights reserved.
# #
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process'] __all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
# #
# Imports # Imports
# #
import sys import sys
import os import os
import tarfile import tarfile
import shutil import shutil
import subprocess import subprocess
import logging import logging
import itertools import itertools
import Queue import Queue
try: try:
import cPickle as pickle import cPickle as pickle
except ImportError: except ImportError:
import pickle import pickle
from multiprocessing import Process, current_process, cpu_count from multiprocessing import Process, current_process, cpu_count
from multiprocessing import util, managers, connection, forking, pool from multiprocessing import util, managers, connection, forking, pool
# #
# Logging # Logging
# #
def get_logger(): def get_logger():
return _logger return _logger
_logger = logging.getLogger('distributing') _logger = logging.getLogger('distributing')
_logger.propogate = 0 _logger.propogate = 0
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT) _formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
_handler = logging.StreamHandler() _handler = logging.StreamHandler()
_handler.setFormatter(_formatter) _handler.setFormatter(_formatter)
_logger.addHandler(_handler) _logger.addHandler(_handler)
info = _logger.info info = _logger.info
debug = _logger.debug debug = _logger.debug
# #
# Get number of cpus # Get number of cpus
# #
try: try:
slot_count = cpu_count() slot_count = cpu_count()
except NotImplemented: except NotImplemented:
slot_count = 1 slot_count = 1
# #
# Manager type which spawns subprocesses # Manager type which spawns subprocesses
# #
class HostManager(managers.SyncManager): class HostManager(managers.SyncManager):
''' '''
Manager type used for spawning processes on a (presumably) foreign host Manager type used for spawning processes on a (presumably) foreign host
''' '''
def __init__(self, address, authkey): def __init__(self, address, authkey):
managers.SyncManager.__init__(self, address, authkey) managers.SyncManager.__init__(self, address, authkey)
self._name = 'Host-unknown' self._name = 'Host-unknown'
def Process(self, group=None, target=None, name=None, args=(), kwargs={}): def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
if hasattr(sys.modules['__main__'], '__file__'): if hasattr(sys.modules['__main__'], '__file__'):
main_path = os.path.basename(sys.modules['__main__'].__file__) main_path = os.path.basename(sys.modules['__main__'].__file__)
else: else:
main_path = None main_path = None
data = pickle.dumps((target, args, kwargs)) data = pickle.dumps((target, args, kwargs))
p = self._RemoteProcess(data, main_path) p = self._RemoteProcess(data, main_path)
if name is None: if name is None:
temp = self._name.split('Host-')[-1] + '/Process-%s' temp = self._name.split('Host-')[-1] + '/Process-%s'
name = temp % ':'.join(map(str, p.get_identity())) name = temp % ':'.join(map(str, p.get_identity()))
p.set_name(name) p.set_name(name)
return p return p
@classmethod @classmethod
def from_address(cls, address, authkey): def from_address(cls, address, authkey):
manager = cls(address, authkey) manager = cls(address, authkey)
managers.transact(address, authkey, 'dummy') managers.transact(address, authkey, 'dummy')
manager._state.value = managers.State.STARTED manager._state.value = managers.State.STARTED
manager._name = 'Host-%s:%s' % manager.address manager._name = 'Host-%s:%s' % manager.address
manager.shutdown = util.Finalize( manager.shutdown = util.Finalize(
manager, HostManager._finalize_host, manager, HostManager._finalize_host,
args=(manager._address, manager._authkey, manager._name), args=(manager._address, manager._authkey, manager._name),
exitpriority=-10 exitpriority=-10
) )
return manager return manager
@staticmethod @staticmethod
def _finalize_host(address, authkey, name): def _finalize_host(address, authkey, name):
managers.transact(address, authkey, 'shutdown') managers.transact(address, authkey, 'shutdown')
def __repr__(self): def __repr__(self):
return '<Host(%s)>' % self._name return '<Host(%s)>' % self._name
# #
# Process subclass representing a process on (possibly) a remote machine # Process subclass representing a process on (possibly) a remote machine
# #
class RemoteProcess(Process): class RemoteProcess(Process):
''' '''
Represents a process started on a remote host Represents a process started on a remote host
''' '''
def __init__(self, data, main_path): def __init__(self, data, main_path):
assert not main_path or os.path.basename(main_path) == main_path assert not main_path or os.path.basename(main_path) == main_path
Process.__init__(self) Process.__init__(self)
self._data = data self._data = data
self._main_path = main_path self._main_path = main_path
def _bootstrap(self): def _bootstrap(self):
forking.prepare({'main_path': self._main_path}) forking.prepare({'main_path': self._main_path})
self._target, self._args, self._kwargs = pickle.loads(self._data) self._target, self._args, self._kwargs = pickle.loads(self._data)
return Process._bootstrap(self) return Process._bootstrap(self)
def get_identity(self): def get_identity(self):
return self._identity return self._identity
HostManager.register('_RemoteProcess', RemoteProcess) HostManager.register('_RemoteProcess', RemoteProcess)
# #
# A Pool class that uses a cluster # A Pool class that uses a cluster
# #
class DistributedPool(pool.Pool): class DistributedPool(pool.Pool):
def __init__(self, cluster, processes=None, initializer=None, initargs=()): def __init__(self, cluster, processes=None, initializer=None, initargs=()):
self._cluster = cluster self._cluster = cluster
self.Process = cluster.Process self.Process = cluster.Process
pool.Pool.__init__(self, processes or len(cluster), pool.Pool.__init__(self, processes or len(cluster),
initializer, initargs) initializer, initargs)
def _setup_queues(self): def _setup_queues(self):
self._inqueue = self._cluster._SettableQueue() self._inqueue = self._cluster._SettableQueue()
self._outqueue = self._cluster._SettableQueue() self._outqueue = self._cluster._SettableQueue()
self._quick_put = self._inqueue.put self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get self._quick_get = self._outqueue.get
@staticmethod @staticmethod
def _help_stuff_finish(inqueue, task_handler, size): def _help_stuff_finish(inqueue, task_handler, size):
inqueue.set_contents([None] * size) inqueue.set_contents([None] * size)
# #
# Manager type which starts host managers on other machines # Manager type which starts host managers on other machines
# #
def LocalProcess(**kwds): def LocalProcess(**kwds):
p = Process(**kwds) p = Process(**kwds)
p.set_name('localhost/' + p.name) p.set_name('localhost/' + p.name)
return p return p
class Cluster(managers.SyncManager): class Cluster(managers.SyncManager):
''' '''
Represents collection of slots running on various hosts. Represents collection of slots running on various hosts.
`Cluster` is a subclass of `SyncManager` so it allows creation of `Cluster` is a subclass of `SyncManager` so it allows creation of
various types of shared objects. various types of shared objects.
''' '''
def __init__(self, hostlist, modules): def __init__(self, hostlist, modules):
managers.SyncManager.__init__(self, address=('localhost', 0)) managers.SyncManager.__init__(self, address=('localhost', 0))
self._hostlist = hostlist self._hostlist = hostlist
self._modules = modules self._modules = modules
if __name__ not in modules: if __name__ not in modules:
modules.append(__name__) modules.append(__name__)
files = [sys.modules[name].__file__ for name in modules] files = [sys.modules[name].__file__ for name in modules]
for i, file in enumerate(files): for i, file in enumerate(files):
if file.endswith('.pyc') or file.endswith('.pyo'): if file.endswith('.pyc') or file.endswith('.pyo'):
files[i] = file[:-4] + '.py' files[i] = file[:-4] + '.py'
self._files = [os.path.abspath(file) for file in files] self._files = [os.path.abspath(file) for file in files]
def start(self): def start(self):
managers.SyncManager.start(self) managers.SyncManager.start(self)
l = connection.Listener(family='AF_INET', authkey=self._authkey) l = connection.Listener(family='AF_INET', authkey=self._authkey)
for i, host in enumerate(self._hostlist): for i, host in enumerate(self._hostlist):
host._start_manager(i, self._authkey, l.address, self._files) host._start_manager(i, self._authkey, l.address, self._files)
for host in self._hostlist: for host in self._hostlist:
if host.hostname != 'localhost': if host.hostname != 'localhost':
conn = l.accept() conn = l.accept()
i, address, cpus = conn.recv() i, address, cpus = conn.recv()
conn.close() conn.close()
other_host = self._hostlist[i] other_host = self._hostlist[i]
other_host.manager = HostManager.from_address(address, other_host.manager = HostManager.from_address(address,
self._authkey) self._authkey)
other_host.slots = other_host.slots or cpus other_host.slots = other_host.slots or cpus
other_host.Process = other_host.manager.Process other_host.Process = other_host.manager.Process
else: else:
host.slots = host.slots or slot_count host.slots = host.slots or slot_count
host.Process = LocalProcess host.Process = LocalProcess
self._slotlist = [ self._slotlist = [
Slot(host) for host in self._hostlist for i in range(host.slots) Slot(host) for host in self._hostlist for i in range(host.slots)
] ]
self._slot_iterator = itertools.cycle(self._slotlist) self._slot_iterator = itertools.cycle(self._slotlist)
self._base_shutdown = self.shutdown self._base_shutdown = self.shutdown
del self.shutdown del self.shutdown
def shutdown(self): def shutdown(self):
for host in self._hostlist: for host in self._hostlist:
if host.hostname != 'localhost': if host.hostname != 'localhost':
host.manager.shutdown() host.manager.shutdown()
self._base_shutdown() self._base_shutdown()
def Process(self, group=None, target=None, name=None, args=(), kwargs={}): def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
slot = self._slot_iterator.next() slot = self._slot_iterator.next()
return slot.Process( return slot.Process(
group=group, target=target, name=name, args=args, kwargs=kwargs group=group, target=target, name=name, args=args, kwargs=kwargs
) )
def Pool(self, processes=None, initializer=None, initargs=()): def Pool(self, processes=None, initializer=None, initargs=()):
return DistributedPool(self, processes, initializer, initargs) return DistributedPool(self, processes, initializer, initargs)
def __getitem__(self, i): def __getitem__(self, i):
return self._slotlist[i] return self._slotlist[i]
def __len__(self): def __len__(self):
return len(self._slotlist) return len(self._slotlist)
def __iter__(self): def __iter__(self):
return iter(self._slotlist) return iter(self._slotlist)
# #
# Queue subclass used by distributed pool # Queue subclass used by distributed pool
# #
class SettableQueue(Queue.Queue): class SettableQueue(Queue.Queue):
def empty(self): def empty(self):
return not self.queue return not self.queue
def full(self): def full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize return self.maxsize > 0 and len(self.queue) == self.maxsize
def set_contents(self, contents): def set_contents(self, contents):
# length of contents must be at least as large as the number of # length of contents must be at least as large as the number of
# threads which have potentially called get() # threads which have potentially called get()
self.not_empty.acquire() self.not_empty.acquire()
try: try:
self.queue.clear() self.queue.clear()
self.queue.extend(contents) self.queue.extend(contents)
self.not_empty.notifyAll() self.not_empty.notifyAll()
finally: finally:
self.not_empty.release() self.not_empty.release()
Cluster.register('_SettableQueue', SettableQueue) Cluster.register('_SettableQueue', SettableQueue)
# #
# Class representing a notional cpu in the cluster # Class representing a notional cpu in the cluster
# #
class Slot(object): class Slot(object):
def __init__(self, host): def __init__(self, host):
self.host = host self.host = host
self.Process = host.Process self.Process = host.Process
# #
# Host # Host
# #
class Host(object): class Host(object):
''' '''
Represents a host to use as a node in a cluster. Represents a host to use as a node in a cluster.
`hostname` gives the name of the host. If hostname is not `hostname` gives the name of the host. If hostname is not
"localhost" then ssh is used to log in to the host. To log in as "localhost" then ssh is used to log in to the host. To log in as
a different user use a host name of the form a different user use a host name of the form
"username@somewhere.org" "username@somewhere.org"
`slots` is used to specify the number of slots for processes on `slots` is used to specify the number of slots for processes on
the host. This affects how often processes will be allocated to the host. This affects how often processes will be allocated to
this host. Normally this should be equal to the number of cpus on this host. Normally this should be equal to the number of cpus on
that host. that host.
''' '''
def __init__(self, hostname, slots=None): def __init__(self, hostname, slots=None):
self.hostname = hostname self.hostname = hostname
self.slots = slots self.slots = slots
def _start_manager(self, index, authkey, address, files): def _start_manager(self, index, authkey, address, files):
if self.hostname != 'localhost': if self.hostname != 'localhost':
tempdir = copy_to_remote_temporary_directory(self.hostname, files) tempdir = copy_to_remote_temporary_directory(self.hostname, files)
debug('startup files copied to %s:%s', self.hostname, tempdir) debug('startup files copied to %s:%s', self.hostname, tempdir)
p = subprocess.Popen( p = subprocess.Popen(
['ssh', self.hostname, 'python', '-c', ['ssh', self.hostname, 'python', '-c',
'"import os; os.chdir(%r); ' '"import os; os.chdir(%r); '
'from distributing import main; main()"' % tempdir], 'from distributing import main; main()"' % tempdir],
stdin=subprocess.PIPE stdin=subprocess.PIPE
) )
data = dict( data = dict(
name='BoostrappingHost', index=index, name='BoostrappingHost', index=index,
dist_log_level=_logger.getEffectiveLevel(), dist_log_level=_logger.getEffectiveLevel(),
dir=tempdir, authkey=str(authkey), parent_address=address dir=tempdir, authkey=str(authkey), parent_address=address
) )
pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL) pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
p.stdin.close() p.stdin.close()
# #
# Copy files to remote directory, returning name of directory # Copy files to remote directory, returning name of directory
# #
unzip_code = '''" unzip_code = '''"
import tempfile, os, sys, tarfile import tempfile, os, sys, tarfile
tempdir = tempfile.mkdtemp(prefix='distrib-') tempdir = tempfile.mkdtemp(prefix='distrib-')
os.chdir(tempdir) os.chdir(tempdir)
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz') tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
for ti in tf: for ti in tf:
tf.extract(ti) tf.extract(ti)
print tempdir print tempdir
"''' "'''
def copy_to_remote_temporary_directory(host, files): def copy_to_remote_temporary_directory(host, files):
p = subprocess.Popen( p = subprocess.Popen(
['ssh', host, 'python', '-c', unzip_code], ['ssh', host, 'python', '-c', unzip_code],
stdout=subprocess.PIPE, stdin=subprocess.PIPE stdout=subprocess.PIPE, stdin=subprocess.PIPE
) )
tf = tarfile.open(fileobj=p.stdin, mode='w|gz') tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
for name in files: for name in files:
tf.add(name, os.path.basename(name)) tf.add(name, os.path.basename(name))
tf.close() tf.close()
p.stdin.close() p.stdin.close()
return p.stdout.read().rstrip() return p.stdout.read().rstrip()
# #
# Code which runs a host manager # Code which runs a host manager
# #
def main(): def main():
# get data from parent over stdin # get data from parent over stdin
data = pickle.load(sys.stdin) data = pickle.load(sys.stdin)
sys.stdin.close() sys.stdin.close()
# set some stuff # set some stuff
_logger.setLevel(data['dist_log_level']) _logger.setLevel(data['dist_log_level'])
forking.prepare(data) forking.prepare(data)
# create server for a `HostManager` object # create server for a `HostManager` object
server = managers.Server(HostManager._registry, ('', 0), data['authkey']) server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
current_process()._server = server current_process()._server = server
# report server address and number of cpus back to parent # report server address and number of cpus back to parent
conn = connection.Client(data['parent_address'], authkey=data['authkey']) conn = connection.Client(data['parent_address'], authkey=data['authkey'])
conn.send((data['index'], server.address, slot_count)) conn.send((data['index'], server.address, slot_count))
conn.close() conn.close()
# set name etc # set name etc
current_process().set_name('Host-%s:%s' % server.address) current_process().set_name('Host-%s:%s' % server.address)
util._run_after_forkers() util._run_after_forkers()
# register a cleanup function # register a cleanup function
def cleanup(directory): def cleanup(directory):
debug('removing directory %s', directory) debug('removing directory %s', directory)
shutil.rmtree(directory) shutil.rmtree(directory)
debug('shutting down host manager') debug('shutting down host manager')
util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0) util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
# start host manager # start host manager
debug('remote host manager starting in %s', data['dir']) debug('remote host manager starting in %s', data['dir'])
server.serve_forever() server.serve_forever()