Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can Maggy be used with a Spark Cluster that uses YARN #72

Open
crakama opened this issue Dec 12, 2020 · 0 comments
Open

Can Maggy be used with a Spark Cluster that uses YARN #72

crakama opened this issue Dec 12, 2020 · 0 comments

Comments

@crakama
Copy link

crakama commented Dec 12, 2020

I was wondering how Maggy knows
Question 1: How to contact Spark's driver to make the RPC calls in another Spark infrastructure that is not Hopsworks. Does having a resource manager such as YARN on top of Spark Cluster affect how Maggy should make RPC requests to the Spark driver ? or should it work as normal?

Question 2: If I opt to go for "you can deploy an entire Hopsworks instance to your own AWS account" as also explained here (https://hopsworks.readthedocs.io/en/stable/getting_started/installation_guide/platforms/aws-image.html), a two t2.2xlarge instance type that has 8 vCPUs and 32 GB RAM is a single host and not a cluster. Does the 8 available vCPU equate to the Executors that Spark will use ? Meaning that if I run my trials and would like to experiment the results from different executors I will only have a maximum of 8 available executors, unless if I increase the instance type ?

The reason behind Question 1: is because in this example here (https://github.com/logicalclocks/maggy/blob/master/examples/maggy-ablation-titanic-example.ipynb), a spark session is created but I cannot explicitly see the point where Maggy hands over/submits jobs to Spark. For Instance, In some of the industrial set ups, one would interact with Spark by creating a Spark session then submitting the job like so;

Creating spark session

spark = SparkSession \ .builder \ .appName('spark-ipython') \ .config('spark.shuffle.service.enabled', 'true') \ .config('spark.executor.memory', '2844M') \ .config('spark.dynamicAllocation.enabled', 'true') \ .config('spark.dynamicAllocation.minExecutors', '0') \ .config('spark.dynamicAllocation.maxExecutors', '100') \ .getOrCreate()

Submitting Job to Spark Cluster

spark-submit --master yarn --deploy-mode cluster --archives hdfs:///somelocation/Python.zip#Python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./Python/bin/python3 --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./Python/bin/python3 main.py whereby main.py contains Maggy code

Example of main.py file

 `  from maggy import Searchspace

### The searchspace can be instantiated with parameters
sp = Searchspace(kernel=('INTEGER', [2, 8]), pool=('INTEGER', [2, 8]))

 # Or additional parameters can be added one by one
 sp.add('dropout', ('DOUBLE', [0.01, 0.99]))

 from maggy import experiment
 from maggy.callbacks import KerasBatchEnd
 #########
 ### maggy: hyperparameters as arguments and including the reporter
#########
def keras(kernel, pool, dropout, reporter):
     from tensorflow.python import keras
     import tensorflow as tf
     from tensorflow.python.keras.datasets import mnist
     from tensorflow.python.keras.models import Sequential
     from tensorflow.python.keras.layers import Dense, Dropout, Flatten
     from tensorflow.python.keras.layers import Conv2D, MaxPooling2D
     from tensorflow.python.keras.callbacks import TensorBoard

from tensorflow.python.keras import backend as K
import math

batch_size = 512
num_classes = 10
epochs = 1

# Input image dimensions
img_rows, img_cols = 28, 28

# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
model = Sequential()
model.add(Conv2D(32, kernel_size=(kernel, kernel),
                 activation='relu',
                 input_shape=input_shape))
model.add(Conv2D(64, (kernel, kernel), activation='relu'))
model.add(MaxPooling2D(pool_size=(pool, pool)))
model.add(Dropout(dropout))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(dropout))
model.add(Dense(num_classes, activation='softmax'))

opt = keras.optimizers.Adadelta(1.0)

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=opt,
              metrics=['accuracy'])
#########
### maggy: REPORTER API through keras callback
#########
callbacks = [KerasBatchEnd(reporter, metric='acc')]

model.fit(x_train, y_train,
          batch_size=batch_size,
          callbacks=callbacks, # add callback
          epochs=epochs,
          verbose=1,
          validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

#########
### maggy: return the metric to be optimized, test accuracy in this case
#########
return score[1]

`

@crakama crakama changed the title Can Maggy be used in a Spark Cluster that uses YARN Can Maggy be used with a Spark Cluster that uses YARN Dec 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant