From 1c9fb10f5e4284b7e34bc659dfb4d3328c32e905 Mon Sep 17 00:00:00 2001 From: "Carlos A. Ledezma" Date: Fri, 18 Jan 2019 09:51:27 +0000 Subject: [PATCH] Add shuffling (and a bit of housekeeping) (#92) Introduce training using shuffling per batch and using part of the training set as the per-epoch validation set. A future PR should thoroughly revise the code to either deprecate --validfile or make it an option to use --validfile or a portion of the training set. --- .circleci/config.yml | 6 +-- requirements.txt | 17 ++++--- toupee/common.py | 88 +++++++-------------------------- toupee/ensemble_methods.py | 60 ++--------------------- toupee/mlp.py | 99 +++++++++++++++----------------------- 5 files changed, 75 insertions(+), 195 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a20d964..01bd17b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,9 +18,9 @@ jobs: # Download and cache dependencies - restore_cache: keys: - - v2-dependencies-{{ checksum "requirements.txt" }} + - v3-dependencies-{{ checksum "requirements.txt" }} # fallback to using the latest cache if no exact match is found - - v2-dependencies- + - v3-dependencies- - run: name: install dependencies @@ -31,7 +31,7 @@ jobs: - save_cache: paths: - ./venv - key: v2-dependencies-{{ checksum "requirements.txt" }} + key: v3-dependencies-{{ checksum "requirements.txt" }} - run: diff --git a/requirements.txt b/requirements.txt index 8fa8923..cf8d032 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ -pytest -dill -sklearn +numpy==1.15.2 +pytest==4.0.1 +dill==0.2.8.2 +sklearn==0.0 theano -scikit-image -pymongo -keras -pyyaml -tensorflow +scikit-image==0.14.1 +pymongo==3.7.1 +keras==2.2.4 +PyYAML==3.13 +tensorflow==1.11.0 diff --git a/toupee/common.py b/toupee/common.py index c7ea81f..51847db 100644 --- a/toupee/common.py +++ b/toupee/common.py @@ -15,6 +15,7 @@ import math import time import h5py +import warnings from keras.callbacks import Callback from keras.utils import Sequence @@ -41,7 +42,6 @@ def __init__(self, monitor='val_loss', verbose=0, mode='auto'): super(ModelCheckpointInMemory, self).__init__() self.monitor = monitor self.verbose = verbose - self.best_model = h5py.File("/dev/null", driver = 'core', backing_store = False) self.best_epoch = None @@ -84,7 +84,6 @@ def on_epoch_end(self, epoch, logs={}): if self.verbose > 0: print('Epoch %05d: %s did not improve' % (epoch, self.monitor)) - @@ -92,22 +91,18 @@ def on_epoch_end(self, epoch, logs={}): # to multiple complications (especially with resampled data). # But it can decrease train&test time - do it in the future! class DataGenerator(Sequence): - ''' - Data holder generator class for .npz/.h5 data + ''' + Data holder generator class for .npz/.h5 data -- keras Sequence based (for better generator performance) [requires __len__(self) and __getitem__(self, idx)] ''' - def __init__(self, data_file, batch_size, sampled_indexes, hold_y = True): - #define x if 'x' in data_file: xlabel = 'x' elif 'X' in data_file: xlabel = 'X' - self.data_x = data_file[xlabel] - #auxiliary variables self.sampled_indexes = sampled_indexes @@ -117,8 +112,6 @@ def __init__(self, data_file, batch_size, sampled_indexes, hold_y = True): self.num_examples = self.data_x.shape[0] self.batch_size = batch_size self.number_of_batches = math.ceil(self.num_examples/self.batch_size) - - # define y if needed self.hold_y = hold_y if hold_y: @@ -128,97 +121,75 @@ def __init__(self, data_file, batch_size, sampled_indexes, hold_y = True): assert self.n_classes > 1 self.data_y = data_file['y'] - def sequential_batch(self, step): #sequential iteration over the data - #defines the indexes for this batch if (step+1) == self.number_of_batches: #<- last batch - batch_indexes = list(range(step*self.batch_size, + batch_indexes = list(range(step*self.batch_size, self.num_examples)) else: - batch_indexes = list(range(step*self.batch_size, + batch_indexes = list(range(step*self.batch_size, (step+1)*self.batch_size)) - - if self.hold_y: # Return the arrays in the shape that fit_gen uses (data, target) return (self.data_x[batch_indexes, ...], self.data_y[batch_indexes, ...]) # else: # Return the arrays in the shape that predict_generator uses (data) - return (self.data_x[batch_indexes, ...]) - - + return (self.data_x[batch_indexes, ...]) + def sliced_batch(self, step): #problem with returning the "sampled_indexes" only: # H5 can only slice given i) a sequencial list of integers or ii) a # boolean array [i.e. there is no fancy slicing, as in numpy] # since ii) might need a giant boolean array, let's do i) and then # filter stuff - #gets the desired indexes for this batch if (step+1) == self.number_of_batches: #<- last batch - batch_indexes = (self.sampled_indexes[step*self.batch_size : + batch_indexes = (self.sampled_indexes[step*self.batch_size : self.num_examples]) else: - batch_indexes = (self.sampled_indexes[step*self.batch_size : + batch_indexes = (self.sampled_indexes[step*self.batch_size : (step+1)*self.batch_size]) - first_index = batch_indexes[0] last_index = batch_indexes[-1] - #if the samples are too far appart, loads one by one if last_index - first_index > 4096: #<- magic number - data_x = [] for i in batch_indexes: - data_x.append(self.data_x[i, ...]) + data_x.append(self.data_x[i, ...]) data_x = numpy.asarray(data_x) - if self.hold_y: data_y = [] for i in batch_indexes: - data_y.append(self.data_y[i, ...]) + data_y.append(self.data_y[i, ...]) data_y = numpy.asarray(data_y) #otherwise, loads the interval and then filters else: batch_indexes = batch_indexes - first_index - data_x = self.data_x[first_index:last_index+1, ...] data_x = data_x[batch_indexes, ...] - if self.hold_y: data_y = self.data_y[first_index:last_index+1, ...] data_y = data_y[batch_indexes, ...] - - - if self.hold_y: + if self.hold_y: return(data_x, data_y) - # else: return(data_x) - - + def __len__(self): #returns the dataset length return self.number_of_batches - - + def __getitem__(self, step): #gets a batch if self.sampled_indexes is None: return self.sequential_batch(step) else: return self.sliced_batch(step) - - - - class Toupee: - def __init__(self): self.reset() @@ -300,19 +271,15 @@ def get_probabilities(classifier, file_object, batch_size): """ Predicts the train set using the trained model """ - x_holder = DataGenerator(file_object, batch_size, None, hold_y = False) - #applies the correct method, depending on the classifier class if hasattr(classifier, 'predict_generator'): class_proba = classifier.predict_generator(x_holder, max_queue_size=1000) else: class_proba = classifier.predict_proba(x_holder) - return class_proba - def errors(classifier, file_object, batch_size): """ Gets the model's binary error status for each sample @@ -331,7 +298,6 @@ def errors(classifier, file_object, batch_size): end = 0 r = numpy.empty(n_samples) while end < n_samples: - start = end end += 131072 # magic number, power of 2 :D if end > n_samples: @@ -344,49 +310,38 @@ def errors(classifier, file_object, batch_size): r[start:end] = r[start:end].astype('int32') return r - - + def accuracy(classifier, file_object, batch_size): - e = errors(classifier, file_object, batch_size) - return 1.0 - (float(e.sum()) / float(file_object['y'].shape[0])) - - + #TODO: this is kinda a redefinition of data.py's one_hot # -> take care of the duplicates! def one_hot(data, n_classes): b = numpy.zeros((data.size, n_classes),dtype='float32') b[numpy.arange(data.size), data] = 1. return b - - + def count_classes(file_object): """Counts the number of entries on each class""" n_samples, n_classes = file_object['y'].shape sample_count = numpy.asarray([0]*n_classes) - end = 0 while end < n_samples: start = end end += 131072 # magic number, power of 2 :D if end > n_samples: end = n_samples - data_y = numpy.asarray(file_object['y'][start:end]) sample_count += numpy.sum(data_y, axis = 0) - return sample_count - - + def confidence(classifier, file_object, batch_size): """ Returns the model's confidence for the true label """ - class_proba = get_probabilities(classifier, file_object, batch_size) n_samples = file_object['y'].shape[0] - end = 0 h = numpy.empty(n_samples) while end < n_samples: @@ -394,15 +349,10 @@ def confidence(classifier, file_object, batch_size): end += 131072 # magic number, power of 2 :D if end > n_samples: end = n_samples - data_y = numpy.asarray(file_object['y'][start:end]).argmax(axis=-1) - for i in range(end - start): h[start + i] = class_proba[start + i][data_y[i]] - return h - - #---------------------------------------------------------- #for regression problems: # def distance(predictor, test_set_x, test_set_y): @@ -417,11 +367,9 @@ def confidence(classifier, file_object, batch_size): # euclidian_distance_squared = distance(predictor, test_set_x, test_set_y) # euclidian_distance = numpy.sqrt(euclidian_distance_squared) # return(numpy.sum(euclidian_distance) / float(test_set_y.shape[0])) - # def relative_distance(predictor, test_set_x, test_set_y): # relative_distance = distance(y-y_pred) / sqrt(y^2) [sqrt(y^2) = L2 norm] # euclidian_distance_squared = distance(predictor, test_set_x, test_set_y) # y_squared = numpy.sum(numpy.square(test_set_y), axis = 1) #both this and the previous line will need a sqrt, which can be done after the division # relative_distance = numpy.sqrt(euclidian_distance_squared / y_squared) # return(numpy.sum(relative_distance) / float(test_set_y.shape[0])) - diff --git a/toupee/ensemble_methods.py b/toupee/ensemble_methods.py index 194db00..231258a 100644 --- a/toupee/ensemble_methods.py +++ b/toupee/ensemble_methods.py @@ -24,7 +24,6 @@ #------------------------------------------------------------------------------ #Aggregators: - class Aggregator: """ Base class for all aggregating methods @@ -46,7 +45,6 @@ def predict(self, X): m = np.argmax(a,axis=1) return np.eye(self.out_shape[1])[m] - class MajorityVotingRunner(Aggregator): """ Take an ensemble and produce the majority vote output on a dataset @@ -61,20 +59,17 @@ def predict_proba(self,X): for (m_yaml, m_weights) in self.members: m = keras.models.model_from_yaml(m_yaml) m.set_weights(m_weights) - if isinstance(X, np.ndarray): #To test the ensemble with ndarrays p = m.predict_proba(X, batch_size = self.params.batch_size) else: p = m.predict_generator(X, max_queue_size=1000) - prob.append(p) self.out_shape = m.layers[-1].output_shape prob_arr = np.array(prob) a = np.sum(prob_arr,axis=0) / float(len(self.members)) m = np.argmax(a,axis=1) return np.eye(self.out_shape[1])[m] - - + class AveragingRunner(Aggregator): """ Take an ensemble and produce the average @@ -90,12 +85,10 @@ def predict_proba(self, X): for (m_yaml, m_weights) in self.members: m = keras.models.model_from_yaml(m_yaml) m.set_weights(m_weights) - if isinstance(X, np.ndarray): #To test the ensemble with ndarrays p = m.predict_proba(X, batch_size = self.params.batch_size) else: p = m.predict_generator(X, max_queue_size=1000) - if self.wrapper is not None: p = self.wrapper(p) prob.append(p) @@ -122,9 +115,8 @@ def predict_proba(self,X): m_yaml, m_weights = self.members[i] m = keras.models.model_from_yaml(m_yaml) m.set_weights(m_weights) - if isinstance(X, np.ndarray): #To test the ensemble with ndarrays - p = m.predict_proba(X, batch_size = self.params.batch_size) + p = m.predict_proba(X, batch_size = self.params.batch_size) else: p = m.predict_generator(X, max_queue_size=1000) @@ -132,8 +124,7 @@ def predict_proba(self,X): prob_arr = np.array(prob) / np.sum(self.weights) a = np.sum(prob_arr,axis=0) return a - - + # class WeightedAveragingRunner_Regression(Aggregator): # """ # Take an Ensemble and produce a weighted average, usually done in AdaBoost @@ -154,7 +145,6 @@ def predict_proba(self,X): # m.set_weights(m_weights) # r = m.predict(data, batch_size = self.params.batch_size) # result.append(r * self.weights[i]) - # result_arr = np.array(result) / np.sum(self.weights) # final_result = np.sum(result_arr,axis=0) / float(len(self.members)) @@ -186,7 +176,7 @@ def load_weights(self,weights,x,y,index): raise "needs fixing" for w in weights: rng = numpy.random.RandomState(self.params.random_seed) - m = mlp.MLP(params=self.params, rng=rng, input=x, index=index, + m = mlp.MLP(params=self.params, rng=rng, input=x, index=index, x=x, y=y) m.set_weights(w) self.members.append(m) @@ -207,28 +197,23 @@ def create_aggregator(self,params,members,train_set,valid_set): return WeightedAveragingRunner(members,self.alphas,params) def create_member(self, data_files): - #Gets the training indexes if self.member_number > 0: train_indexes = \ self.resampler.make_new_train(self.params.resample_size) else: train_indexes = [None,None] - #Packs the needed data dataset = [ train_indexes, data_files ] - #Trains the model m = mlp.sequential_model(dataset, self.params, member_number = self.member_number) - #Gets the errors for the train set and updates the weights print('Getting the train errors and updating the weights') errors = common.errors(m, data_files[0], self.params.batch_size) - e = np.sum((errors * self.D)) if e > 0: n_classes = data_files[0]['y'].shape[1] @@ -249,7 +234,6 @@ def create_member(self, data_files): self.member_number += 1 return (m.to_yaml(), m.get_weights()) - def prepare(self, params, train_size): self.params = params self.train_size = train_size @@ -260,9 +244,7 @@ def prepare(self, params, train_size): def serialize(self): return 'AdaBoostM1' - - class AdaBoost_MA(EnsembleMethod): """ Create an AdaBoost MA Ensemble from parameters @@ -270,12 +252,10 @@ class AdaBoost_MA(EnsembleMethod): """ yaml_tag = '!AdaBoostMA' - def create_aggregator(self,params,members,train_set,valid_set): return WeightedAveragingRunner(members,self.alphas,params) def create_member(self, data_files): - #Gets the training indexes and defines c, if needed if self.member_number > 0: train_indexes = \ @@ -284,21 +264,17 @@ def create_member(self, data_files): train_indexes = [None,None] sample_counts = common.count_classes(data_files[0]) self.c = np.sum(np.square(sample_counts/self.train_size)) - #Packs the needed data dataset = [ train_indexes, data_files ] - #Trains the model m = mlp.sequential_model(dataset, self.params, member_number = self.member_number) - #Gets the errors for the train set and updates the weights print('Getting the confidence and updating the weights') h = common.confidence(m, data_files[0], self.params.batch_size) - r = np.sum((h * self.D)) if r > self.c: alpha = math.log(((1-self.c)*r)/(self.c*(1-r))) @@ -318,7 +294,6 @@ def create_member(self, data_files): self.member_number += 1 return (m.to_yaml(), m.get_weights()) - def prepare(self, params, train_size): self.params = params self.train_size = train_size @@ -329,8 +304,7 @@ def prepare(self, params, train_size): def serialize(self): return 'AdaBoostMA' - - + class Bagging(EnsembleMethod): """ Create a Bagging Runner from parameters @@ -341,7 +315,6 @@ class Bagging(EnsembleMethod): def __init__(self,voting=False): self.voting = voting self.resampler = None - def create_aggregator(self,params,members,train_set,valid_set): if 'voting' in self.__dict__ and self.voting: return MajorityVotingRunner(members,params) @@ -349,23 +322,19 @@ def create_aggregator(self,params,members,train_set,valid_set): return AveragingRunner(members,params) def create_member(self, data_files): - #gets the training indexes if self.member_number > 0: train_indexes = self.resampler.make_new_train(self.params.resample_size) else: train_indexes = [None,None] - #packs the needed data dataset = [ train_indexes, data_files ] - #trains the model m = mlp.sequential_model(dataset, self.params, member_number = self.member_number) - self.member_number += 1 return (m.to_yaml(), m.get_weights()) @@ -381,7 +350,6 @@ def serialize(self): #-------------------------------------------------------------------------------------- # To update: [these emsebles are not adapted for the generator class] - class DIB(EnsembleMethod): """ Create Deep Incremental Boosting Ensemble from parameters @@ -796,14 +764,6 @@ def serialize(self): self.incremental_index, self.incremental_layers) - - - - - - - - #class StackingRunner(Aggregator): # """ # Take an ensemble and produce the stacked output on a dataset @@ -853,7 +813,6 @@ def serialize(self): # self.errors = self.stack_head.errors(y) # # - #class Stacking(EnsembleMethod): # """ # Create a Stacking Runner from parameters @@ -903,7 +862,6 @@ def serialize(self): # def serialize(self): # return 'Stacking' - # class AdaBoost_Regression(EnsembleMethod): # """ @@ -933,23 +891,16 @@ def serialize(self): # m = mlp.sequential_model(resampled, self.params, # <--- trains the new model(step 2 in [1]) # member_number = self.member_number) # orig_train = self.resampler.get_train() - # distance = common.distance(m, orig_train[0], orig_train[1]) # <--- loss for each element (steps 3 and 4 in [1]) - # max_dist = distance.max() # distance_norm = distance / max_dist # <--- the loss function is now normalized in range [0,1] - # weighted_dist = np.sum((distance_norm * self.D)) # <--- average weighted loss (step 5 in [1]) - # beta = weighted_dist / (1 - weighted_dist) # <--- computation of the confidence in the predictor (step 6 in [1]) # [low beta = good prediction] - # w = self.D * (beta ** (1 - distance_norm)) # <--- updates the weights for each sample (step 7 in [1]) # self.D = w / w.sum() - # alpha = the better the model is (smaller beta), the bigger alpha will be [alpha is computed to maintain consistency with other models] # alpha = 0.5 * math.log(1/beta) - # self.resampler.update_weights(self.D) # self.alphas.append(alpha) # self.member_number += 1 @@ -965,4 +916,3 @@ def serialize(self): # def serialize(self): # return 'AdaBoostRegression' - diff --git a/toupee/mlp.py b/toupee/mlp.py index 2356904..473410f 100644 --- a/toupee/mlp.py +++ b/toupee/mlp.py @@ -34,9 +34,8 @@ from keras import backend as K -def initialize_model(params, sample_weight, model_config, model_yaml, +def initialize_model(params, sample_weight, model_config, model_yaml, model_weights, frozen_layers): - print("loading model...") if sample_weight is not None: print("using sample weights...") @@ -58,26 +57,20 @@ def initialize_model(params, sample_weight, model_config, model_yaml, model.layers[i].set_weights(model_weights[i]) print(("total weight count: {0}".format(total_weights))) - if frozen_layers is None: frozen_layers = [] - for l in frozen_layers: model.layers[l].trainable = False - return(model, total_weights) - def initialize_metrics(params): - - if params.classification == True: + if params.classification == True: scorer_name = 'accuracy' monitor_type = 'val_acc' else: scorer_name = 'mean_squared_error' monitor_type = 'val_loss' - metrics = [scorer_name] if 'additional_metrics' in params.__dict__: metrics = metrics + additional_metrics @@ -85,11 +78,8 @@ def initialize_metrics(params): checkpointer = common.ModelCheckpointInMemory(verbose=1, monitor = monitor_type, mode = 'max') - return(metrics, checkpointer) - - def callbacks_with_lr_scheduler(schedule, model, callbacks): def scheduler(epoch): if epoch in schedule: @@ -98,11 +88,9 @@ def scheduler(epoch): model.optimizer.lr = K.variable(value = schedule[epoch]) # return float(model.optimizer.lr.get_value()) #<--- old keras-fork version return float(K.eval(model.optimizer.lr)) - return callbacks + [keras.callbacks.LearningRateScheduler(scheduler)] - + return callbacks + [keras.callbacks.LearningRateScheduler(scheduler)] def print_results(model, train_metrics, valid_metrics, test_metrics): - for metrics_name,metrics in ( ('train', train_metrics), ('valid', valid_metrics), @@ -112,7 +100,6 @@ def print_results(model, train_metrics, valid_metrics, test_metrics): for i in range(len(metrics)): print((" {0} = {1}".format(model.metrics_names[i], metrics[i]))) - def sequential_model(dataset, params, pretraining_set = None, @@ -130,25 +117,19 @@ def sequential_model(dataset, """ #_ was "total_weights" before - model, _ = initialize_model(params, sample_weight, model_config, + model, _ = initialize_model(params, sample_weight, model_config, model_yaml, model_weights, frozen_layers) if return_results: results = common.Results(params) - #3-4 data holders: (1) sampled train data, (2-3) eval data - train/valid/[test] sets sampled_indexes = dataset[0][0] if sampled_indexes is not None: sampled_indexes.sort() files = dataset[1] - - train_holder = common.DataGenerator(files[0], params.batch_size, sampled_indexes) - train_eval_holder = common.DataGenerator(files[0], params.batch_size, None) - valid_holder = common.DataGenerator(files[1], params.batch_size, None) - test_holder = common.DataGenerator(files[2], params.batch_size, None) - + train_holder = files[0] + test_holder = files[2] start_time = time.clock() - metrics, checkpointer = initialize_metrics(params) callbacks = [checkpointer] @@ -162,15 +143,10 @@ def sequential_model(dataset, lr_schedule = params.optimizer['config']['lr'] params.optimizer['config']['lr'] = lr_schedule[0] optimizer = keras.optimizers.deserialize(params.optimizer) - model.compile(optimizer = optimizer, loss = params.cost_function, metrics = metrics, - - #theano stuff: #<--- old keras-fork version - # update_inputs = params.update_inputs, - # update_inputs_lr = params.update_inputs_lr - ) + ) #TODO - Joao: I think this if branch needs to be updated with the new data holder if params.online_transform is not None: @@ -182,51 +158,56 @@ def sequential_model(dataset, if lr_schedule is not None: callbacks = callbacks_with_lr_scheduler(lr_schedule, model, callbacks) if return_results: - hist = model.fit_generator(train_holder, - epochs = params.n_epochs, - validation_data = valid_holder, - callbacks = callbacks, - max_queue_size=1000, - shuffle=False, - verbose=params.verbose, - use_multiprocessing=False) #<------------ Don't use more than 1 worker! Will crash [Gen class must be upgraded] - #the old keras-fork version had more parameters here + hist = model.fit( + x = train_holder['X'], + y = train_holder['y'], + epochs = params.n_epochs, + batch_size = params.batch_size, + shuffle = 'batch', + validation_split=0.1, + callbacks = callbacks, + verbose = params.verbose, + ) else: - print('Verbosity level:', params.verbose) - model.fit_generator(train_holder, - epochs = params.n_epochs, - validation_data = valid_holder, - callbacks = callbacks, - max_queue_size=1000, - shuffle=False, - verbose=params.verbose, - use_multiprocessing=False) #<------------ Don't use more than 1 worker! Will crash [Gen class must be upgraded] - #the old keras-fork version had more parameters here - + model.fit( + x = train_holder['X'], + y = train_holder['y'], + epochs = params.n_epochs, + batch_size = params.batch_size, + shuffle = 'batch', + validation_split=0.1, + callbacks = callbacks, + verbose = params.verbose, + ) + model.set_weights(checkpointer.best_model) - #evals everything with a generator print('\nGetting the train metrics...') - train_metrics = model.evaluate_generator(train_eval_holder) + train_metrics = model.evaluate( + x=train_holder['X'], + y=train_holder['y'], + batch_size=params.batch_size, + ) print('Getting the validation metrics...') - valid_metrics = model.evaluate_generator(valid_holder) + valid_metrics = train_metrics # This is just here until we decide whether to keep a validation + # dataset print('Getting the test metrics...') - test_metrics = model.evaluate_generator(test_holder) - + test_metrics = model.evaluate( + x=test_holder['X'], + y=test_holder['y'], + batch_size=params.batch_size, + ) print_results(model, train_metrics, valid_metrics, test_metrics) if return_results: results.set_history(hist) - end_time = time.clock() - print((('Optimization complete.\nBest valid: %f \n' 'Obtained at epoch: %i\nTest: %f ') % (valid_metrics[1], checkpointer.best_epoch + 1, test_metrics[1]))) print(('The code for ' + os.path.split(__file__)[1] + ' ran for %.2fm' % ((end_time - start_time) / 60.))) - if return_results: results.set_final_observation(valid_metrics[1], test_metrics[1],