diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d7401897ff6a4..7439c7ab6d6e1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -346,7 +346,8 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with def testStream( _stream: Dataset[_], outputMode: OutputMode = OutputMode.Append, - extraOptions: Map[String, String] = Map.empty)(actions: StreamAction*): Unit = synchronized { + extraOptions: Map[String, String] = Map.empty, + sink: MemorySink = new MemorySink())(actions: StreamAction*): Unit = synchronized { import org.apache.spark.sql.streaming.util.StreamManualClock // `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently @@ -359,7 +360,6 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with var currentStream: StreamExecution = null var lastStream: StreamExecution = null val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> offset to wait for - val sink = new MemorySink val resetConfValues = mutable.Map[String, Option[String]]() val defaultCheckpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath