-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmapreduce
73 lines (64 loc) · 1.79 KB
/
mapreduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import sys
from numpy import mat,mean,power
def read_input(file):
for line in file:
yield line.rstrip()
input = read_input(sys.stdin)
input = [float(line) for line in input]
numInputs = len(input)
input = mat(input)
sqInput = power(input,2)
print('%d\t%f\t%f'% (numInputs,mean(input),mean(sqInput)))
print(sys.stderr,"report:still alive")
import sys
from numpy import mat,mean,power
def read_input(file):
for line in file:
yield line.rstrip()
input = read_input(sys.stdin)
mapperOut = [line.split('\t') for line in input]
cumVal = 0.0
cumSumSq = 0.0
cumN = 0.0
for instance in mapperOut:
nj = float(instance[0])
cumN += nj
cumVal += nj*float(instance[1])
cumSumSq += nj*float(instance[2])
mean = cumVal//cumN
varSum = (cumSumSq-2*mean*cumVal+cum*mean*mean)/cumN
print("%d\t%f\t%f"%(cumN,mean,varSum))
print(sys.stderr,"report:still alive")
!pip install mrjob
from mrjob.job import MRJob
class MRmean(MRJob):
def __init__(self,*args,**kwargs):
super(MRmean,self).__init__(*args,**kwargs)
self.inCount=0
self.inSum=0
self.inSqSum=0
def map(self,key,val):
if False:yield
inVal=float(val)
self.inCount += 1
self.inSum += inVal
self.inSqSum += inVal*inVal
def map_final(self):
mn = self.inSum/self.inCount
mnSq = self.inSqSum/self.inCount
yield (1,[self.inCount,mn,mnSq])
def reduce(self,key,packedValues):
cumVal=0.0;cumSumSq=0.0;cumN=0.0
for valArr in packedValues:
nj=float(valArr[0])
cumN+=nj
cumVal+=nj*float(valArr[1])
cumSumSq+=nj*float(valArr[2])
mean=cumVal/cumN
var=(cumSumSq-2*mean*cumVal+cumN*mean*mean)/cumN
yield(mean,var)
def steps(self):
return ([self.mr(mapper=self.map,reducer=self.reduce,\
mapper_final=self.map_final)])
if __name__=='__main__':
MRmean.run()