Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Propagate catalog table through DeltaSink #2109

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,18 @@ class DeltaAnalysis(session: SparkSession)
} else deltaMerge
d.copy(target = stripTempViewForMergeWrapper(d.target))

case streamWrite: WriteToStream =>
case origStreamWrite: WriteToStream =>
// The command could have Delta as source and/or sink. We need to look at both.
val streamWrite = origStreamWrite match {
case WriteToStream(_, _, sink @ DeltaSink(_, _, _, _, _, None), _, _, _, _, Some(ct)) =>
// The command has a catalog table, but the DeltaSink does not. This happens because
// DeltaDataSource.createSink (Spark API) didn't have access to the catalog table when it
// created the DeltaSink. Fortunately we can fix it up here.
origStreamWrite.copy(sink = sink.copy(catalogTable = Some(ct)))
case _ => origStreamWrite
}

// We also need to validate the source schema location, if the command has a Delta source.
verifyDeltaSourceSchemaLocation(
streamWrite.inputQuery, streamWrite.resolvedCheckpointLocation)
streamWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class DeltaDataSource
throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString)
}
val deltaOptions = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
// NOTE: Spark API doesn't give access to the CatalogTable here, but DeltaAnalysis will pick
// that info out of the containing WriteToStream (if present), and update the sink there.
new DeltaSink(sqlContext, new Path(path), partitionColumns, outputMode, deltaOptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand All @@ -39,12 +40,13 @@ import org.apache.spark.util.Utils
/**
* A streaming sink that writes data into a Delta Table.
*/
class DeltaSink(
case class DeltaSink(
sqlContext: SQLContext,
path: Path,
partitionColumns: Seq[String],
outputMode: OutputMode,
options: DeltaOptions)
options: DeltaOptions,
catalogTable: Option[CatalogTable] = None)
extends Sink
with ImplicitMetadataOperation
with DeltaLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Locale

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.actions.CommitInfo
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaSink}
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
import org.apache.commons.io.FileUtils
import org.scalatest.time.SpanSugar._
Expand All @@ -31,7 +31,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.{MemoryStream, MicroBatchExecution, StreamingQueryWrapper}
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -433,6 +434,47 @@ class DeltaSinkSuite
}
}

private def verifyDeltaSinkCatalog(f: DataStreamWriter[_] => StreamingQuery): Unit = {
// Create a Delta sink whose target table is defined by our caller.
val input = MemoryStream[Int]
val streamWriter = input.toDF
.writeStream
.format("delta")
.option(
"checkpointLocation",
Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath)
val q = f(streamWriter).asInstanceOf[StreamingQueryWrapper]

// WARNING: Only the query execution thread is allowed to initialize the logical plan (enforced
// by an assertion in MicroBatchExecution.scala). To avoid flaky failures, run the stream to
// completion, to guarantee the query execution thread ran before we try to access the plan.
try {
input.addData(1, 2, 3)
q.processAllAvailable()
} finally {
q.stop()
}

val plan = q.streamingQuery.logicalPlan
val WriteToMicroBatchDataSourceV1(catalogTable, sink: DeltaSink, _, _, _, _, _) = plan
assert(catalogTable === sink.catalogTable)
}

test("DeltaSink.catalogTable is correctly populated - catalog-based table") {
withTable("tab") {
verifyDeltaSinkCatalog(_.toTable("tab"))
}
}

test("DeltaSink.catalogTable is correctly populated - path-based table") {
withTempDir { tempDir =>
if (tempDir.exists()) {
assert(tempDir.delete())
}
verifyDeltaSinkCatalog(_.start(tempDir.getCanonicalPath))
}
}

test("can't write out with all columns being partition columns") {
withTempDirs { (outputDir, checkpointDir) =>
val inputData = MemoryStream[Int]
Expand Down
Loading