Skip to content

Commit

Permalink
onwards
Browse files Browse the repository at this point in the history
  • Loading branch information
Ignalina committed Jun 20, 2022
1 parent 9b6475d commit d5a2317
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static String extractFileName(ConsumerRecord<String, GenericRecord> recor
String message = ""+record.value();
JsonObject jo = null;
try {
jo = new JsonParser().parse(message).getAsJsonObject();
jo = JsonParser.parseString(message).getAsJsonObject();
} catch (IllegalStateException ei) {
String res="JSON CONTAINED NO PARSABLE FILENAME";
System.out.println(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@

package dk.ignalina.lab.spark320;

import org.apache.hadoop.shaded.org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
Expand All @@ -28,24 +33,21 @@



public class AppDeltaStreamingKafka {
public class KafkaEventDrivenSparkJob {


private static String authType;

public static void main(String[] args) {

Utils.Config config = new Utils.Config(args);
// JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("v20220612 spark 3.0.1 Streaming /event driven spark job"), new Duration(config.msIntervall));
// SparkSession spark=Utils.createS3SparkSession(config);
// JavaInputDStream<ConsumerRecord<String, GenericRecord>> stream = Utils.createStream(config,ssc);
Utils.decorateS3SparkSession(config);
SparkSession.Builder sparkBuilder= Utils.decorateWithS3(config);
//for a local spark instance
SparkConf conf = new SparkConf()
.setAppName("appName")
.setMaster("spark://10.1.1.193:7077")
.setMaster(config.master)
.set("spark.submit.deployMode", "cluster")
.set("spark.jars.packages", "org.projectnessie:nessie-deltalake:0.30.0,org.projectnessie:nessie-spark-3.2-extensions:0.30.0")
.set("spark.jars.packages", config.packages)
.set("spark.hadoop.nessie.url", config.url)
.set("spark.hadoop.nessie.ref", config.branch)
.set("spark.hadoop.nessie.authentication.type", config.authType)
Expand All @@ -54,18 +56,34 @@ public static void main(String[] args) {
.set("spark.delta.logStore.class", "org.projectnessie.deltalake.NessieLogStore")
.set("spark.delta.logFileHandler.class", "org.projectnessie.deltalake.NessieLogFileMetaParser");


JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

// spark = SparkSession.builder()
// .master("local[2]")
// .config(conf)
// .getOrCreate();

SparkSession spark = SparkSession.builder()
JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("v20220620 spark 3.2.0 Streaming /event driven spark job"), new Duration(config.msIntervall));
SparkSession spark = sparkBuilder
.master(config.master)
.config(conf)
.getOrCreate();

JavaInputDStream<ConsumerRecord<String, GenericRecord>> stream = Utils.createStream(config,ssc);
stream.foreachRDD(rdd -> {
JavaRDD<String> filenames = rdd.map(record -> Utils.extractFileName(record)); // VARNING/TODO: List needs to be sorted on date for correct Delta ingest order.
filenames.collect().forEach(fileName -> {
System.out.println("Filename from JSON=" + fileName);
Dataset<Row> df = spark.read().parquet(fileName);
df.printSchema();

// Thats all folks , do something useful with the dataset

});
});


ssc.start();
try {
ssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
// TODO Make sureKAFKA read offset is getting NOT updated OR add to dead letter que !
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package dk.ignalina.lab.spark320.base;

import org.apache.avro.generic.GenericRecord;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.hadoop.shaded.org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -59,6 +61,7 @@ public static class Config {
public String branch;
public String url;
public String authType;
public String packages="org.projectnessie:nessie-deltalake:0.30.0,org.projectnessie:nessie-spark-3.2-extensions:0.30.0";

public Config(String[] args) {
topic = args[0];
Expand Down Expand Up @@ -108,16 +111,14 @@ public static SparkConf CreateSparkConf(String appName) {
return conf;
}

static public SparkSession decorateS3SparkSession(Utils.Config config) {
SparkSession spark = SparkSession.builder().master(config.master).
static public SparkSession.Builder decorateWithS3(Utils.Config config) {
return SparkSession.builder().master(config.master).
config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem").
config("fs.s3a.access.key",config.s3AccessKey).
config("fs.s3a.secret.key",config.s3SecretKey).
config("fs.s3a.endpoint", config.s3EndPoint).
config("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem").
config("fs.s3a.path.style.access","true").
getOrCreate();
return spark;
config("fs.s3a.path.style.access","true");
}
static public JavaInputDStream<ConsumerRecord<String, GenericRecord>> createStream(Utils.Config config,JavaStreamingContext ssc) {

Expand All @@ -142,5 +143,22 @@ static public JavaInputDStream<ConsumerRecord<String, GenericRecord>> createStre
return stream;
}

public static String extractFileName(ConsumerRecord<String, GenericRecord> record) {

String message = ""+record.value();
JsonObject jo = null;
try {
jo = JsonParser.parseString(message).getAsJsonObject();
} catch (IllegalStateException ei) {
String res="JSON CONTAINED NO PARSABLE FILENAME";
System.out.println(res);
return res;
}
String filename=jo.get("body").getAsJsonObject().get("name").getAsString();
System.out.println("Filename extraced from JSON=" + filename);

return filename;
}


}

0 comments on commit d5a2317

Please sign in to comment.