Skip to content

Commit

Permalink
Merge pull request #124 from maxpumperla/mp_travis
Browse files Browse the repository at this point in the history
Travis build
  • Loading branch information
maxpumperla authored Jan 3, 2019
2 parents dcc9b42 + 98dfb61 commit db90035
Show file tree
Hide file tree
Showing 35 changed files with 206 additions and 93 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,6 @@ java_param_sharing_model.h5
temp.h5

docs/sources
docs/site
docs/site

.vscode
11 changes: 5 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dist: trusty
language: python
python:
- "2.7"
#- "3.4" # Note that hyperopt currently seems to have issues with 3.4
- "3.4" # Note that hyperopt currently seems to have issues with 3.4
install:
# code below is taken from http://conda.pydata.org/docs/travis.html
# We do this conditionally because it saves us some downloading if the
Expand All @@ -19,14 +19,13 @@ install:
- conda config --set always_yes yes --set changeps1 no
- conda update -q conda
- conda info -a
- conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION numpy scipy pytest h5py flask
- travis_retry conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION nose
- source activate test-environment
- pip install pytest-cov python-coveralls
- python setup.py install
- pip install -r requirements.txt
- pip install Cython --install-option="--no-cython-compile"
- pip install -e .[tests]

script:
- python -c "import keras.backend"
- py.test tests/
- py.test --pep8 -m pep8 tests/
after_success:
- coveralls
11 changes: 7 additions & 4 deletions elephas/dl4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size, averaging
self.save(save_file)
model_file = java_classes.File(save_file)
keras_model_type = model.__class__.__name__
self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type)
self.java_spark_model = dl4j_import(
java_spark_context, model_file, keras_model_type)

def fit_rdd(self, data_set_rdd, epochs):
for _ in range(epochs):
Expand All @@ -42,7 +43,7 @@ def get_keras_model(self):

class ParameterSharingModel(SparkModel):
def __init__(self, java_spark_context, model, num_workers, batch_size,
shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3 , workers_per_node=-1,
shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3, workers_per_node=-1,
num_batches_prefetch=0, step_delay=50, step_trigger=0.05, threshold_step=1e-5,
collect_stats=False, save_file='temp.h5', *args, **kwargs):
"""ParameterSharingModel
Expand Down Expand Up @@ -73,7 +74,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size,
self.save(save_file)
model_file = java_classes.File(save_file)
keras_model_type = model.__class__.__name__
self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type)
self.java_spark_model = dl4j_import(
java_spark_context, model_file, keras_model_type)

def fit_rdd(self, data_set_rdd, epochs):
for _ in range(epochs):
Expand Down Expand Up @@ -101,4 +103,5 @@ def dl4j_import(jsc, model_file, keras_model_type):
except:
print("Couldn't load Keras model into DL4J")
else:
raise Exception("Keras model not understood, got: {}".format(keras_model_type))
raise Exception(
"Keras model not understood, got: {}".format(keras_model_type))
14 changes: 10 additions & 4 deletions elephas/hyperparam.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class HyperParamModel(object):
Computes distributed hyper-parameter optimization using Hyperas and
Spark.
"""

def __init__(self, sc, num_workers=4):
self.spark_context = sc
self.num_workers = num_workers
Expand All @@ -26,14 +27,16 @@ def compute_trials(self, model, data, max_evals, notebook_name):
hyperas_worker = HyperasWorker(model_string, max_evals)
dummy_rdd = self.spark_context.parallelize([i for i in range(1, 1000)])
dummy_rdd = dummy_rdd.repartition(self.num_workers)
trials_list = dummy_rdd.mapPartitions(hyperas_worker._minimize).collect()
trials_list = dummy_rdd.mapPartitions(
hyperas_worker._minimize).collect()

return trials_list

def minimize(self, model, data, max_evals, notebook_name=None):
global best_model_yaml, best_model_weights

trials_list = self.compute_trials(model, data, max_evals, notebook_name)
trials_list = self.compute_trials(
model, data, max_evals, notebook_name)

best_val = 1e7
for trials in trials_list:
Expand Down Expand Up @@ -61,14 +64,16 @@ def best_models(self, nb_models, model, data, max_evals):
nb_models = len(trials_list)
scores = []
for trials in trials_list:
scores = scores + [trial.get('result').get('loss') for trial in trials]
scores = scores + [trial.get('result').get('loss')
for trial in trials]
cut_off = sorted(scores, reverse=True)[nb_models - 1]
model_list = []
for trials in trials_list:
for trial in trials:
if trial.get('result').get('loss') >= cut_off:
model = model_from_yaml(trial.get('result').get('model'))
model.set_weights(pickle.loads(trial.get('result').get('weights')))
model.set_weights(pickle.loads(
trial.get('result').get('weights')))
model_list.append(model)
return model_list

Expand All @@ -78,6 +83,7 @@ class HyperasWorker(object):
Executes hyper-parameter search on each worker and returns results.
"""

def __init__(self, bc_model, bc_max_evals):
self.model_string = bc_model
self.max_evals = bc_max_evals
Expand Down
2 changes: 1 addition & 1 deletion elephas/java/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ def retrieve_keras_weights(java_model):
weight = params.get(key)
np_weight = np.squeeze(to_numpy(weight))
weights.append(np_weight)
return weights
return weights
6 changes: 4 additions & 2 deletions elephas/java/java_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@


# Import
KerasModelImport = autoclass('org.deeplearning4j.nn.modelimport.keras.KerasModelImport')
ElephasModelImport = autoclass('org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport')
KerasModelImport = autoclass(
'org.deeplearning4j.nn.modelimport.keras.KerasModelImport')
ElephasModelImport = autoclass(
'org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport')
5 changes: 3 additions & 2 deletions elephas/java/ndarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ def _from_numpy(np_array):
# Convert the numpy array to nd4j context dtype
required_dtype = get_np_dtype(get_context_dtype())
if np_array.dtype != required_dtype:
raise Exception("{} is required, got {}".format(repr(required_dtype), repr(np_array.dtype)))
raise Exception("{} is required, got {}".format(
repr(required_dtype), repr(np_array.dtype)))

# Nd4j does not have 1-d vectors.
# So we add a dummy dimension.
Expand Down Expand Up @@ -430,4 +431,4 @@ def T(self):


def array(*args, **kwargs):
return ndarray(*args, **kwargs)
return ndarray(*args, **kwargs)
2 changes: 1 addition & 1 deletion elephas/ml/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .adapter import *
from .params import *
from .params import *
7 changes: 5 additions & 2 deletions elephas/ml/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ..utils.rdd_utils import from_labeled_point, to_labeled_point, lp_to_simple_rdd
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors


def to_data_frame(sc, features, labels, categorical=False):
"""Convert numpy arrays of features and labels into Spark DataFrame
"""
Expand All @@ -30,8 +31,10 @@ def df_to_simple_rdd(df, categorical=False, nb_classes=None, features_col='featu
selected_df = sql_context.sql(
"SELECT {0} AS features, {1} as label from temp_table".format(features_col, label_col))
if isinstance(selected_df.first().features, MLLibVector):
lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, row.features))
lp_rdd = selected_df.rdd.map(
lambda row: LabeledPoint(row.label, row.features))
else:
lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, MLLibVectors.fromML(row.features)))
lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(
row.label, MLLibVectors.fromML(row.features)))
rdd = lp_to_simple_rdd(lp_rdd, categorical, nb_classes)
return rdd
28 changes: 22 additions & 6 deletions elephas/ml/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ class HasKerasModelConfig(Params):
Parameter mixin for Keras model yaml
"""

def __init__(self):
super(HasKerasModelConfig, self).__init__()
self.keras_model_config = Param(self, "keras_model_config", "Serialized Keras model as yaml string")
self.keras_model_config = Param(
self, "keras_model_config", "Serialized Keras model as yaml string")

def set_keras_model_config(self, keras_model_config):
self._paramMap[self.keras_model_config] = keras_model_config
Expand All @@ -22,13 +24,13 @@ def get_keras_model_config(self):
class HasElephasOptimizerConfig(Params):
"""Parameter mixin for Elephas optimizer config
"""

def __init__(self):
super(HasElephasOptimizerConfig, self).__init__()
self.elephas_optimizer_config = Param(self, "elephas_optimizer_config",
"Serialized Elephas optimizer properties")
self._setDefault(elephas_optimizer_config=None)


def set_elephas_optimizer_config(self, elephas_optimizer_config):
self._paramMap[self.elephas_optimizer_config] = elephas_optimizer_config
return self
Expand All @@ -40,6 +42,7 @@ def get_elephas_optimizer_config(self):
class HasMode(Params):
"""Parameter mixin for Elephas mode
"""

def __init__(self):
super(HasMode, self).__init__()
self.mode = Param(self, "mode", "Elephas mode")
Expand All @@ -56,6 +59,7 @@ def get_mode(self):
class HasFrequency(Params):
"""Parameter mixin for Elephas frequency
"""

def __init__(self):
super(HasFrequency, self).__init__()
self.frequency = Param(self, "frequency", "Elephas frequency")
Expand All @@ -74,6 +78,7 @@ class HasNumberOfClasses(Params):
Parameter mixin for number of classes
"""

def __init__(self):
super(HasNumberOfClasses, self).__init__()
self.nb_classes = Param(self, "nb_classes", "number of classes")
Expand All @@ -92,9 +97,11 @@ class HasCategoricalLabels(Params):
Parameter mixin for setting categorical features
"""

def __init__(self):
super(HasCategoricalLabels, self).__init__()
self.categorical = Param(self, "categorical", "Boolean to indicate if labels are categorical")
self.categorical = Param(
self, "categorical", "Boolean to indicate if labels are categorical")
self._setDefault(categorical=True)

def set_categorical_labels(self, categorical):
Expand All @@ -108,6 +115,7 @@ def get_categorical_labels(self):
class HasEpochs(Params):
"""Parameter mixin for number of epochs
"""

def __init__(self):
super(HasEpochs, self).__init__()
self.epochs = Param(self, "epochs", "Number of epochs to train")
Expand All @@ -124,6 +132,7 @@ def get_epochs(self):
class HasBatchSize(Params):
"""Parameter mixin for batch size
"""

def __init__(self):
super(HasBatchSize, self).__init__()
self.batch_size = Param(self, "batch_size", "Batch size")
Expand All @@ -140,6 +149,7 @@ def get_batch_size(self):
class HasVerbosity(Params):
"""Parameter mixin for output verbosity
"""

def __init__(self):
super(HasVerbosity, self).__init__()
self.verbose = Param(self, "verbose", "Stdout verbosity")
Expand All @@ -156,9 +166,11 @@ def get_verbosity(self):
class HasValidationSplit(Params):
"""Parameter mixin for validation split percentage
"""

def __init__(self):
super(HasValidationSplit, self).__init__()
self.validation_split = Param(self, "validation_split", "validation split percentage")
self.validation_split = Param(
self, "validation_split", "validation split percentage")
self._setDefault(validation_split=0.1)

def set_validation_split(self, validation_split):
Expand All @@ -172,6 +184,7 @@ def get_validation_split(self):
class HasNumberOfWorkers(Params):
"""Parameter mixin for number of workers
"""

def __init__(self):
super(HasNumberOfWorkers, self).__init__()
self.num_workers = Param(self, "num_workers", "number of workers")
Expand All @@ -188,9 +201,11 @@ def get_num_workers(self):
class HasKerasOptimizerConfig(Params):
"""Parameter mixin for Keras optimizer config
"""

def __init__(self):
super(HasKerasOptimizerConfig, self).__init__()
self.optimizer_config = Param(self, "optimizer_config", "Serialized Keras optimizer properties")
self.optimizer_config = Param(
self, "optimizer_config", "Serialized Keras optimizer properties")
self._setDefault(optimizer_config=None)

def set_optimizer_config(self, optimizer_config):
Expand All @@ -204,12 +219,12 @@ def get_optimizer_config(self):
class HasMetrics(Params):
"""Parameter mixin for Keras metrics
"""

def __init__(self):
super(HasMetrics, self).__init__()
self.metrics = Param(self, "metrics", "Keras metrics")
self._setDefault(metrics=['acc'])


def set_metrics(self, metrics):
self._paramMap[self.metrics] = metrics
return self
Expand All @@ -221,6 +236,7 @@ def get_metrics(self):
class HasLoss(Params):
"""Parameter mixin for Keras metrics
"""

def __init__(self):
super(HasLoss, self).__init__()
self.loss = Param(self, "loss", "Keras loss")
Expand Down
18 changes: 11 additions & 7 deletions elephas/ml_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _fit(self, df):
elephas_optimizer = None
if self.get_elephas_optimizer_config() is not None:
elephas_optimizer = get({'class_name': self.get_optimizer_config()['class_name'],
'config': self.get_optimizer_config()})
'config': self.get_optimizer_config()})

keras_model = model_from_yaml(self.get_keras_model_config())
metrics = self.get_metrics()
Expand Down Expand Up @@ -117,7 +117,8 @@ class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol):
def __init__(self, **kwargs):
super(ElephasTransformer, self).__init__()
if "weights" in kwargs.keys():
self.weights = kwargs.pop('weights') # Strip model weights from parameters to init Transformer
# Strip model weights from parameters to init Transformer
self.weights = kwargs.pop('weights')
self.set_params(**kwargs)

@keyword_only
Expand All @@ -142,7 +143,6 @@ def save(self, file_name):
f.flush()
f.close()


def get_model(self):
return model_from_yaml(self.get_keras_model_config())

Expand All @@ -155,20 +155,24 @@ def _transform(self, df):
new_schema.add(StructField(output_col, StringType(), True))

rdd = df.rdd.coalesce(1)
features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect())
features = np.asarray(
rdd.map(lambda x: from_vector(x.features)).collect())
# Note that we collect, since executing this on the rdd would require model serialization once again
model = model_from_yaml(self.get_keras_model_config())
model.set_weights(self.weights.value)
predictions = rdd.ctx.parallelize(model.predict_classes(features)).coalesce(1)
predictions = rdd.ctx.parallelize(
model.predict_classes(features)).coalesce(1)
predictions = predictions.map(lambda x: tuple(str(x)))

results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1])
# TODO: Zipping like this is very likely wrong
# results_rdd = rdd.zip(predictions).map(lambda pair: Row(features=to_vector(pair[0].features),
# label=pair[0].label, prediction=float(pair[1])))
results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema)
results_df = results_df.withColumn(output_col, results_df[output_col].cast(DoubleType()))
results_df = results_df.withColumn(label_col, results_df[label_col].cast(DoubleType()))
results_df = results_df.withColumn(
output_col, results_df[output_col].cast(DoubleType()))
results_df = results_df.withColumn(
label_col, results_df[label_col].cast(DoubleType()))

return results_df

Expand Down
2 changes: 1 addition & 1 deletion elephas/mllib/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .adapter import *
from .adapter import *
Loading

0 comments on commit db90035

Please sign in to comment.