diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index a37f8b71b35..118f1dc20da 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -15,6 +15,7 @@ */ package io.delta.kernel.internal; +import java.io.IOException; import java.util.Optional; import io.delta.kernel.ScanBuilder; @@ -22,6 +23,7 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.types.StructType; +import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; @@ -41,6 +43,7 @@ public class SnapshotImpl implements Snapshot { private final Protocol protocol; private final Metadata metadata; private final LogSegment logSegment; + private Optional inCommitTimestampOpt; public SnapshotImpl( Path dataPath, @@ -55,6 +58,7 @@ public SnapshotImpl( this.logReplay = logReplay; this.protocol = protocol; this.metadata = metadata; + this.inCommitTimestampOpt = Optional.empty(); } @Override @@ -121,4 +125,38 @@ public Path getLogPath() { public Path getDataPath() { return dataPath; } + + /** + * Returns the timestamp of the latest commit of this snapshot. + * For an uninitialized snapshot, this returns -1. + *

+ * When InCommitTimestampTableFeature is enabled, the timestamp + * is retrieved from the CommitInfo of the latest commit which + * can result in an IO operation. + *

+ * For non-ICT tables, this is the same as the file modification time of the latest commit in + * the snapshot. + * + * @param engine the engine to use for IO operations + * @return the timestamp of the latest commit + */ + public long getTimestamp(Engine engine) { + if (TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) { + if (!inCommitTimestampOpt.isPresent()) { + try { + Optional commitInfoOpt = CommitInfo.getCommitInfoOpt( + engine, logPath, logSegment.version); + inCommitTimestampOpt = Optional.of(CommitInfo.getRequiredInCommitTimestamp( + commitInfoOpt, + String.valueOf(logSegment.version), + dataPath)); + } catch (IOException e) { + throw new RuntimeException("Failed to get inCommitTimestamp with IO", e); + } + } + return inCommitTimestampOpt.get(); + } else { + return logSegment.lastCommitTimestamp; + } + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 235f48ee47a..00ef10e3c18 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -23,26 +23,43 @@ import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.snapshot.SnapshotManager; +import io.delta.kernel.internal.util.Clock; public class TableImpl implements Table { public static Table forPath(Engine engine, String path) { + return forPath(engine, path, System::currentTimeMillis); + } + + /** + * Instantiate a table object for the Delta Lake table at the given path. It takes an additional + * parameter called {@link Clock} which helps in testing. + * + * @param engine {@link Engine} instance to use in Delta Kernel. + * @param path location of the table. + * @param clock {@link Clock} instance to use for time-related operations. + * + * @return an instance of {@link Table} representing the Delta table at the given path + */ + public static Table forPath(Engine engine, String path, Clock clock) { String resolvedPath; try { resolvedPath = engine.getFileSystemClient().resolvePath(path); } catch (IOException io) { throw new RuntimeException(io); } - return new TableImpl(resolvedPath); + return new TableImpl(resolvedPath, clock); } private final SnapshotManager snapshotManager; private final String tablePath; + private final Clock clock; - public TableImpl(String tablePath) { + public TableImpl(String tablePath, Clock clock) { this.tablePath = tablePath; final Path dataPath = new Path(tablePath); final Path logPath = new Path(dataPath, "_delta_log"); this.snapshotManager = new SnapshotManager(logPath, dataPath); + this.clock = clock; } @Override @@ -81,6 +98,10 @@ public TransactionBuilder createTransactionBuilder( return new TransactionBuilderImpl(this, engineInfo, operation); } + public Clock getClock() { + return clock; + } + protected Path getDataPath() { return new Path(tablePath); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java index 4a11a19b67b..898f428e108 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionBuilderImpl.java @@ -155,7 +155,8 @@ public Transaction build(Engine engine) { metadata, setTxnOpt, shouldUpdateMetadata, - shouldUpdateProtocol); + shouldUpdateProtocol, + table.getClock()); } /** @@ -206,6 +207,11 @@ private class InitialSnapshot extends SnapshotImpl { InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) { super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata); } + + @Override + public long getTimestamp(Engine engine) { + return -1L; + } } private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 7375b14b3fb..5dd39007389 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -37,6 +37,7 @@ import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.ConflictChecker; import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState; +import io.delta.kernel.internal.util.Clock; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.InCommitTimestampUtils; import io.delta.kernel.internal.util.VectorUtils; @@ -72,6 +73,7 @@ public class TransactionImpl private final SnapshotImpl readSnapshot; private final Optional setTxnOpt; private final boolean shouldUpdateProtocol; + private final Clock clock; private Metadata metadata; private boolean shouldUpdateMetadata; @@ -88,7 +90,8 @@ public TransactionImpl( Metadata metadata, Optional setTxnOpt, boolean shouldUpdateMetadata, - boolean shouldUpdateProtocol) { + boolean shouldUpdateProtocol, + Clock clock) { this.isNewTable = isNewTable; this.dataPath = dataPath; this.logPath = logPath; @@ -100,6 +103,7 @@ public TransactionImpl( this.setTxnOpt = setTxnOpt; this.shouldUpdateMetadata = shouldUpdateMetadata; this.shouldUpdateProtocol = shouldUpdateProtocol; + this.clock = clock; } @Override @@ -239,14 +243,22 @@ public Optional getSetTxnOpt() { return setTxnOpt; } + /** + * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This + * can result in an additional file read and that this will only happen if ICT is enabled. + */ private Optional generateInCommitTimestampForFirstCommitAttempt( Engine engine, long currentTimestamp) { - boolean ictEnabled = IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); - return ictEnabled ? Optional.of(currentTimestamp) : Optional.empty(); + if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) { + long lastCommitTimestamp = readSnapshot.getTimestamp(engine); + return Optional.of(Math.max(currentTimestamp, lastCommitTimestamp + 1)); + } else { + return Optional.empty(); + } } private CommitInfo generateCommitAction(Engine engine) { - long commitAttemptStartTime = System.currentTimeMillis(); + long commitAttemptStartTime = clock.getTimeMillis(); return new CommitInfo( generateInCommitTimestampForFirstCommitAttempt( engine, commitAttemptStartTime), diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java index f45737bac69..c229682b117 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/CommitInfo.java @@ -15,16 +15,27 @@ */ package io.delta.kernel.internal.actions; +import java.io.IOException; import java.util.*; import java.util.stream.IntStream; import static java.util.stream.Collectors.toMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.InvalidTableException; import io.delta.kernel.types.*; - +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; import io.delta.kernel.internal.data.GenericRow; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.VectorUtils; +import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue; /** @@ -84,6 +95,8 @@ public static CommitInfo fromColumnVector(ColumnVector vector, int rowId) { .boxed() .collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i)); + private static final Logger logger = LoggerFactory.getLogger(CommitInfo.class); + private final long timestamp; private final String engineInfo; private final String operation; @@ -148,4 +161,64 @@ public Row toRow() { return new GenericRow(CommitInfo.FULL_SCHEMA, commitInfo); } + + /** + * Returns the `inCommitTimestamp` of the given `commitInfoOpt` if it is defined. + * Throws an exception if `commitInfoOpt` is empty or contains an empty `inCommitTimestamp`. + */ + public static long getRequiredInCommitTimestamp( + Optional commitInfoOpt, String version, Path dataPath) { + CommitInfo commitInfo = commitInfoOpt + .orElseThrow(() -> new InvalidTableException( + dataPath.toString(), + String.format("This table has the feature inCommitTimestamp-preview " + + "enabled which requires the presence of the CommitInfo action " + + "in every commit. However, the CommitInfo action is " + + "missing from commit version %s.", version))); + return commitInfo + .inCommitTimestamp + .orElseThrow(() -> new InvalidTableException( + dataPath.toString(), + String.format("This table has the feature inCommitTimestamp-preview " + + "enabled which requires the presence of inCommitTimestamp in the " + + "CommitInfo action. However, this field has not " + + "been set in commit version %s.", version))); + } + + /** Get the persisted commit info (if available) for the given delta file. */ + public static Optional getCommitInfoOpt( + Engine engine, + Path logPath, + long version) throws IOException { + final FileStatus file = FileStatus.of( + FileNames.deltaFile(logPath, version), /* path */ + 0, /* size */ + 0 /* modification time */); + final StructType COMMITINFO_READ_SCHEMA = new StructType() + .add("commitInfo", CommitInfo.FULL_SCHEMA); + try (CloseableIterator columnarBatchIter = engine.getJsonHandler() + .readJsonFiles( + singletonCloseableIterator(file), + COMMITINFO_READ_SCHEMA, + Optional.empty())) { + while (columnarBatchIter.hasNext()) { + final ColumnarBatch columnarBatch = columnarBatchIter.next(); + assert(columnarBatch.getSchema().equals(COMMITINFO_READ_SCHEMA)); + final ColumnVector commitInfoVector = columnarBatch.getColumnVector(0); + for (int i = 0; i < commitInfoVector.getSize(); i++) { + if (!commitInfoVector.isNullAt(i)) { + CommitInfo commitInfo = CommitInfo.fromColumnVector(commitInfoVector, i); + if (commitInfo != null) { + return Optional.of(commitInfo); + } + } + } + } + } catch (IOException ex) { + throw new RuntimeException("Could not close iterator", ex); + } + + logger.info("No commit info found for commit of version {}", version); + return Optional.empty(); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/Clock.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/Clock.java new file mode 100644 index 00000000000..9095b8d0947 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/Clock.java @@ -0,0 +1,24 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util; + +/** + * An interface to represent clocks, so that they can be mocked out in unit tests. + */ +public interface Clock { + /** @return Current system time, in ms. */ + long getTimeMillis(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ManualClock.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ManualClock.java new file mode 100644 index 00000000000..0a6fd324e1c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/ManualClock.java @@ -0,0 +1,39 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util; + +/** + * A clock whose time can be manually set and modified. + */ +public class ManualClock implements Clock { + private long timeMillis; + public ManualClock(long timeMillis) { + this.timeMillis = timeMillis; + } + + /** + * @param timeToSet new time (in milliseconds) that the clock should represent + */ + public synchronized void setTime(long timeToSet) { + this.timeMillis = timeToSet; + this.notifyAll(); + } + + @Override + public long getTimeMillis() { + return timeMillis; + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 4f0736743c8..eb570c2bf89 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -24,13 +24,14 @@ import io.delta.kernel.internal.actions.{Metadata, Protocol, SingleAction} import io.delta.kernel.internal.fs.{Path => DeltaPath} import io.delta.kernel.internal.util.FileNames import io.delta.kernel.internal.util.Utils.singletonCloseableIterator -import io.delta.kernel.internal.{SnapshotImpl, TableConfig} +import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl} import io.delta.kernel.utils.FileStatus import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult} import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row} import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal.ofInt +import io.delta.kernel.internal.util.Clock import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.IntegerType.INTEGER @@ -279,10 +280,11 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { isNewTable: Boolean = false, schema: StructType = null, partCols: Seq[String] = null, - tableProperties: Map[String, String] = null): Transaction = { + tableProperties: Map[String, String] = null, + clock: Clock = () => System.currentTimeMillis): Transaction = { var txnBuilder = createWriteTxnBuilder( - Table.forPath(engine, tablePath)) + TableImpl.forPath(engine, tablePath, clock)) if (isNewTable) { txnBuilder = txnBuilder.withSchema(engine, schema) @@ -303,9 +305,10 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { schema: StructType = null, partCols: Seq[String] = null, data: Seq[(Map[String, Literal], Seq[FilteredColumnarBatch])], + clock: Clock = () => System.currentTimeMillis, tableProperties: Map[String, String] = null): TransactionCommitResult = { - val txn = createTxn(engine, tablePath, isNewTable, schema, partCols, tableProperties) + val txn = createTxn(engine, tablePath, isNewTable, schema, partCols, tableProperties, clock) val txnState = txn.getTransactionState(engine) val actions = data.map { case (partValues, partData) => @@ -338,13 +341,15 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { engine: Engine, tablePath: String, isNewTable: Boolean = true, - key: TableConfig[_ <: Any], value: String, expectedValue: Any): Unit = { + key: TableConfig[_ <: Any], value: String, expectedValue: Any, + clock: Clock = () => System.currentTimeMillis): Unit = { val table = Table.forPath(engine, tablePath) createTxn( engine, - tablePath, isNewTable, testSchema, Seq.empty, tableProperties = Map(key.getKey -> value)) + tablePath, + isNewTable, testSchema, Seq.empty, tableProperties = Map(key.getKey -> value), clock) .commit(engine, emptyIterable()) val snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala index bec4cbc9fa6..ff2773e80fd 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala @@ -18,11 +18,13 @@ package io.delta.kernel.defaults import io.delta.kernel.Operation.{CREATE_TABLE, WRITE} import io.delta.kernel._ import io.delta.kernel.engine.Engine +import io.delta.kernel.exceptions.InvalidTableException import io.delta.kernel.expressions.Literal import io.delta.kernel.internal.actions.{CommitInfo, SingleAction} import io.delta.kernel.internal.fs.Path import io.delta.kernel.internal.util.{FileNames, VectorUtils} -import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl} +import io.delta.kernel.internal.{DeltaHistoryManager, SnapshotImpl, TableImpl} +import io.delta.kernel.internal.util.ManualClock import io.delta.kernel.internal.util.Utils.singletonCloseableIterator import io.delta.kernel.types.IntegerType.INTEGER import io.delta.kernel.types._ @@ -33,23 +35,29 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.{ListMap, Seq} import scala.collection.mutable import io.delta.kernel.internal.TableConfig._ -import io.delta.kernel.utils.FileStatus; +import io.delta.kernel.utils.FileStatus +import io.delta.kernel.internal.actions.SingleAction.createCommitInfoSingleAction class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { test("Enable ICT on commit 0") { withTempDirAndEngine { (tablePath, engine) => val beforeCommitAttemptStartTime = System.currentTimeMillis + val clock = new ManualClock(beforeCommitAttemptStartTime) val table = Table.forPath(engine, tablePath) + clock.setTime(beforeCommitAttemptStartTime + 1) setTablePropAndVerify( engine = engine, tablePath = tablePath, key = IN_COMMIT_TIMESTAMPS_ENABLED, value = "true", - expectedValue = true) + expectedValue = true, + clock = clock) val ver0Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] - assert(getInCommitTimestamp(engine, table, 0).get >= beforeCommitAttemptStartTime) + + assert(ver0Snapshot.getTimestamp(engine) == beforeCommitAttemptStartTime + 1) + assert(getInCommitTimestamp(engine, table, 0).get == ver0Snapshot.getTimestamp(engine)) assertHasWriterFeature(ver0Snapshot, "inCommitTimestamp-preview") } } @@ -70,7 +78,6 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { assertHasNoWriterFeature(ver0Snapshot, "inCommitTimestamp-preview") assert(getInCommitTimestamp(engine, table, 0).isEmpty) - val beforeCommitAttemptStartTime = System.currentTimeMillis setTablePropAndVerify( engine = engine, tablePath = tablePath, @@ -81,8 +88,126 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { val ver1Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] assertHasWriterFeature(ver1Snapshot, "inCommitTimestamp-preview") - assert( - getInCommitTimestamp(engine, table, 1).get >= beforeCommitAttemptStartTime) + assert(ver1Snapshot.getTimestamp(engine) > ver0Snapshot.getTimestamp(engine)) + assert(getInCommitTimestamp(engine, table, 1).get == ver1Snapshot.getTimestamp(engine)) + } + } + + test("InCommitTimestamps are monotonic even when the clock is skewed") { + withTempDirAndEngine { (tablePath, engine) => + val startTime = System.currentTimeMillis() + val clock = new ManualClock(startTime) + val table = Table.forPath(engine, tablePath) + + appendData( + engine, + tablePath, + isNewTable = true, + testSchema, + partCols = Seq.empty, + data = Seq(Map.empty[String, Literal] -> dataBatches1), + clock = clock, + tableProperties = Map(IN_COMMIT_TIMESTAMPS_ENABLED.getKey -> "true") + ) + + val ver1Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val ver1Timestamp = ver1Snapshot.getTimestamp(engine) + assert(IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(ver1Snapshot.getMetadata)) + + clock.setTime(startTime - 10000) + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches2), + clock = clock + ) + + val ver2Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val ver2Timestamp = ver2Snapshot.getTimestamp(engine) + assert(ver2Timestamp == ver1Timestamp + 1) + } + } + + test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + + setTablePropAndVerify( + engine = engine, + tablePath = tablePath, + key = IN_COMMIT_TIMESTAMPS_ENABLED, + value = "true", + expectedValue = true) + // Remove CommitInfo from the commit. + val logPath = new Path(table.getPath(engine), "_delta_log") + val file = FileStatus.of(FileNames.deltaFile(logPath, 0), 0, 0) + val columnarBatches = + engine.getJsonHandler.readJsonFiles( + singletonCloseableIterator(file), + SingleAction.FULL_SCHEMA, + Optional.empty()) + assert(columnarBatches.hasNext) + val rows = columnarBatches.next().getRows + val rowsWithoutCommitInfo = + rows.filter(row => row.isNullAt(row.getSchema.indexOf("commitInfo"))) + engine + .getJsonHandler + .writeJsonFileAtomically( + FileNames.deltaFile(logPath, 0), rowsWithoutCommitInfo, true /* overwrite */) + + val ex = intercept[InvalidTableException] { + table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl].getTimestamp(engine) + } + assert(ex.getMessage.contains(String.format( + "This table has the feature %s enabled which requires the presence of the " + + "CommitInfo action in every commit. However, the CommitInfo action is " + + "missing from commit version %s.", "inCommitTimestamp-preview", "0"))) + } + } + + test("Missing CommitInfo.inCommitTimestamp should result in a " + + "DELTA_MISSING_COMMIT_TIMESTAMP exception") { + withTempDirAndEngine { (tablePath, engine) => + val table = Table.forPath(engine, tablePath) + + setTablePropAndVerify( + engine, tablePath, isNewTable = true, IN_COMMIT_TIMESTAMPS_ENABLED, "true", true) + // Remove CommitInfo.inCommitTimestamp from the commit. + val logPath = new Path(table.getPath(engine), "_delta_log") + val file = FileStatus.of(FileNames.deltaFile(logPath, 0), 0, 0) + val columnarBatches = + engine.getJsonHandler.readJsonFiles( + singletonCloseableIterator(file), + SingleAction.FULL_SCHEMA, + Optional.empty()) + assert(columnarBatches.hasNext) + val rows = columnarBatches.next().getRows + val commitInfoOpt = CommitInfo.getCommitInfoOpt(engine, logPath, 0) + assert(commitInfoOpt.isPresent) + val commitInfo = commitInfoOpt.get + commitInfo.setInCommitTimestamp(Optional.empty()) + val rowsWithoutCommitInfoInCommitTimestamp = + rows.map(row => { + val commitInfoOrd = row.getSchema.indexOf("commitInfo") + if (row.isNullAt(commitInfoOrd)) { + row + } else { + createCommitInfoSingleAction(commitInfo.toRow) + } + }) + engine + .getJsonHandler + .writeJsonFileAtomically( + FileNames.deltaFile(logPath, 0), + rowsWithoutCommitInfoInCommitTimestamp, true /* overwrite */) + + val ex = intercept[InvalidTableException] { + table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl].getTimestamp(engine) + } + assert(ex.getMessage.contains(String.format( + "This table has the feature %s enabled which requires the presence of " + + "inCommitTimestamp in the CommitInfo action. However, this field has not " + + "been set in commit version %s.", "inCommitTimestamp-preview", "0"))) } } @@ -126,7 +251,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { assertMetadataProp( ver1Snapshot, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP, - Optional.of(getInCommitTimestamp(engine, table, version = 1).get)) + Optional.of(ver1Snapshot.getTimestamp(engine))) assertMetadataProp( ver1Snapshot, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION, @@ -144,7 +269,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { assertMetadataProp( ver2Snapshot, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP, - Optional.of(getInCommitTimestamp(engine, table, version = 1).get)) + Optional.of(ver1Snapshot.getTimestamp(engine))) assertMetadataProp( ver2Snapshot, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION, @@ -273,16 +398,12 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { * is not null, otherwise return null. */ private def getInCommitTimestamp(engine: Engine, table: Table, version: Long): Option[Long] = { - readCommitFile(engine, table.getPath(engine), version, row => { - val commitInfoOrd = row.getSchema.indexOf("commitInfo") - if (!row.isNullAt(commitInfoOrd)) { - val commitInfo = row.getStruct(commitInfoOrd) - val inCommitTimestampOrd = commitInfo.getSchema.indexOf("inCommitTimestamp") - if (!commitInfo.isNullAt(inCommitTimestampOrd)) { - return Some(commitInfo.getLong(inCommitTimestampOrd)) - } - } + val logPath = new Path(table.getPath(engine), "_delta_log") + val commitInfoOpt = CommitInfo.getCommitInfoOpt(engine, logPath, version) + if (commitInfoOpt.isPresent && commitInfoOpt.get.getInCommitTimestamp.isPresent) { + Some(commitInfoOpt.get.getInCommitTimestamp.get) + } else { Option.empty - }).map{ case inCommitTimestamp: Long => Some(inCommitTimestamp)}.getOrElse(Option.empty) + } } }