From 2cf9e0669cac466466f2b1f7707e032806316e72 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Fri, 12 Jul 2024 08:21:38 +0100 Subject: [PATCH] Spark/Scala: Load Spark data frame into CrateDB using Scala and HTTP (#520) * New example with Scala Spark and the CrateDB http endpoint * Typo * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl * Update by-language/scala-spark-http/README.md Co-authored-by: Andreas Motl * Moving example to by-dataframe folder as suggested by amotl --------- Co-authored-by: Andreas Motl --- by-dataframe/spark/scala-http/README.md | 31 +++++++++ .../scala-http/SparkCrateDBhttpExample.scala | 64 +++++++++++++++++++ by-dataframe/spark/scala-http/build.sbt | 11 ++++ 3 files changed, 106 insertions(+) create mode 100644 by-dataframe/spark/scala-http/README.md create mode 100644 by-dataframe/spark/scala-http/SparkCrateDBhttpExample.scala create mode 100644 by-dataframe/spark/scala-http/build.sbt diff --git a/by-dataframe/spark/scala-http/README.md b/by-dataframe/spark/scala-http/README.md new file mode 100644 index 00000000..6a87116f --- /dev/null +++ b/by-dataframe/spark/scala-http/README.md @@ -0,0 +1,31 @@ +# Load Spark data frame into CrateDB using Scala and HTTP + +This example aims to demonstrate how a Spark data frame can be loaded into CrateDB using the CrateDB HTTP endpoint. + +It assumes there is a CrateDB instance running on localhost accepting connections with the default `crate` superuser, and it relies on the following table being created: + +.. code-block:: sql + + CREATE TABLE myschema.mytable ( + examplefield1 TIMESTAMP, + anotherfield geo_point, + jsonpayload OBJECT + ); + +When applicable for your environment, you may want to consider to replace `scalaj.http` with async calls like `akka.http` or `AsyncHttpClient`. +You may also want to explore if connection pooling is useful in your environment, +and if JDBC calls leveraging the PostgreSQL wire protocol are more convenient +for your particular case. + +Saying this, note that this example uses [CrateDB's HTTP bulk operations] to ingest +data, which is currently the most efficient way to do it. + +[CrateDB's HTTP bulk operations]: https://cratedb.com/docs/guide/performance/inserts/bulk.html + +You can run this example with [sbt]: + +.. code-block:: shell + + sbt run + +[sbt]: https://www.scala-sbt.org/download/ diff --git a/by-dataframe/spark/scala-http/SparkCrateDBhttpExample.scala b/by-dataframe/spark/scala-http/SparkCrateDBhttpExample.scala new file mode 100644 index 00000000..0c801194 --- /dev/null +++ b/by-dataframe/spark/scala-http/SparkCrateDBhttpExample.scala @@ -0,0 +1,64 @@ +import scalaj.http.{Http, HttpOptions} +import org.apache.spark.sql.{SparkSession, Row} +import org.apache.spark.sql.types._ +import org.json4s.jackson.Serialization +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import java.sql.Timestamp + +object SparkCrateDBhttpExample { + def main(args: Array[String]): Unit = { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm") + + val data = Seq( + Row(Timestamp.valueOf(LocalDateTime.parse("2024-07-10 10:00", formatter)), Array(9.744417, 47.413417), """{"example_quantity_field": 30, "example_string_field": "abc"}"""), + Row(Timestamp.valueOf(LocalDateTime.parse("2024-07-10 11:00", formatter)), Array(13.46738, 52.50463), """{"example_quantity_field": 40, "example_string_field": "def"}""") + ) + + val spark = SparkSession.builder + .appName("test") + .master("local[*]") + .getOrCreate() + val df = spark.createDataFrame( + spark.sparkContext.parallelize(data), + StructType( + List( + StructField("examplefield1", TimestampType, true), + StructField("anotherfield", ArrayType(DoubleType), true), + StructField("jsonpayload", StringType, true) + ) + ) + ) + + val url = "http://localhost:4200/_sql" + + val columns = df.columns.mkString(", ") + val placeholders = df.columns.map(_ => "?").mkString(", ") + val stmt = s"INSERT INTO myschema.mytable ($columns) VALUES ($placeholders)" + + val columnNames = df.columns + df.foreachPartition { partition => + val bulkArgs: List[List[Any]] = partition.map { row => + columnNames.indices.map(i => row.get(i)).toList + }.toList + + if (bulkArgs.nonEmpty) { + val data = Map( + "stmt" -> stmt, + "bulk_args" -> bulkArgs + ) + + implicit val formats = org.json4s.DefaultFormats + val jsonString = Serialization.write(data) + + val response = Http(url) + .postData(jsonString) + .header("Content-Type", "application/json") + .asString + + println(response) + } + } + } +} + diff --git a/by-dataframe/spark/scala-http/build.sbt b/by-dataframe/spark/scala-http/build.sbt new file mode 100644 index 00000000..731233a4 --- /dev/null +++ b/by-dataframe/spark/scala-http/build.sbt @@ -0,0 +1,11 @@ +name := "SparkCrateDBhttpExample" + +version := "0.1" + +scalaVersion := "2.11.12" + +libraryDependencies ++= Seq( + "org.scala-lang.modules" %% "scala-xml" % "1.0.6", + "org.scalaj" %% "scalaj-http" % "2.4.2", + "org.apache.spark" %% "spark-sql" % "2.4.8" +)