diff --git a/spark301job-mod/src/main/java/dk/ignalina/lab/spark301/KafkaEventDrivenSparkJob.java b/spark301job-mod/src/main/java/dk/ignalina/lab/spark301/KafkaEventDrivenSparkJob.java index 9063db4..b23b7cc 100644 --- a/spark301job-mod/src/main/java/dk/ignalina/lab/spark301/KafkaEventDrivenSparkJob.java +++ b/spark301job-mod/src/main/java/dk/ignalina/lab/spark301/KafkaEventDrivenSparkJob.java @@ -54,7 +54,7 @@ public static String extractFileName(ConsumerRecord recor String message = ""+record.value(); JsonObject jo = null; try { - jo = new JsonParser().parse(message).getAsJsonObject(); + jo = JsonParser.parseString(message).getAsJsonObject(); } catch (IllegalStateException ei) { String res="JSON CONTAINED NO PARSABLE FILENAME"; System.out.println(res); diff --git a/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/AppDeltaStreamingKafka.java b/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/KafkaEventDrivenSparkJob.java similarity index 59% rename from spark320job-mod/src/main/java/dk/ignalina/lab/spark320/AppDeltaStreamingKafka.java rename to spark320job-mod/src/main/java/dk/ignalina/lab/spark320/KafkaEventDrivenSparkJob.java index 6e8ef37..90cbc25 100644 --- a/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/AppDeltaStreamingKafka.java +++ b/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/KafkaEventDrivenSparkJob.java @@ -19,7 +19,12 @@ package dk.ignalina.lab.spark320; +import org.apache.hadoop.shaded.org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.*; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.*; @@ -28,7 +33,7 @@ -public class AppDeltaStreamingKafka { +public class KafkaEventDrivenSparkJob { private static String authType; @@ -36,16 +41,13 @@ public class AppDeltaStreamingKafka { public static void main(String[] args) { Utils.Config config = new Utils.Config(args); -// JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("v20220612 spark 3.0.1 Streaming /event driven spark job"), new Duration(config.msIntervall)); -// SparkSession spark=Utils.createS3SparkSession(config); -// JavaInputDStream> stream = Utils.createStream(config,ssc); - Utils.decorateS3SparkSession(config); + SparkSession.Builder sparkBuilder= Utils.decorateWithS3(config); //for a local spark instance SparkConf conf = new SparkConf() .setAppName("appName") - .setMaster("spark://10.1.1.193:7077") + .setMaster(config.master) .set("spark.submit.deployMode", "cluster") - .set("spark.jars.packages", "org.projectnessie:nessie-deltalake:0.30.0,org.projectnessie:nessie-spark-3.2-extensions:0.30.0") + .set("spark.jars.packages", config.packages) .set("spark.hadoop.nessie.url", config.url) .set("spark.hadoop.nessie.ref", config.branch) .set("spark.hadoop.nessie.authentication.type", config.authType) @@ -54,18 +56,34 @@ public static void main(String[] args) { .set("spark.delta.logStore.class", "org.projectnessie.deltalake.NessieLogStore") .set("spark.delta.logFileHandler.class", "org.projectnessie.deltalake.NessieLogFileMetaParser"); - - JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); - -// spark = SparkSession.builder() -// .master("local[2]") -// .config(conf) -// .getOrCreate(); - - SparkSession spark = SparkSession.builder() + JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("v20220620 spark 3.2.0 Streaming /event driven spark job"), new Duration(config.msIntervall)); + SparkSession spark = sparkBuilder .master(config.master) .config(conf) .getOrCreate(); + + JavaInputDStream> stream = Utils.createStream(config,ssc); + stream.foreachRDD(rdd -> { + JavaRDD filenames = rdd.map(record -> Utils.extractFileName(record)); // VARNING/TODO: List needs to be sorted on date for correct Delta ingest order. + filenames.collect().forEach(fileName -> { + System.out.println("Filename from JSON=" + fileName); + Dataset df = spark.read().parquet(fileName); + df.printSchema(); + + // Thats all folks , do something useful with the dataset + + }); + }); + + + ssc.start(); + try { + ssc.awaitTermination(); + } catch (InterruptedException e) { + e.printStackTrace(); + // TODO Make sureKAFKA read offset is getting NOT updated OR add to dead letter que ! + } + } diff --git a/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/base/Utils.java b/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/base/Utils.java index 9697e1b..db615fc 100644 --- a/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/base/Utils.java +++ b/spark320job-mod/src/main/java/dk/ignalina/lab/spark320/base/Utils.java @@ -19,7 +19,9 @@ package dk.ignalina.lab.spark320.base; -import org.apache.avro.generic.GenericRecord; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import org.apache.hadoop.shaded.org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; @@ -59,6 +61,7 @@ public static class Config { public String branch; public String url; public String authType; + public String packages="org.projectnessie:nessie-deltalake:0.30.0,org.projectnessie:nessie-spark-3.2-extensions:0.30.0"; public Config(String[] args) { topic = args[0]; @@ -108,16 +111,14 @@ public static SparkConf CreateSparkConf(String appName) { return conf; } - static public SparkSession decorateS3SparkSession(Utils.Config config) { - SparkSession spark = SparkSession.builder().master(config.master). + static public SparkSession.Builder decorateWithS3(Utils.Config config) { + return SparkSession.builder().master(config.master). config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem"). config("fs.s3a.access.key",config.s3AccessKey). config("fs.s3a.secret.key",config.s3SecretKey). config("fs.s3a.endpoint", config.s3EndPoint). config("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem"). - config("fs.s3a.path.style.access","true"). - getOrCreate(); - return spark; + config("fs.s3a.path.style.access","true"); } static public JavaInputDStream> createStream(Utils.Config config,JavaStreamingContext ssc) { @@ -142,5 +143,22 @@ static public JavaInputDStream> createStre return stream; } + public static String extractFileName(ConsumerRecord record) { + + String message = ""+record.value(); + JsonObject jo = null; + try { + jo = JsonParser.parseString(message).getAsJsonObject(); + } catch (IllegalStateException ei) { + String res="JSON CONTAINED NO PARSABLE FILENAME"; + System.out.println(res); + return res; + } + String filename=jo.get("body").getAsJsonObject().get("name").getAsString(); + System.out.println("Filename extraced from JSON=" + filename); + + return filename; + } + }