Skip to content

Commit

Permalink
[Spark] Add time tracking for file changes and getting dataframe for …
Browse files Browse the repository at this point in the history
…Delta source with/without CDC (#3090)

#### Which Delta project/connector is this regarding?
Spark

## Description
Add time tracking for file changes and getting dataframe for Delta
source with/without CDC

## How was this patch tested?
Existing unit tests

## Does this PR introduce _any_ user-facing changes?
No
  • Loading branch information
anishshri-db authored May 15, 2024
1 parent b9fe0e1 commit 69fd7e4
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

/**
* A case class to help with `Dataset` operations regarding Offset indexing, representing AddFile
Expand Down Expand Up @@ -322,7 +323,13 @@ trait DeltaSourceBase extends Source
excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty)
}

createDataFrame(filteredIndexedFiles)
val (result, duration) = Utils.timeTakenMs {
createDataFrame(filteredIndexedFiles)
}
logInfo(s"Getting dataFrame for delta_log_path=${deltaLog.logPath} with " +
s"startVersion=$startVersion, startIndex=$startIndex, " +
s"isInitialSnapshot=$isInitialSnapshot took timeMs=$duration ms")
result
} finally {
fileActionsIter.close()
}
Expand Down Expand Up @@ -772,36 +779,42 @@ case class DeltaSource(
}
}

var iter = if (isInitialSnapshot) {
Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy
case 1 => getSnapshotAt(fromVersion).toClosable
case 2 => filterAndIndexDeltaLogs(fromVersion + 1)
val (result, duration) = Utils.timeTakenMs {
var iter = if (isInitialSnapshot) {
Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy
case 1 => getSnapshotAt(fromVersion).toClosable
case 2 => filterAndIndexDeltaLogs(fromVersion + 1)
}
} else {
filterAndIndexDeltaLogs(fromVersion)
}
} else {
filterAndIndexDeltaLogs(fromVersion)
}

iter = iter.withClose { it =>
it.filter { file =>
file.version > fromVersion || file.index > fromIndex
}
}

// If endOffset is provided, we are getting a batch on a constructed range so we should use
// the endOffset as the limit.
// Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for
// Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow.
val lastOffsetForThisScan = endOffset.orElse(lastOffsetForTriggerAvailableNow)

lastOffsetForThisScan.foreach { bound =>
iter = iter.withClose { it =>
it.takeWhile { file =>
file.version < bound.reservoirVersion ||
(file.version == bound.reservoirVersion && file.index <= bound.index)
it.filter { file =>
file.version > fromVersion || file.index > fromIndex
}
}

// If endOffset is provided, we are getting a batch on a constructed range so we should use
// the endOffset as the limit.
// Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for
// Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow.
val lastOffsetForThisScan = endOffset.orElse(lastOffsetForTriggerAvailableNow)

lastOffsetForThisScan.foreach { bound =>
iter = iter.withClose { it =>
it.takeWhile { file =>
file.version < bound.reservoirVersion ||
(file.version == bound.reservoirVersion && file.index <= bound.index)
}
}
}
iter
}
iter
logInfo(s"Getting file changes for delta_log_path=${deltaLog.logPath} with " +
s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " +
s"took timeMs=$duration ms")
result
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.delta.actions.DomainMetadata
import org.apache.spark.sql.delta.commands.cdc.CDCReader

import org.apache.spark.sql.DataFrame
import org.apache.spark.util.Utils

/**
* Helper functions for CDC-specific handling for DeltaSource.
Expand Down Expand Up @@ -202,16 +203,22 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
(v, indexFiles.filter(_.hasFileAction).map { _.getFileAction }.toSeq)
}

val cdcInfo = CDCReader.changesToDF(
readSnapshotDescriptor,
startVersion,
endOffset.reservoirVersion,
groupedFileActions,
spark,
isStreaming = true
)
val (result, duration) = Utils.timeTakenMs {
val cdcInfo = CDCReader.changesToDF(
readSnapshotDescriptor,
startVersion,
endOffset.reservoirVersion,
groupedFileActions,
spark,
isStreaming = true
)

cdcInfo.fileChangeDf
cdcInfo.fileChangeDf
}
logInfo(s"Getting CDC dataFrame for delta_log_path=${deltaLog.logPath} with " +
s"startVersion=$startVersion, startIndex=$startIndex, isInitialSnapshot=$isInitialSnapshot " +
s"took timeMs=$duration ms")
result
}

/**
Expand Down Expand Up @@ -243,60 +250,68 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
actions, version, fromVersion, endOffset.map(_.reservoirVersion),
verifyMetadataAction && !trackingMetadataChange)
val itr = addBeginAndEndIndexOffsetsForVersion(version,
getMetadataOrProtocolChangeIndexedFileIterator(metadataOpt, protocolOpt, version) ++
fileActions.zipWithIndex.map {
case (action: AddFile, index) =>
IndexedFile(
version,
index.toLong,
action,
shouldSkip = skipIndexedFile)
case (cdcFile: AddCDCFile, index) =>
IndexedFile(
version,
index.toLong,
add = null,
cdc = cdcFile,
shouldSkip = skipIndexedFile)
case (remove: RemoveFile, index) =>
IndexedFile(
version,
index.toLong,
add = null,
remove = remove,
shouldSkip = skipIndexedFile)
getMetadataOrProtocolChangeIndexedFileIterator(metadataOpt, protocolOpt, version) ++
fileActions.zipWithIndex.map {
case (action: AddFile, index) =>
IndexedFile(
version,
index.toLong,
action,
shouldSkip = skipIndexedFile)
case (cdcFile: AddCDCFile, index) =>
IndexedFile(
version,
index.toLong,
add = null,
cdc = cdcFile,
shouldSkip = skipIndexedFile)
case (remove: RemoveFile, index) =>
IndexedFile(
version,
index.toLong,
add = null,
remove = remove,
shouldSkip = skipIndexedFile)
})
(version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false))
}
}

val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
// get the latest snapshot of the table as well.
val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m =>
// When we get the snapshot the dataChange is false for the AddFile actions
// We need to set it to true for it to be considered by the CDCReader.
if (m.add != null) {
m.copy(add = m.add.copy(dataChange = true))
} else {
m
val (result, duration) = Utils.timeTakenMs {
val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
// get the latest snapshot of the table as well.
val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m =>
// When we get the snapshot the dataChange is false for the AddFile actions
// We need to set it to true for it to be considered by the CDCReader.
if (m.add != null) {
m.copy(add = m.add.copy(dataChange = true))
} else {
m
}
}
val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator((
fromVersion,
new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true)
))

snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1)
} else {
filterAndIndexDeltaLogs(fromVersion)
}
val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator((
fromVersion,
new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true)
))

snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1)
} else {
filterAndIndexDeltaLogs(fromVersion)
}
// In this case, filterFiles will consume the available capacity. We use takeWhile
// to stop the iteration when we reach the limit which will save us from reading
// unnecessary log files.
iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) =>
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset))
// In this case, filterFiles will consume the available capacity. We use takeWhile
// to stop the iteration when we reach the limit which will save us from reading
// unnecessary log files.
iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) =>
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset))
}
}

logInfo(s"Getting CDC file changes for delta_log_path=${deltaLog.logPath} with " +
s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " +
s"took timeMs=$duration ms")
result
}

/////////////////////
Expand Down

0 comments on commit 69fd7e4

Please sign in to comment.