-
Notifications
You must be signed in to change notification settings - Fork 4
/
simpleMapReduce.py
51 lines (40 loc) · 1.89 KB
/
simpleMapReduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import collections
import multiprocessing
class SimpleMapReduce(object):
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
Function to map inputs to intermediate data. Takes as argument one input value and
returns a tuple with the key and a value to be reduced.
reduce_func
Function to reduce partitioned version of intermediate data to final output. Takes
as argument a key as produced by map_func and a sequence of the values associated
with that key.
num_workers
The number of workers to create in the pool. Defaults to the number of CPUs
available on the current host.
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""Organize the mapped values by their key.
Returns a dictionary mapping each key to a sequence of values.
"""
partitioned_data = collections.defaultdict(list)
for sublist in mapped_values:
for key, value in sublist:
partitioned_data[key].append(value)
return partitioned_data
def __call__(self, inputs, chunksize=1):
"""Process the inputs through the map and reduce functions given.
inputs
An iterable containing the input data to be processed.
chunksize=1
The portion of the input data to hand to each worker. This
can be used to tune performance during the mapping phase.
"""
mapped_values = self.pool.map(self.map_func, inputs, chunksize=chunksize)
partitioned_data = self.partition(mapped_values)
reduced_values = self.pool.map(self.reduce_func, partitioned_data.items())
return reduced_values