-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmultiprocess.py
88 lines (67 loc) · 2.21 KB
/
multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
.. module:: multiprocess.py
:platform: Unix
:synopsis: Generic multiprocessing class.
.. moduleauthor:: David Vine <[email protected]>
.. licence:: GPLv2
.. version:: 1.0
"""
import multiprocessing as mp
import pdb
class multiprocess(object):
def __init__(self, target_func, num_processes=mp.cpu_count(), **kwargs):
"""
Class to wrap multiprocessing arbitrary task.
Call sequence:
1) Instantiate a multiprocess object.
2) Add jobs to job queue using add_job method.
3) Call multiprocess objects close_out method.
4) Deal with the results in whatever foul, depraved way you deem fit.
"""
self.total_jobs = 0
self.jobs = mp.JoinableQueue()
self.results = mp.Queue()
tup = (self.jobs, self.results)
self.num_processes = num_processes
self.p = [mp.Process(target=target_func,
args=tup) for i in range(num_processes)]
for process in self.p:
process.start()
def add_job(self, job):
self.total_jobs += 1
self.jobs.put(job)
def close_out(self):
# Add Poison Pills
for i in range(self.num_processes):
self.jobs.put((None,))
completed_jobs=0
res_list = []
while True:
if not self.results.empty():
res_list.append(self.results.get())
completed_jobs += 1
if completed_jobs==self.total_jobs:
break
self.jobs.join()
self.jobs.close()
self.results.close()
for process in self.p:
process.join()
return res_list
def worker(func):
def worker2(*args, **kwargs):
name = mp.current_process().name
jobs_completed = 0
jobs, results = args[0], args[1]
while True:
job_args = jobs.get()
if job_args[0] is None: # Deal with Poison Pill
#print '{}: Exiting. {:d} jobs completed.'.format(name, jobs_completed)
jobs.task_done()
break
res = func(job_args)
jobs_completed += 1
jobs.task_done()
results.put(res)
return worker2
return worker2