From 7ef7715c778c7faf51ab9c8f79029d39583fedbc Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Tue, 4 Jun 2024 18:22:34 +0200 Subject: [PATCH] Allow passing custom sink totestStream() --- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d7401897ff6a4..7439c7ab6d6e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -346,7 +346,8 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with def testStream( _stream: Dataset[_], outputMode: OutputMode = OutputMode.Append, - extraOptions: Map[String, String] = Map.empty)(actions: StreamAction*): Unit = synchronized { + extraOptions: Map[String, String] = Map.empty, + sink: MemorySink = new MemorySink())(actions: StreamAction*): Unit = synchronized { import org.apache.spark.sql.streaming.util.StreamManualClock // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently @@ -359,7 +360,6 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with var currentStream: StreamExecution = null var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> offset to wait for - val sink = new MemorySink val resetConfValues = mutable.Map[String, Option[String]]() val defaultCheckpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath