Skip to content

Commit

Permalink
Merge pull request #254 from opentargets/1767-multiple-output-formats
Browse files Browse the repository at this point in the history
1767 multiple output formats
  • Loading branch information
JarrodBaker authored Jun 22, 2022
2 parents 9c70656 + 9c728f8 commit 9698b22
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 42 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ The `options` field configures how Spark will read the input files. Both Json an
configurable options, details of which can be
found [in the documentation](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html)

#### Additional output formats

Typically all outputs should use `common.output-format` to specify a Spark supported write format ("json", "parquet",
etc). Relying on this value allows us to read in pre-computed steps easily.

If you require outputs in additional formats, complete the `common.additional-outputs` section. Metadata is not
generated for these additional sources.

#### Configuring Spark

If you want to use a local installation of Spark customise the `application.conf` with the following spark-uri field and
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ object Dependencies {
"org.apache.spark" %% "spark-graphx" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided"
)
lazy val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.3"

lazy val testVersion = "3.2.2"
lazy val testingDeps = Seq(
"org.scalactic" %% "scalactic" % testVersion,
"org.scalatest" %% "scalatest" % testVersion % "test"
) :+ scalaCheck
"org.scalatest" %% "scalatest" % testVersion % "test",
"org.scalamock" %% "scalamock" % "5.1.0" % "test"
)

lazy val typeSafeConfig = "com.typesafe" % "config" % "1.4.1"

Expand Down
1 change: 1 addition & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ common {

input = "gs://open-targets-pre-data-releases/21.09.4/input"
error = ${common.output}"/errors/"
additional-outputs = [ "json" ]

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ object Configuration extends LazyLogging {
output: String,
error: String,
outputFormat: String,
additionalOutputs: List[String],
metadata: IOResourceConfig
)

Expand Down
66 changes: 50 additions & 16 deletions src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,6 @@ object IoHelpers extends LazyLogging {
type IOResourceConfigurations = Map[String, IOResourceConfig]
type IOResources = Map[String, IOResource]

/** Create an IOResourceConf Map for each of the given files, where the file is a key and the value is the output
* configuration
* @param files will be the names out the output files
* @param configuration to provide access to the program's configuration
* @return a map of file -> IOResourceConfig
*/
def generateDefaultIoOutputConfiguration(
files: String*
)(configuration: OTConfig): IOResourceConfigurations = {
files.map { n =>
n -> IOResourceConfig(configuration.common.outputFormat, configuration.common.output + s"/$n")
} toMap
}

/** It creates an hashmap of dataframes.
* Es. inputsDataFrame {"disease", Dataframe} , {"target", Dataframe}
* Reading is the first step in the pipeline
Expand Down Expand Up @@ -94,7 +80,6 @@ object IoHelpers extends LazyLogging {
def seqToIOResourceConfigMap(resourceConfigs: Seq[IOResourceConfig]): IOResourceConfigurations = {
(for (rc <- resourceConfigs) yield Random.alphanumeric.take(6).toString -> rc).toMap
}

private def writeTo(output: IOResource)(implicit context: ETLSessionContext): IOResource = {
implicit val spark: SparkSession = context.sparkSession

Expand Down Expand Up @@ -124,6 +109,45 @@ object IoHelpers extends LazyLogging {
output
}

/** Add additional output formats to prepared IOResources. Each dataframe will be cached to prevent recalculation on a
* subsequent call to write.
*
* Additional formats are set in the configuration under `common.additional-outputs`. When there are entries here,
* each output is given an additional configuration to facilitate writing in multiple output formats (eg, json and parquet).
* @param resources standard collection of resources to save
* @param additionalFormats additional output configurations
* @param defaultFormat default format for writing outputs
* @return IOResources of outputs to save. This includes all the entries in `resources` and an additional entry for
* each item in `config`.
*/
private def addAdditionalOutputFormats(
resources: IOResources,
additionalFormats: List[String],
defaultFormat: String
): IOResources = {

val cachedResources: IOResources =
resources.mapValues(r => IOResource(r.data.cache(), r.configuration))

cachedResources ++ cachedResources.flatMap(kv => {
val (name, resource) = kv
additionalFormats.map(additionalFormat => {
val resourceName = resource.configuration.path.replace(defaultFormat, additionalFormat)
val key = s"${name}_$additionalFormat"
val value = IOResource(
resource.data,
resource.configuration
.copy(
format = additionalFormat,
path = resourceName,
generateMetadata = false
)
)
key -> value
})
})
}

/** writeTo save all datasets in the Map outputs. It does write per IOResource
* its companion metadata dataset
*
Expand All @@ -134,10 +158,20 @@ object IoHelpers extends LazyLogging {
def writeTo(outputs: IOResources)(implicit context: ETLSessionContext): IOResources = {
implicit val spark: SparkSession = context.sparkSession

// add in additional output types
val resourcesToWrite =
if (context.configuration.common.additionalOutputs.isEmpty) outputs
else
addAdditionalOutputFormats(
outputs,
context.configuration.common.additionalOutputs,
context.configuration.common.outputFormat
)

val datasetNamesStr = outputs.keys.mkString("(", ", ", ")")
logger.info(s"write datasets $datasetNamesStr")

outputs foreach { out =>
resourcesToWrite foreach { out =>
logger.info(s"save dataset ${out._1}")
writeTo(out._2)

Expand Down
24 changes: 1 addition & 23 deletions src/test/scala/io/opentargets/etl/backend/HelpersTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,7 @@ class HelpersTest extends EtlSparkUnitTest with TableDrivenPropertyChecks with L
lazy val testDf: DataFrame =
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(testData), testStruct)

"generateDefaultIoOutputConfiguration" should "generate a valid configuration for each of its input files" in {
// given
Configuration.config match {
case Right(config) =>
val inputFileNames = Seq("a", "b", "c")
// when
val results = IoHelpers.generateDefaultIoOutputConfiguration(inputFileNames: _*)(config)
// then
assert(results.keys.size == inputFileNames.size)
assert(
results.values.forall(ioResConf =>
ioResConf.format == config.common.outputFormat &&
inputFileNames.contains(ioResConf.path.split("/").last)
)
)

case Left(ex) =>
logger.error(ex.prettyPrint())
assertDoesNotCompile("OT config loading problem")
}
}

they should "load correctly when header and separator as specified" in {
"Files" should "load correctly when header and separator as specified" in {
// given
val path: String = this.getClass.getResource("/drugbank_v.csv").getPath
val input = IOResourceConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.opentargets.etl.backend.target
import io.opentargets.etl.backend.EtlSparkUnitTest
import io.opentargets.etl.backend.spark.{IOResource, IOResourceConfig, IoHelpers}
import io.opentargets.etl.backend.spark.IoHelpers.IOResources
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

class IoHelpersTest extends EtlSparkUnitTest {

"Additional formats" should "be added to IOResources outputs" in {
// given
import sparkSession.implicits._
val addFormats = PrivateMethod[IOResources]('addAdditionalOutputFormats)
val df = Seq(1).toDF
val resources: IOResources = Map("one" -> IOResource(df, IOResourceConfig("f1", "p1/f1/out")))
val configs: List[String] = List("f2")

// when
val result = IoHelpers invokePrivate addFormats(resources, configs, "parquet")

// then
// an entry created for each format
result should have size (2)
// the dataframe is cached
result.head._2.data.storageLevel.useMemory should be(true)
}
}

0 comments on commit 9698b22

Please sign in to comment.