diff --git a/README.md b/README.md index 3363b64f..ba1d452d 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,14 @@ The `options` field configures how Spark will read the input files. Both Json an configurable options, details of which can be found [in the documentation](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html) +#### Additional output formats + +Typically all outputs should use `common.output-format` to specify a Spark supported write format ("json", "parquet", +etc). Relying on this value allows us to read in pre-computed steps easily. + +If you require outputs in additional formats, complete the `common.additional-outputs` section. Metadata is not +generated for these additional sources. + #### Configuring Spark If you want to use a local installation of Spark customise the `application.conf` with the following spark-uri field and diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f01c7596..1d60b101 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -37,13 +37,13 @@ object Dependencies { "org.apache.spark" %% "spark-graphx" % sparkVersion % "provided", "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided" ) - lazy val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.3" lazy val testVersion = "3.2.2" lazy val testingDeps = Seq( "org.scalactic" %% "scalactic" % testVersion, - "org.scalatest" %% "scalatest" % testVersion % "test" - ) :+ scalaCheck + "org.scalatest" %% "scalatest" % testVersion % "test", + "org.scalamock" %% "scalamock" % "5.1.0" % "test" + ) lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1" diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index f8f54412..40ee7b24 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -53,6 +53,7 @@ common { input = "gs://open-targets-pre-data-releases/21.09.4/input" error = ${common.output}"/errors/" + additional-outputs = [ "json" ] } diff --git a/src/main/scala/io/opentargets/etl/backend/Configuration.scala b/src/main/scala/io/opentargets/etl/backend/Configuration.scala index 1741bf6b..e85207ae 100644 --- a/src/main/scala/io/opentargets/etl/backend/Configuration.scala +++ b/src/main/scala/io/opentargets/etl/backend/Configuration.scala @@ -157,6 +157,7 @@ object Configuration extends LazyLogging { output: String, error: String, outputFormat: String, + additionalOutputs: List[String], metadata: IOResourceConfig ) diff --git a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala index 8645c22d..488e1264 100644 --- a/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala +++ b/src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala @@ -48,20 +48,6 @@ object IoHelpers extends LazyLogging { type IOResourceConfigurations = Map[String, IOResourceConfig] type IOResources = Map[String, IOResource] - /** Create an IOResourceConf Map for each of the given files, where the file is a key and the value is the output - * configuration - * @param files will be the names out the output files - * @param configuration to provide access to the program's configuration - * @return a map of file -> IOResourceConfig - */ - def generateDefaultIoOutputConfiguration( - files: String* - )(configuration: OTConfig): IOResourceConfigurations = { - files.map { n => - n -> IOResourceConfig(configuration.common.outputFormat, configuration.common.output + s"/$n") - } toMap - } - /** It creates an hashmap of dataframes. * Es. inputsDataFrame {"disease", Dataframe} , {"target", Dataframe} * Reading is the first step in the pipeline @@ -94,7 +80,6 @@ object IoHelpers extends LazyLogging { def seqToIOResourceConfigMap(resourceConfigs: Seq[IOResourceConfig]): IOResourceConfigurations = { (for (rc <- resourceConfigs) yield Random.alphanumeric.take(6).toString -> rc).toMap } - private def writeTo(output: IOResource)(implicit context: ETLSessionContext): IOResource = { implicit val spark: SparkSession = context.sparkSession @@ -124,6 +109,45 @@ object IoHelpers extends LazyLogging { output } + /** Add additional output formats to prepared IOResources. Each dataframe will be cached to prevent recalculation on a + * subsequent call to write. + * + * Additional formats are set in the configuration under `common.additional-outputs`. When there are entries here, + * each output is given an additional configuration to facilitate writing in multiple output formats (eg, json and parquet). + * @param resources standard collection of resources to save + * @param additionalFormats additional output configurations + * @param defaultFormat default format for writing outputs + * @return IOResources of outputs to save. This includes all the entries in `resources` and an additional entry for + * each item in `config`. + */ + private def addAdditionalOutputFormats( + resources: IOResources, + additionalFormats: List[String], + defaultFormat: String + ): IOResources = { + + val cachedResources: IOResources = + resources.mapValues(r => IOResource(r.data.cache(), r.configuration)) + + cachedResources ++ cachedResources.flatMap(kv => { + val (name, resource) = kv + additionalFormats.map(additionalFormat => { + val resourceName = resource.configuration.path.replace(defaultFormat, additionalFormat) + val key = s"${name}_$additionalFormat" + val value = IOResource( + resource.data, + resource.configuration + .copy( + format = additionalFormat, + path = resourceName, + generateMetadata = false + ) + ) + key -> value + }) + }) + } + /** writeTo save all datasets in the Map outputs. It does write per IOResource * its companion metadata dataset * @@ -134,10 +158,20 @@ object IoHelpers extends LazyLogging { def writeTo(outputs: IOResources)(implicit context: ETLSessionContext): IOResources = { implicit val spark: SparkSession = context.sparkSession + // add in additional output types + val resourcesToWrite = + if (context.configuration.common.additionalOutputs.isEmpty) outputs + else + addAdditionalOutputFormats( + outputs, + context.configuration.common.additionalOutputs, + context.configuration.common.outputFormat + ) + val datasetNamesStr = outputs.keys.mkString("(", ", ", ")") logger.info(s"write datasets $datasetNamesStr") - outputs foreach { out => + resourcesToWrite foreach { out => logger.info(s"save dataset ${out._1}") writeTo(out._2) diff --git a/src/test/scala/io/opentargets/etl/backend/HelpersTest.scala b/src/test/scala/io/opentargets/etl/backend/HelpersTest.scala index 755d4822..689e0497 100644 --- a/src/test/scala/io/opentargets/etl/backend/HelpersTest.scala +++ b/src/test/scala/io/opentargets/etl/backend/HelpersTest.scala @@ -25,29 +25,7 @@ class HelpersTest extends EtlSparkUnitTest with TableDrivenPropertyChecks with L lazy val testDf: DataFrame = sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(testData), testStruct) - "generateDefaultIoOutputConfiguration" should "generate a valid configuration for each of its input files" in { - // given - Configuration.config match { - case Right(config) => - val inputFileNames = Seq("a", "b", "c") - // when - val results = IoHelpers.generateDefaultIoOutputConfiguration(inputFileNames: _*)(config) - // then - assert(results.keys.size == inputFileNames.size) - assert( - results.values.forall(ioResConf => - ioResConf.format == config.common.outputFormat && - inputFileNames.contains(ioResConf.path.split("/").last) - ) - ) - - case Left(ex) => - logger.error(ex.prettyPrint()) - assertDoesNotCompile("OT config loading problem") - } - } - - they should "load correctly when header and separator as specified" in { + "Files" should "load correctly when header and separator as specified" in { // given val path: String = this.getClass.getResource("/drugbank_v.csv").getPath val input = IOResourceConfig( diff --git a/src/test/scala/io/opentargets/etl/backend/target/IoHelpersTest.scala b/src/test/scala/io/opentargets/etl/backend/target/IoHelpersTest.scala new file mode 100644 index 00000000..50335c7a --- /dev/null +++ b/src/test/scala/io/opentargets/etl/backend/target/IoHelpersTest.scala @@ -0,0 +1,26 @@ +package io.opentargets.etl.backend.target +import io.opentargets.etl.backend.EtlSparkUnitTest +import io.opentargets.etl.backend.spark.{IOResource, IOResourceConfig, IoHelpers} +import io.opentargets.etl.backend.spark.IoHelpers.IOResources +import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper + +class IoHelpersTest extends EtlSparkUnitTest { + + "Additional formats" should "be added to IOResources outputs" in { + // given + import sparkSession.implicits._ + val addFormats = PrivateMethod[IOResources]('addAdditionalOutputFormats) + val df = Seq(1).toDF + val resources: IOResources = Map("one" -> IOResource(df, IOResourceConfig("f1", "p1/f1/out"))) + val configs: List[String] = List("f2") + + // when + val result = IoHelpers invokePrivate addFormats(resources, configs, "parquet") + + // then + // an entry created for each format + result should have size (2) + // the dataframe is cached + result.head._2.data.storageLevel.useMemory should be(true) + } +}