Skip to content

Commit

Permalink
Update multi-format write to take only formats rather than paths.
Browse files Browse the repository at this point in the history
  • Loading branch information
JarrodBaker committed Jun 22, 2022
1 parent ee20380 commit 9c728f8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 21 deletions.
7 changes: 1 addition & 6 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ common {

input = "gs://open-targets-pre-data-releases/21.09.4/input"
error = ${common.output}"/errors/"
additional-outputs = [
{
format = "json"
path = "gs://open-targets-pre-data-releases/21.09.4/output/etl/json"
}
]
additional-outputs = [ "json" ]

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Configuration extends LazyLogging {
output: String,
error: String,
outputFormat: String,
additionalOutputs: List[IOResourceConfig],
additionalOutputs: List[String],
metadata: IOResourceConfig
)

Expand Down
26 changes: 15 additions & 11 deletions src/main/scala/io/opentargets/etl/backend/spark/IoHelpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,32 +115,31 @@ object IoHelpers extends LazyLogging {
* Additional formats are set in the configuration under `common.additional-outputs`. When there are entries here,
* each output is given an additional configuration to facilitate writing in multiple output formats (eg, json and parquet).
* @param resources standard collection of resources to save
* @param configs additional output configurations
* @param additionalFormats additional output configurations
* @param defaultFormat default format for writing outputs
* @return IOResources of outputs to save. This includes all the entries in `resources` and an additional entry for
* each item in `config`.
*/
private def addAdditionalOutputFormats(
resources: IOResources,
configs: List[IOResourceConfig]
additionalFormats: List[String],
defaultFormat: String
): IOResources = {

val cachedResources: IOResources =
resources.mapValues(r => IOResource(r.data.cache(), r.configuration))

cachedResources ++ cachedResources.flatMap(kv => {
val (name, resource) = kv
configs.map(additionalFormat => {
val resourceName = {
val pathComponents = resource.configuration.path.split("/")
if (pathComponents.nonEmpty) pathComponents.last else ""
}
val key = s"${name}_${additionalFormat.format}"
additionalFormats.map(additionalFormat => {
val resourceName = resource.configuration.path.replace(defaultFormat, additionalFormat)
val key = s"${name}_$additionalFormat"
val value = IOResource(
resource.data,
resource.configuration
.copy(
format = additionalFormat.format,
path = s"${additionalFormat.path}/${resourceName}",
format = additionalFormat,
path = resourceName,
generateMetadata = false
)
)
Expand All @@ -162,7 +161,12 @@ object IoHelpers extends LazyLogging {
// add in additional output types
val resourcesToWrite =
if (context.configuration.common.additionalOutputs.isEmpty) outputs
else addAdditionalOutputFormats(outputs, context.configuration.common.additionalOutputs)
else
addAdditionalOutputFormats(
outputs,
context.configuration.common.additionalOutputs,
context.configuration.common.outputFormat
)

val datasetNamesStr = outputs.keys.mkString("(", ", ", ")")
logger.info(s"write datasets $datasetNamesStr")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ class IoHelpersTest extends EtlSparkUnitTest {
import sparkSession.implicits._
val addFormats = PrivateMethod[IOResources]('addAdditionalOutputFormats)
val df = Seq(1).toDF
val resources: IOResources = Map("one" -> IOResource(df, IOResourceConfig("f1", "p1")))
val configs: List[IOResourceConfig] = List(IOResourceConfig("f2", "p2"))
val resources: IOResources = Map("one" -> IOResource(df, IOResourceConfig("f1", "p1/f1/out")))
val configs: List[String] = List("f2")

// when
val result = IoHelpers invokePrivate addFormats(resources, configs)
val result = IoHelpers invokePrivate addFormats(resources, configs, "parquet")

// then
// an entry created for each format
Expand Down

0 comments on commit 9c728f8

Please sign in to comment.