This posts follow from the setup of the master slave cluster using Apache Spark (here)
Considering your Spark environment is ready, we can make use of Elephas library to develop deep learning models in a distributed manner. Make sure that your every node in the cluster is configured and set properly with all the necessary libraries to run the program.
About Elephas
Elephas is a python library that allows to develop distributed version deep learning Keras models in Apache Spark environment. Elephas offers following features:
- Data-parallel training of deep learning models
- Distributed hyper-parameter optimization
- Distributed training of ensemble models
With base platform as Keras, Elephas utilizes data-parallel algorithms using RDDs. Initially the deep learning Keras models are serialized and transferred to workers of given cluster followed by data and training parameters. During the training phase the worker nodes de-serialize the model that are trained on block of data whose gradients are transferred back to the driver. The model at master node is updated with optimizer that receive gradients synchronously or asynchronously. More details can be found here.
Distributed modelling
This task is relatively easier than setting up the Apache spark environment.
You can do this on your existing Keras model to execute in the distributed environment.
To proceed you can do as follows:
In the following code I have used raw U-Net model that is the most popular choice for biomedical image segmentation (more details can be found here). The model is configured to train and evaluate on multiple nodes using Elephas as follows:
from keras.models import Model, load_model
from keras.layers import Input
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras import backend as K
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from elephas.spark_model import SparkModel
from elephas.utils.rdd_utils import to_simple_rdd
# Creating Spark context
master_url = "spark://<MASTER_IP>:7077"
conf = SparkConf().setAppName("UNet")
conf.setMaster(master_url)
sc = SparkContext(conf = conf)
# Some other code
# ------
# U-Net model creation
class UNet:
def build(width, height, depth):
inputs = Input((height, width, depth))
s = Lambda(lambda x: x / 255) (inputs)
c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (s)
c1 = Dropout(0.1) (c1)
c1 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c1)
p1 = MaxPooling2D((2, 2)) (c1)
c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p1)
c2 = Dropout(0.1) (c2)
c2 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c2)
p2 = MaxPooling2D((2, 2)) (c2)
c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p2)
c3 = Dropout(0.2) (c3)
c3 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c3)
p3 = MaxPooling2D((2, 2)) (c3)
c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p3)
c4 = Dropout(0.2) (c4)
c4 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c4)
p4 = MaxPooling2D(pool_size=(2, 2)) (c4)
c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (p4)
c5 = Dropout(0.3) (c5)
c5 = Conv2D(256, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c5)
u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same') (c5)
u6 = concatenate([u6, c4])
c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u6)
c6 = Dropout(0.2) (c6)
c6 = Conv2D(128, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c6)
u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same') (c6)
u7 = concatenate([u7, c3])
c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u7)
c7 = Dropout(0.2) (c7)
c7 = Conv2D(64, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c7)
u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same') (c7)
u8 = concatenate([u8, c2])
c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u8)
c8 = Dropout(0.1) (c8)
c8 = Conv2D(32, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c8)
u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same') (c8)
u9 = concatenate([u9, c1], axis=3)
c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (u9)
c9 = Dropout(0.1) (c9)
c9 = Conv2D(16, (3, 3), activation='elu', kernel_initializer='he_normal', padding='same') (c9)
outputs = Conv2D(1, (1, 1), activation='sigmoid') (c9)
model = Model(inputs=[inputs], outputs=[outputs])
# return the constructed network architecture
return model
# Build and compile U-Net model
model = UNet.build(width=IMAGE_DIMS[1], height=IMAGE_DIMS[0],
depth=IMAGE_DIMS[2])
opt = Adam(lr=INIT_LR, decay=INIT_LR / EPOCHS)
model.compile(loss="binary_crossentropy", optimizer=opt,
metrics=["accuracy"])
# Convert TrainX and TrainY to RDD
rdd = to_simple_rdd(sc, trainX, trainY)
# Create Spark model
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous',
num_workers=1)
spark_model.fit(rdd, epochs=EPOCHS, batch_size=BS, verbose=1, validation_split=0.1)
# Evaluate Spark model by evaluating the underlying model
score = spark_model.master_network.evaluate(testX, testY, verbose=1)
print('Test accuracy:', score[1])
print(spark_model.get_results())
Running the code
sudo bash spark-submit --master spark://<MASTER_IP>:7077 unet.py
For detailed understanding on how to run code follow here.