Skip to content

Commit

Permalink
Allow passing custom sink totestStream()
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Jun 4, 2024
1 parent 8b88f5a commit 7ef7715
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
def testStream(
_stream: Dataset[_],
outputMode: OutputMode = OutputMode.Append,
extraOptions: Map[String, String] = Map.empty)(actions: StreamAction*): Unit = synchronized {
extraOptions: Map[String, String] = Map.empty,
sink: MemorySink = new MemorySink())(actions: StreamAction*): Unit = synchronized {
import org.apache.spark.sql.streaming.util.StreamManualClock

// `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently
Expand All @@ -359,7 +360,6 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with
var currentStream: StreamExecution = null
var lastStream: StreamExecution = null
val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> offset to wait for
val sink = new MemorySink
val resetConfValues = mutable.Map[String, Option[String]]()
val defaultCheckpointLocation =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
Expand Down

0 comments on commit 7ef7715

Please sign in to comment.