From 00da7a013a94433646cbb98bc4ab9d9b7187fe07 Mon Sep 17 00:00:00 2001 From: Marc Lamy Date: Wed, 10 Apr 2024 20:03:50 +0200 Subject: [PATCH] add the dataio-test project * add the JavaImplicitConverters trait * add the SparkSpec trait * add the SparkStreamingSpec trait * add the FileSystemSpec trait * update the documentation --- build.sbt | 145 ++++++++++-------- docs/content/advanced/advanced.md | 2 +- docs/content/configuration/configuration.md | 2 +- docs/content/getting-started.md | 4 +- docs/content/tests.md | 107 +++++++++++++ .../amadeus/dataio/test/FileSystemSpec.scala | 36 +++++ .../dataio/test/JavaImplicitConverters.scala | 52 +++++++ .../com/amadeus/dataio/test/SparkSpec.scala | 70 +++++++++ .../dataio/test/SparkStreamingSpec.scala | 55 +++++++ test/src/test/resources/spark-spec/data.csv | 3 + test/src/test/resources/spark-spec/empty.csv | 0 .../test/JavaImplicitConvertersTest.scala | 68 ++++++++ .../amadeus/dataio/test/SparkSpecTest.scala | 41 +++++ .../dataio/test/SparkStreamingSpecTest.scala | 13 ++ 14 files changed, 526 insertions(+), 72 deletions(-) create mode 100644 docs/content/tests.md create mode 100644 test/src/main/scala/com/amadeus/dataio/test/FileSystemSpec.scala create mode 100644 test/src/main/scala/com/amadeus/dataio/test/JavaImplicitConverters.scala create mode 100644 test/src/main/scala/com/amadeus/dataio/test/SparkSpec.scala create mode 100644 test/src/main/scala/com/amadeus/dataio/test/SparkStreamingSpec.scala create mode 100644 test/src/test/resources/spark-spec/data.csv create mode 100644 test/src/test/resources/spark-spec/empty.csv create mode 100644 test/src/test/scala/com/amadeus/dataio/test/JavaImplicitConvertersTest.scala create mode 100644 test/src/test/scala/com/amadeus/dataio/test/SparkSpecTest.scala create mode 100644 test/src/test/scala/com/amadeus/dataio/test/SparkStreamingSpecTest.scala diff --git a/build.sbt b/build.sbt index 284f7bb..daabe0c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,43 +1,40 @@ ThisBuild / versionScheme := Some("strict") // Build configuration - ThisBuild / scalaVersion := "2.12.12" - ThisBuild / organization := "com.amadeus.dataio" - val sparkVersion = settingKey[String]("The version of Spark used for building.") - ThisBuild / sparkVersion := "3.4.1" +ThisBuild / scalaVersion := "2.12.12" +ThisBuild / organization := "com.amadeus.dataio" +val sparkVersion = settingKey[String]("The version of Spark used for building.") +ThisBuild / sparkVersion := "3.4.1" // Common dependencies - ThisBuild / libraryDependencies ++= Seq( - // Core - "org.apache.logging.log4j" %% "log4j-api-scala" % "12.0", - "org.apache.logging.log4j" % "log4j-api" % "2.19.0", - "com.typesafe" % "config" % "1.4.0", - "commons-io" % "commons-io" % "2.9.0", - // Spark - "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value, - "org.apache.spark" %% "spark-sql" % sparkVersion.value, - "org.apache.spark" %% "spark-core" % sparkVersion.value, - // Tests - "org.scalatest" %% "scalatest" % "3.2.16" % Test, - "org.scalamock" %% "scalamock" % "5.2.0" % Test - ) +ThisBuild / libraryDependencies ++= Seq( + // Core + "org.apache.logging.log4j" %% "log4j-api-scala" % "12.0", + "org.apache.logging.log4j" % "log4j-api" % "2.19.0", + "com.typesafe" % "config" % "1.4.0", + "commons-io" % "commons-io" % "2.9.0", + // Spark + "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion.value, + "org.apache.spark" %% "spark-sql" % sparkVersion.value, + "org.apache.spark" %% "spark-core" % sparkVersion.value +) // Tests configuration - ThisBuild / Test / parallelExecution := false - ThisBuild / Test / publishArtifact := false +ThisBuild / Test / parallelExecution := false +ThisBuild / Test / publishArtifact := false // Publication configuration - ThisBuild / publishTo := Some("GitHub Packages" at "https://maven.pkg.github.com/AmadeusITGroup/dataio-framework") - ThisBuild / credentials += Credentials( - "GitHub Package Registry", - "maven.pkg.github.com", - "", - sys.env.getOrElse("GITHUB_REGISTRY_TOKEN", "") - ) - ThisBuild / publishMavenStyle := true - ThisBuild / pomIncludeRepository := { _ => true } - ThisBuild / pomExtra := - https://github.com/AmadeusITGroup/dataio-framework +ThisBuild / publishTo := Some("GitHub Packages" at "https://maven.pkg.github.com/AmadeusITGroup/dataio-framework") +ThisBuild / credentials += Credentials( + "GitHub Package Registry", + "maven.pkg.github.com", + "", + sys.env.getOrElse("GITHUB_REGISTRY_TOKEN", "") +) +ThisBuild / publishMavenStyle := true +ThisBuild / pomIncludeRepository := { _ => true } +ThisBuild / pomExtra := + https://github.com/AmadeusITGroup/dataio-framework Apache License 2.0 @@ -46,45 +43,57 @@ ThisBuild / versionScheme := Some("strict") // Release configuration - import ReleaseTransformations._ +import ReleaseTransformations._ - releaseVersionBump := sbtrelease.Version.Bump.Minor - releaseProcess := Seq[ReleaseStep]( - checkSnapshotDependencies, - inquireVersions, - runClean, - runTest, - setReleaseVersion, - commitReleaseVersion, - tagRelease, - publishArtifacts, - setNextVersion, - commitNextVersion, - pushChanges - ) +releaseVersionBump := sbtrelease.Version.Bump.Minor +releaseProcess := Seq[ReleaseStep]( + checkSnapshotDependencies, + inquireVersions, + runClean, + runTest, + setReleaseVersion, + commitReleaseVersion, + tagRelease, + publishArtifacts, + setNextVersion, + commitNextVersion, + pushChanges +) // Projects configuration - lazy val root = (project in file(".")) - .aggregate(core) - .settings( - publishArtifact := false +lazy val root = (project in file(".")) + .aggregate(core, test) + .settings( + publishArtifact := false + ) + +lazy val core = (project in file("core")) + .settings( + name := "dataio-core", + libraryDependencies ++= Seq( + // Distribution + "javax.mail" % "mail" % "1.4.7", + // Input / Output + "com.crealytics" %% "spark-excel" % s"${sparkVersion.value}_0.19.0", + "org.elasticsearch" %% "elasticsearch-spark-30" % "8.4.3" + exclude ("org.scala-lang", "scala-library") + exclude ("org.scala-lang", "scala-reflect") + exclude ("org.slf4j", "slf4j-api") + exclude ("org.apache.spark", "spark-core_" + scalaVersion.value.substring(0, 4)) + exclude ("org.apache.spark", "spark-sql_" + scalaVersion.value.substring(0, 4)) + exclude ("org.apache.spark", "spark-catalyst_" + scalaVersion.value.substring(0, 4)) + exclude ("org.apache.spark", "spark-streaming_" + scalaVersion.value.substring(0, 4)), + // Tests + "org.scalatest" %% "scalatest" % "3.2.16" % Test, + "org.scalamock" %% "scalamock" % "5.2.0" % Test ) + ) - lazy val core = (project in file("core")) - .settings( - name := "dataio-core", - libraryDependencies ++= Seq( - // Distribution - "javax.mail" % "mail" % "1.4.7", - // Input / Output - "com.crealytics" %% "spark-excel" % s"${sparkVersion.value}_0.19.0", - "org.elasticsearch" %% "elasticsearch-spark-30" % "8.4.3" - exclude ("org.scala-lang", "scala-library") - exclude ("org.scala-lang", "scala-reflect") - exclude ("org.slf4j", "slf4j-api") - exclude ("org.apache.spark", "spark-core_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-sql_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-catalyst_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-streaming_" + scalaVersion.value.substring(0, 4)) - ) - ) \ No newline at end of file +lazy val test = (project in file("test")) + .settings( + name := "dataio-test", + libraryDependencies ++= Seq( + "org.scalatest" %% "scalatest" % "3.2.16", + "org.scalamock" %% "scalamock" % "5.2.0" + ) + ) diff --git a/docs/content/advanced/advanced.md b/docs/content/advanced/advanced.md index 598d55e..0aebf24 100644 --- a/docs/content/advanced/advanced.md +++ b/docs/content/advanced/advanced.md @@ -2,6 +2,6 @@ title: Advanced layout: default has_children: true -nav_order: 6 +nav_order: 7 --- # Advanced diff --git a/docs/content/configuration/configuration.md b/docs/content/configuration/configuration.md index f95e33c..0dae97f 100644 --- a/docs/content/configuration/configuration.md +++ b/docs/content/configuration/configuration.md @@ -2,7 +2,7 @@ title: Configuration layout: default has_children: true -nav_order: 5 +nav_order: 6 --- # Configuration diff --git a/docs/content/getting-started.md b/docs/content/getting-started.md index 653a407..75bad1e 100644 --- a/docs/content/getting-started.md +++ b/docs/content/getting-started.md @@ -17,7 +17,7 @@ nav_order: 2 ## Installation -Data I/O was built with Spark 3.3.2 and Scala 2.12. Support for prior versions is not guaranteed. +Data I/O was built and tested with Spark 3.2.1/3.3.2/3.4.1 and Scala 2.12. Support for prior versions is not guaranteed. {: .warning} Published releases are available on GitHub Packages, in the AmadeusITGroup repository. @@ -27,7 +27,7 @@ Using Maven: ```xml com.amadeus.dataio - dataio-framework + dataio-core x.x.x ``` diff --git a/docs/content/tests.md b/docs/content/tests.md new file mode 100644 index 0000000..44ecf8c --- /dev/null +++ b/docs/content/tests.md @@ -0,0 +1,107 @@ +--- +title: Writing tests +layout: default +nav_order: 5 +--- +# Writing tests +
+ + Table of contents + + {: .text-delta } +1. TOC +{:toc} +
+ +--- + +Data I/O offers a separate library with utility traits and methods designed to facilitate testing Scala/Spark SQL applications. + +## Installation + +Published releases are available on GitHub Packages, in the AmadeusITGroup repository. + +Using Maven: + +```xml + + com.amadeus.dataio + dataio-test + x.x.x + +``` + +## Overview + + +### Interacting with the file system +The `FileSystemSpec` trait provides the Hadoop `LocalFileSystem` for tests needing direct access to an instance of `FileSystem`. + +Example: + +```scala + +import com.amadeus.dataio.test._ +import org.scalatest.flatspec.AnyFlatSpec + +case class MyAppTest extends AnyFlatSpec with FileSystemSpec { + "MyAppTest" should "do something" in { + assert(fs.exists("file:///my_file.txt")) + } +} +``` + + +### Interacting with a SparkSession +The `SparkSpec` trait provides a local Spark session and helper functions for Spark tests: +- `getTestName: String`: Returns the test suite's name. +- `collectData(path: String, format: String, schema: Option[String] = None): Array[String])`: Collects data from the file system. + +Note that extending this trait, you will have to override the getTestName: String function. + +Example: + +```scala + +import com.amadeus.dataio.test._ +import org.scalatest.flatspec.AnyFlatSpec + +case class MyAppTest extends AnyFlatSpec with SparkSpec { + override def getTestName = "MyAppTest" + + "MyAppTest" should "do something" in { + spark.read.format("csv").load("my_data.csv") + collectData + } +} +``` + + +### Interacting with a Streaming context +The `SparkStreamingSpec` trait provides a local Spark session and helper functions for Spark Streaming tests: +- `enableSparkStreamingSchemaInference(): Unit`: Enables Spark streaming schema inference. +- `collectDataStream(dataFrame: DataFrame): Array[String]`: Collects data from a DataFrame read from a stream using an in-memory sink. + + +### Implicitly converting Scala Maps and Lists in Java equivalents +It it sometimes necessary to build complex map structures while building `Typesafe Config` objects, requiring redundant Scala-to-Java conversions. + +To simplify this, you may extend the `JavaImplicitConverters` trait. + +Example: + +```scala + +import com.amadeus.dataio.test._ +import com.typesafe.config.ConfigFactory +import org.scalatest.flatspec.AnyFlatSpec + +case class MyAppTest extends AnyFlatSpec with JavaImplicitConverters { + "MyAppTest" should "do something" in { + ConfigFactory.parseMap( + Map("NodeName" -> Seq(Map("Type" -> "com.Entity"), Map("Type" -> "com.Entity"))) + ) + } +} +``` + diff --git a/test/src/main/scala/com/amadeus/dataio/test/FileSystemSpec.scala b/test/src/main/scala/com/amadeus/dataio/test/FileSystemSpec.scala new file mode 100644 index 0000000..b27e52d --- /dev/null +++ b/test/src/main/scala/com/amadeus/dataio/test/FileSystemSpec.scala @@ -0,0 +1,36 @@ +package com.amadeus.dataio.test + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.scalatest.{BeforeAndAfter, TestSuite} + +/** + * Provides the Hadoop LocalFileSystem for tests needing direct access to an instance of FileSystem. + * + * Provides a dedicated instance initialized before each test and automatically closed after each test, providing as + * much isolation as possible between tests. It also deletes the dataio-test temporary directory (/tmp/dataio-test/) and + * sub-directories, before closing the FileSystem. + * + * e.g. + * {{{ + * class MyClassTest extends WordSpec with FileSystemSpec{ + * // provided by FileSystemSpec: + * // fs: FileSystem + * // val tmpPath: String = "file:///tmp/dataio-test/" + * } + * }}} + */ +trait FileSystemSpec extends TestSuite with BeforeAndAfter { + val tmpPath = "file:///tmp/dataio-test/" + + var fs: FileSystem = _ + + before { + fs = FileSystem.newInstance(new Configuration()) + } + + after { + fs.delete(new Path(tmpPath), true) + fs.close() + } +} diff --git a/test/src/main/scala/com/amadeus/dataio/test/JavaImplicitConverters.scala b/test/src/main/scala/com/amadeus/dataio/test/JavaImplicitConverters.scala new file mode 100644 index 0000000..5019050 --- /dev/null +++ b/test/src/main/scala/com/amadeus/dataio/test/JavaImplicitConverters.scala @@ -0,0 +1,52 @@ +package com.amadeus.dataio.test + +import scala.language.implicitConversions +import scala.collection.JavaConverters._ + +/** + *

Contains helper implicit conversions to make working with functions expecting Java maps/lists easier.

+ *

These conversions are meant for tests only, for instance to create typesafe Config objects.

+ *

e.g.: + *

+ * import com.amadeus.dataio.test.JavaImplicitConverters
+ * import com.typesafe.config.ConfigFactory
+ * import org.scalatest.flatspec.AnyFlatSpec
+ *
+ * class MyAppSpec extends AnyFlatSpec with JavaImplicitConverters {
+ *   "MyApp" should "do something" in {
+ *     val config = ConfigFactory.parseMap(
+ *       Map(
+ *         "MyField" -> Seq("val1", "val2", "val3"),
+ *         "MyOtherField" -> 5
+ *       )
+ *     )
+ *   }
+ * }
+ * 
+ *

+ */ +trait JavaImplicitConverters { + import scala.language.implicitConversions + + implicit def scalaMap2Java(m: Map[String, _]): java.util.Map[String, _] = { + toJava(m).asInstanceOf[java.util.Map[String, _]] + } + + implicit def scalaSeq2Java(s: Seq[_]): java.util.List[_] = { + val list = new java.util.ArrayList[Any] + s.foreach(item => list.add(toJava(item))) + list + } + + private def toJava(obj: Any): Any = { + obj match { + case m: Map[_, _] => + m.map { case (key, value) => + (key, toJava(value)) + }.asJava + case i: Iterable[_] => + i.map(toJava).asJava + case _ => obj + } + } +} diff --git a/test/src/main/scala/com/amadeus/dataio/test/SparkSpec.scala b/test/src/main/scala/com/amadeus/dataio/test/SparkSpec.scala new file mode 100644 index 0000000..db5a626 --- /dev/null +++ b/test/src/main/scala/com/amadeus/dataio/test/SparkSpec.scala @@ -0,0 +1,70 @@ +package com.amadeus.dataio.test + +import org.apache.spark.sql.{SQLContext, SQLImplicits, SparkSession} +import org.scalatest.{BeforeAndAfter, TestSuite} + +/** + * Provides functions for Spark tests: + * + * + * e.g. + * {{{ + * class MyClassTest extends AnyFlatSpec with SparkSpec { + * // provided by SparkSpec: + * // sparkSession: SparkSession + * // sparkTestImplicits + * } + * }}} + */ +trait SparkSpec extends TestSuite with BeforeAndAfter { + + implicit var spark: SparkSession = _ + + object sparkTestImplicits extends SQLImplicits with Serializable { + protected override def _sqlContext: SQLContext = spark.sqlContext + } + + before { + spark = SparkSession + .builder() + .master("local[1]") + .config("spark.ui.enabled", "false") + .appName(getTestName) + .getOrCreate() + } + + after { + spark.close() + } + + /** + * @return the test suite's name + */ + def getTestName: String + + /** + * Collects data from file system. + * + * @param path the data's path + * @param format the data's format + * @param schema the schema of dataframe to be read. Infer schema by default + * @return the data read as a Array[String] using org.apache.spark.sql.Row toString + */ + def collectData(path: String, format: String, schema: Option[String] = None): Array[String] = { + var dataFrameReader = spark.read + + dataFrameReader = schema match { + case Some(definedSchema) => dataFrameReader.schema(definedSchema) + case None => dataFrameReader.option("inferSchema", "true") + } + + val dataFrame = dataFrameReader.format(format).load(path).collect() + + dataFrame.map(row => row.toString()) + } +} diff --git a/test/src/main/scala/com/amadeus/dataio/test/SparkStreamingSpec.scala b/test/src/main/scala/com/amadeus/dataio/test/SparkStreamingSpec.scala new file mode 100644 index 0000000..df81e1a --- /dev/null +++ b/test/src/main/scala/com/amadeus/dataio/test/SparkStreamingSpec.scala @@ -0,0 +1,55 @@ +package com.amadeus.dataio.test + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.streaming.Trigger + +/** + * Provides functions for Spark Streaming tests: + * + * + * e.g. + * {{{ + * class MyClassTest extends AnyFlatSpec with SparkStreamingSpec { + * // provided by SparkStreamingSpec: + * // sparkSession: SparkSession + * // sparkTestImplicits + * } + * }}} + */ +trait SparkStreamingSpec extends SparkSpec { + + /** + * enable spark streaming schema inference + */ + def enableSparkStreamingSchemaInference(): Unit = spark.sql("set spark.sql.streaming.schemaInference=true") + + /** + * Collect data from dataframe read from stream + * + * Use an in memory sink to gather the data + * + * @param dataFrame the dataFrame to collect. + * @return the String representation of each rows. + */ + def collectDataStream(dataFrame: DataFrame): Array[String] = { + + val tmpTableName = "result_" + System.currentTimeMillis() + val streamWriter = dataFrame.writeStream + .format("memory") + .queryName(tmpTableName) + .trigger(Trigger.Once()) + .start() + + streamWriter.awaitTermination(2000) + + val resultData = spark.sql("select * from " + tmpTableName).collect() + + resultData.map(row => row.toString()) + } + +} diff --git a/test/src/test/resources/spark-spec/data.csv b/test/src/test/resources/spark-spec/data.csv new file mode 100644 index 0000000..055d7a1 --- /dev/null +++ b/test/src/test/resources/spark-spec/data.csv @@ -0,0 +1,3 @@ +1,John,30 +2,Alice,25 +3,Bob,35 \ No newline at end of file diff --git a/test/src/test/resources/spark-spec/empty.csv b/test/src/test/resources/spark-spec/empty.csv new file mode 100644 index 0000000..e69de29 diff --git a/test/src/test/scala/com/amadeus/dataio/test/JavaImplicitConvertersTest.scala b/test/src/test/scala/com/amadeus/dataio/test/JavaImplicitConvertersTest.scala new file mode 100644 index 0000000..94408fc --- /dev/null +++ b/test/src/test/scala/com/amadeus/dataio/test/JavaImplicitConvertersTest.scala @@ -0,0 +1,68 @@ +package com.amadeus.dataio.test + +import org.scalatest.flatspec.AnyFlatSpec + +class JavaImplicitConvertersTest extends AnyFlatSpec with JavaImplicitConverters { + + "JavaImplicitConverters" should "convert Scala map to Java map" in { + val scalaMap = Map( + "key1" -> "value1", + "key2" -> "value2" + ) + + val javaMap: java.util.Map[String, _] = scalaMap2Java(scalaMap) + + assert(javaMap.containsKey("key1")) + assert(javaMap.containsKey("key2")) + assert(javaMap.get("key1") == "value1") + assert(javaMap.get("key2") == "value2") + } + + it should "convert Scala sequence to Java list" in { + val scalaSeq = Seq("value1", "value2", "value3") + + val javaList: java.util.List[_] = scalaSeq2Java(scalaSeq) + + assert(javaList.contains("value1")) + assert(javaList.contains("value2")) + assert(javaList.contains("value3")) + } + + it should "convert nested Scala map to nested Java map" in { + val scalaNestedMap = Map( + "nestedMap" -> Map( + "key1" -> "value1", + "key2" -> "value2" + ) + ) + + val javaNestedMap: java.util.Map[String, _] = scalaMap2Java(scalaNestedMap("nestedMap").asInstanceOf[Map[String, _]]) + + assert(javaNestedMap.containsKey("key1")) + assert(javaNestedMap.containsKey("key2")) + assert(javaNestedMap.get("key1") == "value1") + assert(javaNestedMap.get("key2") == "value2") + } + + it should "convert nested Scala sequence to nested Java list" in { + val scalaNestedSeq = Seq(Seq("value1", "value2"), Seq("value3", "value4")) + + val javaNestedList: java.util.List[_] = scalaSeq2Java(scalaNestedSeq.flatMap(identity)) + + assert(javaNestedList.contains("value1")) + assert(javaNestedList.contains("value2")) + assert(javaNestedList.contains("value3")) + assert(javaNestedList.contains("value4")) + } + + it should "handle empty input gracefully" in { + val emptyMap = Map.empty[String, String] + val emptySeq = Seq.empty[String] + + val javaEmptyMap: java.util.Map[String, _] = scalaMap2Java(emptyMap) + val javaEmptyList: java.util.List[_] = scalaSeq2Java(emptySeq) + + assert(javaEmptyMap.isEmpty) + assert(javaEmptyList.isEmpty) + } +} diff --git a/test/src/test/scala/com/amadeus/dataio/test/SparkSpecTest.scala b/test/src/test/scala/com/amadeus/dataio/test/SparkSpecTest.scala new file mode 100644 index 0000000..ea13cd5 --- /dev/null +++ b/test/src/test/scala/com/amadeus/dataio/test/SparkSpecTest.scala @@ -0,0 +1,41 @@ +package com.amadeus.dataio.test + +import org.scalatest.flatspec.AnyFlatSpec + +class SparkSpecTest extends AnyFlatSpec with SparkSpec { + override def getTestName: String = "SparkSpecTest" + + val dataPath = getClass.getResource("/spark-spec/data.csv").getPath + val emptyPath = getClass.getResource("/spark-spec/empty.csv").getPath + + "SparkSpec" should "collect data from file system" in { + // Collect data from CSV file + val collectedData1 = collectData(dataPath, "csv") + assert(collectedData1.length == 3) // Three lines excluding header + assert(collectedData1.contains("[1,John,30]")) + assert(collectedData1.contains("[2,Alice,25]")) + assert(collectedData1.contains("[3,Bob,35]")) + + // Attempt to collect data from empty CSV file + val collectedData2 = collectData(emptyPath, "csv") + assert(collectedData2.isEmpty) + } + + it should "collect data with defined schema from file system" in { + // Assuming data is present at "path" in "format" with a defined schema + val schema = "id INT, name STRING, age INT" + + // Collect data from CSV file with defined schema + val collectedData1 = collectData(dataPath, "csv", Some(schema)) + // Verify collected data + assert(collectedData1.length == 3) // Three lines excluding header + assert(collectedData1.contains("[1,John,30]")) + assert(collectedData1.contains("[2,Alice,25]")) + assert(collectedData1.contains("[3,Bob,35]")) + + // Collect data from empty CSV file with defined schema + val collectedData2 = collectData(emptyPath, "csv", Some(schema)) + // Verify collected data + assert(collectedData2.isEmpty) + } +} diff --git a/test/src/test/scala/com/amadeus/dataio/test/SparkStreamingSpecTest.scala b/test/src/test/scala/com/amadeus/dataio/test/SparkStreamingSpecTest.scala new file mode 100644 index 0000000..324e372 --- /dev/null +++ b/test/src/test/scala/com/amadeus/dataio/test/SparkStreamingSpecTest.scala @@ -0,0 +1,13 @@ +package com.amadeus.dataio.test + +import org.scalatest.flatspec.AnyFlatSpec + +class SparkStreamingSpecTest extends AnyFlatSpec with SparkStreamingSpec { + override def getTestName: String = "SparkStreamingSpecTest" + + "SparkStreamingSpec" should "enable Spark streaming schema inference" in { + enableSparkStreamingSchemaInference() + val schemaInferenceEnabled = spark.conf.get("spark.sql.streaming.schemaInference") + assert(schemaInferenceEnabled == "true") + } +}