-
Notifications
You must be signed in to change notification settings - Fork 2
/
test_reduce.py
68 lines (45 loc) · 1.63 KB
/
test_reduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#http://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
import multiprocessing
import multiprocessing.pool
import math
def parallel_reduce(func, iterable, processes= 4, args=(), kwargs={}):
#iterable = range(1,11)
comp_stack = list(iterable)
pair_list = []
pool = multiprocessing.pool.Pool(processes)
#print comp_stack
while len(comp_stack) > 1:
while len(comp_stack) > 1:
pair_list.append((comp_stack.pop(), comp_stack.pop()))
#print pair_list
results = []
while len(pair_list) > 0:
pair = pair_list.pop()
results.append(pool.apply_async(func, [pair]))
#print results
while True:
if all([result.ready() for result in results]): break
comp_stack = [result.get() for result in results]
return comp_stack
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class RecursivePool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def test_recursive_parallel_reduce(workers = 5):
pool = RecursivePool()
ranges = [range(1, 5), range(2, 9), range(3, 7)]
print ranges
results = []
for myrange in ranges:
pool.apply_async(parallel_reduce, [sum, myrange],
callback= results.append)
pool.close()
pool.join()
print results
if __name__ == '__main__':
test_recursive_parallel_reduce()