From 98dfb610a3982c02d7c56deb78fbffe3ab0fb788 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:44:55 +0100 Subject: [PATCH] pep8 --- elephas/dl4j.py | 11 +++++++---- elephas/hyperparam.py | 14 ++++++++++---- elephas/java/adapter.py | 2 +- elephas/java/java_classes.py | 6 ++++-- elephas/java/ndarray.py | 5 +++-- elephas/ml/__init__.py | 2 +- elephas/ml/adapter.py | 7 +++++-- elephas/ml/params.py | 28 ++++++++++++++++++++++------ elephas/ml_model.py | 18 +++++++++++------- elephas/mllib/__init__.py | 2 +- elephas/optimizers.py | 10 +++++++++- elephas/parameter/__init__.py | 2 +- elephas/parameter/client.py | 2 ++ elephas/parameter/server.py | 6 ++++-- elephas/spark_model.py | 32 ++++++++++++++++++++------------ elephas/utils/__init__.py | 2 +- elephas/utils/rdd_utils.py | 9 ++++++--- elephas/utils/rwlock.py | 1 + elephas/worker.py | 23 ++++++++++++++++------- 19 files changed, 125 insertions(+), 57 deletions(-) diff --git a/elephas/dl4j.py b/elephas/dl4j.py index bcefedc..8bbac8e 100644 --- a/elephas/dl4j.py +++ b/elephas/dl4j.py @@ -26,7 +26,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size, averaging self.save(save_file) model_file = java_classes.File(save_file) keras_model_type = model.__class__.__name__ - self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type) + self.java_spark_model = dl4j_import( + java_spark_context, model_file, keras_model_type) def fit_rdd(self, data_set_rdd, epochs): for _ in range(epochs): @@ -42,7 +43,7 @@ def get_keras_model(self): class ParameterSharingModel(SparkModel): def __init__(self, java_spark_context, model, num_workers, batch_size, - shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3 , workers_per_node=-1, + shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3, workers_per_node=-1, num_batches_prefetch=0, step_delay=50, step_trigger=0.05, threshold_step=1e-5, collect_stats=False, save_file='temp.h5', *args, **kwargs): """ParameterSharingModel @@ -73,7 +74,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size, self.save(save_file) model_file = java_classes.File(save_file) keras_model_type = model.__class__.__name__ - self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type) + self.java_spark_model = dl4j_import( + java_spark_context, model_file, keras_model_type) def fit_rdd(self, data_set_rdd, epochs): for _ in range(epochs): @@ -101,4 +103,5 @@ def dl4j_import(jsc, model_file, keras_model_type): except: print("Couldn't load Keras model into DL4J") else: - raise Exception("Keras model not understood, got: {}".format(keras_model_type)) \ No newline at end of file + raise Exception( + "Keras model not understood, got: {}".format(keras_model_type)) diff --git a/elephas/hyperparam.py b/elephas/hyperparam.py index ae00289..c97abf4 100644 --- a/elephas/hyperparam.py +++ b/elephas/hyperparam.py @@ -16,6 +16,7 @@ class HyperParamModel(object): Computes distributed hyper-parameter optimization using Hyperas and Spark. """ + def __init__(self, sc, num_workers=4): self.spark_context = sc self.num_workers = num_workers @@ -26,14 +27,16 @@ def compute_trials(self, model, data, max_evals, notebook_name): hyperas_worker = HyperasWorker(model_string, max_evals) dummy_rdd = self.spark_context.parallelize([i for i in range(1, 1000)]) dummy_rdd = dummy_rdd.repartition(self.num_workers) - trials_list = dummy_rdd.mapPartitions(hyperas_worker._minimize).collect() + trials_list = dummy_rdd.mapPartitions( + hyperas_worker._minimize).collect() return trials_list def minimize(self, model, data, max_evals, notebook_name=None): global best_model_yaml, best_model_weights - trials_list = self.compute_trials(model, data, max_evals, notebook_name) + trials_list = self.compute_trials( + model, data, max_evals, notebook_name) best_val = 1e7 for trials in trials_list: @@ -61,14 +64,16 @@ def best_models(self, nb_models, model, data, max_evals): nb_models = len(trials_list) scores = [] for trials in trials_list: - scores = scores + [trial.get('result').get('loss') for trial in trials] + scores = scores + [trial.get('result').get('loss') + for trial in trials] cut_off = sorted(scores, reverse=True)[nb_models - 1] model_list = [] for trials in trials_list: for trial in trials: if trial.get('result').get('loss') >= cut_off: model = model_from_yaml(trial.get('result').get('model')) - model.set_weights(pickle.loads(trial.get('result').get('weights'))) + model.set_weights(pickle.loads( + trial.get('result').get('weights'))) model_list.append(model) return model_list @@ -78,6 +83,7 @@ class HyperasWorker(object): Executes hyper-parameter search on each worker and returns results. """ + def __init__(self, bc_model, bc_max_evals): self.model_string = bc_model self.max_evals = bc_max_evals diff --git a/elephas/java/adapter.py b/elephas/java/adapter.py index 2588a37..ef9b99b 100644 --- a/elephas/java/adapter.py +++ b/elephas/java/adapter.py @@ -49,4 +49,4 @@ def retrieve_keras_weights(java_model): weight = params.get(key) np_weight = np.squeeze(to_numpy(weight)) weights.append(np_weight) - return weights \ No newline at end of file + return weights diff --git a/elephas/java/java_classes.py b/elephas/java/java_classes.py index 624fb4d..cc8bedb 100644 --- a/elephas/java/java_classes.py +++ b/elephas/java/java_classes.py @@ -44,5 +44,7 @@ # Import -KerasModelImport = autoclass('org.deeplearning4j.nn.modelimport.keras.KerasModelImport') -ElephasModelImport = autoclass('org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport') \ No newline at end of file +KerasModelImport = autoclass( + 'org.deeplearning4j.nn.modelimport.keras.KerasModelImport') +ElephasModelImport = autoclass( + 'org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport') diff --git a/elephas/java/ndarray.py b/elephas/java/ndarray.py index 4fc27bb..eafa8cd 100644 --- a/elephas/java/ndarray.py +++ b/elephas/java/ndarray.py @@ -83,7 +83,8 @@ def _from_numpy(np_array): # Convert the numpy array to nd4j context dtype required_dtype = get_np_dtype(get_context_dtype()) if np_array.dtype != required_dtype: - raise Exception("{} is required, got {}".format(repr(required_dtype), repr(np_array.dtype))) + raise Exception("{} is required, got {}".format( + repr(required_dtype), repr(np_array.dtype))) # Nd4j does not have 1-d vectors. # So we add a dummy dimension. @@ -430,4 +431,4 @@ def T(self): def array(*args, **kwargs): - return ndarray(*args, **kwargs) \ No newline at end of file + return ndarray(*args, **kwargs) diff --git a/elephas/ml/__init__.py b/elephas/ml/__init__.py index a6390b6..e20d51a 100644 --- a/elephas/ml/__init__.py +++ b/elephas/ml/__init__.py @@ -1,2 +1,2 @@ from .adapter import * -from .params import * \ No newline at end of file +from .params import * diff --git a/elephas/ml/adapter.py b/elephas/ml/adapter.py index 80b20fb..030d30c 100644 --- a/elephas/ml/adapter.py +++ b/elephas/ml/adapter.py @@ -5,6 +5,7 @@ from ..utils.rdd_utils import from_labeled_point, to_labeled_point, lp_to_simple_rdd from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors + def to_data_frame(sc, features, labels, categorical=False): """Convert numpy arrays of features and labels into Spark DataFrame """ @@ -30,8 +31,10 @@ def df_to_simple_rdd(df, categorical=False, nb_classes=None, features_col='featu selected_df = sql_context.sql( "SELECT {0} AS features, {1} as label from temp_table".format(features_col, label_col)) if isinstance(selected_df.first().features, MLLibVector): - lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, row.features)) + lp_rdd = selected_df.rdd.map( + lambda row: LabeledPoint(row.label, row.features)) else: - lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, MLLibVectors.fromML(row.features))) + lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint( + row.label, MLLibVectors.fromML(row.features))) rdd = lp_to_simple_rdd(lp_rdd, categorical, nb_classes) return rdd diff --git a/elephas/ml/params.py b/elephas/ml/params.py index bc83644..0cf62ce 100644 --- a/elephas/ml/params.py +++ b/elephas/ml/params.py @@ -7,9 +7,11 @@ class HasKerasModelConfig(Params): Parameter mixin for Keras model yaml """ + def __init__(self): super(HasKerasModelConfig, self).__init__() - self.keras_model_config = Param(self, "keras_model_config", "Serialized Keras model as yaml string") + self.keras_model_config = Param( + self, "keras_model_config", "Serialized Keras model as yaml string") def set_keras_model_config(self, keras_model_config): self._paramMap[self.keras_model_config] = keras_model_config @@ -22,13 +24,13 @@ def get_keras_model_config(self): class HasElephasOptimizerConfig(Params): """Parameter mixin for Elephas optimizer config """ + def __init__(self): super(HasElephasOptimizerConfig, self).__init__() self.elephas_optimizer_config = Param(self, "elephas_optimizer_config", "Serialized Elephas optimizer properties") self._setDefault(elephas_optimizer_config=None) - def set_elephas_optimizer_config(self, elephas_optimizer_config): self._paramMap[self.elephas_optimizer_config] = elephas_optimizer_config return self @@ -40,6 +42,7 @@ def get_elephas_optimizer_config(self): class HasMode(Params): """Parameter mixin for Elephas mode """ + def __init__(self): super(HasMode, self).__init__() self.mode = Param(self, "mode", "Elephas mode") @@ -56,6 +59,7 @@ def get_mode(self): class HasFrequency(Params): """Parameter mixin for Elephas frequency """ + def __init__(self): super(HasFrequency, self).__init__() self.frequency = Param(self, "frequency", "Elephas frequency") @@ -74,6 +78,7 @@ class HasNumberOfClasses(Params): Parameter mixin for number of classes """ + def __init__(self): super(HasNumberOfClasses, self).__init__() self.nb_classes = Param(self, "nb_classes", "number of classes") @@ -92,9 +97,11 @@ class HasCategoricalLabels(Params): Parameter mixin for setting categorical features """ + def __init__(self): super(HasCategoricalLabels, self).__init__() - self.categorical = Param(self, "categorical", "Boolean to indicate if labels are categorical") + self.categorical = Param( + self, "categorical", "Boolean to indicate if labels are categorical") self._setDefault(categorical=True) def set_categorical_labels(self, categorical): @@ -108,6 +115,7 @@ def get_categorical_labels(self): class HasEpochs(Params): """Parameter mixin for number of epochs """ + def __init__(self): super(HasEpochs, self).__init__() self.epochs = Param(self, "epochs", "Number of epochs to train") @@ -124,6 +132,7 @@ def get_epochs(self): class HasBatchSize(Params): """Parameter mixin for batch size """ + def __init__(self): super(HasBatchSize, self).__init__() self.batch_size = Param(self, "batch_size", "Batch size") @@ -140,6 +149,7 @@ def get_batch_size(self): class HasVerbosity(Params): """Parameter mixin for output verbosity """ + def __init__(self): super(HasVerbosity, self).__init__() self.verbose = Param(self, "verbose", "Stdout verbosity") @@ -156,9 +166,11 @@ def get_verbosity(self): class HasValidationSplit(Params): """Parameter mixin for validation split percentage """ + def __init__(self): super(HasValidationSplit, self).__init__() - self.validation_split = Param(self, "validation_split", "validation split percentage") + self.validation_split = Param( + self, "validation_split", "validation split percentage") self._setDefault(validation_split=0.1) def set_validation_split(self, validation_split): @@ -172,6 +184,7 @@ def get_validation_split(self): class HasNumberOfWorkers(Params): """Parameter mixin for number of workers """ + def __init__(self): super(HasNumberOfWorkers, self).__init__() self.num_workers = Param(self, "num_workers", "number of workers") @@ -188,9 +201,11 @@ def get_num_workers(self): class HasKerasOptimizerConfig(Params): """Parameter mixin for Keras optimizer config """ + def __init__(self): super(HasKerasOptimizerConfig, self).__init__() - self.optimizer_config = Param(self, "optimizer_config", "Serialized Keras optimizer properties") + self.optimizer_config = Param( + self, "optimizer_config", "Serialized Keras optimizer properties") self._setDefault(optimizer_config=None) def set_optimizer_config(self, optimizer_config): @@ -204,12 +219,12 @@ def get_optimizer_config(self): class HasMetrics(Params): """Parameter mixin for Keras metrics """ + def __init__(self): super(HasMetrics, self).__init__() self.metrics = Param(self, "metrics", "Keras metrics") self._setDefault(metrics=['acc']) - def set_metrics(self, metrics): self._paramMap[self.metrics] = metrics return self @@ -221,6 +236,7 @@ def get_metrics(self): class HasLoss(Params): """Parameter mixin for Keras metrics """ + def __init__(self): super(HasLoss, self).__init__() self.loss = Param(self, "loss", "Keras loss") diff --git a/elephas/ml_model.py b/elephas/ml_model.py index 90b5438..5f91c0d 100644 --- a/elephas/ml_model.py +++ b/elephas/ml_model.py @@ -80,7 +80,7 @@ def _fit(self, df): elephas_optimizer = None if self.get_elephas_optimizer_config() is not None: elephas_optimizer = get({'class_name': self.get_optimizer_config()['class_name'], - 'config': self.get_optimizer_config()}) + 'config': self.get_optimizer_config()}) keras_model = model_from_yaml(self.get_keras_model_config()) metrics = self.get_metrics() @@ -117,7 +117,8 @@ class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol): def __init__(self, **kwargs): super(ElephasTransformer, self).__init__() if "weights" in kwargs.keys(): - self.weights = kwargs.pop('weights') # Strip model weights from parameters to init Transformer + # Strip model weights from parameters to init Transformer + self.weights = kwargs.pop('weights') self.set_params(**kwargs) @keyword_only @@ -142,7 +143,6 @@ def save(self, file_name): f.flush() f.close() - def get_model(self): return model_from_yaml(self.get_keras_model_config()) @@ -155,11 +155,13 @@ def _transform(self, df): new_schema.add(StructField(output_col, StringType(), True)) rdd = df.rdd.coalesce(1) - features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect()) + features = np.asarray( + rdd.map(lambda x: from_vector(x.features)).collect()) # Note that we collect, since executing this on the rdd would require model serialization once again model = model_from_yaml(self.get_keras_model_config()) model.set_weights(self.weights.value) - predictions = rdd.ctx.parallelize(model.predict_classes(features)).coalesce(1) + predictions = rdd.ctx.parallelize( + model.predict_classes(features)).coalesce(1) predictions = predictions.map(lambda x: tuple(str(x))) results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1]) @@ -167,8 +169,10 @@ def _transform(self, df): # results_rdd = rdd.zip(predictions).map(lambda pair: Row(features=to_vector(pair[0].features), # label=pair[0].label, prediction=float(pair[1]))) results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema) - results_df = results_df.withColumn(output_col, results_df[output_col].cast(DoubleType())) - results_df = results_df.withColumn(label_col, results_df[label_col].cast(DoubleType())) + results_df = results_df.withColumn( + output_col, results_df[output_col].cast(DoubleType())) + results_df = results_df.withColumn( + label_col, results_df[label_col].cast(DoubleType())) return results_df diff --git a/elephas/mllib/__init__.py b/elephas/mllib/__init__.py index 56ac82f..38fc832 100644 --- a/elephas/mllib/__init__.py +++ b/elephas/mllib/__init__.py @@ -1 +1 @@ -from .adapter import * \ No newline at end of file +from .adapter import * diff --git a/elephas/optimizers.py b/elephas/optimizers.py index fbb49e0..a938d4b 100644 --- a/elephas/optimizers.py +++ b/elephas/optimizers.py @@ -31,6 +31,7 @@ class Optimizer(object): """Optimizer for elephas models, adapted from respective Keras module. """ + def __init__(self, **kwargs): self.__dict__.update(kwargs) self.updates = [] @@ -67,6 +68,7 @@ def get_config(self): class SGD(Optimizer): """SGD, optionally with nesterov momentum """ + def __init__(self, lr=0.01, momentum=0., decay=0., nesterov=False, *args, **kwargs): super(SGD, self).__init__(**kwargs) @@ -103,6 +105,7 @@ def get_config(self): class RMSprop(Optimizer): """Reference: www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf """ + def __init__(self, lr=0.001, rho=0.9, epsilon=1e-6, *args, **kwargs): super(RMSprop, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -132,6 +135,7 @@ def get_config(self): class Adagrad(Optimizer): """Reference: http://www.magicbroom.info/Papers/DuchiHaSi10.pdf """ + def __init__(self, lr=0.01, epsilon=1e-6, *args, **kwargs): super(Adagrad, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -156,6 +160,7 @@ def get_config(self): class Adadelta(Optimizer): """Reference: http://arxiv.org/abs/1212.5701 """ + def __init__(self, lr=1.0, rho=0.95, epsilon=1e-6, *args, **kwargs): super(Adadelta, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -190,6 +195,7 @@ class Adam(Optimizer): """Reference: http://arxiv.org/abs/1412.6980v8 Default parameters follow those provided in the original paper. """ + def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-8, *args, **kwargs): super(Adam, self).__init__(**kwargs) @@ -221,6 +227,7 @@ def get_config(self): "beta_2": self.beta_2, "epsilon": self.epsilon} + # aliases sgd = SGD rmsprop = RMSprop @@ -228,6 +235,7 @@ def get_config(self): adadelta = Adadelta adam = Adam + def serialize(optimizer): return serialize_keras_object(optimizer) @@ -286,4 +294,4 @@ def get(identifier): return identifier else: raise ValueError('Could not interpret optimizer identifier:', - identifier) \ No newline at end of file + identifier) diff --git a/elephas/parameter/__init__.py b/elephas/parameter/__init__.py index a62f33e..ad9c21a 100644 --- a/elephas/parameter/__init__.py +++ b/elephas/parameter/__init__.py @@ -1,2 +1,2 @@ from .server import * -from .client import * \ No newline at end of file +from .client import * diff --git a/elephas/parameter/client.py b/elephas/parameter/client.py index 997b082..e171864 100644 --- a/elephas/parameter/client.py +++ b/elephas/parameter/client.py @@ -44,6 +44,7 @@ class HttpClient(BaseParameterClient): namely HttpServer. The HTTP server provides two endpoints, `/parameters` to get parameters and `/update` to update the server's parameters. """ + def __init__(self, port=4000): self.master_url = determine_master(port=port) @@ -68,6 +69,7 @@ class SocketClient(BaseParameterClient): The socket server listens to two types of events. Those with a `g` prefix indicate a get-request, those with a `u` indicate a parameter update. """ + def __init__(self, host='0.0.0.0', port=4000): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/elephas/parameter/server.py b/elephas/parameter/server.py index 31ae633..07c376f 100644 --- a/elephas/parameter/server.py +++ b/elephas/parameter/server.py @@ -12,6 +12,7 @@ from ..utils.rwlock import RWLock as Lock from ..utils.notebook_utils import is_running_in_notebook + class BaseParameterServer(object): """BaseParameterServer @@ -125,9 +126,10 @@ def handle_update_parameters(): if not self.master_network.built: self.master_network.build() - base_constraint = lambda a: a + def base_constraint(a): return a constraints = [base_constraint for _ in self.weights] - self.weights = self.optimizer.get_updates(self.weights, constraints, delta) + self.weights = self.optimizer.get_updates( + self.weights, constraints, delta) if self.mode == 'asynchronous': self.lock.release() return 'Update done' diff --git a/elephas/spark_model.py b/elephas/spark_model.py index 28cb2fd..90e5adf 100644 --- a/elephas/spark_model.py +++ b/elephas/spark_model.py @@ -37,7 +37,8 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser self._master_network = model if not hasattr(model, "loss"): - raise Exception("Compile your Keras model before initializing an Elephas model with it") + raise Exception( + "Compile your Keras model before initializing an Elephas model with it") metrics = model.metrics loss = model.loss optimizer = serialize_optimizer(model.optimizer) @@ -66,7 +67,8 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser self.serialized_model = model_to_dict(self.master_network) # TODO only set this for async/hogwild mode if self.parameter_server_mode == 'http': - self.parameter_server = HttpServer(self.serialized_model, self.optimizer, self.mode) + self.parameter_server = HttpServer( + self.serialized_model, self.optimizer, self.mode) self.client = HttpClient() elif self.parameter_server_mode == 'socket': self.parameter_server = SocketServer(self.serialized_model) @@ -150,7 +152,8 @@ def fit(self, rdd, epochs=10, batch_size=32, if self.mode in ['asynchronous', 'synchronous', 'hogwild']: self._fit(rdd, epochs, batch_size, verbose, validation_split) else: - raise ValueError("Choose from one of the modes: asynchronous, synchronous or hogwild") + raise ValueError( + "Choose from one of the modes: asynchronous, synchronous or hogwild") def _fit(self, rdd, epochs, batch_size, verbose, validation_split): """Protected train method to make wrapping of modes easier @@ -160,7 +163,8 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): metrics=self.master_metrics) if self.mode in ['asynchronous', 'hogwild']: self.start_server() - train_config = self.get_train_config(epochs, batch_size, verbose, validation_split) + train_config = self.get_train_config( + epochs, batch_size, verbose, validation_split) mode = self.parameter_server_mode freq = self.frequency optimizer = self.master_optimizer @@ -173,18 +177,21 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): parameters = rdd.context.broadcast(init) if self.mode in ['asynchronous', 'hogwild']: - worker = AsynchronousSparkWorker(yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom) + worker = AsynchronousSparkWorker( + yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom) rdd.mapPartitions(worker.train).collect() new_parameters = self.client.get_parameters() elif self.mode == 'synchronous': - worker = SparkWorker(yaml, parameters, train_config, optimizer, loss, metrics, custom) + worker = SparkWorker(yaml, parameters, train_config, + optimizer, loss, metrics, custom) deltas = rdd.mapPartitions(worker.train).collect() new_parameters = self.master_network.get_weights() for delta in deltas: - base_constraint = lambda a: a + def base_constraint(a): return a constraints = [base_constraint for _ in self.weights] - new_parameters = self.optimizer.get_updates(self.weights, constraints, delta) + new_parameters = self.optimizer.get_updates( + self.weights, constraints, delta) else: raise ValueError("Unsupported mode {}".format(self.mode)) self.master_network.set_weights(new_parameters) @@ -227,12 +234,13 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_serv batch_size=batch_size, *args, **kwargs) def fit(self, labeled_points, epochs=10, batch_size=32, verbose=0, validation_split=0.1, - categorical=False, nb_classes=None): + categorical=False, nb_classes=None): """Train an elephas model on an RDD of LabeledPoints """ rdd = lp_to_simple_rdd(labeled_points, categorical, nb_classes) rdd = rdd.repartition(self.num_workers) - self._fit(rdd=rdd, epochs=epochs, batch_size=batch_size, verbose=verbose, validation_split=validation_split) + self._fit(rdd=rdd, epochs=epochs, batch_size=batch_size, + verbose=verbose, validation_split=validation_split) def predict(self, mllib_data): """Predict probabilities for an RDD of features @@ -242,5 +250,5 @@ def predict(self, mllib_data): elif isinstance(mllib_data, pyspark.mllib.linalg.Vector): return to_vector(self.master_network.predict(from_vector(mllib_data))) else: - raise ValueError('Provide either an MLLib matrix or vector, got {}'.format(mllib_data.__name__)) - + raise ValueError( + 'Provide either an MLLib matrix or vector, got {}'.format(mllib_data.__name__)) diff --git a/elephas/utils/__init__.py b/elephas/utils/__init__.py index 0c657db..38d40d8 100644 --- a/elephas/utils/__init__.py +++ b/elephas/utils/__init__.py @@ -2,4 +2,4 @@ from .rdd_utils import * from .serialization import * from .sockets import * -from .rwlock import * \ No newline at end of file +from .rwlock import * diff --git a/elephas/utils/rdd_utils.py b/elephas/utils/rdd_utils.py index a2062d4..1765dbc 100644 --- a/elephas/utils/rdd_utils.py +++ b/elephas/utils/rdd_utils.py @@ -76,7 +76,8 @@ def from_labeled_point(rdd, categorical=False, nb_classes=None): :param nb_classes: optional int, indicating the number of class labels :return: pair of numpy arrays, features and labels """ - features = np.asarray(rdd.map(lambda lp: from_vector(lp.features)).collect()) + features = np.asarray( + rdd.map(lambda lp: from_vector(lp.features)).collect()) labels = np.asarray(rdd.map(lambda lp: lp.label).collect(), dtype='int32') if categorical: if not nb_classes: @@ -110,9 +111,11 @@ def lp_to_simple_rdd(lp_rdd, categorical=False, nb_classes=None): """ if categorical: if not nb_classes: - labels = np.asarray(lp_rdd.map(lambda lp: lp.label).collect(), dtype='int32') + labels = np.asarray(lp_rdd.map( + lambda lp: lp.label).collect(), dtype='int32') nb_classes = np.max(labels) + 1 - rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), encode_label(lp.label, nb_classes))) + rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), + encode_label(lp.label, nb_classes))) else: rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), lp.label)) return rdd diff --git a/elephas/utils/rwlock.py b/elephas/utils/rwlock.py index 42fa069..f1e1e86 100644 --- a/elephas/utils/rwlock.py +++ b/elephas/utils/rwlock.py @@ -14,6 +14,7 @@ class RWLock: simultaneously, XOR one writer. Write locks have priority over reads to prevent write starvation. """ + def __init__(self): self.rwlock = 0 self.writers_waiting = 0 diff --git a/elephas/worker.py b/elephas/worker.py index 437d4b7..9b84a1e 100644 --- a/elephas/worker.py +++ b/elephas/worker.py @@ -11,6 +11,7 @@ class SparkWorker(object): """Synchronous Spark worker. This code will be executed on workers. """ + def __init__(self, yaml, parameters, train_config, master_optimizer, master_loss, master_metrics, custom_objects): self.yaml = yaml @@ -27,25 +28,29 @@ def train(self, data_iterator): """ optimizer = get_optimizer(self.master_optimizer) self.model = model_from_yaml(self.yaml, self.custom_objects) - self.model.compile(optimizer=optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=optimizer, + loss=self.master_loss, metrics=self.master_metrics) self.model.set_weights(self.parameters.value) feature_iterator, label_iterator = tee(data_iterator, 2) x_train = np.asarray([x for x, y in feature_iterator]) y_train = np.asarray([y for x, y in label_iterator]) - self.model.compile(optimizer=self.master_optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=self.master_optimizer, + loss=self.master_loss, metrics=self.master_metrics) weights_before_training = self.model.get_weights() if x_train.shape[0] > self.train_config.get('batch_size'): self.model.fit(x_train, y_train, **self.train_config) weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) yield deltas class AsynchronousSparkWorker(object): """Asynchronous Spark worker. This code will be executed on workers. """ + def __init__(self, yaml, parameters, parameter_server_mode, train_config, frequency, master_optimizer, master_loss, master_metrics, custom_objects): @@ -80,7 +85,8 @@ def train(self, data_iterator): optimizer = get_optimizer(self.master_optimizer) self.model = model_from_yaml(self.yaml, self.custom_objects) - self.model.compile(optimizer=optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=optimizer, + loss=self.master_loss, metrics=self.master_metrics) self.model.set_weights(self.parameters.value) epochs = self.train_config['epochs'] @@ -102,7 +108,8 @@ def train(self, data_iterator): self.model.fit(x_train, y_train, **self.train_config) self.train_config['epochs'] = epochs weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) self.client.update_parameters(deltas) elif self.frequency == 'batch': for epoch in range(epochs): @@ -115,8 +122,10 @@ def train(self, data_iterator): y = slice_arrays(y_train, batch_ids) self.model.train_on_batch(x, y) weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) self.client.update_parameters(deltas) else: - raise ValueError('frequency parameter can be `epoch` or `batch, got {}'.format(self.frequency)) + raise ValueError( + 'frequency parameter can be `epoch` or `batch, got {}'.format(self.frequency)) yield []