Skip to content

Latest commit

 

History

History
165 lines (135 loc) · 7.34 KB

run_on_Azure.md

File metadata and controls

165 lines (135 loc) · 7.34 KB

1)

to deploy a kafka - spark structured streaming cluser on Azure, do the following: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight SparkStruturedStreaming-kafka-Azure

2) install jq on your local machine

sudo apt update
sudo apt install jq

3) Deploy the Spark-kafka cluster using a template that is available in Microsoft website:

Cluster size - Spark

Node type Node size Number of nodes
Head D12 v2 (4 Cores, 28 GB RAM) 2
Worker D13 v2 (8 Cores, 56 GB RAM) 4
Zookeeper A2 v2 (2 Cores, 4 GB RAM) 3

Cluster size - Kafka

Node type Node size Number of nodes
Head D12 v2 (4 Cores, 28 GB RAM) 2
Worker D13 v2 (8 Cores, 56 GB RAM) 4
Zookeeper A2 v2 (2 Cores, 4 GB RAM) 3
  • check that you have resources
  • homepage - subscriptions - subscription title (microsoft azure sponsorship 2) - Usage + quotas - youo need more than 30 vCPUs in Total Regional vCPUs

4) start using the cluster

  1. Gather host information
export password='YOUR_KAFKA_CLUSTER_PASSWORD'
export CLUSTERNAME=YOUR_KAFKA_CLUSTER_NAME


curl -sS -u admin:$password -G "https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
  • Replace YOUR_KAFKA_CLUSTER_NAME with the name of your Kafka cluster, and YOUR_KAFKA_CLUSTER_PASSWORD with the cluster login password.
  1. From a web browser, navigate to https://CLUSTERNAME.azurehdinsight.net/jupyter, where CLUSTERNAME is the name of your Spark cluster. When prompted, enter the cluster login (admin is the default) and Spark cluster password used when you created the cluster.
  2. Use the curl and jq commands to obtain broker hosts information.
curl -sS -u admin:$password -G https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2

5) Copying required jars!

this project depends on some spatial processing libraries above Apache Spark, you need to load them to the project in order to be able to call them in Jupyter

  • find the jars here
  • copy the file titled magellan-1.0.5-s_2.11.jar to the storage account of your Spark cluster
  • this .jar library will be used as a dependency that will be added to the library using the Jupyter Spark magic command
  • specifically, it will be imported in the first cell
%%configure -f
{
    "conf": {
        "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,com.esri.geometry:esri-geometry-api:1.2.1,commons-io:commons-io:2.6,org.apache.spark:spark-streaming_2.11:2.2.0",
        "spark.jars":"wasbs://[email protected]/jars/magellan-1.0.5-s_2.11.jar",
        "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11",
        "spark.dynamicAllocation.enabled": false
    }
}

6) running java kafka producer and send data to Azure Kafka cluster

copying files

  • copy the Kafka Java producer fat .jar file saosKafkaProducer-1.0-SNAPSHOT.jar find it here from this folder to a folder in your kafka cluster in Azure
scp ./target/saosKafkaProducer-1.0-SNAPSHOT.jar USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar

replace USER_NAME with the user name you have chosen when you created the cluster sshuser is the default!

  • copy the geojson file shenzhen_converted.geojson to the same working directory
scp guang.csv USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net:guang.csv
in order to be able to access the Neigboors .geojson file, 
we need to store it in a blob storage:
- go to "HDInsight clusters" --> Spark cluster name --> search for "storage accounts",
- select the "Azure Storage" name 
- storage explorer --> blob containers --> sspark --> create new folder "datasets"
- upload shenzhen_converted.geojson

find it here

- then you can access it in your notebook using:
- "wasb://CONTAINER_NAME@STORAGE_ACCOUNT_NAME.blob.core.windows.net/datasets/shenzhen_converted.geojson"
- where sspark is the spark cluster name

replace CONTAINER_NAME with the container name in your Spark storage account where you hosted the shenzhen_converted.geojson regions file. ALSO, replace STORAGE_ACCOUNT_NAME with the name of your Spark storage account

scp guang.csv USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net:guang.csv 

7) to run kafka producer

  1. create the topic in Jupyter
  2. login to the headnode of kafka cluster
  • navigate to kafka cluster 'skafka' | SSH + Cluster login
  • copy the login command and use it in your local machine's terminal

ssh USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net

  1. get the kafkaBrokers list running the following command in your local machine
sudo apt -y install jq
export password='cluster_pass'

export KAFKABROKERS=$(curl -sS -u admin:$password -G https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
  1. run the following command to start the kafka producer in kafka cluster head node
**you need to create the topic first, maybe in the jupyter notebook with the %%bash magic command**
java -jar kafka-producer-consumer.jar shenzhen spatial1 $KAFKABROKERS /home/isam/guang.csv 1
  • kafka java producer takes the following parameters
 args[0] -->  data     :(String) 
 args[1] -->  topicName :(type:string)
 args[2] -->  brokers :(String)
 args[3] -->  path    :(String) 
 args[4] -->  time    : int  
       
       time is the time between tuples generated
       to get the path:
       pwd in the kafka cluster headnode
       data is either shenzhen or nyc

8) run the Jupyter notebook find it here

N.B. if you are using this code, please cite our works

"...first ..." [1]. "...second ..." [2]. "...third ..." [3].

References

  • [1] Al Jawarneh, Isam M., Paolo Bellavista, Antonio Corradi, Luca Foschini, and Rebecca Montanari. (2021) "QoS-Aware Approximate Query Processing for Smart Cities Spatial Data Streams". Sensors 21, no. 12: 4160. https://doi.org/10.3390/s21124160
  • [2] Al Jawarneh, I. M., Bellavista, P., Corradi, A., Foschini, L., & Montanari, R. (2020, September). "Spatially Representative Online Big Data Sampling for Smart Cities". In 2020 IEEE 25th International Workshop on Computer Aided Modeling and Design of Communication Links and Networks (CAMAD) (pp. 1-6). IEEE.
  • [3] Al Jawarneh, I. M., Bellavista, P., Foschini, L., & Montanari, R. (2019, December). "Spatial-aware approximate big data stream processing". In 2019 IEEE global communications conference (GLOBECOM) (pp. 1-6). IEEE.