From 1e8499e1a5a7c27e169ff33f3cd8521c74cf2830 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:27:07 +0100 Subject: [PATCH 1/6] extra requirements --- .gitignore | 4 +++- setup.py | 18 ++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 1975cef..97be343 100644 --- a/.gitignore +++ b/.gitignore @@ -77,4 +77,6 @@ java_param_sharing_model.h5 temp.h5 docs/sources -docs/site \ No newline at end of file +docs/site + +.vscode \ No newline at end of file diff --git a/setup.py b/setup.py index 6fd583b..049ee02 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,21 @@ download_url='https://github.com/maxpumperla/elephas/tarball/0.4', author='Max Pumperla', author_email='max.pumperla@googlemail.com', - install_requires=['cython', 'keras', 'tensorflow', 'hyperas', 'flask', 'six', 'pyspark', 'pydl4j>=0.1.3'], + install_requires=['cython', 'tensorflow', 'keras', 'hyperas', 'flask', 'six', 'pyspark'], + extras_require={ + 'java': ['pydl4j>=0.1.3'], + 'tests': ['pytest', 'pytest-pep8', 'pytest-cov', 'mock'] + }, packages=find_packages(), license='MIT', - zip_safe=False) + zip_safe=False, + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'Environment :: Console', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: OS Independent', + 'Programming Language :: Python', + 'Programming Language :: Python :: 2', + 'Programming Language :: Python :: 3' + ]) From 93e739e8dcda5cbbe3cb2f9d0f4aead78e5def36 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:30:44 +0100 Subject: [PATCH 2/6] travis --- .travis.yml | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 638bb9b..67cd968 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,12 +19,10 @@ install: - conda config --set always_yes yes --set changeps1 no - conda update -q conda - conda info -a - - conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION numpy scipy pytest h5py flask + - travis_retry conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION nose - source activate test-environment - - pip install pytest-cov python-coveralls - - python setup.py install - - pip install -r requirements.txt - + - pip install Cython --install-option="--no-cython-compile" + - pip install -e .[tests] script: - python -c "import keras.backend" - py.test tests/ From 79171ff73feb2a28662f9f37fb6a00ca3d9b89e6 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:39:39 +0100 Subject: [PATCH 3/6] py3.4 --- .travis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 67cd968..eed22b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ dist: trusty language: python python: - "2.7" - #- "3.4" # Note that hyperopt currently seems to have issues with 3.4 + - "3.4" # Note that hyperopt currently seems to have issues with 3.4 install: # code below is taken from http://conda.pydata.org/docs/travis.html # We do this conditionally because it saves us some downloading if the @@ -23,8 +23,9 @@ install: - source activate test-environment - pip install Cython --install-option="--no-cython-compile" - pip install -e .[tests] + script: - python -c "import keras.backend" - - py.test tests/ + - py.test --pep8 -m pep8 tests/ after_success: - coveralls From dfc90f9613bf7d951d961459da0b5e1d14926660 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:44:25 +0100 Subject: [PATCH 4/6] pytest ini --- pytest.ini | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 pytest.ini diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..bca914c --- /dev/null +++ b/pytest.ini @@ -0,0 +1,10 @@ +[pytest] + +norecursedirs= build + +# PEP-8 The following are ignored: +# E501 line too long (82 > 79 characters) +# W503 line break occurred before a binary operator + +pep8ignore=* E501 \ + * W503 \ No newline at end of file From bee3376305c3babf4e4fdca294f3a0f12fe94e19 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:44:38 +0100 Subject: [PATCH 5/6] pep8 --- tests/conftest.py | 6 ++++-- tests/ml/test_adapter.py | 15 ++++++++++----- tests/ml/test_params.py | 4 ++-- tests/mllib/test_adapter.py | 2 +- tests/parameter/test_client.py | 2 +- tests/parameter/test_server.py | 2 +- tests/test_hyperparam.py | 3 ++- tests/test_mllib_model.py | 9 ++++++--- tests/test_model_serialization.py | 6 ++++-- tests/test_spark_model.py | 21 ++++++++++++++------- tests/utils/test_rwlock.py | 2 +- tests/utils/test_sockets.py | 2 +- 12 files changed, 47 insertions(+), 27 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index df5ee50..961724a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,8 @@ def spark_context(request): Args: request: pytest.FixtureRequest object """ - conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) + conf = (SparkConf().setMaster("local[2]").setAppName( + "pytest-pyspark-local-testing")) sc = SparkContext(conf=conf) request.addfinalizer(lambda: sc.stop()) @@ -30,7 +31,8 @@ def sql_context(request): Args: request: pytest.FixtureRequest object """ - conf = (SparkConf().setMaster("local[2]").setAppName("pytest-pyspark-local-testing")) + conf = (SparkConf().setMaster("local[2]").setAppName( + "pytest-pyspark-local-testing")) sc = SparkContext(conf=conf) sql_context = SQLContext(sc) request.addfinalizer(lambda: sc.stop()) diff --git a/tests/ml/test_adapter.py b/tests/ml/test_adapter.py index 2017db9..cfb4db3 100644 --- a/tests/ml/test_adapter.py +++ b/tests/ml/test_adapter.py @@ -8,7 +8,8 @@ def test_to_data_frame(spark_context): features = np.ones((2, 10)) labels = np.asarray([[2.0], [1.0]]) - data_frame = adapter.to_data_frame(spark_context, features, labels, categorical=False) + data_frame = adapter.to_data_frame( + spark_context, features, labels, categorical=False) assert data_frame.count() == 2 @@ -16,7 +17,8 @@ def test_to_data_frame_cat(spark_context): features = np.ones((2, 10)) labels = np.asarray([[0, 0, 1.0], [0, 1.0, 0]]) - data_frame = adapter.to_data_frame(spark_context, features, labels, categorical=True) + data_frame = adapter.to_data_frame( + spark_context, features, labels, categorical=True) assert data_frame.count() == 2 @@ -24,7 +26,8 @@ def test_from_data_frame(spark_context): features = np.ones((2, 10)) labels = np.asarray([[2.0], [1.0]]).reshape((2,)) - data_frame = adapter.to_data_frame(spark_context, features, labels, categorical=False) + data_frame = adapter.to_data_frame( + spark_context, features, labels, categorical=False) x, y = adapter.from_data_frame(data_frame, categorical=False) assert features.shape == x.shape @@ -35,7 +38,8 @@ def test_from_data_frame_cat(spark_context): features = np.ones((2, 10)) labels = np.asarray([[0, 0, 1.0], [0, 1.0, 0]]) - data_frame = adapter.to_data_frame(spark_context, features, labels, categorical=True) + data_frame = adapter.to_data_frame( + spark_context, features, labels, categorical=True) x, y = adapter.from_data_frame(data_frame, categorical=True, nb_classes=3) assert features.shape == x.shape @@ -46,7 +50,8 @@ def test_df_to_simple_rdd(spark_context): features = np.ones((2, 10)) labels = np.asarray([[2.0], [1.0]]).reshape((2,)) - data_frame = adapter.to_data_frame(spark_context, features, labels, categorical=False) + data_frame = adapter.to_data_frame( + spark_context, features, labels, categorical=False) rdd = adapter.df_to_simple_rdd(data_frame, False) assert rdd.count() == 2 diff --git a/tests/ml/test_params.py b/tests/ml/test_params.py index 5a51ee1..ea37619 100644 --- a/tests/ml/test_params.py +++ b/tests/ml/test_params.py @@ -5,7 +5,7 @@ def test_has_keras_model_config(): param = HasKerasModelConfig() conf = {"foo": "bar"} param.set_keras_model_config(conf) - assert conf == param.get_keras_model_config() + assert conf == param.get_keras_model_config() def test_has_elephas_optimizer_config(): @@ -91,4 +91,4 @@ def test_has_number_of_workers(): assert param.get_num_workers() == 8 workers = 12 param.set_num_workers(workers) - assert param.get_num_workers() == workers \ No newline at end of file + assert param.get_num_workers() == workers diff --git a/tests/mllib/test_adapter.py b/tests/mllib/test_adapter.py index 3956aac..07b3b3f 100644 --- a/tests/mllib/test_adapter.py +++ b/tests/mllib/test_adapter.py @@ -25,4 +25,4 @@ def test_from_vector(): def test_to_vector(): vector = Vectors.dense([4, 2]) x = from_vector(vector) - assert x.shape == (2,) \ No newline at end of file + assert x.shape == (2,) diff --git a/tests/parameter/test_client.py b/tests/parameter/test_client.py index 0cbdcac..96c610a 100644 --- a/tests/parameter/test_client.py +++ b/tests/parameter/test_client.py @@ -1 +1 @@ -# TODO test clients \ No newline at end of file +# TODO test clients diff --git a/tests/parameter/test_server.py b/tests/parameter/test_server.py index 4c0e0a7..26fb11c 100644 --- a/tests/parameter/test_server.py +++ b/tests/parameter/test_server.py @@ -1 +1 @@ -# TODO test servers \ No newline at end of file +# TODO test servers diff --git a/tests/test_hyperparam.py b/tests/test_hyperparam.py index 5f9ff48..3942094 100644 --- a/tests/test_hyperparam.py +++ b/tests/test_hyperparam.py @@ -40,7 +40,8 @@ def model(x_train, y_train, x_test, y_test): keras_model.add(Activation('softmax')) rms = RMSprop() - keras_model.compile(loss='categorical_crossentropy', optimizer=rms, metrics=['acc']) + keras_model.compile(loss='categorical_crossentropy', + optimizer=rms, metrics=['acc']) keras_model.fit(x_train, y_train, batch_size={{choice([64, 128])}}, diff --git a/tests/test_mllib_model.py b/tests/test_mllib_model.py index 305f00d..d2b3006 100644 --- a/tests/test_mllib_model.py +++ b/tests/test_mllib_model.py @@ -47,17 +47,20 @@ def test_serialization(): - spark_model = SparkMLlibModel(model, frequency='epoch', mode='synchronous', num_workers=2) + spark_model = SparkMLlibModel( + model, frequency='epoch', mode='synchronous', num_workers=2) spark_model.save("test.h5") load_spark_model("test.h5") def test_mllib_model(spark_context): # Build RDD from numpy features and labels - lp_rdd = to_labeled_point(spark_context, x_train, y_train, categorical=True) + lp_rdd = to_labeled_point(spark_context, x_train, + y_train, categorical=True) # Initialize SparkModel from Keras model and Spark context - spark_model = SparkMLlibModel(model=model, frequency='epoch', mode='synchronous') + spark_model = SparkMLlibModel( + model=model, frequency='epoch', mode='synchronous') # Train Spark model spark_model.fit(lp_rdd, epochs=5, batch_size=32, verbose=0, diff --git a/tests/test_model_serialization.py b/tests/test_model_serialization.py index 437b428..b962be9 100644 --- a/tests/test_model_serialization.py +++ b/tests/test_model_serialization.py @@ -23,7 +23,8 @@ def test_sequential_serialization(): seq_model.add(Dense(10)) seq_model.add(Activation('softmax')) - seq_model.compile(optimizer="sgd", loss="categorical_crossentropy", metrics=["acc"]) + seq_model.compile( + optimizer="sgd", loss="categorical_crossentropy", metrics=["acc"]) spark_model = SparkModel(seq_model, frequency='epoch', mode='synchronous') spark_model.save("elephas_sequential.h5") @@ -44,7 +45,8 @@ def test_model_serialization(): loss='categorical_crossentropy', metrics=['accuracy']) - spark_model = SparkModel(model, frequency='epoch', mode='synchronous', foo="bar") + spark_model = SparkModel(model, frequency='epoch', + mode='synchronous', foo="bar") spark_model.save("elephas_model.h5") diff --git a/tests/test_spark_model.py b/tests/test_spark_model.py index 3387dbd..2829967 100644 --- a/tests/test_spark_model.py +++ b/tests/test_spark_model.py @@ -46,32 +46,39 @@ model.add(Dense(10)) model.add(Activation('softmax')) -model.compile(optimizer="sgd", loss="categorical_crossentropy", metrics=["acc"]) +model.compile(optimizer="sgd", + loss="categorical_crossentropy", metrics=["acc"]) def test_spark_model_end_to_end(spark_context): rdd = to_simple_rdd(spark_context, x_train, y_train) # sync epoch - spark_model = SparkModel(model, frequency='epoch', mode='synchronous', num_workers=2) - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=2, validation_split=0.1) + spark_model = SparkModel(model, frequency='epoch', + mode='synchronous', num_workers=2) + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=2, validation_split=0.1) score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) print('Test accuracy:', score[1]) # sync batch - spark_model = SparkModel(model, frequency='batch', mode='synchronous', num_workers=2) - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=2, validation_split=0.1) + spark_model = SparkModel(model, frequency='batch', + mode='synchronous', num_workers=2) + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=2, validation_split=0.1) score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) print('Test accuracy:', score[1]) # async epoch spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=2, validation_split=0.1) + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=2, validation_split=0.1) score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) print('Test accuracy:', score[1]) # hog wild epoch spark_model = SparkModel(model, frequency='epoch', mode='hogwild') - spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, verbose=2, validation_split=0.1) + spark_model.fit(rdd, epochs=epochs, batch_size=batch_size, + verbose=2, validation_split=0.1) score = spark_model.master_network.evaluate(x_test, y_test, verbose=2) print('Test accuracy:', score[1]) diff --git a/tests/utils/test_rwlock.py b/tests/utils/test_rwlock.py index 2bc8f73..6efef1e 100644 --- a/tests/utils/test_rwlock.py +++ b/tests/utils/test_rwlock.py @@ -1 +1 @@ -# TODO test lock \ No newline at end of file +# TODO test lock diff --git a/tests/utils/test_sockets.py b/tests/utils/test_sockets.py index 253355a..d09264f 100644 --- a/tests/utils/test_sockets.py +++ b/tests/utils/test_sockets.py @@ -1 +1 @@ -# TODO test sockets \ No newline at end of file +# TODO test sockets From 98dfb610a3982c02d7c56deb78fbffe3ab0fb788 Mon Sep 17 00:00:00 2001 From: Max Pumperla Date: Thu, 3 Jan 2019 17:44:55 +0100 Subject: [PATCH 6/6] pep8 --- elephas/dl4j.py | 11 +++++++---- elephas/hyperparam.py | 14 ++++++++++---- elephas/java/adapter.py | 2 +- elephas/java/java_classes.py | 6 ++++-- elephas/java/ndarray.py | 5 +++-- elephas/ml/__init__.py | 2 +- elephas/ml/adapter.py | 7 +++++-- elephas/ml/params.py | 28 ++++++++++++++++++++++------ elephas/ml_model.py | 18 +++++++++++------- elephas/mllib/__init__.py | 2 +- elephas/optimizers.py | 10 +++++++++- elephas/parameter/__init__.py | 2 +- elephas/parameter/client.py | 2 ++ elephas/parameter/server.py | 6 ++++-- elephas/spark_model.py | 32 ++++++++++++++++++++------------ elephas/utils/__init__.py | 2 +- elephas/utils/rdd_utils.py | 9 ++++++--- elephas/utils/rwlock.py | 1 + elephas/worker.py | 23 ++++++++++++++++------- 19 files changed, 125 insertions(+), 57 deletions(-) diff --git a/elephas/dl4j.py b/elephas/dl4j.py index bcefedc..8bbac8e 100644 --- a/elephas/dl4j.py +++ b/elephas/dl4j.py @@ -26,7 +26,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size, averaging self.save(save_file) model_file = java_classes.File(save_file) keras_model_type = model.__class__.__name__ - self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type) + self.java_spark_model = dl4j_import( + java_spark_context, model_file, keras_model_type) def fit_rdd(self, data_set_rdd, epochs): for _ in range(epochs): @@ -42,7 +43,7 @@ def get_keras_model(self): class ParameterSharingModel(SparkModel): def __init__(self, java_spark_context, model, num_workers, batch_size, - shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3 , workers_per_node=-1, + shake_frequency=0, min_threshold=1e-5, update_threshold=1e-3, workers_per_node=-1, num_batches_prefetch=0, step_delay=50, step_trigger=0.05, threshold_step=1e-5, collect_stats=False, save_file='temp.h5', *args, **kwargs): """ParameterSharingModel @@ -73,7 +74,8 @@ def __init__(self, java_spark_context, model, num_workers, batch_size, self.save(save_file) model_file = java_classes.File(save_file) keras_model_type = model.__class__.__name__ - self.java_spark_model = dl4j_import(java_spark_context, model_file, keras_model_type) + self.java_spark_model = dl4j_import( + java_spark_context, model_file, keras_model_type) def fit_rdd(self, data_set_rdd, epochs): for _ in range(epochs): @@ -101,4 +103,5 @@ def dl4j_import(jsc, model_file, keras_model_type): except: print("Couldn't load Keras model into DL4J") else: - raise Exception("Keras model not understood, got: {}".format(keras_model_type)) \ No newline at end of file + raise Exception( + "Keras model not understood, got: {}".format(keras_model_type)) diff --git a/elephas/hyperparam.py b/elephas/hyperparam.py index ae00289..c97abf4 100644 --- a/elephas/hyperparam.py +++ b/elephas/hyperparam.py @@ -16,6 +16,7 @@ class HyperParamModel(object): Computes distributed hyper-parameter optimization using Hyperas and Spark. """ + def __init__(self, sc, num_workers=4): self.spark_context = sc self.num_workers = num_workers @@ -26,14 +27,16 @@ def compute_trials(self, model, data, max_evals, notebook_name): hyperas_worker = HyperasWorker(model_string, max_evals) dummy_rdd = self.spark_context.parallelize([i for i in range(1, 1000)]) dummy_rdd = dummy_rdd.repartition(self.num_workers) - trials_list = dummy_rdd.mapPartitions(hyperas_worker._minimize).collect() + trials_list = dummy_rdd.mapPartitions( + hyperas_worker._minimize).collect() return trials_list def minimize(self, model, data, max_evals, notebook_name=None): global best_model_yaml, best_model_weights - trials_list = self.compute_trials(model, data, max_evals, notebook_name) + trials_list = self.compute_trials( + model, data, max_evals, notebook_name) best_val = 1e7 for trials in trials_list: @@ -61,14 +64,16 @@ def best_models(self, nb_models, model, data, max_evals): nb_models = len(trials_list) scores = [] for trials in trials_list: - scores = scores + [trial.get('result').get('loss') for trial in trials] + scores = scores + [trial.get('result').get('loss') + for trial in trials] cut_off = sorted(scores, reverse=True)[nb_models - 1] model_list = [] for trials in trials_list: for trial in trials: if trial.get('result').get('loss') >= cut_off: model = model_from_yaml(trial.get('result').get('model')) - model.set_weights(pickle.loads(trial.get('result').get('weights'))) + model.set_weights(pickle.loads( + trial.get('result').get('weights'))) model_list.append(model) return model_list @@ -78,6 +83,7 @@ class HyperasWorker(object): Executes hyper-parameter search on each worker and returns results. """ + def __init__(self, bc_model, bc_max_evals): self.model_string = bc_model self.max_evals = bc_max_evals diff --git a/elephas/java/adapter.py b/elephas/java/adapter.py index 2588a37..ef9b99b 100644 --- a/elephas/java/adapter.py +++ b/elephas/java/adapter.py @@ -49,4 +49,4 @@ def retrieve_keras_weights(java_model): weight = params.get(key) np_weight = np.squeeze(to_numpy(weight)) weights.append(np_weight) - return weights \ No newline at end of file + return weights diff --git a/elephas/java/java_classes.py b/elephas/java/java_classes.py index 624fb4d..cc8bedb 100644 --- a/elephas/java/java_classes.py +++ b/elephas/java/java_classes.py @@ -44,5 +44,7 @@ # Import -KerasModelImport = autoclass('org.deeplearning4j.nn.modelimport.keras.KerasModelImport') -ElephasModelImport = autoclass('org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport') \ No newline at end of file +KerasModelImport = autoclass( + 'org.deeplearning4j.nn.modelimport.keras.KerasModelImport') +ElephasModelImport = autoclass( + 'org.deeplearning4j.spark.parameterserver.modelimport.elephas.ElephasModelImport') diff --git a/elephas/java/ndarray.py b/elephas/java/ndarray.py index 4fc27bb..eafa8cd 100644 --- a/elephas/java/ndarray.py +++ b/elephas/java/ndarray.py @@ -83,7 +83,8 @@ def _from_numpy(np_array): # Convert the numpy array to nd4j context dtype required_dtype = get_np_dtype(get_context_dtype()) if np_array.dtype != required_dtype: - raise Exception("{} is required, got {}".format(repr(required_dtype), repr(np_array.dtype))) + raise Exception("{} is required, got {}".format( + repr(required_dtype), repr(np_array.dtype))) # Nd4j does not have 1-d vectors. # So we add a dummy dimension. @@ -430,4 +431,4 @@ def T(self): def array(*args, **kwargs): - return ndarray(*args, **kwargs) \ No newline at end of file + return ndarray(*args, **kwargs) diff --git a/elephas/ml/__init__.py b/elephas/ml/__init__.py index a6390b6..e20d51a 100644 --- a/elephas/ml/__init__.py +++ b/elephas/ml/__init__.py @@ -1,2 +1,2 @@ from .adapter import * -from .params import * \ No newline at end of file +from .params import * diff --git a/elephas/ml/adapter.py b/elephas/ml/adapter.py index 80b20fb..030d30c 100644 --- a/elephas/ml/adapter.py +++ b/elephas/ml/adapter.py @@ -5,6 +5,7 @@ from ..utils.rdd_utils import from_labeled_point, to_labeled_point, lp_to_simple_rdd from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors + def to_data_frame(sc, features, labels, categorical=False): """Convert numpy arrays of features and labels into Spark DataFrame """ @@ -30,8 +31,10 @@ def df_to_simple_rdd(df, categorical=False, nb_classes=None, features_col='featu selected_df = sql_context.sql( "SELECT {0} AS features, {1} as label from temp_table".format(features_col, label_col)) if isinstance(selected_df.first().features, MLLibVector): - lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, row.features)) + lp_rdd = selected_df.rdd.map( + lambda row: LabeledPoint(row.label, row.features)) else: - lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint(row.label, MLLibVectors.fromML(row.features))) + lp_rdd = selected_df.rdd.map(lambda row: LabeledPoint( + row.label, MLLibVectors.fromML(row.features))) rdd = lp_to_simple_rdd(lp_rdd, categorical, nb_classes) return rdd diff --git a/elephas/ml/params.py b/elephas/ml/params.py index bc83644..0cf62ce 100644 --- a/elephas/ml/params.py +++ b/elephas/ml/params.py @@ -7,9 +7,11 @@ class HasKerasModelConfig(Params): Parameter mixin for Keras model yaml """ + def __init__(self): super(HasKerasModelConfig, self).__init__() - self.keras_model_config = Param(self, "keras_model_config", "Serialized Keras model as yaml string") + self.keras_model_config = Param( + self, "keras_model_config", "Serialized Keras model as yaml string") def set_keras_model_config(self, keras_model_config): self._paramMap[self.keras_model_config] = keras_model_config @@ -22,13 +24,13 @@ def get_keras_model_config(self): class HasElephasOptimizerConfig(Params): """Parameter mixin for Elephas optimizer config """ + def __init__(self): super(HasElephasOptimizerConfig, self).__init__() self.elephas_optimizer_config = Param(self, "elephas_optimizer_config", "Serialized Elephas optimizer properties") self._setDefault(elephas_optimizer_config=None) - def set_elephas_optimizer_config(self, elephas_optimizer_config): self._paramMap[self.elephas_optimizer_config] = elephas_optimizer_config return self @@ -40,6 +42,7 @@ def get_elephas_optimizer_config(self): class HasMode(Params): """Parameter mixin for Elephas mode """ + def __init__(self): super(HasMode, self).__init__() self.mode = Param(self, "mode", "Elephas mode") @@ -56,6 +59,7 @@ def get_mode(self): class HasFrequency(Params): """Parameter mixin for Elephas frequency """ + def __init__(self): super(HasFrequency, self).__init__() self.frequency = Param(self, "frequency", "Elephas frequency") @@ -74,6 +78,7 @@ class HasNumberOfClasses(Params): Parameter mixin for number of classes """ + def __init__(self): super(HasNumberOfClasses, self).__init__() self.nb_classes = Param(self, "nb_classes", "number of classes") @@ -92,9 +97,11 @@ class HasCategoricalLabels(Params): Parameter mixin for setting categorical features """ + def __init__(self): super(HasCategoricalLabels, self).__init__() - self.categorical = Param(self, "categorical", "Boolean to indicate if labels are categorical") + self.categorical = Param( + self, "categorical", "Boolean to indicate if labels are categorical") self._setDefault(categorical=True) def set_categorical_labels(self, categorical): @@ -108,6 +115,7 @@ def get_categorical_labels(self): class HasEpochs(Params): """Parameter mixin for number of epochs """ + def __init__(self): super(HasEpochs, self).__init__() self.epochs = Param(self, "epochs", "Number of epochs to train") @@ -124,6 +132,7 @@ def get_epochs(self): class HasBatchSize(Params): """Parameter mixin for batch size """ + def __init__(self): super(HasBatchSize, self).__init__() self.batch_size = Param(self, "batch_size", "Batch size") @@ -140,6 +149,7 @@ def get_batch_size(self): class HasVerbosity(Params): """Parameter mixin for output verbosity """ + def __init__(self): super(HasVerbosity, self).__init__() self.verbose = Param(self, "verbose", "Stdout verbosity") @@ -156,9 +166,11 @@ def get_verbosity(self): class HasValidationSplit(Params): """Parameter mixin for validation split percentage """ + def __init__(self): super(HasValidationSplit, self).__init__() - self.validation_split = Param(self, "validation_split", "validation split percentage") + self.validation_split = Param( + self, "validation_split", "validation split percentage") self._setDefault(validation_split=0.1) def set_validation_split(self, validation_split): @@ -172,6 +184,7 @@ def get_validation_split(self): class HasNumberOfWorkers(Params): """Parameter mixin for number of workers """ + def __init__(self): super(HasNumberOfWorkers, self).__init__() self.num_workers = Param(self, "num_workers", "number of workers") @@ -188,9 +201,11 @@ def get_num_workers(self): class HasKerasOptimizerConfig(Params): """Parameter mixin for Keras optimizer config """ + def __init__(self): super(HasKerasOptimizerConfig, self).__init__() - self.optimizer_config = Param(self, "optimizer_config", "Serialized Keras optimizer properties") + self.optimizer_config = Param( + self, "optimizer_config", "Serialized Keras optimizer properties") self._setDefault(optimizer_config=None) def set_optimizer_config(self, optimizer_config): @@ -204,12 +219,12 @@ def get_optimizer_config(self): class HasMetrics(Params): """Parameter mixin for Keras metrics """ + def __init__(self): super(HasMetrics, self).__init__() self.metrics = Param(self, "metrics", "Keras metrics") self._setDefault(metrics=['acc']) - def set_metrics(self, metrics): self._paramMap[self.metrics] = metrics return self @@ -221,6 +236,7 @@ def get_metrics(self): class HasLoss(Params): """Parameter mixin for Keras metrics """ + def __init__(self): super(HasLoss, self).__init__() self.loss = Param(self, "loss", "Keras loss") diff --git a/elephas/ml_model.py b/elephas/ml_model.py index 90b5438..5f91c0d 100644 --- a/elephas/ml_model.py +++ b/elephas/ml_model.py @@ -80,7 +80,7 @@ def _fit(self, df): elephas_optimizer = None if self.get_elephas_optimizer_config() is not None: elephas_optimizer = get({'class_name': self.get_optimizer_config()['class_name'], - 'config': self.get_optimizer_config()}) + 'config': self.get_optimizer_config()}) keras_model = model_from_yaml(self.get_keras_model_config()) metrics = self.get_metrics() @@ -117,7 +117,8 @@ class ElephasTransformer(Model, HasKerasModelConfig, HasLabelCol, HasOutputCol): def __init__(self, **kwargs): super(ElephasTransformer, self).__init__() if "weights" in kwargs.keys(): - self.weights = kwargs.pop('weights') # Strip model weights from parameters to init Transformer + # Strip model weights from parameters to init Transformer + self.weights = kwargs.pop('weights') self.set_params(**kwargs) @keyword_only @@ -142,7 +143,6 @@ def save(self, file_name): f.flush() f.close() - def get_model(self): return model_from_yaml(self.get_keras_model_config()) @@ -155,11 +155,13 @@ def _transform(self, df): new_schema.add(StructField(output_col, StringType(), True)) rdd = df.rdd.coalesce(1) - features = np.asarray(rdd.map(lambda x: from_vector(x.features)).collect()) + features = np.asarray( + rdd.map(lambda x: from_vector(x.features)).collect()) # Note that we collect, since executing this on the rdd would require model serialization once again model = model_from_yaml(self.get_keras_model_config()) model.set_weights(self.weights.value) - predictions = rdd.ctx.parallelize(model.predict_classes(features)).coalesce(1) + predictions = rdd.ctx.parallelize( + model.predict_classes(features)).coalesce(1) predictions = predictions.map(lambda x: tuple(str(x))) results_rdd = rdd.zip(predictions).map(lambda x: x[0] + x[1]) @@ -167,8 +169,10 @@ def _transform(self, df): # results_rdd = rdd.zip(predictions).map(lambda pair: Row(features=to_vector(pair[0].features), # label=pair[0].label, prediction=float(pair[1]))) results_df = df.sql_ctx.createDataFrame(results_rdd, new_schema) - results_df = results_df.withColumn(output_col, results_df[output_col].cast(DoubleType())) - results_df = results_df.withColumn(label_col, results_df[label_col].cast(DoubleType())) + results_df = results_df.withColumn( + output_col, results_df[output_col].cast(DoubleType())) + results_df = results_df.withColumn( + label_col, results_df[label_col].cast(DoubleType())) return results_df diff --git a/elephas/mllib/__init__.py b/elephas/mllib/__init__.py index 56ac82f..38fc832 100644 --- a/elephas/mllib/__init__.py +++ b/elephas/mllib/__init__.py @@ -1 +1 @@ -from .adapter import * \ No newline at end of file +from .adapter import * diff --git a/elephas/optimizers.py b/elephas/optimizers.py index fbb49e0..a938d4b 100644 --- a/elephas/optimizers.py +++ b/elephas/optimizers.py @@ -31,6 +31,7 @@ class Optimizer(object): """Optimizer for elephas models, adapted from respective Keras module. """ + def __init__(self, **kwargs): self.__dict__.update(kwargs) self.updates = [] @@ -67,6 +68,7 @@ def get_config(self): class SGD(Optimizer): """SGD, optionally with nesterov momentum """ + def __init__(self, lr=0.01, momentum=0., decay=0., nesterov=False, *args, **kwargs): super(SGD, self).__init__(**kwargs) @@ -103,6 +105,7 @@ def get_config(self): class RMSprop(Optimizer): """Reference: www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf """ + def __init__(self, lr=0.001, rho=0.9, epsilon=1e-6, *args, **kwargs): super(RMSprop, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -132,6 +135,7 @@ def get_config(self): class Adagrad(Optimizer): """Reference: http://www.magicbroom.info/Papers/DuchiHaSi10.pdf """ + def __init__(self, lr=0.01, epsilon=1e-6, *args, **kwargs): super(Adagrad, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -156,6 +160,7 @@ def get_config(self): class Adadelta(Optimizer): """Reference: http://arxiv.org/abs/1212.5701 """ + def __init__(self, lr=1.0, rho=0.95, epsilon=1e-6, *args, **kwargs): super(Adadelta, self).__init__(**kwargs) self.__dict__.update(locals()) @@ -190,6 +195,7 @@ class Adam(Optimizer): """Reference: http://arxiv.org/abs/1412.6980v8 Default parameters follow those provided in the original paper. """ + def __init__(self, lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-8, *args, **kwargs): super(Adam, self).__init__(**kwargs) @@ -221,6 +227,7 @@ def get_config(self): "beta_2": self.beta_2, "epsilon": self.epsilon} + # aliases sgd = SGD rmsprop = RMSprop @@ -228,6 +235,7 @@ def get_config(self): adadelta = Adadelta adam = Adam + def serialize(optimizer): return serialize_keras_object(optimizer) @@ -286,4 +294,4 @@ def get(identifier): return identifier else: raise ValueError('Could not interpret optimizer identifier:', - identifier) \ No newline at end of file + identifier) diff --git a/elephas/parameter/__init__.py b/elephas/parameter/__init__.py index a62f33e..ad9c21a 100644 --- a/elephas/parameter/__init__.py +++ b/elephas/parameter/__init__.py @@ -1,2 +1,2 @@ from .server import * -from .client import * \ No newline at end of file +from .client import * diff --git a/elephas/parameter/client.py b/elephas/parameter/client.py index 997b082..e171864 100644 --- a/elephas/parameter/client.py +++ b/elephas/parameter/client.py @@ -44,6 +44,7 @@ class HttpClient(BaseParameterClient): namely HttpServer. The HTTP server provides two endpoints, `/parameters` to get parameters and `/update` to update the server's parameters. """ + def __init__(self, port=4000): self.master_url = determine_master(port=port) @@ -68,6 +69,7 @@ class SocketClient(BaseParameterClient): The socket server listens to two types of events. Those with a `g` prefix indicate a get-request, those with a `u` indicate a parameter update. """ + def __init__(self, host='0.0.0.0', port=4000): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/elephas/parameter/server.py b/elephas/parameter/server.py index 31ae633..07c376f 100644 --- a/elephas/parameter/server.py +++ b/elephas/parameter/server.py @@ -12,6 +12,7 @@ from ..utils.rwlock import RWLock as Lock from ..utils.notebook_utils import is_running_in_notebook + class BaseParameterServer(object): """BaseParameterServer @@ -125,9 +126,10 @@ def handle_update_parameters(): if not self.master_network.built: self.master_network.build() - base_constraint = lambda a: a + def base_constraint(a): return a constraints = [base_constraint for _ in self.weights] - self.weights = self.optimizer.get_updates(self.weights, constraints, delta) + self.weights = self.optimizer.get_updates( + self.weights, constraints, delta) if self.mode == 'asynchronous': self.lock.release() return 'Update done' diff --git a/elephas/spark_model.py b/elephas/spark_model.py index 28cb2fd..90e5adf 100644 --- a/elephas/spark_model.py +++ b/elephas/spark_model.py @@ -37,7 +37,8 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser self._master_network = model if not hasattr(model, "loss"): - raise Exception("Compile your Keras model before initializing an Elephas model with it") + raise Exception( + "Compile your Keras model before initializing an Elephas model with it") metrics = model.metrics loss = model.loss optimizer = serialize_optimizer(model.optimizer) @@ -66,7 +67,8 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_ser self.serialized_model = model_to_dict(self.master_network) # TODO only set this for async/hogwild mode if self.parameter_server_mode == 'http': - self.parameter_server = HttpServer(self.serialized_model, self.optimizer, self.mode) + self.parameter_server = HttpServer( + self.serialized_model, self.optimizer, self.mode) self.client = HttpClient() elif self.parameter_server_mode == 'socket': self.parameter_server = SocketServer(self.serialized_model) @@ -150,7 +152,8 @@ def fit(self, rdd, epochs=10, batch_size=32, if self.mode in ['asynchronous', 'synchronous', 'hogwild']: self._fit(rdd, epochs, batch_size, verbose, validation_split) else: - raise ValueError("Choose from one of the modes: asynchronous, synchronous or hogwild") + raise ValueError( + "Choose from one of the modes: asynchronous, synchronous or hogwild") def _fit(self, rdd, epochs, batch_size, verbose, validation_split): """Protected train method to make wrapping of modes easier @@ -160,7 +163,8 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): metrics=self.master_metrics) if self.mode in ['asynchronous', 'hogwild']: self.start_server() - train_config = self.get_train_config(epochs, batch_size, verbose, validation_split) + train_config = self.get_train_config( + epochs, batch_size, verbose, validation_split) mode = self.parameter_server_mode freq = self.frequency optimizer = self.master_optimizer @@ -173,18 +177,21 @@ def _fit(self, rdd, epochs, batch_size, verbose, validation_split): parameters = rdd.context.broadcast(init) if self.mode in ['asynchronous', 'hogwild']: - worker = AsynchronousSparkWorker(yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom) + worker = AsynchronousSparkWorker( + yaml, parameters, mode, train_config, freq, optimizer, loss, metrics, custom) rdd.mapPartitions(worker.train).collect() new_parameters = self.client.get_parameters() elif self.mode == 'synchronous': - worker = SparkWorker(yaml, parameters, train_config, optimizer, loss, metrics, custom) + worker = SparkWorker(yaml, parameters, train_config, + optimizer, loss, metrics, custom) deltas = rdd.mapPartitions(worker.train).collect() new_parameters = self.master_network.get_weights() for delta in deltas: - base_constraint = lambda a: a + def base_constraint(a): return a constraints = [base_constraint for _ in self.weights] - new_parameters = self.optimizer.get_updates(self.weights, constraints, delta) + new_parameters = self.optimizer.get_updates( + self.weights, constraints, delta) else: raise ValueError("Unsupported mode {}".format(self.mode)) self.master_network.set_weights(new_parameters) @@ -227,12 +234,13 @@ def __init__(self, model, mode='asynchronous', frequency='epoch', parameter_serv batch_size=batch_size, *args, **kwargs) def fit(self, labeled_points, epochs=10, batch_size=32, verbose=0, validation_split=0.1, - categorical=False, nb_classes=None): + categorical=False, nb_classes=None): """Train an elephas model on an RDD of LabeledPoints """ rdd = lp_to_simple_rdd(labeled_points, categorical, nb_classes) rdd = rdd.repartition(self.num_workers) - self._fit(rdd=rdd, epochs=epochs, batch_size=batch_size, verbose=verbose, validation_split=validation_split) + self._fit(rdd=rdd, epochs=epochs, batch_size=batch_size, + verbose=verbose, validation_split=validation_split) def predict(self, mllib_data): """Predict probabilities for an RDD of features @@ -242,5 +250,5 @@ def predict(self, mllib_data): elif isinstance(mllib_data, pyspark.mllib.linalg.Vector): return to_vector(self.master_network.predict(from_vector(mllib_data))) else: - raise ValueError('Provide either an MLLib matrix or vector, got {}'.format(mllib_data.__name__)) - + raise ValueError( + 'Provide either an MLLib matrix or vector, got {}'.format(mllib_data.__name__)) diff --git a/elephas/utils/__init__.py b/elephas/utils/__init__.py index 0c657db..38d40d8 100644 --- a/elephas/utils/__init__.py +++ b/elephas/utils/__init__.py @@ -2,4 +2,4 @@ from .rdd_utils import * from .serialization import * from .sockets import * -from .rwlock import * \ No newline at end of file +from .rwlock import * diff --git a/elephas/utils/rdd_utils.py b/elephas/utils/rdd_utils.py index a2062d4..1765dbc 100644 --- a/elephas/utils/rdd_utils.py +++ b/elephas/utils/rdd_utils.py @@ -76,7 +76,8 @@ def from_labeled_point(rdd, categorical=False, nb_classes=None): :param nb_classes: optional int, indicating the number of class labels :return: pair of numpy arrays, features and labels """ - features = np.asarray(rdd.map(lambda lp: from_vector(lp.features)).collect()) + features = np.asarray( + rdd.map(lambda lp: from_vector(lp.features)).collect()) labels = np.asarray(rdd.map(lambda lp: lp.label).collect(), dtype='int32') if categorical: if not nb_classes: @@ -110,9 +111,11 @@ def lp_to_simple_rdd(lp_rdd, categorical=False, nb_classes=None): """ if categorical: if not nb_classes: - labels = np.asarray(lp_rdd.map(lambda lp: lp.label).collect(), dtype='int32') + labels = np.asarray(lp_rdd.map( + lambda lp: lp.label).collect(), dtype='int32') nb_classes = np.max(labels) + 1 - rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), encode_label(lp.label, nb_classes))) + rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), + encode_label(lp.label, nb_classes))) else: rdd = lp_rdd.map(lambda lp: (from_vector(lp.features), lp.label)) return rdd diff --git a/elephas/utils/rwlock.py b/elephas/utils/rwlock.py index 42fa069..f1e1e86 100644 --- a/elephas/utils/rwlock.py +++ b/elephas/utils/rwlock.py @@ -14,6 +14,7 @@ class RWLock: simultaneously, XOR one writer. Write locks have priority over reads to prevent write starvation. """ + def __init__(self): self.rwlock = 0 self.writers_waiting = 0 diff --git a/elephas/worker.py b/elephas/worker.py index 437d4b7..9b84a1e 100644 --- a/elephas/worker.py +++ b/elephas/worker.py @@ -11,6 +11,7 @@ class SparkWorker(object): """Synchronous Spark worker. This code will be executed on workers. """ + def __init__(self, yaml, parameters, train_config, master_optimizer, master_loss, master_metrics, custom_objects): self.yaml = yaml @@ -27,25 +28,29 @@ def train(self, data_iterator): """ optimizer = get_optimizer(self.master_optimizer) self.model = model_from_yaml(self.yaml, self.custom_objects) - self.model.compile(optimizer=optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=optimizer, + loss=self.master_loss, metrics=self.master_metrics) self.model.set_weights(self.parameters.value) feature_iterator, label_iterator = tee(data_iterator, 2) x_train = np.asarray([x for x, y in feature_iterator]) y_train = np.asarray([y for x, y in label_iterator]) - self.model.compile(optimizer=self.master_optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=self.master_optimizer, + loss=self.master_loss, metrics=self.master_metrics) weights_before_training = self.model.get_weights() if x_train.shape[0] > self.train_config.get('batch_size'): self.model.fit(x_train, y_train, **self.train_config) weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) yield deltas class AsynchronousSparkWorker(object): """Asynchronous Spark worker. This code will be executed on workers. """ + def __init__(self, yaml, parameters, parameter_server_mode, train_config, frequency, master_optimizer, master_loss, master_metrics, custom_objects): @@ -80,7 +85,8 @@ def train(self, data_iterator): optimizer = get_optimizer(self.master_optimizer) self.model = model_from_yaml(self.yaml, self.custom_objects) - self.model.compile(optimizer=optimizer, loss=self.master_loss, metrics=self.master_metrics) + self.model.compile(optimizer=optimizer, + loss=self.master_loss, metrics=self.master_metrics) self.model.set_weights(self.parameters.value) epochs = self.train_config['epochs'] @@ -102,7 +108,8 @@ def train(self, data_iterator): self.model.fit(x_train, y_train, **self.train_config) self.train_config['epochs'] = epochs weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) self.client.update_parameters(deltas) elif self.frequency == 'batch': for epoch in range(epochs): @@ -115,8 +122,10 @@ def train(self, data_iterator): y = slice_arrays(y_train, batch_ids) self.model.train_on_batch(x, y) weights_after_training = self.model.get_weights() - deltas = subtract_params(weights_before_training, weights_after_training) + deltas = subtract_params( + weights_before_training, weights_after_training) self.client.update_parameters(deltas) else: - raise ValueError('frequency parameter can be `epoch` or `batch, got {}'.format(self.frequency)) + raise ValueError( + 'frequency parameter can be `epoch` or `batch, got {}'.format(self.frequency)) yield []