Skip to content

Commit

Permalink
leverage jobTest in internall tests
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Oct 11, 2023
1 parent a53adfc commit ad0e31d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.spotify.scio.io.{KeyedIO, ScioIO}
import com.spotify.scio.util.ScioUtil
import com.spotify.scio.values.SCollection
import com.spotify.scio.coders.{Coder, CoderMaterializer}
import org.apache.beam.sdk.options.{ApplicationNameOptions, PipelineOptions, PipelineOptionsFactory}
import org.apache.beam.sdk.runners.PTransformOverride
import org.apache.beam.sdk.testing.TestStream
import org.apache.beam.sdk.testing.TestStream.{ElementEvent, Event}
Expand Down
134 changes: 37 additions & 97 deletions scio-test/src/main/scala/com/spotify/scio/testing/ScioIOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,127 +112,67 @@ trait ScioIOSpec extends PipelineSpec {

def testJobTestInput[T: ClassTag: Coder](xs: Seq[T], in: String = "in")(
ioFn: String => ScioIO[T]
)(readFn: (ScioContext, String) => SCollection[T]): Unit = {
def runMain(args: Array[String]): Unit = {
val (sc, argz) = ContextAndArgs(args)
readFn(sc, argz("input")).saveAsTextFile("out")
sc.run()
()
}
)(
readFn: (ScioContext, String) => SCollection[T]
): Unit = {
val testJob = (sc: ScioContext) => readFn(sc, in).saveAsTextFile("out")

val builder = com.spotify.scio.testing
.JobTest("null")
jobTest(testJob)
.input(ioFn(in), xs)
.output(TextIO("out")) { coll =>
coll should containInAnyOrder(xs.map(_.toString))
()
}
builder.setUp()
runMain(Array(s"--input=$in") :+ s"--appName=${builder.testId}")
builder.tearDown()
.output(TextIO("out"))(_ should containInAnyOrder(xs.map(_.toString)))
.run()

the[IllegalArgumentException] thrownBy {
val builder = com.spotify.scio.testing
.JobTest("null")
jobTest(testJob)
.input(CustomIO[T](in), xs)
.output(TextIO("out")) { coll =>
coll should containInAnyOrder(xs.map(_.toString))
()
}
builder.setUp()
runMain(Array(s"--input=$in") :+ s"--appName=${builder.testId}")
builder.tearDown()
} should have message s"requirement failed: Missing test input: ${ioFn(in).testId}, " +
s"available: [CustomIO($in)]"
()
.output(TextIO("out"))(_ should containInAnyOrder(xs.map(_.toString)))
.run()
} should have message s"requirement failed: Missing test input: ${ioFn(in).testId}, available: [CustomIO($in)]"
}

def testJobTestOutput[T: Coder, WT](xs: Seq[T], out: String = "out")(
ioFn: String => ScioIO[T]
)(writeFn: (SCollection[T], String) => ClosedTap[WT]): Unit = {
def runMain(args: Array[String]): Unit = {
val (sc, argz) = ContextAndArgs(args)
writeFn(sc.parallelize(xs), argz("output"))
sc.run()
()
}
)(
writeFn: (SCollection[T], String) => ClosedTap[WT]
): Unit = {
val testJob = (sc: ScioContext) => writeFn(sc.parallelize(xs), out)

val builder = com.spotify.scio.testing
.JobTest("null")
.output(ioFn(out)) { coll =>
coll should containInAnyOrder(xs)
()
}
builder.setUp()
runMain(Array(s"--output=$out") :+ s"--appName=${builder.testId}")
builder.tearDown()
jobTest(testJob)
.output(ioFn(out))(_ should containInAnyOrder(xs))
.run()

the[IllegalArgumentException] thrownBy {
val builder = com.spotify.scio.testing
.JobTest("null")
.output(CustomIO[T](out)) { coll =>
coll should containInAnyOrder(xs)
()
}
builder.setUp()
runMain(Array(s"--output=$out") :+ s"--appName=${builder.testId}")
builder.tearDown()
} should have message s"requirement failed: Missing test output: ${ioFn(out).testId}, " +
s"available: [CustomIO($out)]"
()
jobTest(testJob)
.output(CustomIO[T](out))(_ should containInAnyOrder(xs))
.run()
} should have message s"requirement failed: Missing test output: ${ioFn(out).testId}, available: [CustomIO($out)]"
}

def testJobTest[T: Coder](xs: Seq[T], in: String = "in", out: String = "out")(
ioFn: String => ScioIO[T]
)(
readFn: (ScioContext, String) => SCollection[T]
)(writeFn: (SCollection[T], String) => ClosedTap[_]): Unit = {
def runMain(args: Array[String]): Unit = {
val (sc, argz) = ContextAndArgs(args)
val data = readFn(sc, argz("input"))
writeFn(data, argz("output"))
sc.run()
()
}

val builder = com.spotify.scio.testing
.JobTest("null")
)(
writeFn: (SCollection[T], String) => ClosedTap[_]
): Unit = {
val testJob = (sc: ScioContext) => writeFn(readFn(sc, in), out)
jobTest(testJob)
.input(ioFn(in), xs)
.output(ioFn(out)) { coll =>
coll should containInAnyOrder(xs)
()
}
builder.setUp()
runMain(Array(s"--input=$in", s"--output=$out") :+ s"--appName=${builder.testId}")
builder.tearDown()
.output(ioFn(out))(_ should containInAnyOrder(xs))
.run()

the[IllegalArgumentException] thrownBy {
val builder = com.spotify.scio.testing
.JobTest("null")
jobTest(testJob)
.input(CustomIO[T](in), xs)
.output(ioFn(out)) { coll =>
coll should containInAnyOrder(xs)
()
}
builder.setUp()
runMain(Array(s"--input=$in", s"--output=$out") :+ s"--appName=${builder.testId}")
builder.tearDown()
} should have message s"requirement failed: Missing test input: ${ioFn(in).testId}, " +
s"available: [CustomIO($in)]"
.output(ioFn(out))(_ should containInAnyOrder(xs))
.run()
} should have message s"requirement failed: Missing test input: ${ioFn(in).testId}, available: [CustomIO($in)]"

the[IllegalArgumentException] thrownBy {
val builder = com.spotify.scio.testing
.JobTest("null")
jobTest(testJob)
.input(ioFn(in), xs)
.output(CustomIO[T](out)) { coll =>
coll should containInAnyOrder(xs)
()
}
builder.setUp()
runMain(Array(s"--input=$in", s"--output=$out") :+ s"--appName=${builder.testId}")
builder.tearDown()
} should have message s"requirement failed: Missing test output: ${ioFn(out).testId}, " +
s"available: [CustomIO($out)]"
()
.output(CustomIO[T](out))(_ should containInAnyOrder(xs))
.run()
} should have message s"requirement failed: Missing test output: ${ioFn(out).testId}, available: [CustomIO($out)]"
}
}

0 comments on commit ad0e31d

Please sign in to comment.