From fb419c097ce491376bf77c3038a5e5660a56d8c4 Mon Sep 17 00:00:00 2001 From: ming dai Date: Wed, 26 Jun 2024 17:42:57 -0700 Subject: [PATCH] Use trigger.AvailableNow in ConvertToDelta suites --- .../sql/delta/ConvertToDeltaSuiteBase.scala | 73 ++++++++++--------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala index b374eede5f7..ac25bb75798 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ConvertToDeltaSuiteBase.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions._ +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -368,30 +369,32 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons val options = Map("checkpointLocation" -> checkpoint) // Add initial data to parquet file sink - val q = df.writeStream.options(options).format("parquet").start(dataLocation) stream.addData(1, 2, 3) - q.processAllAvailable() - q.stop() + df.writeStream + .options(options) + .format("parquet") + .trigger(Trigger.AvailableNow()) + .start(dataLocation) + .awaitTermination() // Add non-streaming data: this should be ignored in conversion. spark.range(10, 20).write.mode("append").parquet(dataLocation) sql(s"CONVERT TO DELTA parquet.`$dataLocation`") // Write data to delta - val q2 = df.writeStream.options(options).format("delta").start(dataLocation) - - try { - stream.addData(4, 5, 6) - q2.processAllAvailable() - - // Should only read streaming data. - checkAnswer( - spark.read.format("delta").load(dataLocation), - (1 to 6).map { Row(_) } - ) - } finally { - q2.stop() - } + stream.addData(4, 5, 6) + df.writeStream + .options(options) + .format("delta") + .trigger(Trigger.AvailableNow()) + .start(dataLocation) + .awaitTermination() + + // Should only read streaming data. + checkAnswer( + spark.read.format("delta").load(dataLocation), + (1 to 6).map { Row(_) } + ) } } @@ -407,10 +410,13 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons ) // Add initial data to parquet file sink - val q = df.writeStream.options(options).format("parquet").start(dataLocation) stream.addData(1 to 5) - q.processAllAvailable() - q.stop() + df.writeStream + .options(options) + .format("parquet") + .trigger(Trigger.AvailableNow()) + .start(dataLocation) + .awaitTermination() // Add non-streaming data: this should not be ignored in conversion. spark.range(11, 21).select('id.cast("int") as "col1") @@ -421,20 +427,19 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons } // Write data to delta - val q2 = df.writeStream.options(options).format("delta").start(dataLocation) - - try { - stream.addData(6 to 10) - q2.processAllAvailable() - - // Should read all data not just streaming data - checkAnswer( - spark.read.format("delta").load(dataLocation), - (1 to 20).map { Row(_) } - ) - } finally { - q2.stop() - } + stream.addData(6 to 10) + df.writeStream + .options(options) + .format("delta") + .trigger(Trigger.AvailableNow()) + .start(dataLocation) + .awaitTermination() + + // Should read all data not just streaming data + checkAnswer( + spark.read.format("delta").load(dataLocation), + (1 to 20).map { Row(_) } + ) } }