Skip to content

Commit

Permalink
(fix #5290) Support empty input in TrasnformOverride.ofSource (#5293)
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty authored Mar 7, 2024
1 parent 86432c8 commit 40c900b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 56 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Keys.*
import explicitdeps.ExplicitDepsPlugin.autoImport.moduleFilterRemoveValue
import sbtassembly.AssemblyPlugin.autoImport.*
import com.github.sbt.git.SbtGit.GitKeys.gitRemoteRepo
import com.typesafe.tools.mima.core.*
import de.heikoseeberger.sbtheader.CommentCreator
import org.typelevel.scalacoptions.JavaMajorVersion.javaMajorVersion

Expand Down Expand Up @@ -372,7 +373,11 @@ ThisBuild / githubWorkflowAddedJobs ++= Seq(
)

// mima
ThisBuild / mimaBinaryIssueFilters ++= Seq()
ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.spotify.scio.testing.TransformOverride.ofSource"
)
)

// headers
lazy val currentYear = java.time.LocalDate.now().getYear
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.spotify.scio.testing

import com.spotify.scio.coders.{Coder, CoderMaterializer}

import java.lang.{Iterable => JIterable}
import java.util

import com.spotify.scio.transforms.BaseAsyncLookupDoFn
import com.spotify.scio.util.Functions
import org.apache.beam.runners.core.construction.{PTransformReplacements, ReplacementOutputs}
Expand Down Expand Up @@ -95,14 +96,16 @@ object TransformOverride {
* A [[PTransformOverride]] which when applied will override a source with name `name` with a
* source producing `values`.
*/
def ofSource[U](name: String, values: Seq[U]): PTransformOverride =
def ofSource[U: Coder](name: String, values: Seq[U]): PTransformOverride = {
val bCoder = CoderMaterializer.beamWithDefault(Coder[U])
PTransformOverride.of(
new EqualNamePTransformMatcher(name),
factory[PBegin, PCollection[U], PTransform[PBegin, PCollection[U]]](
inFn = t => t.getPipeline.begin(),
replacement = Create.of(values.asJava)
replacement = Create.of(values.asJava).withCoder(bCoder)
)
)
}

/**
* @return
Expand Down
115 changes: 63 additions & 52 deletions scio-test/src/test/scala/com/spotify/scio/testing/JobTestTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ class JobTestTest extends PipelineSpec {
e.getMessage should endWith(" was not greater than or equal to 100")
}

"transformOverride" should "pass with a source override" in {
"transformOverride.ofSource" should "support non-empty input" in {
JobTest[SourceTransformOverrideJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq.empty[String]) // still required for pipeline construction
Expand All @@ -962,21 +962,18 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with an override" in {
JobTest[TransformOverrideJob.type]
it should "support empty input" in {
JobTest[SourceTransformOverrideJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
.input(TextIO("in.txt"), Seq.empty[String])
.transformOverride(
TransformOverride.of[Int, String](
"myTransform",
Map(1 -> "10", 2 -> "20", 3 -> "30")
)
TransformOverride.ofSource[String]("ReadInput", List())
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.output(TextIO("out.txt"))(_ should beEmpty)
.run()
}

it should "pass with a 1-to-n override" in {
"TransformOverride.ofIter" should "support 1-to-n inputs" in {
JobTest[TransformOverrideIterJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
Expand All @@ -992,34 +989,48 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with a function override" in {
it should "support 1-to-n function inputs" in {
JobTest[TransformOverrideIterJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2", "3"))
.transformOverride(
// #JobTestTest_example_mock_iter_fun
TransformOverride.ofIter[Int, String](
"myTransform",
// map fn equal to: Map(1 -> Seq(), 2 -> Seq("1"), 3 -> Seq("1", "2")}
(i: Int) => { (1 until i).map(String.valueOf(_)) }
)
// #JobTestTest_example_mock_iter_fun
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
.run()
}

"TransformOverride.of" should "support non-empty input" in {
JobTest[TransformOverrideJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
.transformOverride(
TransformOverride.of[Int, String](
"myTransform",
(i: Int) => s"${i * 10}"
Map(1 -> "10", 2 -> "20", 3 -> "30")
)
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.run()
}

it should "pass with a 1-to-n function override" in {
JobTest[TransformOverrideIterJob.type]
it should "support function input" in {
JobTest[TransformOverrideJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2", "3"))
.input(TextIO("in.txt"), Seq("1", "2"))
.transformOverride(
// #JobTestTest_example_mock_iter_fun
TransformOverride.ofIter[Int, String](
TransformOverride.of[Int, String](
"myTransform",
// map fn equal to: Map(1 -> Seq(), 2 -> Seq("1"), 3 -> Seq("1", "2")}
(i: Int) => { (1 until i).map(String.valueOf(_)) }
(i: Int) => s"${i * 10}"
)
// #JobTestTest_example_mock_iter_fun
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.run()
}

Expand Down Expand Up @@ -1105,7 +1116,7 @@ class JobTestTest extends PipelineSpec {
}
}

it should "pass with a KV override" in {
"TransformOverride.ofKV" should "support non-empty input" in {
JobTest[TransformOverrideKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
Expand All @@ -1120,36 +1131,36 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with a 1-to-n KV override" in {
JobTest[TransformOverrideIterKVJob.type]
it should "suppport a KV function override" in {
JobTest[TransformOverrideKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2", "3"))
.input(TextIO("in.txt"), Seq("1", "2"))
.transformOverride(
TransformOverride.ofIterKV[Int, BaseAsyncLookupDoFn.Try[String]](
TransformOverride.ofKV[Int, BaseAsyncLookupDoFn.Try[String]](
"myTransform",
Map(1 -> Seq(), 2 -> Seq("20", "21"), 3 -> Seq("30"), 4 -> Seq("40"))
.map { case (k, v) => k -> v.map(new BaseAsyncLookupDoFn.Try(_)) }
(i: Int) => new BaseAsyncLookupDoFn.Try(s"${i * 10}")
)
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("20", "21", "30")))
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.run()
}

it should "pass with a KV function override" in {
JobTest[TransformOverrideKVJob.type]
"TransformOverride.ofIterKV" should "support non-empty input" in {
JobTest[TransformOverrideIterKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
.input(TextIO("in.txt"), Seq("1", "2", "3"))
.transformOverride(
TransformOverride.ofKV[Int, BaseAsyncLookupDoFn.Try[String]](
TransformOverride.ofIterKV[Int, BaseAsyncLookupDoFn.Try[String]](
"myTransform",
(i: Int) => new BaseAsyncLookupDoFn.Try(s"${i * 10}")
Map(1 -> Seq(), 2 -> Seq("20", "21"), 3 -> Seq("30"), 4 -> Seq("40"))
.map { case (k, v) => k -> v.map(new BaseAsyncLookupDoFn.Try(_)) }
)
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("20", "21", "30")))
.run()
}

it should "pass with a 1-to-n KV function override" in {
it should "support a 1-to-n KV function input" in {
JobTest[TransformOverrideIterKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2", "3"))
Expand All @@ -1164,7 +1175,7 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with an AsyncLookup override" in {
"TransformOverride.ofAsyncLookup" should "support non-empty input" in {
JobTest[TransformOverrideKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
Expand All @@ -1180,20 +1191,6 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with a 1-to-n AsyncLookup override" in {
JobTest[TransformOverrideIterKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
.transformOverride(
TransformOverride.ofIterAsyncLookup[Int, String](
"myTransform",
Map(1 -> Seq(), 2 -> Seq("10", "20"), 3 -> Seq("30"))
)
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.run()
}

it should "pass with an AsyncLookup function override" in {
JobTest[TransformOverrideKVJob.type]
.args("--input=in.txt", "--output=out.txt")
Expand All @@ -1210,7 +1207,7 @@ class JobTestTest extends PipelineSpec {
.run()
}

it should "pass with a 1-to-n AsyncLookup function override" in {
"TransformOverride.ofIterAsyncLookup" should "support a 1-to-n AsyncLookup function override" in {
JobTest[TransformOverrideIterKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2", "3"))
Expand All @@ -1224,4 +1221,18 @@ class JobTestTest extends PipelineSpec {
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("1", "1", "2")))
.run()
}

it should "support a 1-to-n AsyncLookup override" in {
JobTest[TransformOverrideIterKVJob.type]
.args("--input=in.txt", "--output=out.txt")
.input(TextIO("in.txt"), Seq("1", "2"))
.transformOverride(
TransformOverride.ofIterAsyncLookup[Int, String](
"myTransform",
Map(1 -> Seq(), 2 -> Seq("10", "20"), 3 -> Seq("30"))
)
)
.output(TextIO("out.txt"))(_ should containInAnyOrder(List("10", "20")))
.run()
}
}

0 comments on commit 40c900b

Please sign in to comment.