Skip to content

Commit

Permalink
[Kernel] Add monotonic inCommitTimestamp and read support for inCommi…
Browse files Browse the repository at this point in the history
…tTimestamp (#3282)

## Description
Add read support for inCommitTimestamp and ensure the increasing
monotonicity of inCommitTimestamp assuming that there are no conflicts
to prepare for the complete inCommitTimestamp support with conflict
resolution in Kernel.

## How was this patch tested?
Add unit tests to verify that the read of inCommitTimestamp is correct
and inCommitTimestamp is monotonic.

## Does this PR introduce _any_ user-facing changes?
Yes, user can enable monotonic inCommitTimestamp assuming that there are
no conflicts by enabling its property.
  • Loading branch information
EstherBear authored Jun 28, 2024
1 parent c8e87b4 commit 87f0685
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.delta.kernel.internal;

import java.io.IOException;
import java.util.Optional;

import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -41,6 +43,7 @@ public class SnapshotImpl implements Snapshot {
private final Protocol protocol;
private final Metadata metadata;
private final LogSegment logSegment;
private Optional<Long> inCommitTimestampOpt;

public SnapshotImpl(
Path dataPath,
Expand All @@ -55,6 +58,7 @@ public SnapshotImpl(
this.logReplay = logReplay;
this.protocol = protocol;
this.metadata = metadata;
this.inCommitTimestampOpt = Optional.empty();
}

@Override
Expand Down Expand Up @@ -121,4 +125,38 @@ public Path getLogPath() {
public Path getDataPath() {
return dataPath;
}

/**
* Returns the timestamp of the latest commit of this snapshot.
* For an uninitialized snapshot, this returns -1.
* <p>
* When InCommitTimestampTableFeature is enabled, the timestamp
* is retrieved from the CommitInfo of the latest commit which
* can result in an IO operation.
* <p>
* For non-ICT tables, this is the same as the file modification time of the latest commit in
* the snapshot.
*
* @param engine the engine to use for IO operations
* @return the timestamp of the latest commit
*/
public long getTimestamp(Engine engine) {
if (TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
if (!inCommitTimestampOpt.isPresent()) {
try {
Optional<CommitInfo> commitInfoOpt = CommitInfo.getCommitInfoOpt(
engine, logPath, logSegment.version);
inCommitTimestampOpt = Optional.of(CommitInfo.getRequiredInCommitTimestamp(
commitInfoOpt,
String.valueOf(logSegment.version),
dataPath));
} catch (IOException e) {
throw new RuntimeException("Failed to get inCommitTimestamp with IO", e);
}
}
return inCommitTimestampOpt.get();
} else {
return logSegment.lastCommitTimestamp;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,43 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Clock;

public class TableImpl implements Table {
public static Table forPath(Engine engine, String path) {
return forPath(engine, path, System::currentTimeMillis);
}

/**
* Instantiate a table object for the Delta Lake table at the given path. It takes an additional
* parameter called {@link Clock} which helps in testing.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @param path location of the table.
* @param clock {@link Clock} instance to use for time-related operations.
*
* @return an instance of {@link Table} representing the Delta table at the given path
*/
public static Table forPath(Engine engine, String path, Clock clock) {
String resolvedPath;
try {
resolvedPath = engine.getFileSystemClient().resolvePath(path);
} catch (IOException io) {
throw new RuntimeException(io);
}
return new TableImpl(resolvedPath);
return new TableImpl(resolvedPath, clock);
}

private final SnapshotManager snapshotManager;
private final String tablePath;
private final Clock clock;

public TableImpl(String tablePath) {
public TableImpl(String tablePath, Clock clock) {
this.tablePath = tablePath;
final Path dataPath = new Path(tablePath);
final Path logPath = new Path(dataPath, "_delta_log");
this.snapshotManager = new SnapshotManager(logPath, dataPath);
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -81,6 +98,10 @@ public TransactionBuilder createTransactionBuilder(
return new TransactionBuilderImpl(this, engineInfo, operation);
}

public Clock getClock() {
return clock;
}

protected Path getDataPath() {
return new Path(tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ public Transaction build(Engine engine) {
metadata,
setTxnOpt,
shouldUpdateMetadata,
shouldUpdateProtocol);
shouldUpdateProtocol,
table.getClock());
}

/**
Expand Down Expand Up @@ -206,6 +207,11 @@ private class InitialSnapshot extends SnapshotImpl {
InitialSnapshot(Path dataPath, LogReplay logReplay, Metadata metadata, Protocol protocol) {
super(dataPath, LogSegment.empty(table.getLogPath()), logReplay, protocol, metadata);
}

@Override
public long getTimestamp(Engine engine) {
return -1L;
}
}

private LogReplay getEmptyLogReplay(Engine engine, Metadata metadata, Protocol protocol) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.util.Clock;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.VectorUtils;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class TransactionImpl
private final SnapshotImpl readSnapshot;
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private Metadata metadata;
private boolean shouldUpdateMetadata;

Expand All @@ -88,7 +90,8 @@ public TransactionImpl(
Metadata metadata,
Optional<SetTransaction> setTxnOpt,
boolean shouldUpdateMetadata,
boolean shouldUpdateProtocol) {
boolean shouldUpdateProtocol,
Clock clock) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -100,6 +103,7 @@ public TransactionImpl(
this.setTxnOpt = setTxnOpt;
this.shouldUpdateMetadata = shouldUpdateMetadata;
this.shouldUpdateProtocol = shouldUpdateProtocol;
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -239,14 +243,22 @@ public Optional<SetTransaction> getSetTxnOpt() {
return setTxnOpt;
}

/**
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This
* can result in an additional file read and that this will only happen if ICT is enabled.
*/
private Optional<Long> generateInCommitTimestampForFirstCommitAttempt(
Engine engine, long currentTimestamp) {
boolean ictEnabled = IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata);
return ictEnabled ? Optional.of(currentTimestamp) : Optional.empty();
if (IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
long lastCommitTimestamp = readSnapshot.getTimestamp(engine);
return Optional.of(Math.max(currentTimestamp, lastCommitTimestamp + 1));
} else {
return Optional.empty();
}
}

private CommitInfo generateCommitAction(Engine engine) {
long commitAttemptStartTime = System.currentTimeMillis();
long commitAttemptStartTime = clock.getTimeMillis();
return new CommitInfo(
generateInCommitTimestampForFirstCommitAttempt(
engine, commitAttemptStartTime),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,27 @@
*/
package io.delta.kernel.internal.actions;

import java.io.IOException;
import java.util.*;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.types.*;

import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.VectorUtils;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;

/**
Expand Down Expand Up @@ -84,6 +95,8 @@ public static CommitInfo fromColumnVector(ColumnVector vector, int rowId) {
.boxed()
.collect(toMap(i -> FULL_SCHEMA.at(i).getName(), i -> i));

private static final Logger logger = LoggerFactory.getLogger(CommitInfo.class);

private final long timestamp;
private final String engineInfo;
private final String operation;
Expand Down Expand Up @@ -148,4 +161,64 @@ public Row toRow() {

return new GenericRow(CommitInfo.FULL_SCHEMA, commitInfo);
}

/**
* Returns the `inCommitTimestamp` of the given `commitInfoOpt` if it is defined.
* Throws an exception if `commitInfoOpt` is empty or contains an empty `inCommitTimestamp`.
*/
public static long getRequiredInCommitTimestamp(
Optional<CommitInfo> commitInfoOpt, String version, Path dataPath) {
CommitInfo commitInfo = commitInfoOpt
.orElseThrow(() -> new InvalidTableException(
dataPath.toString(),
String.format("This table has the feature inCommitTimestamp-preview " +
"enabled which requires the presence of the CommitInfo action " +
"in every commit. However, the CommitInfo action is " +
"missing from commit version %s.", version)));
return commitInfo
.inCommitTimestamp
.orElseThrow(() -> new InvalidTableException(
dataPath.toString(),
String.format("This table has the feature inCommitTimestamp-preview " +
"enabled which requires the presence of inCommitTimestamp in the " +
"CommitInfo action. However, this field has not " +
"been set in commit version %s.", version)));
}

/** Get the persisted commit info (if available) for the given delta file. */
public static Optional<CommitInfo> getCommitInfoOpt(
Engine engine,
Path logPath,
long version) throws IOException {
final FileStatus file = FileStatus.of(
FileNames.deltaFile(logPath, version), /* path */
0, /* size */
0 /* modification time */);
final StructType COMMITINFO_READ_SCHEMA = new StructType()
.add("commitInfo", CommitInfo.FULL_SCHEMA);
try (CloseableIterator<ColumnarBatch> columnarBatchIter = engine.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
COMMITINFO_READ_SCHEMA,
Optional.empty())) {
while (columnarBatchIter.hasNext()) {
final ColumnarBatch columnarBatch = columnarBatchIter.next();
assert(columnarBatch.getSchema().equals(COMMITINFO_READ_SCHEMA));
final ColumnVector commitInfoVector = columnarBatch.getColumnVector(0);
for (int i = 0; i < commitInfoVector.getSize(); i++) {
if (!commitInfoVector.isNullAt(i)) {
CommitInfo commitInfo = CommitInfo.fromColumnVector(commitInfoVector, i);
if (commitInfo != null) {
return Optional.of(commitInfo);
}
}
}
}
} catch (IOException ex) {
throw new RuntimeException("Could not close iterator", ex);
}

logger.info("No commit info found for commit of version {}", version);
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

/**
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
public interface Clock {
/** @return Current system time, in ms. */
long getTimeMillis();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.util;

/**
* A clock whose time can be manually set and modified.
*/
public class ManualClock implements Clock {
private long timeMillis;
public ManualClock(long timeMillis) {
this.timeMillis = timeMillis;
}

/**
* @param timeToSet new time (in milliseconds) that the clock should represent
*/
public synchronized void setTime(long timeToSet) {
this.timeMillis = timeToSet;
this.notifyAll();
}

@Override
public long getTimeMillis() {
return timeMillis;
}
}
Loading

0 comments on commit 87f0685

Please sign in to comment.