From 204eb0ce1ea2917d40644f393c658dae37cc722a Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 26 Sep 2023 14:35:02 -0700 Subject: [PATCH 1/4] [Spark] Propagate catalog table through DeltaSink --- .../spark/sql/delta/DeltaAnalysis.scala | 10 +++++++- .../sql/delta/sources/DeltaDataSource.scala | 2 ++ .../spark/sql/delta/sources/DeltaSink.scala | 14 +++++++++-- .../apache/spark/sql/delta/DeltaSuite.scala | 24 +++++++++++++++++++ 4 files changed, 47 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index f6fdeb6672d..7ae539fde16 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -553,7 +553,15 @@ class DeltaAnalysis(session: SparkSession) } else deltaMerge d.copy(target = stripTempViewForMergeWrapper(d.target)) - case streamWrite: WriteToStream => + case origStreamWrite @ WriteToStream(_, _, sink: DeltaSink, _, _, _, _, _) => + val streamWrite = origStreamWrite.catalogTable match { + case Some(catalogTable) if sink.catalogTable.isEmpty => + // Hook up the missing catalog table, since we didn't have access to it when we first + // created the DeltaSink in DeltaDataSource.createSink (Spark API). + origStreamWrite.copy(sink = sink.copy(catalogTable = Some(catalogTable))) + case _ => origStreamWrite + } + verifyDeltaSourceSchemaLocation( streamWrite.inputQuery, streamWrite.resolvedCheckpointLocation) streamWrite diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 424197f8303..11571adea93 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -169,6 +169,8 @@ class DeltaDataSource throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString) } val deltaOptions = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf) + // NOTE: Spark API doesn't give access to the CatalogTable here, but DeltaAnalysis will pick + // that info out of the containing WriteToStream (if present), and update the sink there. new DeltaSink(sqlContext, new Path(path), partitionColumns, outputMode, deltaOptions) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index ad4961033ed..d7b217f174a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -39,16 +40,25 @@ import org.apache.spark.util.Utils /** * A streaming sink that writes data into a Delta Table. */ -class DeltaSink( +case class DeltaSink( sqlContext: SQLContext, path: Path, partitionColumns: Seq[String], outputMode: OutputMode, - options: DeltaOptions) + options: DeltaOptions, + catalogTable: Option[CatalogTable]) extends Sink with ImplicitMetadataOperation with DeltaLogging { + def this( + sqlContext: SQLContext, + path: Path, + partitionColumns: Seq[String], + outputMode: OutputMode, + options: DeltaOptions) = this( + sqlContext, path, partitionColumns, outputMode, options, catalogTable = None) + private val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) private val sqlConf = sqlContext.sparkSession.sessionState.conf diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index 17e6793c6b6..a99caf97f48 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -1184,6 +1184,30 @@ class DeltaSuite extends QueryTest } } + test("streaming write to catalog table via data source API") { + withTable("tab") { + Seq(1, 2, 3).toDF.write.format("delta").saveAsTable("tab") + checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3) + + // Streaming write should correctly update the Delta table. + val input = MemoryStream[Int] + val q = input.toDF + .writeStream + .format("delta") + .option( + "checkpointLocation", + Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath) + .toTable("tab") + try { + input.addData(10, 12, 13) + q.processAllAvailable() + checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3, 10, 12, 13) + } finally { + q.stop() + } + } + } + test("support partitioning with batch data source API - append") { withTempDir { tempDir => if (tempDir.exists()) { From 618b6eaf41b86b617c854e1a22337db0afa044c6 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Tue, 26 Sep 2023 21:08:42 -0700 Subject: [PATCH 2/4] fix delta source vs sink --- .../apache/spark/sql/delta/DeltaAnalysis.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 7ae539fde16..41e0911aff0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -553,15 +553,18 @@ class DeltaAnalysis(session: SparkSession) } else deltaMerge d.copy(target = stripTempViewForMergeWrapper(d.target)) - case origStreamWrite @ WriteToStream(_, _, sink: DeltaSink, _, _, _, _, _) => - val streamWrite = origStreamWrite.catalogTable match { - case Some(catalogTable) if sink.catalogTable.isEmpty => - // Hook up the missing catalog table, since we didn't have access to it when we first - // created the DeltaSink in DeltaDataSource.createSink (Spark API). - origStreamWrite.copy(sink = sink.copy(catalogTable = Some(catalogTable))) + case origStreamWrite: WriteToStream => + // The command could have Delta as source and/or sink. We need to look at both. + val streamWrite = origStreamWrite match { + case WriteToStream(_, _, sink @ DeltaSink(_, _, _, _, _, None), _, _, _, _, Some(ct)) => + // The command has a catalog table, but the DeltaSink does not. This happens because + // DeltaDataSource.createSink (Spark API) didn't have access to the catalog table when it + // created the DeltaSink. Fortunately we can fix it up here. + origStreamWrite.copy(sink = sink.copy(catalogTable = Some(ct))) case _ => origStreamWrite } + // We also need to validate the source schema location, if the command has a Delta source. verifyDeltaSourceSchemaLocation( streamWrite.inputQuery, streamWrite.resolvedCheckpointLocation) streamWrite From a4574d59d017e6227e53b689384005bfa20494e3 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Wed, 27 Sep 2023 07:02:42 -0700 Subject: [PATCH 3/4] add a real test --- .../spark/sql/delta/sources/DeltaSink.scala | 10 +--- .../spark/sql/delta/DeltaSinkSuite.scala | 46 ++++++++++++++++++- .../apache/spark/sql/delta/DeltaSuite.scala | 28 +---------- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index d7b217f174a..82e159c7b4a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -46,19 +46,11 @@ case class DeltaSink( partitionColumns: Seq[String], outputMode: OutputMode, options: DeltaOptions, - catalogTable: Option[CatalogTable]) + catalogTable: Option[CatalogTable] = None) extends Sink with ImplicitMetadataOperation with DeltaLogging { - def this( - sqlContext: SQLContext, - path: Path, - partitionColumns: Seq[String], - outputMode: OutputMode, - options: DeltaOptions) = this( - sqlContext, path, partitionColumns, outputMode, options, catalogTable = None) - private val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path) private val sqlConf = sqlContext.sparkSession.sessionState.conf diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala index 722f4de604c..8f298b2d1f5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala @@ -21,7 +21,7 @@ import java.util.Locale // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.CommitInfo -import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaSink} import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest} import org.apache.commons.io.FileUtils import org.scalatest.time.SpanSugar._ @@ -31,7 +31,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, MicroBatchExecution, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1 import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types._ @@ -433,6 +434,47 @@ class DeltaSinkSuite } } + private def verifyDeltaSinkCatalog(f: DataStreamWriter[_] => StreamingQuery): Unit = { + // Create a Delta sink whose target table is defined by our caller. + val input = MemoryStream[Int] + val streamWriter = input.toDF + .writeStream + .format("delta") + .option( + "checkpointLocation", + Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath) + val q = f(streamWriter).asInstanceOf[StreamingQueryWrapper] + + // WARNING: Only the query execution thread is allowed to initialize the logical plan (enforced + // by an assertion in MicroBatchExecution.scala). To avoid flaky failures, run the stream to + // completion, to guarantee the query execution thread ran before we try to access the plan. + try { + input.addData(1, 2, 3) + q.processAllAvailable() + } finally { + q.stop() + } + + val plan = q.streamingQuery.logicalPlan + val WriteToMicroBatchDataSourceV1(catalogTable, sink: DeltaSink, _, _, _, _, _) = plan + assert(catalogTable === sink.catalogTable) + } + + test("DeltaSink.catalogTable is correctly populated - catalog-based table") { + withTable("tab") { + verifyDeltaSinkCatalog(_.toTable("tab")) + } + } + + test("DeltaSink.catalogTable is correctly populated - path-based table") { + withTempDir { tempDir => + if (tempDir.exists()) { + assert(tempDir.delete()) + } + verifyDeltaSinkCatalog(_.start(tempDir.getCanonicalPath)) + } + } + test("can't write out with all columns being partition columns") { withTempDirs { (outputDir, checkpointDir) => val inputData = MemoryStream[Int] diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index a99caf97f48..07cdc418eb3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.delta.actions.{Action, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files.TahoeLogFileIndex -import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaSink} import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} import org.apache.spark.sql.functions.{asc, col, expr, lit, map_values, struct} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery @@ -1184,30 +1184,6 @@ class DeltaSuite extends QueryTest } } - test("streaming write to catalog table via data source API") { - withTable("tab") { - Seq(1, 2, 3).toDF.write.format("delta").saveAsTable("tab") - checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3) - - // Streaming write should correctly update the Delta table. - val input = MemoryStream[Int] - val q = input.toDF - .writeStream - .format("delta") - .option( - "checkpointLocation", - Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath) - .toTable("tab") - try { - input.addData(10, 12, 13) - q.processAllAvailable() - checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3, 10, 12, 13) - } finally { - q.stop() - } - } - } - test("support partitioning with batch data source API - append") { withTempDir { tempDir => if (tempDir.exists()) { From ff4a55800caa22dfd0e2fabf68b2eaa5f6cb8403 Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Wed, 27 Sep 2023 15:16:47 -0700 Subject: [PATCH 4/4] Update DeltaSuite.scala revert unwanted changes --- .../test/scala/org/apache/spark/sql/delta/DeltaSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index 07cdc418eb3..17e6793c6b6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.delta.actions.{Action, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files.TahoeLogFileIndex -import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaSink} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} -import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{asc, col, expr, lit, map_values, struct} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery