Skip to content

Commit

Permalink
Use trigger.AvailableNow in ConvertToDelta suites
Browse files Browse the repository at this point in the history
  • Loading branch information
mingdai-db committed Jun 27, 2024
1 parent eb26989 commit fb419c0
Showing 1 changed file with 39 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -368,30 +369,32 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
val options = Map("checkpointLocation" -> checkpoint)

// Add initial data to parquet file sink
val q = df.writeStream.options(options).format("parquet").start(dataLocation)
stream.addData(1, 2, 3)
q.processAllAvailable()
q.stop()
df.writeStream
.options(options)
.format("parquet")
.trigger(Trigger.AvailableNow())
.start(dataLocation)
.awaitTermination()

// Add non-streaming data: this should be ignored in conversion.
spark.range(10, 20).write.mode("append").parquet(dataLocation)
sql(s"CONVERT TO DELTA parquet.`$dataLocation`")

// Write data to delta
val q2 = df.writeStream.options(options).format("delta").start(dataLocation)

try {
stream.addData(4, 5, 6)
q2.processAllAvailable()

// Should only read streaming data.
checkAnswer(
spark.read.format("delta").load(dataLocation),
(1 to 6).map { Row(_) }
)
} finally {
q2.stop()
}
stream.addData(4, 5, 6)
df.writeStream
.options(options)
.format("delta")
.trigger(Trigger.AvailableNow())
.start(dataLocation)
.awaitTermination()

// Should only read streaming data.
checkAnswer(
spark.read.format("delta").load(dataLocation),
(1 to 6).map { Row(_) }
)
}
}

Expand All @@ -407,10 +410,13 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
)

// Add initial data to parquet file sink
val q = df.writeStream.options(options).format("parquet").start(dataLocation)
stream.addData(1 to 5)
q.processAllAvailable()
q.stop()
df.writeStream
.options(options)
.format("parquet")
.trigger(Trigger.AvailableNow())
.start(dataLocation)
.awaitTermination()

// Add non-streaming data: this should not be ignored in conversion.
spark.range(11, 21).select('id.cast("int") as "col1")
Expand All @@ -421,20 +427,19 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
}

// Write data to delta
val q2 = df.writeStream.options(options).format("delta").start(dataLocation)

try {
stream.addData(6 to 10)
q2.processAllAvailable()

// Should read all data not just streaming data
checkAnswer(
spark.read.format("delta").load(dataLocation),
(1 to 20).map { Row(_) }
)
} finally {
q2.stop()
}
stream.addData(6 to 10)
df.writeStream
.options(options)
.format("delta")
.trigger(Trigger.AvailableNow())
.start(dataLocation)
.awaitTermination()

// Should read all data not just streaming data
checkAnswer(
spark.read.format("delta").load(dataLocation),
(1 to 20).map { Row(_) }
)
}
}

Expand Down

0 comments on commit fb419c0

Please sign in to comment.