Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Propagate catalog table through DeltaSink #2109

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,18 @@ class DeltaAnalysis(session: SparkSession)
} else deltaMerge
d.copy(target = stripTempViewForMergeWrapper(d.target))

case streamWrite: WriteToStream =>
case origStreamWrite: WriteToStream =>
// The command could have Delta as source and/or sink. We need to look at both.
val streamWrite = origStreamWrite match {
case WriteToStream(_, _, sink @ DeltaSink(_, _, _, _, _, None), _, _, _, _, Some(ct)) =>
// The command has a catalog table, but the DeltaSink does not. This happens because
// DeltaDataSource.createSink (Spark API) didn't have access to the catalog table when it
// created the DeltaSink. Fortunately we can fix it up here.
origStreamWrite.copy(sink = sink.copy(catalogTable = Some(ct)))
case _ => origStreamWrite
}

// We also need to validate the source schema location, if the command has a Delta source.
verifyDeltaSourceSchemaLocation(
streamWrite.inputQuery, streamWrite.resolvedCheckpointLocation)
streamWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class DeltaDataSource
throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString)
}
val deltaOptions = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
// NOTE: Spark API doesn't give access to the CatalogTable here, but DeltaAnalysis will pick
// that info out of the containing WriteToStream (if present), and update the sink there.
new DeltaSink(sqlContext, new Path(path), partitionColumns, outputMode, deltaOptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand All @@ -39,16 +40,25 @@ import org.apache.spark.util.Utils
/**
* A streaming sink that writes data into a Delta Table.
*/
class DeltaSink(
case class DeltaSink(
sqlContext: SQLContext,
path: Path,
partitionColumns: Seq[String],
outputMode: OutputMode,
options: DeltaOptions)
options: DeltaOptions,
catalogTable: Option[CatalogTable])
extends Sink
with ImplicitMetadataOperation
with DeltaLogging {

def this(
sqlContext: SQLContext,
path: Path,
partitionColumns: Seq[String],
outputMode: OutputMode,
options: DeltaOptions) = this(
sqlContext, path, partitionColumns, outputMode, options, catalogTable = None)

private val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)

private val sqlConf = sqlContext.sparkSession.sessionState.conf
Expand Down
24 changes: 24 additions & 0 deletions spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,30 @@ class DeltaSuite extends QueryTest
}
}

test("streaming write to catalog table via data source API") {
ryan-johnson-databricks marked this conversation as resolved.
Show resolved Hide resolved
withTable("tab") {
Seq(1, 2, 3).toDF.write.format("delta").saveAsTable("tab")
checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3)

// Streaming write should correctly update the Delta table.
val input = MemoryStream[Int]
val q = input.toDF
.writeStream
.format("delta")
.option(
"checkpointLocation",
Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath)
.toTable("tab")
ryan-johnson-databricks marked this conversation as resolved.
Show resolved Hide resolved
try {
input.addData(10, 12, 13)
q.processAllAvailable()
checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3, 10, 12, 13)
} finally {
q.stop()
}
}
}

test("support partitioning with batch data source API - append") {
withTempDir { tempDir =>
if (tempDir.exists()) {
Expand Down
Loading