diff --git a/RidgeRegression/RidgeRegressionMapReduce.py b/RidgeRegression/RidgeRegressionMapReduce.py index 00e2b43..25ed5ee 100644 --- a/RidgeRegression/RidgeRegressionMapReduce.py +++ b/RidgeRegression/RidgeRegressionMapReduce.py @@ -18,22 +18,77 @@ def cholesky_solution_least_squares(part_one, part_two): z = np.linalg.solve(R,part_two) theta = np.linalg.solve(np.transpose(R),z) return theta - - + + +class PrioritySampler(object): + + def __init__(self,sample_size): + self.sample_size = sample_size + self.sample = [] + + def process_observation(self,observation): + if len(self.sample) < self.sample_size: + self.sample.append(observation) + if len(self.sample) == self.sample_size: + heapq.heapify(self.sample_cv) + else: + if observation[0] > self.sample[0][0]: + heapq.heapreplace(self.sample,observation) + + def process_observations(self,observations): + for observation in observations: + self.process_observation(observation) + + class RidgeRegressionHoldOutCV(object): - def __init__(self,lambdas): + def __init__(self,lambdas, data): self.lambdas = lambdas - self.data = [] - + self.data = data - def add_mapper_output(self,data): - pass + def run_ridge_regression(self, lambda_ridge , scaling = None): + + def scaler(x, column_scaler): + m = np.shape(x)[1] + for i in range(m): + x[:,i] = column_scaler(x[:,i]) + return x + + X,Y = [],[] + for observation in self.data: + features , y = observation[1:] + X.append(features) + Y.append(y) + X = np.array(X) + Y = np.array(Y) + if scaling == "max-min": + X = scaler(X,lambda x: x/(np.max(x) - np.min(x))) + elif scaling == "z-score": + X = scaler(X,lambda x: (x - np.mean(x))/np.std(x)) + # scale y to account for bias term + Y = Y - np.mean(Y) + # in case of max-min and no scaling, we need to substract mean from features + if scaling != "z-score": + X = scaler(X, lambda x: x-np.mean(x)) + + def cv(self, scaling = None): + err = [ self.run_ridge_regression(lambda_ridge, scaling) for lambda_ridge in self.lambdas] + lambda_best, err = min([ (self.lambdas[i],err[i]) for i in range(len(self.lambdas)) ], key = lambda t: t[1]) + return lambda_best + + + +class DimensionMismatch(Exception): - def cv(self): - pass + def __init__(self,expected,observed): + self.exp = expected + self.obs = observed + + def __str__(self): + err = "Expected number of observations: "+self.exp+" , observed: "+self.obs + return err @@ -52,11 +107,12 @@ class RidgeRegression(MRJob): Options: ----------- - -- dimension - (int) number of explanatory variables - -- bias - (bool) if True regression wil include bias term - - - + --dimension - (int) number of explanatory variables + --scaling - (str) 'z-score' or 'max-min' + --hold-out-sample-size - (int) size of hold out cross validation set + --cv-lambdas - (str) name of file containing set of regularisation + parameters for cross validation + ''' INPUT_PROTOCOL = RawValueProtocol @@ -76,7 +132,7 @@ def __init__(self,*args,**kwargs): self.x_t_y = [0]*self.dim self.n = 0 self.lambdas_cv = self.read_lambdas(self.options.cv_lambdas) - self.sample_cv = [] + self.sampler = Sampler(self.cv_size) #------------------------------------------- load & configure options ---------------------------------------# @@ -192,13 +248,8 @@ def mapper_ridge(self,_,line): # make sample for hold out cross validation set rand_priority = random.randrange(start = 0, stop = 100000000) observation = (rand_priority,features,y) - if len(self.sample_cv) < self.cv_size: - self.sample_cv.append(observation) - if len(self.sample_cv) == self.cv_size: - heapq.heapify(self.sample_cv) - else: - if observation[0] > self.sample_cv[0][0]: - heapq.heapreplace(self.sample_cv,observation) + self.sampler.process_observation(observation) + def mapper_ridge_final(self): @@ -207,18 +258,21 @@ def mapper_ridge_final(self): "x_" "x_t_x": x_t_x, "y_av": self.y_av, - "n": self.n} + "n": self.n + } if self.options.scaling == "max-min": intermediate_stats["max"] = self.max intermediate_stats["min"] = self.min - yield None, ("stats",intermediate_stats) - yield None, ("hold_out_cv",self.sample_cv) + yield None, ("hold_out_cv",self.sampler.sample) def reducer_ridge(self, key, vals): - ridge = RidgeRegressionHoldOutCV(self.lambdas) + ''' + + ''' + sampler = Sampler(self.cv_size) final_summary_stats = {"mu": [0]*self.dim, "x_t_x": [[0]*self.dim for i in range(self.dim)], "x_t_y": [0]*self.dim, @@ -229,9 +283,10 @@ def reducer_ridge(self, key, vals): mapper_summary = val[1] final_summary_stats = self.join_mapper_intermediate_stats(final_summary_stats,mapper_summary) else: - ridge.add_mapper_output(val[1]) + sampler.process_observations(val[1]) # for each scaling type use cross validation to verify best lambda # then use it on all data (including cv set) to find parameters + ridge = RidgeRegressionHoldOutCV(self.lambdas, sampler.sample) best_lambda = ridge.cv(self.options.scaling) yield None, self.estimate_params(final_summary_stats,best_lambda,self.options.scaling)