From 69fd7e4ecbff136ec77a6c37fc76fada6dadfc41 Mon Sep 17 00:00:00 2001 From: Anish <100322362+anishshri-db@users.noreply.github.com> Date: Wed, 15 May 2024 11:35:16 -0700 Subject: [PATCH] [Spark] Add time tracking for file changes and getting dataframe for Delta source with/without CDC (#3090) #### Which Delta project/connector is this regarding? Spark ## Description Add time tracking for file changes and getting dataframe for Delta source with/without CDC ## How was this patch tested? Existing unit tests ## Does this PR introduce _any_ user-facing changes? No --- .../spark/sql/delta/sources/DeltaSource.scala | 63 +++++---- .../delta/sources/DeltaSourceCDCSupport.scala | 123 ++++++++++-------- 2 files changed, 107 insertions(+), 79 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 160cf1440e0..bddd5a7703e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * A case class to help with `Dataset` operations regarding Offset indexing, representing AddFile @@ -322,7 +323,13 @@ trait DeltaSourceBase extends Source excludeRegex.forall(_.findFirstIn(indexedFile.getFileAction.path).isEmpty) } - createDataFrame(filteredIndexedFiles) + val (result, duration) = Utils.timeTakenMs { + createDataFrame(filteredIndexedFiles) + } + logInfo(s"Getting dataFrame for delta_log_path=${deltaLog.logPath} with " + + s"startVersion=$startVersion, startIndex=$startIndex, " + + s"isInitialSnapshot=$isInitialSnapshot took timeMs=$duration ms") + result } finally { fileActionsIter.close() } @@ -772,36 +779,42 @@ case class DeltaSource( } } - var iter = if (isInitialSnapshot) { - Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy - case 1 => getSnapshotAt(fromVersion).toClosable - case 2 => filterAndIndexDeltaLogs(fromVersion + 1) + val (result, duration) = Utils.timeTakenMs { + var iter = if (isInitialSnapshot) { + Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy + case 1 => getSnapshotAt(fromVersion).toClosable + case 2 => filterAndIndexDeltaLogs(fromVersion + 1) + } + } else { + filterAndIndexDeltaLogs(fromVersion) } - } else { - filterAndIndexDeltaLogs(fromVersion) - } - iter = iter.withClose { it => - it.filter { file => - file.version > fromVersion || file.index > fromIndex - } - } - - // If endOffset is provided, we are getting a batch on a constructed range so we should use - // the endOffset as the limit. - // Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for - // Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow. - val lastOffsetForThisScan = endOffset.orElse(lastOffsetForTriggerAvailableNow) - - lastOffsetForThisScan.foreach { bound => iter = iter.withClose { it => - it.takeWhile { file => - file.version < bound.reservoirVersion || - (file.version == bound.reservoirVersion && file.index <= bound.index) + it.filter { file => + file.version > fromVersion || file.index > fromIndex } } + + // If endOffset is provided, we are getting a batch on a constructed range so we should use + // the endOffset as the limit. + // Otherwise, we are looking for a new offset, so we try to use the latestOffset we found for + // Trigger.availableNow() as limit. We know endOffset <= lastOffsetForTriggerAvailableNow. + val lastOffsetForThisScan = endOffset.orElse(lastOffsetForTriggerAvailableNow) + + lastOffsetForThisScan.foreach { bound => + iter = iter.withClose { it => + it.takeWhile { file => + file.version < bound.reservoirVersion || + (file.version == bound.reservoirVersion && file.index <= bound.index) + } + } + } + iter } - iter + logInfo(s"Getting file changes for delta_log_path=${deltaLog.logPath} with " + + s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " + + s"took timeMs=$duration ms") + result } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index 85bed2020fc..295bc74ebe4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.DataFrame +import org.apache.spark.util.Utils /** * Helper functions for CDC-specific handling for DeltaSource. @@ -202,16 +203,22 @@ trait DeltaSourceCDCSupport { self: DeltaSource => (v, indexFiles.filter(_.hasFileAction).map { _.getFileAction }.toSeq) } - val cdcInfo = CDCReader.changesToDF( - readSnapshotDescriptor, - startVersion, - endOffset.reservoirVersion, - groupedFileActions, - spark, - isStreaming = true - ) + val (result, duration) = Utils.timeTakenMs { + val cdcInfo = CDCReader.changesToDF( + readSnapshotDescriptor, + startVersion, + endOffset.reservoirVersion, + groupedFileActions, + spark, + isStreaming = true + ) - cdcInfo.fileChangeDf + cdcInfo.fileChangeDf + } + logInfo(s"Getting CDC dataFrame for delta_log_path=${deltaLog.logPath} with " + + s"startVersion=$startVersion, startIndex=$startIndex, isInitialSnapshot=$isInitialSnapshot " + + s"took timeMs=$duration ms") + result } /** @@ -243,60 +250,68 @@ trait DeltaSourceCDCSupport { self: DeltaSource => actions, version, fromVersion, endOffset.map(_.reservoirVersion), verifyMetadataAction && !trackingMetadataChange) val itr = addBeginAndEndIndexOffsetsForVersion(version, - getMetadataOrProtocolChangeIndexedFileIterator(metadataOpt, protocolOpt, version) ++ - fileActions.zipWithIndex.map { - case (action: AddFile, index) => - IndexedFile( - version, - index.toLong, - action, - shouldSkip = skipIndexedFile) - case (cdcFile: AddCDCFile, index) => - IndexedFile( - version, - index.toLong, - add = null, - cdc = cdcFile, - shouldSkip = skipIndexedFile) - case (remove: RemoveFile, index) => - IndexedFile( - version, - index.toLong, - add = null, - remove = remove, - shouldSkip = skipIndexedFile) + getMetadataOrProtocolChangeIndexedFileIterator(metadataOpt, protocolOpt, version) ++ + fileActions.zipWithIndex.map { + case (action: AddFile, index) => + IndexedFile( + version, + index.toLong, + action, + shouldSkip = skipIndexedFile) + case (cdcFile: AddCDCFile, index) => + IndexedFile( + version, + index.toLong, + add = null, + cdc = cdcFile, + shouldSkip = skipIndexedFile) + case (remove: RemoveFile, index) => + IndexedFile( + version, + index.toLong, + add = null, + remove = remove, + shouldSkip = skipIndexedFile) }) (version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false)) } } - val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) { - // If we are reading change data from the start of the table we need to - // get the latest snapshot of the table as well. - val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m => - // When we get the snapshot the dataChange is false for the AddFile actions - // We need to set it to true for it to be considered by the CDCReader. - if (m.add != null) { - m.copy(add = m.add.copy(dataChange = true)) - } else { - m + val (result, duration) = Utils.timeTakenMs { + val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) { + // If we are reading change data from the start of the table we need to + // get the latest snapshot of the table as well. + val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m => + // When we get the snapshot the dataChange is false for the AddFile actions + // We need to set it to true for it to be considered by the CDCReader. + if (m.add != null) { + m.copy(add = m.add.copy(dataChange = true)) + } else { + m + } } + val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator(( + fromVersion, + new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true) + )) + + snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1) + } else { + filterAndIndexDeltaLogs(fromVersion) } - val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator(( - fromVersion, - new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true) - )) - snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1) - } else { - filterAndIndexDeltaLogs(fromVersion) - } - // In this case, filterFiles will consume the available capacity. We use takeWhile - // to stop the iteration when we reach the limit which will save us from reading - // unnecessary log files. - iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) => - (version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset)) + // In this case, filterFiles will consume the available capacity. We use takeWhile + // to stop the iteration when we reach the limit which will save us from reading + // unnecessary log files. + iter.takeWhile(_ => limits.forall(_.hasCapacity)).map { case (version, indexItr) => + (version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset)) + } } + + logInfo(s"Getting CDC file changes for delta_log_path=${deltaLog.logPath} with " + + s"fromVersion=$fromVersion, fromIndex=$fromIndex, isInitialSnapshot=$isInitialSnapshot " + + s"took timeMs=$duration ms") + result } /////////////////////