Skip to content

Commit 9135d40

Browse files
committed
[spark] Add currentSnapshotId to the description in paimon scan
1 parent 0446894 commit 9135d40

File tree

1 file changed

+41
-3
lines changed

1 file changed

+41
-3
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import org.apache.paimon.spark.sources.PaimonMicroBatchStream
2626
import org.apache.paimon.spark.statistics.StatisticsHelper
2727
import org.apache.paimon.table.{DataTable, InnerTable}
2828
import org.apache.paimon.table.source.{InnerTableScan, Split}
29+
import org.apache.paimon.table.source.snapshot.TimeTravelUtil
30+
import org.apache.paimon.table.system.FilesTable
31+
import org.apache.paimon.utils.{SnapshotManager, TagManager}
2932

3033
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
3134
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
@@ -132,12 +135,47 @@ abstract class PaimonBaseScan(
132135
} else {
133136
""
134137
}
135-
val latestSnapshotIdStr = if (table.latestSnapshot().isPresent) {
136-
s", LatestSnapshotId: [${table.latestSnapshot().get.id}],"
138+
139+
val latestSnapshotId = if (table.latestSnapshot().isPresent) {
140+
Some(table.latestSnapshot().get.id)
141+
} else {
142+
None
143+
}
144+
145+
val latestSnapshotIdStr = if (latestSnapshotId.isDefined) {
146+
s", LatestSnapshotId: [${latestSnapshotId.get}]"
137147
} else {
138148
""
139149
}
140-
s"PaimonScan: [${table.name}]" + latestSnapshotIdStr + pushedFiltersStr + pushedTopNFilterStr +
150+
151+
val currentSnapshot =
152+
try {
153+
val dataTable = table match {
154+
case dt: DataTable => dt
155+
case _ => null
156+
}
157+
158+
if (dataTable != null) {
159+
TimeTravelUtil.tryTravelToSnapshot(
160+
coreOptions.toConfiguration,
161+
dataTable.snapshotManager(),
162+
dataTable.tagManager())
163+
} else {
164+
Optional.empty()
165+
}
166+
} catch {
167+
case _: Exception => Optional.empty()
168+
}
169+
170+
val currentSnapshotIdStr = if (currentSnapshot.isPresent) {
171+
s", currentSnapshotId: [${currentSnapshot.get().id}]"
172+
} else if (latestSnapshotId.isDefined) {
173+
s", currentSnapshotId: [${latestSnapshotId.get}]"
174+
} else {
175+
""
176+
}
177+
178+
s"PaimonScan: [${table.name}]" + latestSnapshotIdStr + currentSnapshotIdStr + pushedFiltersStr + pushedTopNFilterStr +
141179
pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
142180
}
143181
}

0 commit comments

Comments
 (0)