add the multiprocessing package to fulfill PEP 371

This commit is contained in:
Benjamin Peterson 2008-06-11 02:40:25 +00:00
parent d5299866f9
commit 190d56e009
32 changed files with 12500 additions and 0 deletions

View file

@ -0,0 +1,235 @@
#
# Simple benchmarks for the multiprocessing package
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()

View file

@ -0,0 +1,362 @@
#
# Module to allow spawning of processes on foreign host
#
# Depends on `multiprocessing` package -- tested with `processing-0.60`
#
__all__ = ['Cluster', 'Host', 'get_logger', 'current_process']
#
# Imports
#
import sys
import os
import tarfile
import shutil
import subprocess
import logging
import itertools
import Queue
try:
import cPickle as pickle
except ImportError:
import pickle
from multiprocessing import Process, current_process, cpu_count
from multiprocessing import util, managers, connection, forking, pool
#
# Logging
#
def get_logger():
return _logger
_logger = logging.getLogger('distributing')
_logger.propogate = 0
util.fix_up_logger(_logger)
_formatter = logging.Formatter(util.DEFAULT_LOGGING_FORMAT)
_handler = logging.StreamHandler()
_handler.setFormatter(_formatter)
_logger.addHandler(_handler)
info = _logger.info
debug = _logger.debug
#
# Get number of cpus
#
try:
slot_count = cpu_count()
except NotImplemented:
slot_count = 1
#
# Manager type which spawns subprocesses
#
class HostManager(managers.SyncManager):
'''
Manager type used for spawning processes on a (presumably) foreign host
'''
def __init__(self, address, authkey):
managers.SyncManager.__init__(self, address, authkey)
self._name = 'Host-unknown'
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
if hasattr(sys.modules['__main__'], '__file__'):
main_path = os.path.basename(sys.modules['__main__'].__file__)
else:
main_path = None
data = pickle.dumps((target, args, kwargs))
p = self._RemoteProcess(data, main_path)
if name is None:
temp = self._name.split('Host-')[-1] + '/Process-%s'
name = temp % ':'.join(map(str, p.get_identity()))
p.set_name(name)
return p
@classmethod
def from_address(cls, address, authkey):
manager = cls(address, authkey)
managers.transact(address, authkey, 'dummy')
manager._state.value = managers.State.STARTED
manager._name = 'Host-%s:%s' % manager.address
manager.shutdown = util.Finalize(
manager, HostManager._finalize_host,
args=(manager._address, manager._authkey, manager._name),
exitpriority=-10
)
return manager
@staticmethod
def _finalize_host(address, authkey, name):
managers.transact(address, authkey, 'shutdown')
def __repr__(self):
return '<Host(%s)>' % self._name
#
# Process subclass representing a process on (possibly) a remote machine
#
class RemoteProcess(Process):
'''
Represents a process started on a remote host
'''
def __init__(self, data, main_path):
assert not main_path or os.path.basename(main_path) == main_path
Process.__init__(self)
self._data = data
self._main_path = main_path
def _bootstrap(self):
forking.prepare({'main_path': self._main_path})
self._target, self._args, self._kwargs = pickle.loads(self._data)
return Process._bootstrap(self)
def get_identity(self):
return self._identity
HostManager.register('_RemoteProcess', RemoteProcess)
#
# A Pool class that uses a cluster
#
class DistributedPool(pool.Pool):
def __init__(self, cluster, processes=None, initializer=None, initargs=()):
self._cluster = cluster
self.Process = cluster.Process
pool.Pool.__init__(self, processes or len(cluster),
initializer, initargs)
def _setup_queues(self):
self._inqueue = self._cluster._SettableQueue()
self._outqueue = self._cluster._SettableQueue()
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
inqueue.set_contents([None] * size)
#
# Manager type which starts host managers on other machines
#
def LocalProcess(**kwds):
p = Process(**kwds)
p.set_name('localhost/' + p.get_name())
return p
class Cluster(managers.SyncManager):
'''
Represents collection of slots running on various hosts.
`Cluster` is a subclass of `SyncManager` so it allows creation of
various types of shared objects.
'''
def __init__(self, hostlist, modules):
managers.SyncManager.__init__(self, address=('localhost', 0))
self._hostlist = hostlist
self._modules = modules
if __name__ not in modules:
modules.append(__name__)
files = [sys.modules[name].__file__ for name in modules]
for i, file in enumerate(files):
if file.endswith('.pyc') or file.endswith('.pyo'):
files[i] = file[:-4] + '.py'
self._files = [os.path.abspath(file) for file in files]
def start(self):
managers.SyncManager.start(self)
l = connection.Listener(family='AF_INET', authkey=self._authkey)
for i, host in enumerate(self._hostlist):
host._start_manager(i, self._authkey, l.address, self._files)
for host in self._hostlist:
if host.hostname != 'localhost':
conn = l.accept()
i, address, cpus = conn.recv()
conn.close()
other_host = self._hostlist[i]
other_host.manager = HostManager.from_address(address,
self._authkey)
other_host.slots = other_host.slots or cpus
other_host.Process = other_host.manager.Process
else:
host.slots = host.slots or slot_count
host.Process = LocalProcess
self._slotlist = [
Slot(host) for host in self._hostlist for i in range(host.slots)
]
self._slot_iterator = itertools.cycle(self._slotlist)
self._base_shutdown = self.shutdown
del self.shutdown
def shutdown(self):
for host in self._hostlist:
if host.hostname != 'localhost':
host.manager.shutdown()
self._base_shutdown()
def Process(self, group=None, target=None, name=None, args=(), kwargs={}):
slot = self._slot_iterator.next()
return slot.Process(
group=group, target=target, name=name, args=args, kwargs=kwargs
)
def Pool(self, processes=None, initializer=None, initargs=()):
return DistributedPool(self, processes, initializer, initargs)
def __getitem__(self, i):
return self._slotlist[i]
def __len__(self):
return len(self._slotlist)
def __iter__(self):
return iter(self._slotlist)
#
# Queue subclass used by distributed pool
#
class SettableQueue(Queue.Queue):
def empty(self):
return not self.queue
def full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize
def set_contents(self, contents):
# length of contents must be at least as large as the number of
# threads which have potentially called get()
self.not_empty.acquire()
try:
self.queue.clear()
self.queue.extend(contents)
self.not_empty.notifyAll()
finally:
self.not_empty.release()
Cluster.register('_SettableQueue', SettableQueue)
#
# Class representing a notional cpu in the cluster
#
class Slot(object):
def __init__(self, host):
self.host = host
self.Process = host.Process
#
# Host
#
class Host(object):
'''
Represents a host to use as a node in a cluster.
`hostname` gives the name of the host. If hostname is not
"localhost" then ssh is used to log in to the host. To log in as
a different user use a host name of the form
"username@somewhere.org"
`slots` is used to specify the number of slots for processes on
the host. This affects how often processes will be allocated to
this host. Normally this should be equal to the number of cpus on
that host.
'''
def __init__(self, hostname, slots=None):
self.hostname = hostname
self.slots = slots
def _start_manager(self, index, authkey, address, files):
if self.hostname != 'localhost':
tempdir = copy_to_remote_temporary_directory(self.hostname, files)
debug('startup files copied to %s:%s', self.hostname, tempdir)
p = subprocess.Popen(
['ssh', self.hostname, 'python', '-c',
'"import os; os.chdir(%r); '
'from distributing import main; main()"' % tempdir],
stdin=subprocess.PIPE
)
data = dict(
name='BoostrappingHost', index=index,
dist_log_level=_logger.getEffectiveLevel(),
dir=tempdir, authkey=str(authkey), parent_address=address
)
pickle.dump(data, p.stdin, pickle.HIGHEST_PROTOCOL)
p.stdin.close()
#
# Copy files to remote directory, returning name of directory
#
unzip_code = '''"
import tempfile, os, sys, tarfile
tempdir = tempfile.mkdtemp(prefix='distrib-')
os.chdir(tempdir)
tf = tarfile.open(fileobj=sys.stdin, mode='r|gz')
for ti in tf:
tf.extract(ti)
print tempdir
"'''
def copy_to_remote_temporary_directory(host, files):
p = subprocess.Popen(
['ssh', host, 'python', '-c', unzip_code],
stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
tf = tarfile.open(fileobj=p.stdin, mode='w|gz')
for name in files:
tf.add(name, os.path.basename(name))
tf.close()
p.stdin.close()
return p.stdout.read().rstrip()
#
# Code which runs a host manager
#
def main():
# get data from parent over stdin
data = pickle.load(sys.stdin)
sys.stdin.close()
# set some stuff
_logger.setLevel(data['dist_log_level'])
forking.prepare(data)
# create server for a `HostManager` object
server = managers.Server(HostManager._registry, ('', 0), data['authkey'])
current_process()._server = server
# report server address and number of cpus back to parent
conn = connection.Client(data['parent_address'], authkey=data['authkey'])
conn.send((data['index'], server.address, slot_count))
conn.close()
# set name etc
current_process().set_name('Host-%s:%s' % server.address)
util._run_after_forkers()
# register a cleanup function
def cleanup(directory):
debug('removing directory %s', directory)
shutil.rmtree(directory)
debug('shutting down host manager')
util.Finalize(None, cleanup, args=[data['dir']], exitpriority=0)
# start host manager
debug('remote host manager starting in %s', data['dir'])
server.serve_forever()

View file

@ -0,0 +1,98 @@
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()

311
Doc/includes/mp_pool.py Normal file
View file

@ -0,0 +1,311 @@
#
# A test of `multiprocessing.Pool` class
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().get_name(),
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError, 'expected ZeroDivisionError'
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError, 'expected ZeroDivisionError'
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError, 'expected ZeroDivisionError'
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError, 'expected ZeroDivisionError'
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()

View file

@ -0,0 +1,273 @@
#
# A test file for the `multiprocessing` package
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.get_exitcode() == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError, 'there should be no positive refcounts left'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit, 2
test(namespace)

View file

@ -0,0 +1,67 @@
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().get_name(),format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()

View file

@ -0,0 +1,87 @@
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().get_name(), func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()