mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00

mp_benchmarks, mp_newtypes and mp_distribution are still broken but the others are working properly. We should include the examples in our unit test suite ...
311 lines
6.9 KiB
Python
311 lines
6.9 KiB
Python
#
|
|
# A test of `multiprocessing.Pool` class
|
|
#
|
|
|
|
import multiprocessing
|
|
import time
|
|
import random
|
|
import sys
|
|
|
|
#
|
|
# Functions used by test code
|
|
#
|
|
|
|
def calculate(func, args):
|
|
result = func(*args)
|
|
return '%s says that %s%s = %s' % (
|
|
multiprocessing.current_process().name,
|
|
func.__name__, args, result
|
|
)
|
|
|
|
def calculatestar(args):
|
|
return calculate(*args)
|
|
|
|
def mul(a, b):
|
|
time.sleep(0.5*random.random())
|
|
return a * b
|
|
|
|
def plus(a, b):
|
|
time.sleep(0.5*random.random())
|
|
return a + b
|
|
|
|
def f(x):
|
|
return 1.0 / (x-5.0)
|
|
|
|
def pow3(x):
|
|
return x**3
|
|
|
|
def noop(x):
|
|
pass
|
|
|
|
#
|
|
# Test code
|
|
#
|
|
|
|
def test():
|
|
print('cpu_count() = %d\n' % multiprocessing.cpu_count())
|
|
|
|
#
|
|
# Create pool
|
|
#
|
|
|
|
PROCESSES = 4
|
|
print('Creating pool with %d processes\n' % PROCESSES)
|
|
pool = multiprocessing.Pool(PROCESSES)
|
|
print('pool = %s' % pool)
|
|
print()
|
|
|
|
#
|
|
# Tests
|
|
#
|
|
|
|
TASKS = [(mul, (i, 7)) for i in range(10)] + \
|
|
[(plus, (i, 8)) for i in range(10)]
|
|
|
|
results = [pool.apply_async(calculate, t) for t in TASKS]
|
|
imap_it = pool.imap(calculatestar, TASKS)
|
|
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
|
|
|
|
print('Ordered results using pool.apply_async():')
|
|
for r in results:
|
|
print('\t', r.get())
|
|
print()
|
|
|
|
print('Ordered results using pool.imap():')
|
|
for x in imap_it:
|
|
print('\t', x)
|
|
print()
|
|
|
|
print('Unordered results using pool.imap_unordered():')
|
|
for x in imap_unordered_it:
|
|
print('\t', x)
|
|
print()
|
|
|
|
print('Ordered results using pool.map() --- will block till complete:')
|
|
for x in pool.map(calculatestar, TASKS):
|
|
print('\t', x)
|
|
print()
|
|
|
|
#
|
|
# Simple benchmarks
|
|
#
|
|
|
|
N = 100000
|
|
print('def pow3(x): return x**3')
|
|
|
|
t = time.time()
|
|
A = list(map(pow3, range(N)))
|
|
print('\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
|
(N, time.time() - t))
|
|
|
|
t = time.time()
|
|
B = pool.map(pow3, range(N))
|
|
print('\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
|
|
(N, time.time() - t))
|
|
|
|
t = time.time()
|
|
C = list(pool.imap(pow3, range(N), chunksize=N//8))
|
|
print('\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
|
|
' seconds' % (N, N//8, time.time() - t))
|
|
|
|
assert A == B == C, (len(A), len(B), len(C))
|
|
print()
|
|
|
|
L = [None] * 1000000
|
|
print('def noop(x): pass')
|
|
print('L = [None] * 1000000')
|
|
|
|
t = time.time()
|
|
A = list(map(noop, L))
|
|
print('\tmap(noop, L):\n\t\t%s seconds' % \
|
|
(time.time() - t))
|
|
|
|
t = time.time()
|
|
B = pool.map(noop, L)
|
|
print('\tpool.map(noop, L):\n\t\t%s seconds' % \
|
|
(time.time() - t))
|
|
|
|
t = time.time()
|
|
C = list(pool.imap(noop, L, chunksize=len(L)//8))
|
|
print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
|
|
(len(L)//8, time.time() - t))
|
|
|
|
assert A == B == C, (len(A), len(B), len(C))
|
|
print()
|
|
|
|
del A, B, C, L
|
|
|
|
#
|
|
# Test error handling
|
|
#
|
|
|
|
print('Testing error handling:')
|
|
|
|
try:
|
|
print(pool.apply(f, (5,)))
|
|
except ZeroDivisionError:
|
|
print('\tGot ZeroDivisionError as expected from pool.apply()')
|
|
else:
|
|
raise AssertionError('expected ZeroDivisionError')
|
|
|
|
try:
|
|
print(pool.map(f, list(range(10))))
|
|
except ZeroDivisionError:
|
|
print('\tGot ZeroDivisionError as expected from pool.map()')
|
|
else:
|
|
raise AssertionError('expected ZeroDivisionError')
|
|
|
|
try:
|
|
print(list(pool.imap(f, list(range(10)))))
|
|
except ZeroDivisionError:
|
|
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
|
|
else:
|
|
raise AssertionError('expected ZeroDivisionError')
|
|
|
|
it = pool.imap(f, list(range(10)))
|
|
for i in range(10):
|
|
try:
|
|
x = next(it)
|
|
except ZeroDivisionError:
|
|
if i == 5:
|
|
pass
|
|
except StopIteration:
|
|
break
|
|
else:
|
|
if i == 5:
|
|
raise AssertionError('expected ZeroDivisionError')
|
|
|
|
assert i == 9
|
|
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
|
|
print()
|
|
|
|
#
|
|
# Testing timeouts
|
|
#
|
|
|
|
print('Testing ApplyResult.get() with timeout:', end=' ')
|
|
res = pool.apply_async(calculate, TASKS[0])
|
|
while 1:
|
|
sys.stdout.flush()
|
|
try:
|
|
sys.stdout.write('\n\t%s' % res.get(0.02))
|
|
break
|
|
except multiprocessing.TimeoutError:
|
|
sys.stdout.write('.')
|
|
print()
|
|
print()
|
|
|
|
print('Testing IMapIterator.next() with timeout:', end=' ')
|
|
it = pool.imap(calculatestar, TASKS)
|
|
while 1:
|
|
sys.stdout.flush()
|
|
try:
|
|
sys.stdout.write('\n\t%s' % it.next(0.02))
|
|
except StopIteration:
|
|
break
|
|
except multiprocessing.TimeoutError:
|
|
sys.stdout.write('.')
|
|
print()
|
|
print()
|
|
|
|
#
|
|
# Testing callback
|
|
#
|
|
|
|
print('Testing callback:')
|
|
|
|
A = []
|
|
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
|
|
|
|
r = pool.apply_async(mul, (7, 8), callback=A.append)
|
|
r.wait()
|
|
|
|
r = pool.map_async(pow3, list(range(10)), callback=A.extend)
|
|
r.wait()
|
|
|
|
if A == B:
|
|
print('\tcallbacks succeeded\n')
|
|
else:
|
|
print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
|
|
|
|
#
|
|
# Check there are no outstanding tasks
|
|
#
|
|
|
|
assert not pool._cache, 'cache = %r' % pool._cache
|
|
|
|
#
|
|
# Check close() methods
|
|
#
|
|
|
|
print('Testing close():')
|
|
|
|
for worker in pool._pool:
|
|
assert worker.is_alive()
|
|
|
|
result = pool.apply_async(time.sleep, [0.5])
|
|
pool.close()
|
|
pool.join()
|
|
|
|
assert result.get() is None
|
|
|
|
for worker in pool._pool:
|
|
assert not worker.is_alive()
|
|
|
|
print('\tclose() succeeded\n')
|
|
|
|
#
|
|
# Check terminate() method
|
|
#
|
|
|
|
print('Testing terminate():')
|
|
|
|
pool = multiprocessing.Pool(2)
|
|
DELTA = 0.1
|
|
ignore = pool.apply(pow3, [2])
|
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
|
pool.terminate()
|
|
pool.join()
|
|
|
|
for worker in pool._pool:
|
|
assert not worker.is_alive()
|
|
|
|
print('\tterminate() succeeded\n')
|
|
|
|
#
|
|
# Check garbage collection
|
|
#
|
|
|
|
print('Testing garbage collection:')
|
|
|
|
pool = multiprocessing.Pool(2)
|
|
DELTA = 0.1
|
|
processes = pool._pool
|
|
ignore = pool.apply(pow3, [2])
|
|
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
|
|
|
|
results = pool = None
|
|
|
|
time.sleep(DELTA * 2)
|
|
|
|
for worker in processes:
|
|
assert not worker.is_alive()
|
|
|
|
print('\tgarbage collection succeeded\n')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
multiprocessing.freeze_support()
|
|
|
|
assert len(sys.argv) in (1, 2)
|
|
|
|
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
|
|
print(' Using processes '.center(79, '-'))
|
|
elif sys.argv[1] == 'threads':
|
|
print(' Using threads '.center(79, '-'))
|
|
import multiprocessing.dummy as multiprocessing
|
|
else:
|
|
print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
|
|
raise SystemExit(2)
|
|
|
|
test()
|