Skip to content

Commit

Permalink
[Kernel] Update ConflictChecker to perform conflict resolution of ICT
Browse files Browse the repository at this point in the history
  • Loading branch information
EstherBear committed Jun 27, 2024
1 parent 7aba5e9 commit 2d30ab8
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
try {
// TODO Update the attemptCommitInfo and metadata based on the conflict
// resolution.
return doCommit(engine, commitAsVersion, attemptCommitInfo, dataActions);
} catch (FileAlreadyExistsException fnfe) {
logger.info("Concurrent write detected when committing as version = {}. " +
Expand All @@ -149,6 +147,20 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
"New commit version %d should be greater than the previous commit " +
"attempt version %d.", newCommitAsVersion, commitAsVersion);
commitAsVersion = newCommitAsVersion;
Optional<Long> updatedInCommitTimestamp =
getUpdatedInCommitTimestampAfterConflict(
rebaseState.getLatestCommitTimestamp(),
attemptCommitInfo.getInCommitTimestamp());
if (updatedInCommitTimestamp.isPresent()) {
Optional<Metadata> metadataWithICTInfo = getUpdatedMetadataAfterConflict(
engine,
updatedInCommitTimestamp.get(),
metadata,
rebaseState.getLatestVersion()
);
metadataWithICTInfo.ifPresent(this::updateMetadata);
}
attemptCommitInfo.setInCommitTimestamp(updatedInCommitTimestamp);
}
numRetries++;
} while (numRetries < NUM_TXN_RETRIES);
Expand Down Expand Up @@ -186,6 +198,30 @@ private void updateMetadataWithICTIfRequired(Engine engine, CommitInfo attemptCo
);
}

private Optional<Long> getUpdatedInCommitTimestampAfterConflict(
long winningCommitTimestamp, Optional<Long> attemptInCommitTimestamp) {
if (attemptInCommitTimestamp.isPresent()) {
long updatedInCommitTimestamp = Math.max(
attemptInCommitTimestamp.get(), winningCommitTimestamp + 1);
return Optional.of(updatedInCommitTimestamp);
}
return attemptInCommitTimestamp;
}

private Optional<Metadata> getUpdatedMetadataAfterConflict(
Engine engine,
Long updatedInCommitTimestamp,
Metadata attemptMetadata,
Long lastWinningVersion) {
long nextAvailableVersion = lastWinningVersion + 1L;
return InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
engine,
updatedInCommitTimestamp,
readSnapshot,
attemptMetadata,
nextAvailableVersion);
}

private TransactionCommitResult doCommit(
Engine engine,
long commitAsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class SingleAction {
// .add("add", AddFile.FULL_SCHEMA) // not needed for blind appends
// .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends
.add("metaData", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA);
.add("protocol", Protocol.FULL_SCHEMA)
.add("commitInfo", CommitInfo.FULL_SCHEMA);
// Once we start supporting domain metadata/row tracking enabled tables, we should add the
// schema for domain metadata fields here.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import io.delta.kernel.utils.FileStatus;

import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.util.FileNames;
import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED;
import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA;
import static io.delta.kernel.internal.util.FileNames.deltaFile;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
Expand All @@ -44,6 +46,7 @@ public class ConflictChecker {
private static final int PROTOCOL_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("protocol");
private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData");
private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn");
private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo");

// Snapshot of the table read by the transaction that encountered the conflict
// (a.k.a the losing transaction)
Expand Down Expand Up @@ -88,6 +91,7 @@ public static TransactionRebaseState resolveConflicts(

public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentWriteException {
List<FileStatus> winningCommits = getWinningCommitFiles(engine);
Optional<CommitInfo> winningCommitInfoOpt = Optional.empty();

// no winning commits. why did we get the transaction conflict?
checkState(!winningCommits.isEmpty(), "No winning commits found.");
Expand All @@ -99,21 +103,33 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
CONFLICT_RESOLUTION_SCHEMA,
Optional.empty())) {

actionsIterator.forEachRemaining(actionBatch -> {
List<ActionWrapper> actionBatchList = new ArrayList<>();
actionsIterator.forEachRemaining(actionBatchList::add);
for (int i = 0; i < actionBatchList.size(); i++) {
ActionWrapper actionBatch = actionBatchList.get(i);
checkArgument(!actionBatch.isFromCheckpoint()); // no checkpoints should be read
ColumnarBatch batch = actionBatch.getColumnarBatch();
if (i == actionBatchList.size() - 1) {
CommitInfo commitInfo =
getCommitInfo(batch.getColumnVector(COMMITINFO_ORDINAL));
winningCommitInfoOpt = Optional.ofNullable(commitInfo);
}

handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL));
handleMetadata(batch.getColumnVector(METADATA_ORDINAL));
handleTxn(batch.getColumnVector(TXN_ORDINAL));
});
}
} catch (IOException ioe) {
throw new UncheckedIOException("Error reading actions from winning commits.", ioe);
}

// if we get here, we have successfully rebased (i.e no logical conflicts)
// against the winning transactions
return new TransactionRebaseState(getLastWinningTxnVersion(winningCommits));
long lastWinningVersion = getLastWinningTxnVersion(winningCommits);
return new TransactionRebaseState(
lastWinningVersion,
getLastCommitTimestamp(
engine, lastWinningVersion, winningCommits, winningCommitInfoOpt));
}

/**
Expand All @@ -128,9 +144,11 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
*/
public static class TransactionRebaseState {
private final long latestVersion;
private final long latestCommitTimestamp;

public TransactionRebaseState(long latestVersion) {
public TransactionRebaseState(long latestVersion, long latestCommitTimestamp) {
this.latestVersion = latestVersion;
this.latestCommitTimestamp = latestCommitTimestamp;
}

/**
Expand All @@ -141,6 +159,15 @@ public TransactionRebaseState(long latestVersion) {
public long getLatestVersion() {
return latestVersion;
}

/**
* Return the latest CommitTimestamp of the table.
*
* @return latest CommitTimestamp of the table.
*/
public long getLatestCommitTimestamp() {
return latestCommitTimestamp;
}
}

/**
Expand Down Expand Up @@ -173,6 +200,21 @@ private void handleMetadata(ColumnVector metadataVector) {
}
}

/**
* Get the commit info from the winning transactions.
*
* @param commitInfoVector commit info rows from the winning transactions
* @return the commit info
*/
private CommitInfo getCommitInfo(ColumnVector commitInfoVector) {
for (int rowId = 0; rowId < commitInfoVector.getSize(); rowId++) {
if (!commitInfoVector.isNullAt(rowId)) {
return CommitInfo.fromColumnVector(commitInfoVector, rowId);
}
}
return null;
}

private void handleTxn(ColumnVector txnVector) {
// Check if the losing transaction has any txn identifier. If it does, go through the
// winning transactions and make sure that the losing transaction is valid from a
Expand Down Expand Up @@ -218,6 +260,23 @@ private List<FileStatus> getWinningCommitFiles(Engine engine) {
}
}

private long getLastCommitTimestamp(
Engine engine,
long lastWinningVersion,
List<FileStatus> winningCommits,
Optional<CommitInfo> winningCommitInfoOpt) {
FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1);
long winningCommitTimestamp = -1L;
if (snapshot.getVersion(engine) == -1 ||
!IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(snapshot.getMetadata())) {
winningCommitTimestamp = lastWinningTxn.getModificationTime();
} else {
winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp(
winningCommitInfoOpt, String.valueOf(lastWinningVersion));
}
return winningCommitTimestamp;
}

private long getLastWinningTxnVersion(List<FileStatus> winningCommits) {
FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1);
return FileNames.deltaVersion(lastWinningTxn.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
txnBuilder.build(engine)
}

def commitAppendData(
engine: Engine = defaultEngine,
txn: Transaction,
data: Seq[(Map[String, Literal], Seq[FilteredColumnarBatch])]): TransactionCommitResult = {

val txnState = txn.getTransactionState(engine)

val actions = data.map { case (partValues, partData) =>
stageData(txnState, partValues, partData)
}

val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _))
txn.commit(engine, combineActions)
}

def appendData(
engine: Engine = defaultEngine,
tablePath: String,
Expand All @@ -309,14 +324,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
tableProperties: Map[String, String] = null): TransactionCommitResult = {

val txn = createTxn(engine, tablePath, isNewTable, schema, partCols, tableProperties, clock)
val txnState = txn.getTransactionState(engine)

val actions = data.map { case (partValues, partData) =>
stageData(txnState, partValues, partData)
}

val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _))
txn.commit(engine, combineActions)
commitAppendData(engine, txn, data)
}

def assertMetadataProp(
Expand Down
Loading

0 comments on commit 2d30ab8

Please sign in to comment.