Skip to content

Commit

Permalink
Add test support for SMB transform
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Aug 29, 2023
1 parent a961ea0 commit e051fc5
Show file tree
Hide file tree
Showing 9 changed files with 1,650 additions and 5,538 deletions.
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,15 @@ def previousVersion(currentVersion: String): Option[String] = {
}

lazy val mimaSettings = Def.settings(
// format: off
mimaBinaryIssueFilters := Seq(
// scio-smb testing required to declare new abstract methods in parent SortedBucketIO
ProblemFilters.exclude[ReversedMissingMethodProblem](
"org.apache.beam.sdk.extensions.smb.SortedBucketIO#Read.getInputDirectories"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"org.apache.beam.sdk.extensions.smb.SortedBucketIO#Read.getFilenameSuffix"
)
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("com.spotify.scio.smb.syntax.SortedBucketScioContext$SortMergeTransformReadBuilder"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("com.spotify.scio.smb.syntax.SortedBucketScioContext$SortMergeTransformWithSideInputsWriteBuilder"),
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("com.spotify.scio.smb.syntax.SortedBucketScioContext$SortMergeTransformWriteBuilder"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.beam.sdk.extensions.smb.SortedBucketIO#Read.getInputDirectories"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.beam.sdk.extensions.smb.SortedBucketIO#Read.getFilenameSuffix")
),
// format: on
mimaPreviousArtifacts := previousVersion(version.value)
.filter(_ => publishArtifact.value)
.map(organization.value % s"${normalizedName.value}_${scalaBinaryVersion.value}" % _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ private[scio] class TestInput(val m: Map[String, JobInputSource[_]]) {
private[scio] class TestOutput(val m: Map[String, SCollection[_] => Any]) {
val s: MSet[String] =
java.util.concurrent.ConcurrentHashMap.newKeySet[String]().asScala

def apply[T](io: ScioIO[T]): SCollection[T] => Any = {
def apply[T](io: ScioIO[T]): SCollection[T] => Any = apply(io.testId)
def apply[T](key: String): SCollection[T] => Any = {
// TODO: support Materialize outputs, maybe Materialized[T]?
val key = io.testId
require(
m.contains(key),
s"Missing test output: $key, available: ${m.keys.mkString("[", ", ", "]")}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ public abstract static class TransformOutput<K1, K2, V> implements Serializable
public abstract static class Read<V> implements Serializable {

@Nullable
public abstract ImmutableList<String> getInputDirectories();
abstract ImmutableList<String> getInputDirectories();

public abstract String getFilenameSuffix();
abstract String getFilenameSuffix();

public abstract TupleTag<V> getTupleTag();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@ package com.spotify.scio.smb

import com.spotify.scio.coders.Coder
import com.spotify.scio.io.{KeyedIO, TapOf, TapT, TestIO}
import org.apache.beam.sdk.extensions.smb.{SortedBucketIO => BIO}
import com.spotify.scio.util.ScioUtil

import scala.jdk.CollectionConverters._

final class SortedBucketIO[K, T](id: String, override val keyBy: T => K)(implicit
final class SortedBucketIO[K, T](path: String, override val keyBy: T => K)(implicit
override val keyCoder: Coder[K]
) extends TestIO[T]
with KeyedIO[T] {
override type KeyT = K
override val tapT: TapT.Aux[T, T] = TapOf[T]
override def testId: String = SortedBucketIO.testId(id)
override def testId: String = SortedBucketIO.testId(path)
}

object SortedBucketIO {
def apply[K: Coder, T](id: String, keyBy: T => K): SortedBucketIO[K, T] =
new SortedBucketIO[K, T](id, keyBy)
def apply[K: Coder, T](path: String, keyBy: T => K): SortedBucketIO[K, T] =
new SortedBucketIO[K, T](path, keyBy)

private[smb] def testId(read: BIO.Read[_]): String =
testId(read.getInputDirectories.asScala.mkString(","))
private def testId(id: String): String = s"SortedBucketIO($id)"
def testId(paths: String*): String = {
val normalizedPaths = paths.map(p => ScioUtil.strippedPath(p) + "/").mkString(",")
s"SortedBucketIO($normalizedPaths)"
}
}
Loading

0 comments on commit e051fc5

Please sign in to comment.