Skip to content

Commit

Permalink
added hold out cross validation step
Browse files Browse the repository at this point in the history
  • Loading branch information
AmazaspShumik committed Jun 6, 2015
1 parent c2d5b55 commit 42613a2
Showing 1 changed file with 82 additions and 27 deletions.
109 changes: 82 additions & 27 deletions RidgeRegression/RidgeRegressionMapReduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,77 @@ def cholesky_solution_least_squares(part_one, part_two):
z = np.linalg.solve(R,part_two)
theta = np.linalg.solve(np.transpose(R),z)
return theta




class PrioritySampler(object):

def __init__(self,sample_size):
self.sample_size = sample_size
self.sample = []

def process_observation(self,observation):
if len(self.sample) < self.sample_size:
self.sample.append(observation)
if len(self.sample) == self.sample_size:
heapq.heapify(self.sample_cv)
else:
if observation[0] > self.sample[0][0]:
heapq.heapreplace(self.sample,observation)

def process_observations(self,observations):
for observation in observations:
self.process_observation(observation)



class RidgeRegressionHoldOutCV(object):

def __init__(self,lambdas):
def __init__(self,lambdas, data):
self.lambdas = lambdas
self.data = []

self.data = data

def add_mapper_output(self,data):
pass

def run_ridge_regression(self, lambda_ridge , scaling = None):

def scaler(x, column_scaler):
m = np.shape(x)[1]
for i in range(m):
x[:,i] = column_scaler(x[:,i])
return x

X,Y = [],[]
for observation in self.data:
features , y = observation[1:]
X.append(features)
Y.append(y)
X = np.array(X)
Y = np.array(Y)
if scaling == "max-min":
X = scaler(X,lambda x: x/(np.max(x) - np.min(x)))
elif scaling == "z-score":
X = scaler(X,lambda x: (x - np.mean(x))/np.std(x))
# scale y to account for bias term
Y = Y - np.mean(Y)
# in case of max-min and no scaling, we need to substract mean from features
if scaling != "z-score":
X = scaler(X, lambda x: x-np.mean(x))

def cv(self, scaling = None):
err = [ self.run_ridge_regression(lambda_ridge, scaling) for lambda_ridge in self.lambdas]
lambda_best, err = min([ (self.lambdas[i],err[i]) for i in range(len(self.lambdas)) ], key = lambda t: t[1])
return lambda_best



class DimensionMismatch(Exception):

def cv(self):
pass
def __init__(self,expected,observed):
self.exp = expected
self.obs = observed

def __str__(self):
err = "Expected number of observations: "+self.exp+" , observed: "+self.obs
return err



Expand All @@ -52,11 +107,12 @@ class RidgeRegression(MRJob):
Options:
-----------
-- dimension - (int) number of explanatory variables
-- bias - (bool) if True regression wil include bias term
--dimension - (int) number of explanatory variables
--scaling - (str) 'z-score' or 'max-min'
--hold-out-sample-size - (int) size of hold out cross validation set
--cv-lambdas - (str) name of file containing set of regularisation
parameters for cross validation
'''

INPUT_PROTOCOL = RawValueProtocol
Expand All @@ -76,7 +132,7 @@ def __init__(self,*args,**kwargs):
self.x_t_y = [0]*self.dim
self.n = 0
self.lambdas_cv = self.read_lambdas(self.options.cv_lambdas)
self.sample_cv = []
self.sampler = Sampler(self.cv_size)

#------------------------------------------- load & configure options ---------------------------------------#

Expand Down Expand Up @@ -192,13 +248,8 @@ def mapper_ridge(self,_,line):
# make sample for hold out cross validation set
rand_priority = random.randrange(start = 0, stop = 100000000)
observation = (rand_priority,features,y)
if len(self.sample_cv) < self.cv_size:
self.sample_cv.append(observation)
if len(self.sample_cv) == self.cv_size:
heapq.heapify(self.sample_cv)
else:
if observation[0] > self.sample_cv[0][0]:
heapq.heapreplace(self.sample_cv,observation)
self.sampler.process_observation(observation)



def mapper_ridge_final(self):
Expand All @@ -207,18 +258,21 @@ def mapper_ridge_final(self):
"x_"
"x_t_x": x_t_x,
"y_av": self.y_av,
"n": self.n}
"n": self.n
}
if self.options.scaling == "max-min":
intermediate_stats["max"] = self.max
intermediate_stats["min"] = self.min

yield None, ("stats",intermediate_stats)
yield None, ("hold_out_cv",self.sample_cv)
yield None, ("hold_out_cv",self.sampler.sample)



def reducer_ridge(self, key, vals):
ridge = RidgeRegressionHoldOutCV(self.lambdas)
'''
'''
sampler = Sampler(self.cv_size)
final_summary_stats = {"mu": [0]*self.dim,
"x_t_x": [[0]*self.dim for i in range(self.dim)],
"x_t_y": [0]*self.dim,
Expand All @@ -229,9 +283,10 @@ def reducer_ridge(self, key, vals):
mapper_summary = val[1]
final_summary_stats = self.join_mapper_intermediate_stats(final_summary_stats,mapper_summary)
else:
ridge.add_mapper_output(val[1])
sampler.process_observations(val[1])
# for each scaling type use cross validation to verify best lambda
# then use it on all data (including cv set) to find parameters
ridge = RidgeRegressionHoldOutCV(self.lambdas, sampler.sample)
best_lambda = ridge.cv(self.options.scaling)
yield None, self.estimate_params(final_summary_stats,best_lambda,self.options.scaling)

Expand Down

0 comments on commit 42613a2

Please sign in to comment.