Skip to content

Commit

Permalink
[Spark] Propagate catalog table through DeltaSink
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-johnson-databricks committed Sep 26, 2023
1 parent 8df0ff0 commit 204eb0c
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,15 @@ class DeltaAnalysis(session: SparkSession)
} else deltaMerge
d.copy(target = stripTempViewForMergeWrapper(d.target))

case streamWrite: WriteToStream =>
case origStreamWrite @ WriteToStream(_, _, sink: DeltaSink, _, _, _, _, _) =>
val streamWrite = origStreamWrite.catalogTable match {
case Some(catalogTable) if sink.catalogTable.isEmpty =>
// Hook up the missing catalog table, since we didn't have access to it when we first
// created the DeltaSink in DeltaDataSource.createSink (Spark API).
origStreamWrite.copy(sink = sink.copy(catalogTable = Some(catalogTable)))
case _ => origStreamWrite
}

verifyDeltaSourceSchemaLocation(
streamWrite.inputQuery, streamWrite.resolvedCheckpointLocation)
streamWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class DeltaDataSource
throw DeltaErrors.outputModeNotSupportedException(getClass.getName, outputMode.toString)
}
val deltaOptions = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
// NOTE: Spark API doesn't give access to the CatalogTable here, but DeltaAnalysis will pick
// that info out of the containing WriteToStream (if present), and update the sink there.
new DeltaSink(sqlContext, new Path(path), partitionColumns, outputMode, deltaOptions)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand All @@ -39,16 +40,25 @@ import org.apache.spark.util.Utils
/**
* A streaming sink that writes data into a Delta Table.
*/
class DeltaSink(
case class DeltaSink(
sqlContext: SQLContext,
path: Path,
partitionColumns: Seq[String],
outputMode: OutputMode,
options: DeltaOptions)
options: DeltaOptions,
catalogTable: Option[CatalogTable])
extends Sink
with ImplicitMetadataOperation
with DeltaLogging {

def this(
sqlContext: SQLContext,
path: Path,
partitionColumns: Seq[String],
outputMode: OutputMode,
options: DeltaOptions) = this(
sqlContext, path, partitionColumns, outputMode, options, catalogTable = None)

private val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)

private val sqlConf = sqlContext.sparkSession.sessionState.conf
Expand Down
24 changes: 24 additions & 0 deletions spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,30 @@ class DeltaSuite extends QueryTest
}
}

test("streaming write to catalog table via data source API") {
withTable("tab") {
Seq(1, 2, 3).toDF.write.format("delta").saveAsTable("tab")
checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3)

// Streaming write should correctly update the Delta table.
val input = MemoryStream[Int]
val q = input.toDF
.writeStream
.format("delta")
.option(
"checkpointLocation",
Utils.createTempDir(namePrefix = "tahoe-test").getCanonicalPath)
.toTable("tab")
try {
input.addData(10, 12, 13)
q.processAllAvailable()
checkDatasetUnorderly(spark.read.table("tab").as[Int], 1, 2, 3, 10, 12, 13)
} finally {
q.stop()
}
}
}

test("support partitioning with batch data source API - append") {
withTempDir { tempDir =>
if (tempDir.exists()) {
Expand Down

0 comments on commit 204eb0c

Please sign in to comment.