Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Update ConflictChecker to perform conflict resolution of ICT #3283

Merged
merged 7 commits into from
Jul 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
try {
// TODO Update the attemptCommitInfo and metadata based on the conflict
// resolution.
return doCommit(engine, commitAsVersion, attemptCommitInfo, dataActions);
} catch (FileAlreadyExistsException fnfe) {
logger.info("Concurrent write detected when committing as version = {}. " +
Expand All @@ -149,6 +147,20 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
"New commit version %d should be greater than the previous commit " +
"attempt version %d.", newCommitAsVersion, commitAsVersion);
commitAsVersion = newCommitAsVersion;
Optional<Long> updatedInCommitTimestamp =
getUpdatedInCommitTimestampAfterConflict(
rebaseState.getLatestCommitTimestamp(),
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
attemptCommitInfo.getInCommitTimestamp());
if (updatedInCommitTimestamp.isPresent()) {
Optional<Metadata> metadataWithICTInfo = getUpdatedMetadataAfterConflict(
engine,
updatedInCommitTimestamp.get(),
metadata,
rebaseState.getLatestVersion()
);
metadataWithICTInfo.ifPresent(this::updateMetadata);
}
attemptCommitInfo.setInCommitTimestamp(updatedInCommitTimestamp);
}
numRetries++;
} while (numRetries < NUM_TXN_RETRIES);
Expand Down Expand Up @@ -186,6 +198,30 @@ private void updateMetadataWithICTIfRequired(Engine engine, CommitInfo attemptCo
);
}

private Optional<Long> getUpdatedInCommitTimestampAfterConflict(
long winningCommitTimestamp, Optional<Long> attemptInCommitTimestamp) {
if (attemptInCommitTimestamp.isPresent()) {
long updatedInCommitTimestamp = Math.max(
attemptInCommitTimestamp.get(), winningCommitTimestamp + 1);
return Optional.of(updatedInCommitTimestamp);
}
return attemptInCommitTimestamp;
}

private Optional<Metadata> getUpdatedMetadataAfterConflict(
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
Engine engine,
Long updatedInCommitTimestamp,
Metadata attemptMetadata,
Long lastWinningVersion) {
long nextAvailableVersion = lastWinningVersion + 1L;
return InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
engine,
updatedInCommitTimestamp,
readSnapshot,
attemptMetadata,
nextAvailableVersion);
}

private TransactionCommitResult doCommit(
Engine engine,
long commitAsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class SingleAction {
// .add("add", AddFile.FULL_SCHEMA) // not needed for blind appends
// .add("remove", RemoveFile.FULL_SCHEMA) // not needed for blind appends
.add("metaData", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA);
.add("protocol", Protocol.FULL_SCHEMA)
.add("commitInfo", CommitInfo.FULL_SCHEMA);
// Once we start supporting domain metadata/row tracking enabled tables, we should add the
// schema for domain metadata fields here.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.String.format;

import io.delta.kernel.data.ColumnVector;
Expand All @@ -27,8 +28,10 @@
import io.delta.kernel.utils.FileStatus;

import io.delta.kernel.internal.*;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.SetTransaction;
import io.delta.kernel.internal.util.FileNames;
import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED;
import static io.delta.kernel.internal.actions.SingleAction.CONFLICT_RESOLUTION_SCHEMA;
import static io.delta.kernel.internal.util.FileNames.deltaFile;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
Expand All @@ -44,6 +47,7 @@ public class ConflictChecker {
private static final int PROTOCOL_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("protocol");
private static final int METADATA_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("metaData");
private static final int TXN_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("txn");
private static final int COMMITINFO_ORDINAL = CONFLICT_RESOLUTION_SCHEMA.indexOf("commitInfo");

// Snapshot of the table read by the transaction that encountered the conflict
// (a.k.a the losing transaction)
Expand Down Expand Up @@ -88,10 +92,13 @@ public static TransactionRebaseState resolveConflicts(

public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentWriteException {
List<FileStatus> winningCommits = getWinningCommitFiles(engine);
AtomicReference<Optional<CommitInfo>> winningCommitInfoOpt =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this a atmoic reference? Why can't we just use the Optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of this error:
#3283 (comment)

new AtomicReference<>(Optional.empty());

// no winning commits. why did we get the transaction conflict?
checkState(!winningCommits.isEmpty(), "No winning commits found.");

long lastWinningVersion = getLastWinningTxnVersion(winningCommits);
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
// Read the actions from the winning commits
try (ActionsIterator actionsIterator = new ActionsIterator(
engine,
Expand All @@ -102,6 +109,11 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
actionsIterator.forEachRemaining(actionBatch -> {
checkArgument(!actionBatch.isFromCheckpoint()); // no checkpoints should be read
ColumnarBatch batch = actionBatch.getColumnarBatch();
if (actionBatch.getVersion() == lastWinningVersion) {
CommitInfo commitInfo =
getCommitInfo(batch.getColumnVector(COMMITINFO_ORDINAL));
winningCommitInfoOpt.set(Optional.ofNullable(commitInfo));
}

handleProtocol(batch.getColumnVector(PROTOCOL_ORDINAL));
handleMetadata(batch.getColumnVector(METADATA_ORDINAL));
Expand All @@ -113,7 +125,10 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW

// if we get here, we have successfully rebased (i.e no logical conflicts)
// against the winning transactions
return new TransactionRebaseState(getLastWinningTxnVersion(winningCommits));
return new TransactionRebaseState(
lastWinningVersion,
getLastCommitTimestamp(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to fetch the ICT again? aren't we reading already above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just read the CommitInfo above but not the ICT? We still need to extract the ICT in CommitInfo?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti From what I understand, this will not perform an additional IO. This simply extracts the timestamp from the CommitInfo action (or the file modification timestamp if ICT is not enabled.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for clarifying.

engine, lastWinningVersion, winningCommits, winningCommitInfoOpt.get()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to check that winningCommitInfoOpt is not empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This get is for the atomic reference but not for the optional.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, how do verify it is actually set? do you want to start with null and then verify it is not null?

Copy link
Contributor Author

@EstherBear EstherBear Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It starts with optional.empty. And in the getLastCommitTimestamp function it will check if it's ict enabled. And if it's ict enabled and the winningCommitInfoOpt is empty, the CommitInfo.getRequiredInCommitTimestamp will raise an error. It has the same logic with Delta Spark.

}

/**
Expand All @@ -128,9 +143,11 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
*/
public static class TransactionRebaseState {
private final long latestVersion;
private final long latestCommitTimestamp;

public TransactionRebaseState(long latestVersion) {
public TransactionRebaseState(long latestVersion, long latestCommitTimestamp) {
this.latestVersion = latestVersion;
this.latestCommitTimestamp = latestCommitTimestamp;
}

/**
Expand All @@ -141,6 +158,15 @@ public TransactionRebaseState(long latestVersion) {
public long getLatestVersion() {
return latestVersion;
}

/**
* Return the latest CommitTimestamp of the table.
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
*
* @return latest CommitTimestamp of the table.
*/
public long getLatestCommitTimestamp() {
return latestCommitTimestamp;
}
}

/**
Expand Down Expand Up @@ -173,6 +199,21 @@ private void handleMetadata(ColumnVector metadataVector) {
}
}

/**
* Get the commit info from the winning transactions.
*
* @param commitInfoVector commit info rows from the winning transactions
* @return the commit info
*/
private CommitInfo getCommitInfo(ColumnVector commitInfoVector) {
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
for (int rowId = 0; rowId < commitInfoVector.getSize(); rowId++) {
if (!commitInfoVector.isNullAt(rowId)) {
return CommitInfo.fromColumnVector(commitInfoVector, rowId);
}
}
return null;
}

private void handleTxn(ColumnVector txnVector) {
// Check if the losing transaction has any txn identifier. If it does, go through the
// winning transactions and make sure that the losing transaction is valid from a
Expand Down Expand Up @@ -218,6 +259,25 @@ private List<FileStatus> getWinningCommitFiles(Engine engine) {
}
}

private long getLastCommitTimestamp(
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
Engine engine,
long lastWinningVersion,
List<FileStatus> winningCommits,
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
Optional<CommitInfo> winningCommitInfoOpt) {
FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1);
long winningCommitTimestamp = -1L;
if (snapshot.getVersion(engine) == -1 ||
!IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(snapshot.getMetadata())) {
EstherBear marked this conversation as resolved.
Show resolved Hide resolved
winningCommitTimestamp = lastWinningTxn.getModificationTime();
} else {
winningCommitTimestamp = CommitInfo.getRequiredInCommitTimestamp(
winningCommitInfoOpt,
String.valueOf(lastWinningVersion),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why convert to string?

Copy link
Contributor Author

@EstherBear EstherBear Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the error message print which is consistent with Delta-Spark.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than passing these as string, may be just prepare a string which contains the lastWinningVersion and tablePath and pass it as a context to getRequiredInCommitTimestamp?

CommitInfo.getRequiredInCommitTimestamp(winningCommitInfoOpt, String.format("error...", ...))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getRequiredInCommitTimestamp has some logic to check if winningCommitInfoOpt is empty and if it contains ict and raises different errors accordingly. So I think it's better to leave this function handle with the error messages?

snapshot.getDataPath());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need the datapath here?

Copy link
Contributor Author

@EstherBear EstherBear Jul 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the error message print which is consistent with Delta-Spark.

}
return winningCommitTimestamp;
}

private long getLastWinningTxnVersion(List<FileStatus> winningCommits) {
FileStatus lastWinningTxn = winningCommits.get(winningCommits.size() - 1);
return FileNames.deltaVersion(lastWinningTxn.getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
txnBuilder.build(engine)
}

def commitAppendData(
engine: Engine = defaultEngine,
txn: Transaction,
data: Seq[(Map[String, Literal], Seq[FilteredColumnarBatch])]): TransactionCommitResult = {

val txnState = txn.getTransactionState(engine)

val actions = data.map { case (partValues, partData) =>
stageData(txnState, partValues, partData)
}

val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _))
txn.commit(engine, combineActions)
}

def appendData(
engine: Engine = defaultEngine,
tablePath: String,
Expand All @@ -309,14 +324,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
tableProperties: Map[String, String] = null): TransactionCommitResult = {

val txn = createTxn(engine, tablePath, isNewTable, schema, partCols, tableProperties, clock)
val txnState = txn.getTransactionState(engine)

val actions = data.map { case (partValues, partData) =>
stageData(txnState, partValues, partData)
}

val combineActions = inMemoryIterable(actions.reduceLeft(_ combine _))
txn.commit(engine, combineActions)
commitAppendData(engine, txn, data)
}

def assertMetadataProp(
Expand Down
Loading