diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 5dd39007389..47dd0e712f7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -136,8 +136,6 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data do { logger.info("Committing transaction as version = {}.", commitAsVersion); try { - // TODO Update the attemptCommitInfo and metadata based on the conflict - // resolution. return doCommit(engine, commitAsVersion, attemptCommitInfo, dataActions); } catch (FileAlreadyExistsException fnfe) { logger.info("Concurrent write detected when committing as version = {}. " + @@ -149,6 +147,20 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable data "New commit version %d should be greater than the previous commit " + "attempt version %d.", newCommitAsVersion, commitAsVersion); commitAsVersion = newCommitAsVersion; + Optional updatedInCommitTimestamp = + getUpdatedInCommitTimestampAfterConflict( + rebaseState.getLatestCommitTimestamp(), + attemptCommitInfo.getInCommitTimestamp()); + if (updatedInCommitTimestamp.isPresent()) { + Optional metadataWithICTInfo = getUpdatedMetadataAfterConflict( + engine, + updatedInCommitTimestamp.get(), + metadata, + rebaseState.getLatestVersion() + ); + metadataWithICTInfo.ifPresent(this::updateMetadata); + } + attemptCommitInfo.setInCommitTimestamp(updatedInCommitTimestamp); } numRetries++; } while (numRetries < NUM_TXN_RETRIES); @@ -186,6 +198,30 @@ private void updateMetadataWithICTIfRequired(Engine engine, CommitInfo attemptCo ); } + private Optional getUpdatedInCommitTimestampAfterConflict( + long winningCommitTimestamp, Optional attemptInCommitTimestamp) { + if (attemptInCommitTimestamp.isPresent()) { + long updatedInCommitTimestamp = Math.max( + attemptInCommitTimestamp.get(), winningCommitTimestamp + 1); + return Optional.of(updatedInCommitTimestamp); + } + return attemptInCommitTimestamp; + } + + private Optional getUpdatedMetadataAfterConflict( + Engine engine, + Long updatedInCommitTimestamp, + Metadata attemptMetadata, + Long lastWinningVersion) { + long nextAvailableVersion = lastWinningVersion + 1L; + return InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo( + engine, + updatedInCommitTimestamp, + readSnapshot, + attemptMetadata, + nextAvailableVersion); + } + private TransactionCommitResult doCommit( Engine engine, long commitAsVersion, diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java index 798de2a472f..0c8f07d1015 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/SingleAction.java @@ -47,7 +47,8 @@ public class SingleAction { // .add("add", AddFile.FULL_SCHEMA) // not needed for blind appends // .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends .add("metaData", Metadata.FULL_SCHEMA) - .add("protocol", Protocol.FULL_SCHEMA); + .add("protocol", Protocol.FULL_SCHEMA) + .add("commitInfo", CommitInfo.FULL_SCHEMA); // Once we start supporting domain metadata/row tracking enabled tables, we should add the // schema for domain metadata fields here. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java index fb20156d031..b7c0b378539 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java @@ -27,8 +27,10 @@ import io.delta.kernel.utils.FileStatus; import io.delta.kernel.internal.*; +import io.delta.kernel.internal.actions.CommitInfo; import io.delta.kernel.internal.actions.SetTransaction; import io.delta.kernel.internal.util.FileNames; +import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA; import static io.delta.kernel.internal.util.FileNames.deltaFile; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -44,6 +46,7 @@ public class ConflictChecker { private static final int PROTOCOL_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("protocol"); private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData"); private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn"); + private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo"); // Snapshot of the table read by the transaction that encountered the conflict // (a.k.a the losing transaction) @@ -88,6 +91,7 @@ public static TransactionRebaseState resolveConflicts( public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentWriteException { List winningCommits = getWinningCommitFiles(engine); + Optional winningCommitInfoOpt = Optional.empty(); // no winning commits. why did we get the transaction conflict? checkState(!winningCommits.isEmpty(), "No winning commits found."); @@ -99,21 +103,33 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW CONFLICT_RESOLUTION_SCHEMA, Optional.empty())) { - actionsIterator.forEachRemaining(actionBatch -> { + List actionBatchList = new ArrayList<>(); + actionsIterator.forEachRemaining(actionBatchList::add); + for (int i = 0; i < actionBatchList.size(); i++) { + ActionWrapper actionBatch = actionBatchList.get(i); checkArgument(!actionBatch.isFromCheckpoint()); // no checkpoints should be read ColumnarBatch batch = actionBatch.getColumnarBatch(); + if (i == actionBatchList.size() - 1) { + CommitInfo commitInfo = + getCommitInfo(batch.getColumnVector(COMMITINFO_ORDINAL)); + winningCommitInfoOpt = Optional.ofNullable(commitInfo); + } handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL)); handleMetadata(batch.getColumnVector(METADATA_ORDINAL)); handleTxn(batch.getColumnVector(TXN_ORDINAL)); - }); + } } catch (IOException ioe) { throw new UncheckedIOException("Error reading actions from winning commits.", ioe); } // if we get here, we have successfully rebased (i.e no logical conflicts) // against the winning transactions - return new TransactionRebaseState(getLastWinningTxnVersion(winningCommits)); + long lastWinningVersion = getLastWinningTxnVersion(winningCommits); + return new TransactionRebaseState( + lastWinningVersion, + getLastCommitTimestamp( + engine, lastWinningVersion, winningCommits, winningCommitInfoOpt)); } /** @@ -128,9 +144,11 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW */ public static class TransactionRebaseState { private final long latestVersion; + private final long latestCommitTimestamp; - public TransactionRebaseState(long latestVersion) { + public TransactionRebaseState(long latestVersion, long latestCommitTimestamp) { this.latestVersion = latestVersion; + this.latestCommitTimestamp = latestCommitTimestamp; } /** @@ -141,6 +159,15 @@ public TransactionRebaseState(long latestVersion) { public long getLatestVersion() { return latestVersion; } + + /** + * Return the latest CommitTimestamp of the table. + * + * @return latest CommitTimestamp of the table. + */ + public long getLatestCommitTimestamp() { + return latestCommitTimestamp; + } } /** @@ -173,6 +200,21 @@ private void handleMetadata(ColumnVector metadataVector) { } } + /** + * Get the commit info from the winning transactions. + * + * @param commitInfoVector commit info rows from the winning transactions + * @return the commit info + */ + private CommitInfo getCommitInfo(ColumnVector commitInfoVector) { + for (int rowId = 0; rowId < commitInfoVector.getSize(); rowId++) { + if (!commitInfoVector.isNullAt(rowId)) { + return CommitInfo.fromColumnVector(commitInfoVector, rowId); + } + } + return null; + } + private void handleTxn(ColumnVector txnVector) { // Check if the losing transaction has any txn identifier. If it does, go through the // winning transactions and make sure that the losing transaction is valid from a @@ -218,6 +260,23 @@ private List getWinningCommitFiles(Engine engine) { } } + private long getLastCommitTimestamp( + Engine engine, + long lastWinningVersion, + List winningCommits, + Optional winningCommitInfoOpt) { + FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1); + long winningCommitTimestamp = -1L; + if (snapshot.getVersion(engine) == -1 || + !IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(snapshot.getMetadata())) { + winningCommitTimestamp = lastWinningTxn.getModificationTime(); + } else { + winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp( + winningCommitInfoOpt, String.valueOf(lastWinningVersion)); + } + return winningCommitTimestamp; + } + private long getLastWinningTxnVersion(List winningCommits) { FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1); return FileNames.deltaVersion(lastWinningTxn.getPath()); diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index eb570c2bf89..e6659d7ddf3 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -298,6 +298,21 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { txnBuilder.build(engine) } + def commitAppendData( + engine: Engine = defaultEngine, + txn: Transaction, + data: Seq[(Map[String, Literal], Seq[FilteredColumnarBatch])]): TransactionCommitResult = { + + val txnState = txn.getTransactionState(engine) + + val actions = data.map { case (partValues, partData) => + stageData(txnState, partValues, partData) + } + + val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _)) + txn.commit(engine, combineActions) + } + def appendData( engine: Engine = defaultEngine, tablePath: String, @@ -309,14 +324,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { tableProperties: Map[String, String] = null): TransactionCommitResult = { val txn = createTxn(engine, tablePath, isNewTable, schema, partCols, tableProperties, clock) - val txnState = txn.getTransactionState(engine) - - val actions = data.map { case (partValues, partData) => - stageData(txnState, partValues, partData) - } - - val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _)) - txn.commit(engine, combineActions) + commitAppendData(engine, txn, data) } def assertMetadataProp( diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala index ff2773e80fd..57fa1018c6a 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/InCommitTimestampSuite.scala @@ -39,6 +39,24 @@ import io.delta.kernel.utils.FileStatus import io.delta.kernel.internal.actions.SingleAction.createCommitInfoSingleAction class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { + + private def removeCommitInfoFromCommit(engine: Engine, version: Long, logPath: Path): Unit = { + val file = FileStatus.of(FileNames.deltaFile(logPath, version), 0, 0) + val columnarBatches = + engine.getJsonHandler.readJsonFiles( + singletonCloseableIterator(file), + SingleAction.FULL_SCHEMA, + Optional.empty()) + assert(columnarBatches.hasNext) + val rows = columnarBatches.next().getRows + val rowsWithoutCommitInfo = + rows.filter(row => row.isNullAt(row.getSchema.indexOf("commitInfo"))) + engine + .getJsonHandler + .writeJsonFileAtomically( + FileNames.deltaFile(logPath, version), rowsWithoutCommitInfo, true /* overwrite */) + } + test("Enable ICT on commit 0") { withTempDirAndEngine { (tablePath, engine) => val beforeCommitAttemptStartTime = System.currentTimeMillis @@ -140,20 +158,7 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { expectedValue = true) // Remove CommitInfo from the commit. val logPath = new Path(table.getPath(engine), "_delta_log") - val file = FileStatus.of(FileNames.deltaFile(logPath, 0), 0, 0) - val columnarBatches = - engine.getJsonHandler.readJsonFiles( - singletonCloseableIterator(file), - SingleAction.FULL_SCHEMA, - Optional.empty()) - assert(columnarBatches.hasNext) - val rows = columnarBatches.next().getRows - val rowsWithoutCommitInfo = - rows.filter(row => row.isNullAt(row.getSchema.indexOf("commitInfo"))) - engine - .getJsonHandler - .writeJsonFileAtomically( - FileNames.deltaFile(logPath, 0), rowsWithoutCommitInfo, true /* overwrite */) + removeCommitInfoFromCommit(engine, 0, logPath) val ex = intercept[InvalidTableException] { table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl].getTimestamp(engine) @@ -406,4 +411,124 @@ class InCommitTimestampSuite extends DeltaTableWriteSuiteBase { Option.empty } } + + test("Conflict resolution of timestamps") { + withTempDirAndEngine { (tablePath, engine) => + val table = TableImpl.forPath(engine, tablePath, () => System.currentTimeMillis) + setTablePropAndVerify( + engine, tablePath, isNewTable = true, IN_COMMIT_TIMESTAMPS_ENABLED, "true", true) + + val startTime = System.currentTimeMillis() + val clock = new ManualClock(startTime) + val txn1 = createTxn( + engine, + tablePath, + schema = testSchema, + partCols = Seq.empty, + clock = clock + ) + clock.setTime(startTime) + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches2), + clock = clock + ) + clock.setTime(startTime - 1000) + commitAppendData(engine, txn1, Seq(Map.empty[String, Literal] -> dataBatches1)) + assert( + getInCommitTimestamp( + engine, table, 2).get == getInCommitTimestamp(engine, table, 1).get + 1) + } + } + + test("Conflict resolution of enablement version") { + withTempDirAndEngine { (tablePath, engine) => + val table = TableImpl.forPath(engine, tablePath, () => System.currentTimeMillis) + val txnBuilder = table.createTransactionBuilder(engine, testEngineInfo, CREATE_TABLE) + + val txn = txnBuilder + .withSchema(engine, testSchema) + .build(engine) + + txn.commit(engine, emptyIterable()) + + val startTime = System.currentTimeMillis() + val clock = new ManualClock(startTime) + + val txn1 = createTxn( + engine, + tablePath, + schema = testSchema, + partCols = Seq.empty, + tableProperties = Map(IN_COMMIT_TIMESTAMPS_ENABLED.getKey -> "true"), + clock = clock) + + clock.setTime(startTime) + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches2), + clock = clock + ) + + commitAppendData(engine, txn1, Seq(Map.empty[String, Literal] -> dataBatches1)) + + val ver1Snapshot = table.getSnapshotAsOfVersion(engine, 1).asInstanceOf[SnapshotImpl] + val ver2Snapshot = table.getLatestSnapshot(engine).asInstanceOf[SnapshotImpl] + val observedEnablementTimestamp = + IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.fromMetadata(ver2Snapshot.getMetadata) + val observedEnablementVersion = + IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetadata(ver2Snapshot.getMetadata) + assert(observedEnablementTimestamp.get == ver1Snapshot.getTimestamp(engine) + 1) + assert( + observedEnablementTimestamp.get == getInCommitTimestamp(engine, table, version = 2).get) + assert(observedEnablementVersion.get == 2) + } + } + + test("Missing CommitInfo in last winning commit in conflict resolution should result in a " + + "DELTA_MISSING_COMMIT_INFO exception") { + withTempDirAndEngine { (tablePath, engine) => + val table = TableImpl.forPath(engine, tablePath, () => System.currentTimeMillis) + setTablePropAndVerify( + engine, tablePath, isNewTable = true, IN_COMMIT_TIMESTAMPS_ENABLED, "true", true) + + val startTime = System.currentTimeMillis() + val clock = new ManualClock(startTime) + val txn1 = createTxn( + engine, + tablePath, + schema = testSchema, + partCols = Seq.empty, + clock = clock + ) + clock.setTime(startTime) + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches2), + clock = clock + ) + appendData( + engine, + tablePath, + data = Seq(Map.empty[String, Literal] -> dataBatches2), + clock = clock + ) + + // Remove CommitInfo from the commit. + val logPath = new Path(table.getPath(engine), "_delta_log") + removeCommitInfoFromCommit(engine, 2, logPath) + + clock.setTime(startTime - 1000) + val ex = intercept[InvalidTableException] { + commitAppendData(engine, txn1, Seq(Map.empty[String, Literal] -> dataBatches1)) + } + assert(ex.getMessage.contains(String.format( + "This table has the feature %s enabled which requires the presence of the " + + "CommitInfo action in every commit. However, the CommitInfo action is " + + "missing from commit version %s.", "inCommitTimestamp-preview", "2"))) + } + } }