-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathreducer.py
47 lines (40 loc) · 1.93 KB
/
reducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
# ---------------------------------------------------------------
#This reducer code will input a line of text and
# output <word, total-count>
# ---------------------------------------------------------------
import sys
last_key = None #initialize these variables
running_total = 0
# -----------------------------------
# Loop thru file
# --------------------------------
for input_line in sys.stdin:
input_line = input_line.strip()
# --------------------------------
# Get Next Word # --------------------------------
this_key, value = input_line.split("\t", 1) #the Hadoop default is tab separates key value
#the split command returns a list of strings, in this case into 2 variables
value = int(value) #int() will convert a string to integer (this program does no error checking)
# ---------------------------------
# Key Check part
# if this current key is same
# as the last one Consolidate
# otherwise Emit
# ---------------------------------
if last_key == this_key: #check if key has changed ('==' is
# logical equalilty check
running_total += value # add value to running total
else:
if last_key: #if this key that was just read in
# is different, and the previous
# (ie last) key is not empy,
# then output
# the previous <key running-count>
print( "{0}\t{1}".format(last_key, running_total) )
# hadoop expects tab(ie '\t')
# separation
running_total = value #reset values
last_key = this_key
if last_key == this_key:
print( "{0}\t{1}".format(last_key, running_total))