diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitActions.java b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitActions.java new file mode 100644 index 00000000000..a48841adce4 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitActions.java @@ -0,0 +1,88 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.utils.CloseableIterator; + +/** + * Represents all actions from a single commit version in a table. + * + *

Resource Management: + * + *

+ * + *

Example usage: + * + *

{@code
+ * try (CommitActions commitActions = ...) {
+ *   long version = commitActions.getVersion();
+ *   long timestamp = commitActions.getTimestamp();
+ *
+ *   try (CloseableIterator actions = commitActions.getActions()) {
+ *     while (actions.hasNext()) {
+ *       ColumnarBatch batch = actions.next();
+ *       // process batch
+ *     }
+ *   }
+ * }
+ * }
+ * + * @since 4.1.0 + */ +@Evolving +public interface CommitActions extends AutoCloseable { + + /** + * Returns the commit version number. + * + * @return the version number of this commit + */ + long getVersion(); + + /** + * Returns the commit timestamp in milliseconds since Unix epoch. + * + * @return the timestamp of this commit + */ + long getTimestamp(); + + /** + * Returns an iterator over the action batches for this commit. + * + *

Each {@link ColumnarBatch} contains actions from this commit only. + * + *

Note: All rows within all batches have the same version (returned by {@link #getVersion()}). + * + *

This method can be called multiple times, and each call returns a new iterator over the same + * set of batches. This supports use cases like two-pass processing (e.g., validation pass + * followed by processing pass). + * + *

Callers are responsible for closing each iterator returned by this method. Each + * iterator must be closed after use to release underlying resources. + * + * @return a {@link CloseableIterator} over columnar batches containing this commit's actions + */ + CloseableIterator getActions(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java index 4d50e18efd8..f7df04d5d36 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/CommitRange.java @@ -105,4 +105,22 @@ public interface CommitRange { */ CloseableIterator getActions( Engine engine, Snapshot startSnapshot, Set actionSet); + + /** + * Returns an iterator of commits in this commit range, where each commit is represented as a + * {@link CommitActions} object. + * + * @param engine the {@link Engine} to use for reading the Delta log files + * @param startSnapshot the snapshot for startVersion, required to ensure the table is readable by + * Kernel at startVersion + * @param actionSet the set of action types to include in the results. Only actions of these types + * will be returned in each commit's actions iterator + * @return a {@link CloseableIterator} over {@link CommitActions}, one per commit version in this + * range + * @throws IllegalArgumentException if startSnapshot.getVersion() != startVersion + * @throws KernelException if the version range contains a version with reader protocol that is + * unsupported by Kernel + */ + CloseableIterator getCommitActions( + Engine engine, Snapshot startSnapshot, Set actionSet); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/CommitActionsImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/CommitActionsImpl.java new file mode 100644 index 00000000000..1242e27b322 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/CommitActionsImpl.java @@ -0,0 +1,226 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.CommitActions; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.replay.ActionWrapper; +import io.delta.kernel.internal.replay.ActionsIterator; +import io.delta.kernel.internal.tablefeatures.TableFeatures; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.internal.util.Preconditions; +import io.delta.kernel.internal.util.Tuple2; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Implementation of {@link CommitActions}. + * + *

This implementation owns the commit file and supports multiple calls to {@link #getActions()}. + * The first call reuses initially-read data to avoid double I/O, while subsequent calls re-read the + * commit file for memory efficiency. + * + *

Resource Management: + * + *

+ */ +public class CommitActionsImpl implements CommitActions, AutoCloseable { + + private final Engine engine; + private final FileStatus commitFile; + private final StructType readSchema; + private final String tablePath; + private final boolean shouldDropProtocolColumn; + private final boolean shouldDropCommitInfoColumn; + private final long version; + private final long timestamp; + + /** + * Iterator over ActionWrappers. The first call to {@link #getActions()} uses this iterator which + * was created during construction (to extract metadata). Subsequent calls lazily create new + * iterators, by constructing an ActionsIterator which does not open the file. + */ + private CloseableIterator iterator; + + /** + * Creates a CommitActions from a commit file. + * + * @param engine the engine for file I/O + * @param commitFile the commit file to read + * @param tablePath the table path for error messages + * @param actionSet the set of actions to read from the commit file + */ + public CommitActionsImpl( + Engine engine, + FileStatus commitFile, + String tablePath, + Set actionSet) { + requireNonNull(engine, "engine cannot be null"); + this.commitFile = requireNonNull(commitFile, "commitFile cannot be null"); + this.tablePath = requireNonNull(tablePath, "tablePath cannot be null"); + + // Create a new action set which is a super set of the requested actions. + // The extra actions are needed either for checks or to extract + // extra information. We will strip out the extra actions before + // returning the result. + Set copySet = new HashSet<>(actionSet); + copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL); + // commitInfo is needed to extract the inCommitTimestamp of delta files, this is used in + // ActionsIterator to resolve the timestamp when available + copySet.add(DeltaLogActionUtils.DeltaAction.COMMITINFO); + // Determine whether the additional actions were in the original set. + this.shouldDropProtocolColumn = !actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL); + this.shouldDropCommitInfoColumn = + !actionSet.contains(DeltaLogActionUtils.DeltaAction.COMMITINFO); + + this.readSchema = + new StructType( + copySet.stream() + .map(action -> new StructField(action.colName, action.schema, true)) + .collect(Collectors.toList())); + this.engine = engine; + + // Create initial iterator and peek at the first element to extract metadata + CloseableIterator actionsIter = + new ActionsIterator( + engine, Collections.singletonList(commitFile), readSchema, Optional.empty()); + + Tuple2, CloseableIterator> headAndIter = + peekHeadAndGetFullIterator(actionsIter); + this.iterator = headAndIter._2; + + // Extract version and timestamp from first action (or use reading file if not exists) + if (headAndIter._1.isPresent()) { + ActionWrapper firstWrapper = headAndIter._1.get(); + this.version = firstWrapper.getVersion(); + this.timestamp = + firstWrapper + .getTimestamp() + .orElseThrow( + () -> new RuntimeException("timestamp should always exist for Delta File")); + } else { + // Empty commit file - extract from file metadata + this.version = FileNames.deltaVersion(new Path(commitFile.getPath())); + this.timestamp = commitFile.getModificationTime(); + } + } + + /** + * Helper to peek at the first element and return both the head and a full iterator (head + rest). + * + * @return Tuple2 where _1 is the head element (Optional) and _2 is the full iterator + */ + private static Tuple2, CloseableIterator> + peekHeadAndGetFullIterator(CloseableIterator iter) { + Optional head = iter.hasNext() ? Optional.of(iter.next()) : Optional.empty(); + CloseableIterator fullIterator = + head.isPresent() ? Utils.singletonCloseableIterator(head.get()).combine(iter) : iter; + return new Tuple2<>(head, fullIterator); + } + + @Override + public long getVersion() { + return version; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public synchronized CloseableIterator getActions() { + CloseableIterator result = + iterator.map( + wrapper -> + validateProtocolAndDropInternalColumns( + wrapper.getColumnarBatch(), + tablePath, + shouldDropProtocolColumn, + shouldDropCommitInfoColumn)); + // Constructing an ActionsIterator does not open the file. + iterator = + new ActionsIterator( + engine, Collections.singletonList(commitFile), readSchema, Optional.empty()); + + return result; + } + + /** Validates protocol and drops protocol/commitInfo columns if not requested. */ + private static ColumnarBatch validateProtocolAndDropInternalColumns( + ColumnarBatch batch, + String tablePath, + boolean shouldDropProtocolColumn, + boolean shouldDropCommitInfoColumn) { + + // Validate protocol if present in the batch. + int protocolIdx = batch.getSchema().indexOf("protocol"); + Preconditions.checkState(protocolIdx >= 0, "protocol column must be present in readSchema"); + ColumnVector protocolVector = batch.getColumnVector(protocolIdx); + for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) { + if (!protocolVector.isNullAt(rowId)) { + Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId); + TableFeatures.validateKernelCanReadTheTable(protocol, tablePath); + } + } + + // Drop columns if not requested + ColumnarBatch result = batch; + if (shouldDropProtocolColumn && protocolIdx >= 0) { + result = result.withDeletedColumnAt(protocolIdx); + } + + int commitInfoIdx = result.getSchema().indexOf("commitInfo"); + if (shouldDropCommitInfoColumn && commitInfoIdx >= 0) { + result = result.withDeletedColumnAt(commitInfoIdx); + } + + return result; + } + + /** + * Closes this CommitActionsImpl and releases any underlying resources. + * + * @throws IOException if an I/O error occurs while closing resources + */ + @Override + public synchronized void close() throws IOException { + Utils.closeCloseables(iterator); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java index e785731cf1a..465f8330546 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java @@ -18,20 +18,16 @@ import static io.delta.kernel.internal.DeltaErrors.*; import static io.delta.kernel.internal.fs.Path.getName; import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.CommitActions; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.InvalidTableException; import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.exceptions.TableNotFoundException; -import io.delta.kernel.expressions.ExpressionEvaluator; -import io.delta.kernel.expressions.Literal; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; -import io.delta.kernel.internal.replay.ActionsIterator; -import io.delta.kernel.internal.tablefeatures.TableFeatures; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.FileNames.DeltaLogFileType; import io.delta.kernel.internal.util.Tuple2; @@ -139,60 +135,6 @@ public static List getCommitFilesForVersionRange( return commitFiles; } - /** - * Read the given commitFiles and return the contents as an iterator of batches. Also adds two - * columns "version" and "timestamp" that store the commit version and timestamp for the commit - * file that the batch was read from. The "version" and "timestamp" columns are the first and - * second columns in the returned schema respectively and both of {@link LongType} - * - * @param commitFiles list of delta commit files to read - * @param readSchema JSON schema to read - * @return an iterator over the contents of the files in the same order as the provided files - */ - public static CloseableIterator readCommitFiles( - Engine engine, List commitFiles, StructType readSchema) { - return new ActionsIterator(engine, commitFiles, readSchema, Optional.empty()) - .map( - actionWrapper -> { - long timestamp = - actionWrapper - .getTimestamp() - .orElseThrow( - () -> - new RuntimeException("Commit files should always have a timestamp")); - ExpressionEvaluator commitVersionGenerator = - wrapEngineException( - () -> - engine - .getExpressionHandler() - .getEvaluator( - readSchema, - Literal.ofLong(actionWrapper.getVersion()), - LongType.LONG), - "Get the expression evaluator for the commit version"); - ExpressionEvaluator commitTimestampGenerator = - wrapEngineException( - () -> - engine - .getExpressionHandler() - .getEvaluator(readSchema, Literal.ofLong(timestamp), LongType.LONG), - "Get the expression evaluator for the commit timestamp"); - ColumnVector commitVersionVector = - wrapEngineException( - () -> commitVersionGenerator.eval(actionWrapper.getColumnarBatch()), - "Evaluating the commit version expression"); - ColumnVector commitTimestampVector = - wrapEngineException( - () -> commitTimestampGenerator.eval(actionWrapper.getColumnarBatch()), - "Evaluating the commit timestamp expression"); - - return actionWrapper - .getColumnarBatch() - .withNewColumn(0, COMMIT_VERSION_STRUCT_FIELD, commitVersionVector) - .withNewColumn(1, COMMIT_TIMESTAMP_STRUCT_FIELD, commitTimestampVector); - }); - } - /** * Returns a {@link CloseableIterator} of files of type $fileTypes in the _delta_log directory of * the given $tablePath, in increasing order from $startVersion to the optional $endVersion. @@ -318,80 +260,26 @@ public static CloseableIterator listDeltaLogFilesAsIter( } /** - * Returns the delta actions from the delta files provided in commitFiles. Reads and returns the - * actions requested in actionSet. In addition, this function does a few key things: + * Returns CommitActions for each commit file. CommitActions are ordered by increasing version. * - *
    - *
  • Performs protocol validations: we always read the protocol action. If we see a protocol - * action, we validate that it is compatible with Kernel. If the protocol action was not - * requested in actionSet, we remove it from the returned columnar batches. - *
  • Adds commit version column: the first column in the returned batches will be the commit - * version - *
  • Add commit timestamp column: the second column in the returned batches will be the - * timestamp column. This timestamp is the inCommitTimestamp if it is available, otherwise - * it is the file modification time for the commit file. - *
- * - *

For the returned columnar batches: + *

This function automatically: * *

    - *
  • Each row within the same batch is guaranteed to have the same commit version - *
  • The batch commit versions are monotonically increasing - *
  • The top-level columns include "version", "timestamp", and the actions requested in - * actionSet. "version" and "timestamp" are the first and second columns in the schema, - * respectively. The remaining columns are based on the actions requested and each have the - * schema found in {@code DeltaAction.schema}. - *
  • It is possible for a row to be all null + *
  • Performs protocol validation by reading and validating the protocol action + *
  • Extracts commit timestamp using inCommitTimestamp if available, otherwise file + * modification time + *
  • Filters out protocol and commitInfo actions if not requested in actionSet *
*/ - public static CloseableIterator getActionsFromCommitFilesWithProtocolValidation( + public static CloseableIterator getActionsFromCommitFilesWithProtocolValidation( Engine engine, String tablePath, List commitFiles, Set actionSet) { - // Create a new action set which is a super set of the requested actions. - // The extra actions are needed either for checks or to extract - // extra information. We will strip out the extra actions before - // returning the result. - Set copySet = new HashSet<>(actionSet); - copySet.add(DeltaLogActionUtils.DeltaAction.PROTOCOL); - // commitInfo is needed to extract the inCommitTimestamp of delta files, this is used in - // ActionsIterator to resolve the timestamp when available - copySet.add(DeltaLogActionUtils.DeltaAction.COMMITINFO); - // Determine whether the additional actions were in the original set. - boolean shouldDropProtocolColumn = - !actionSet.contains(DeltaLogActionUtils.DeltaAction.PROTOCOL); - boolean shouldDropCommitInfoColumn = - !actionSet.contains(DeltaLogActionUtils.DeltaAction.COMMITINFO); - - StructType readSchema = - new StructType( - copySet.stream() - .map(action -> new StructField(action.colName, action.schema, true)) - .collect(Collectors.toList())); - logger.info("{}: Reading the commit files with readSchema {}", tablePath, readSchema); - - return DeltaLogActionUtils.readCommitFiles(engine, commitFiles, readSchema) - .map( - batch -> { - int protocolIdx = batch.getSchema().indexOf("protocol"); // must exist - ColumnVector protocolVector = batch.getColumnVector(protocolIdx); - for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) { - if (!protocolVector.isNullAt(rowId)) { - Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId); - TableFeatures.validateKernelCanReadTheTable(protocol, tablePath); - } - } - ColumnarBatch batchToReturn = batch; - if (shouldDropProtocolColumn) { - batchToReturn = batchToReturn.withDeletedColumnAt(protocolIdx); - } - int commitInfoIdx = batchToReturn.getSchema().indexOf("commitInfo"); - if (shouldDropCommitInfoColumn) { - batchToReturn = batchToReturn.withDeletedColumnAt(commitInfoIdx); - } - return batchToReturn; - }); + + // For each commit file, create a CommitActions + return toCloseableIterator(commitFiles.iterator()) + .map(commitFile -> new CommitActionsImpl(engine, commitFile, tablePath, actionSet)); } ////////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableChangesUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableChangesUtils.java new file mode 100644 index 00000000000..7f64ab2ed83 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableChangesUtils.java @@ -0,0 +1,121 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal; + +import static io.delta.kernel.internal.DeltaErrors.wrapEngineException; + +import io.delta.kernel.CommitActions; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; + +/** Utility class for table changes operations. */ +public class TableChangesUtils { + + /** Column name for the version metadata column added to getActions results. */ + public static final String VERSION_COLUMN_NAME = "version"; + + /** Column name for the timestamp metadata column added to getActions results. */ + public static final String TIMESTAMP_COLUMN_NAME = "timestamp"; + + /** StructField for the version metadata column. */ + private static final StructField VERSION_STRUCT_FIELD = + new StructField(VERSION_COLUMN_NAME, LongType.LONG, false); + + /** StructField for the timestamp metadata column. */ + private static final StructField TIMESTAMP_STRUCT_FIELD = + new StructField(TIMESTAMP_COLUMN_NAME, LongType.LONG, false); + + private TableChangesUtils() {} + + /** + * Adds version and timestamp columns to a columnar batch. + * + *

The version and timestamp columns are added as the first two columns in the batch. + * + * @param engine the engine for expression evaluation + * @param batch the original batch + * @param version the version value to add + * @param timestamp the timestamp value to add + * @return a new batch with version and timestamp columns prepended + */ + public static ColumnarBatch addVersionAndTimestampColumns( + Engine engine, ColumnarBatch batch, long version, long timestamp) { + StructType schemaForEval = batch.getSchema(); + + ExpressionEvaluator commitVersionGenerator = + wrapEngineException( + () -> + engine + .getExpressionHandler() + .getEvaluator(schemaForEval, Literal.ofLong(version), LongType.LONG), + "Get the expression evaluator for the commit version"); + + ExpressionEvaluator commitTimestampGenerator = + wrapEngineException( + () -> + engine + .getExpressionHandler() + .getEvaluator(schemaForEval, Literal.ofLong(timestamp), LongType.LONG), + "Get the expression evaluator for the commit timestamp"); + + ColumnVector commitVersionVector = + wrapEngineException( + () -> commitVersionGenerator.eval(batch), "Evaluating the commit version expression"); + + ColumnVector commitTimestampVector = + wrapEngineException( + () -> commitTimestampGenerator.eval(batch), + "Evaluating the commit timestamp expression"); + + return batch + .withNewColumn(0, VERSION_STRUCT_FIELD, commitVersionVector) + .withNewColumn(1, TIMESTAMP_STRUCT_FIELD, commitTimestampVector); + } + + /** + * Flattens an iterator of CommitActions into an iterator of ColumnarBatch, adding version and + * timestamp columns to each batch. + * + * @param engine the engine for expression evaluation + * @param commits the iterator of CommitActions to flatten + * @return an iterator of ColumnarBatch with version and timestamp columns added + */ + public static CloseableIterator flattenCommitsAndAddMetadata( + Engine engine, CloseableIterator commits) { + CloseableIterator> nestedIterator = + commits.map( + commit -> { + long version = commit.getVersion(); + long timestamp = commit.getTimestamp(); + CloseableIterator actions = commit.getActions(); + + // Map each batch to add version and timestamp columns + return actions.map( + batch -> addVersionAndTimestampColumns(engine, batch, version, timestamp)); + }); + + return Utils.flatten(nestedIterator); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 1ec67564b34..a5f449995c7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -193,8 +193,13 @@ public CloseableIterator getChanges( DeltaLogActionUtils.getCommitFilesForVersionRange( engine, new Path(tablePath), startVersion, Optional.of(endVersion)); - return DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation( - engine, tablePath, commitFiles, actionSet); + // Get CommitActions for each file + CloseableIterator commits = + DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation( + engine, tablePath, commitFiles, actionSet); + + // Flatten and add version/timestamp columns + return TableChangesUtils.flattenCommitsAndAddMetadata(engine, commits); } protected Path getDataPath() { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumUtils.java index b1250256463..10899b4cae5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumUtils.java @@ -23,13 +23,15 @@ import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.engine.Engine; -import io.delta.kernel.internal.DeltaLogActionUtils; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.replay.ActionWrapper; +import io.delta.kernel.internal.replay.ActionsIterator; import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.stats.FileSizeHistogram; import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; import java.io.IOException; @@ -55,20 +57,8 @@ private ChecksumUtils() {} private static final int DOMAIN_METADATA_INDEX = CHECKPOINT_SCHEMA.indexOf("domainMetadata"); private static final int ADD_SIZE_INDEX = AddFile.FULL_SCHEMA.indexOf("size"); private static final int REMOVE_SIZE_INDEX = RemoveFile.FULL_SCHEMA.indexOf("size"); - - // DeltaLogActionUtils.readCommitFiles will add first two columns: version and commit timestamp. - private static final int INCR_READ_COLUMN_INDEX_OFFSET = 2; - private static final int INCR_READ_VERSION_INDEX = 0; - private static final int INCR_READ_ADD_INDEX = ADD_INDEX + INCR_READ_COLUMN_INDEX_OFFSET; - private static final int INCR_READ_REMOVE_INDEX = REMOVE_INDEX + INCR_READ_COLUMN_INDEX_OFFSET; - private static final int INCR_READ_METADATA_INDEX = - METADATA_INDEX + INCR_READ_COLUMN_INDEX_OFFSET; - private static final int INCR_READ_PROTOCOL_INDEX = - PROTOCOL_INDEX + INCR_READ_COLUMN_INDEX_OFFSET; - private static final int INCR_READ_DOMAIN_METADATA_INDEX = - DOMAIN_METADATA_INDEX + INCR_READ_COLUMN_INDEX_OFFSET; - private static final int INCR_READ_COMMIT_INFO_INDEX = - CHECKPOINT_SCHEMA.length() + INCR_READ_COLUMN_INDEX_OFFSET; + // commitInfo is appended after CHECKPOINT_SCHEMA in incremental read + private static final int COMMIT_INFO_INDEX = CHECKPOINT_SCHEMA.length(); private static final Set INCREMENTAL_SUPPORTED_OPS = Collections.unmodifiableSet( @@ -262,28 +252,26 @@ private static Optional buildCrcInfoIncrementally( validateDeltaContinuity(deltaFiles, lastSeenCrcInfo.getVersion()); Collections.reverse(deltaFiles); // Create iterator for delta files newer than last CRC - try (CloseableIterator iterator = - DeltaLogActionUtils.readCommitFiles( - engine, deltaFiles, CHECKPOINT_SCHEMA.add("commitInfo", CommitInfo.FULL_SCHEMA))) { + StructType readSchema = CHECKPOINT_SCHEMA.add("commitInfo", CommitInfo.FULL_SCHEMA); + try (CloseableIterator iterator = + new ActionsIterator(engine, deltaFiles, readSchema, java.util.Optional.empty())) { Optional lastSeenVersion = Optional.empty(); - while (iterator.hasNext()) { - ColumnarBatch batch = iterator.next(); + ActionWrapper currentAction = iterator.next(); + ColumnarBatch batch = currentAction.getColumnarBatch(); final int rowCount = batch.getSize(); if (rowCount == 0) { continue; } - - ColumnVector versionVector = batch.getColumnVector(INCR_READ_VERSION_INDEX); - ColumnVector addVector = batch.getColumnVector(INCR_READ_ADD_INDEX); - ColumnVector removeVector = batch.getColumnVector(INCR_READ_REMOVE_INDEX); - ColumnVector metadataVector = batch.getColumnVector(INCR_READ_METADATA_INDEX); - ColumnVector protocolVector = batch.getColumnVector(INCR_READ_PROTOCOL_INDEX); - ColumnVector domainMetadataVector = batch.getColumnVector(INCR_READ_DOMAIN_METADATA_INDEX); - ColumnVector commitInfoVector = batch.getColumnVector(INCR_READ_COMMIT_INFO_INDEX); + ColumnVector addVector = batch.getColumnVector(ADD_INDEX); + ColumnVector removeVector = batch.getColumnVector(REMOVE_INDEX); + ColumnVector metadataVector = batch.getColumnVector(METADATA_INDEX); + ColumnVector protocolVector = batch.getColumnVector(PROTOCOL_INDEX); + ColumnVector domainMetadataVector = batch.getColumnVector(DOMAIN_METADATA_INDEX); + ColumnVector commitInfoVector = batch.getColumnVector(COMMIT_INFO_INDEX); for (int i = 0; i < rowCount; i++) { - long newVersion = versionVector.getLong(i); + long newVersion = currentAction.getVersion(); // Detect version change if (!lastSeenVersion.isPresent() || newVersion != lastSeenVersion.get()) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java index ad62e23232a..fbeed3ccc43 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/commitrange/CommitRangeImpl.java @@ -19,12 +19,14 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import io.delta.kernel.CommitActions; import io.delta.kernel.CommitRange; import io.delta.kernel.CommitRangeBuilder; import io.delta.kernel.Snapshot; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.DeltaLogActionUtils; +import io.delta.kernel.internal.TableChangesUtils; import io.delta.kernel.internal.annotation.VisibleForTesting; import io.delta.kernel.internal.files.ParsedDeltaData; import io.delta.kernel.internal.fs.Path; @@ -96,13 +98,32 @@ public List getDeltaFiles() { @Override public CloseableIterator getActions( Engine engine, Snapshot startSnapshot, Set actionSet) { + validateParameters(engine, startSnapshot, actionSet); + // Build on top of getCommitActions() by flattening and adding version/timestamp columns + CloseableIterator commits = getCommitActions(engine, startSnapshot, actionSet); + + return TableChangesUtils.flattenCommitsAndAddMetadata(engine, commits); + } + + @Override + public CloseableIterator getCommitActions( + Engine engine, Snapshot startSnapshot, Set actionSet) { + validateParameters(engine, startSnapshot, actionSet); + return DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation( + engine, dataPath.toString(), getDeltaFiles(), actionSet); + } + + ////////////////////// + // Private helpers // + ////////////////////// + + private void validateParameters( + Engine engine, Snapshot startSnapshot, Set actionSet) { requireNonNull(engine, "engine cannot be null"); requireNonNull(startSnapshot, "startSnapshot cannot be null"); requireNonNull(actionSet, "actionSet cannot be null"); checkArgument( startSnapshot.getVersion() == startVersion, "startSnapshot must have version = startVersion"); - return DeltaLogActionUtils.getActionsFromCommitFilesWithProtocolValidation( - engine, dataPath.toString(), getDeltaFiles(), actionSet); } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala index 5543397b1af..4f4384f043f 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/TableChangesSuite.scala @@ -23,7 +23,7 @@ import scala.collection.immutable import io.delta.golden.GoldenTableUtils.goldenTablePath import io.delta.kernel.{Table, TableManager} import io.delta.kernel.CommitRangeBuilder.CommitBoundary -import io.delta.kernel.data.ColumnarBatch +import io.delta.kernel.data.{ColumnarBatch, ColumnVector} import io.delta.kernel.data.Row import io.delta.kernel.defaults.utils.{TestUtils, WriteUtils} import io.delta.kernel.engine.Engine @@ -34,6 +34,7 @@ import io.delta.kernel.internal.TableImpl import io.delta.kernel.internal.actions.{AddCDCFile, AddFile, CommitInfo, Metadata, Protocol, RemoveFile} import io.delta.kernel.internal.fs.Path import io.delta.kernel.internal.util.{FileNames, ManualClock, VectorUtils} +import io.delta.kernel.types.{DataType, LongType, StructField} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.actions.{Action => SparkAction, AddCDCFile => SparkAddCDCFile, AddFile => SparkAddFile, CommitInfo => SparkCommitInfo, Metadata => SparkMetadata, Protocol => SparkProtocol, RemoveFile => SparkRemoveFile, SetTransaction => SparkSetTransaction} @@ -263,6 +264,260 @@ class CommitRangeTableChangesSuite extends TableChangesSuite { testGetChangesVsSpark(tablePath, 0, 2, FULL_ACTION_SET) } } + + test("getCommitActions returns CommitActions with correct version and timestamp") { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + + // Create commits with known modification times + (0 to 2).foreach { i => + spark.range(10).write.format("delta").mode("append").save(tablePath) + } + + // Set custom modification times on delta files + val logPath = new Path(dir.getCanonicalPath, "_delta_log") + val delta0 = new File(FileNames.deltaFile(logPath, 0)) + val delta1 = new File(FileNames.deltaFile(logPath, 1)) + val delta2 = new File(FileNames.deltaFile(logPath, 2)) + delta0.setLastModified(1000) + delta1.setLastModified(2000) + delta2.setLastModified(3000) + + val commitRange = TableManager.loadCommitRange(tablePath) + .withStartBoundary(CommitBoundary.atVersion(0)) + .withEndBoundary(CommitBoundary.atVersion(2)) + .build(defaultEngine) + + val actionSet = Set( + DeltaAction.ADD, + DeltaAction.REMOVE, + DeltaAction.METADATA, + DeltaAction.PROTOCOL, + DeltaAction.CDC) + + val commitsIter = commitRange.getCommitActions( + defaultEngine, + getTableManagerAdapter.getSnapshotAtVersion(defaultEngine, tablePath, 0), + actionSet.asJava) + + val commits = commitsIter.toSeq + assert(commits.size == 3) + + // Verify versions + assert(commits(0).getVersion == 0) + assert(commits(1).getVersion == 1) + assert(commits(2).getVersion == 2) + + // Verify timestamps match file modification times (no ICT in this table) + assert(commits(0).getTimestamp == 1000) + assert(commits(1).getTimestamp == 2000) + assert(commits(2).getTimestamp == 3000) + + // Get Spark's results for comparison + val sparkChanges = DeltaLog.forTable(spark, tablePath) + .getChanges(0) + + // Compare actions with Spark using the new compareCommitActions method + compareCommitActions(commits, pruneSparkActionsByActionSet(sparkChanges, actionSet)) + + commitsIter.close() + } + } + + test("getCommitActions with ICT returns timestamp from inCommitTimestamp") { + withTempDirAndEngine { (tablePath, engine) => + val startTime = 5000L + val clock = new ManualClock(startTime) + + // Version 0 with ICT=5000L + appendData( + engine, + tablePath, + isNewTable = true, + testSchema, + data = immutable.Seq(Map.empty[String, Literal] -> dataBatches1), + clock = clock, + tableProperties = Map("delta.enableInCommitTimestamps" -> "true")) + + // Version 1 with ICT=6000L + clock.setTime(startTime + 1000) + appendData( + engine, + tablePath, + data = immutable.Seq(Map.empty[String, Literal] -> (dataBatches1 ++ dataBatches2)), + clock = clock) + + // Version 2 with ICT=7000L + clock.setTime(startTime + 2000) + appendData( + engine, + tablePath, + data = immutable.Seq(Map.empty[String, Literal] -> dataBatches1), + clock = clock) + + val commitRange = TableManager.loadCommitRange(tablePath) + .withStartBoundary(CommitBoundary.atVersion(0)) + .withEndBoundary(CommitBoundary.atVersion(2)) + .build(defaultEngine) + + val actionSet = Set( + DeltaAction.ADD, + DeltaAction.REMOVE, + DeltaAction.METADATA, + DeltaAction.PROTOCOL, + DeltaAction.CDC) + + val commitsIter = commitRange.getCommitActions( + engine, + getTableManagerAdapter.getSnapshotAtVersion(engine, tablePath, 0), + actionSet.asJava) + + val commits = commitsIter.toSeq + assert(commits.size == 3) + + // Verify versions + assert(commits(0).getVersion == 0) + assert(commits(1).getVersion == 1) + assert(commits(2).getVersion == 2) + + // Verify timestamps come from ICT, not file modification times + // The file modification times would be much larger (current epoch time) + // but our ICT values are in the 5000-7000 range + assert(commits(0).getTimestamp == 5000) + assert(commits(1).getTimestamp == 6000) + assert(commits(2).getTimestamp == 7000) + + // Get Spark's results for comparison + val sparkChanges = DeltaLog.forTable(spark, tablePath) + .getChanges(0) + compareCommitActions(commits, pruneSparkActionsByActionSet(sparkChanges, actionSet)) + commitsIter.close() + } + } + + test("getCommitActions can be called multiple times and returns same results") { + withTempDir { dir => + val tablePath = dir.getCanonicalPath + + // Create some commits + (0 to 2).foreach { i => + spark.range(10).write.format("delta").mode("append").save(tablePath) + } + + val commitRange = TableManager.loadCommitRange(tablePath) + .withStartBoundary(CommitBoundary.atVersion(0)) + .withEndBoundary(CommitBoundary.atVersion(2)) + .build(defaultEngine) + + val actionSet = Set( + DeltaAction.ADD, + DeltaAction.REMOVE, + DeltaAction.METADATA, + DeltaAction.PROTOCOL, + DeltaAction.CDC) + + // Call getCommitActions multiple times + val commits1 = commitRange.getCommitActions( + defaultEngine, + getTableManagerAdapter.getSnapshotAtVersion(defaultEngine, tablePath, 0), + actionSet.asJava).toSeq + + val commits2 = commitRange.getCommitActions( + defaultEngine, + getTableManagerAdapter.getSnapshotAtVersion(defaultEngine, tablePath, 0), + actionSet.asJava).toSeq + + val commits3 = commitRange.getCommitActions( + defaultEngine, + getTableManagerAdapter.getSnapshotAtVersion(defaultEngine, tablePath, 0), + actionSet.asJava).toSeq + + // Verify all calls return the same number of commits + assert(commits1.size == 3) + assert(commits2.size == 3) + assert(commits3.size == 3) + + // Verify each call returns the same actions by comparing with Spark + val sparkChanges = DeltaLog.forTable(spark, tablePath) + .getChanges(0) + .filter(_._1 <= 2) + compareCommitActions(commits1, pruneSparkActionsByActionSet(sparkChanges, actionSet)) + + // For commits2 and commits3, we need fresh Spark iterators + val sparkChanges2 = DeltaLog.forTable(spark, tablePath) + .getChanges(0) + .filter(_._1 <= 2) + compareCommitActions(commits2, pruneSparkActionsByActionSet(sparkChanges2, actionSet)) + + val sparkChanges3 = DeltaLog.forTable(spark, tablePath) + .getChanges(0) + .filter(_._1 <= 2) + compareCommitActions(commits3, pruneSparkActionsByActionSet(sparkChanges3, actionSet)) + + // Also verify that calling getActions on each CommitActions multiple times works + commits1.foreach { commit => + val actions1 = commit.getActions.toSeq + val actions2 = commit.getActions.toSeq + assert(actions1.size == actions2.size) + // Verify the schemas are the same + actions1.zip(actions2).foreach { case (batch1, batch2) => + assert(batch1.getSchema.equals(batch2.getSchema)) + assert(batch1.getSize == batch2.getSize) + } + } + } + } + + test("getCommitActions with empty commit file") { + withTempDirAndEngine { (tablePath, engine) => + // Create a table with an initial commit + appendData( + engine, + tablePath, + isNewTable = true, + testSchema, + data = immutable.Seq(Map.empty[String, Literal] -> dataBatches1)) + + // Create an empty commit file at version 1 using commitUnsafe with no actions + import org.apache.spark.sql.delta.DeltaLog + val deltaLog = DeltaLog.forTable(spark, tablePath) + val txn = deltaLog.startTransaction() + txn.commitUnsafe(tablePath, 1) + + val commitRange = TableManager.loadCommitRange(tablePath) + .withStartBoundary(CommitBoundary.atVersion(0)) + .withEndBoundary(CommitBoundary.atVersion(1)) + .build(defaultEngine) + + val actionSet = Set( + DeltaAction.ADD, + DeltaAction.REMOVE, + DeltaAction.METADATA, + DeltaAction.PROTOCOL, + DeltaAction.CDC) + + val commitsIter = commitRange.getCommitActions( + engine, + getTableManagerAdapter.getSnapshotAtVersion(engine, tablePath, 0), + actionSet.asJava) + + val commits = commitsIter.toSeq + assert(commits.size == 2) // Version 0 and version 1 + + // Version 0 should have actions (normal commit) + val v0Actions = commits(0).getActions.toSeq + assert(v0Actions.nonEmpty) + + // Version 1 (empty commit) should return empty actions + val v1Actions = commits(1).getActions.toSeq + val totalRows = v1Actions.map(_.getSize).sum + assert(totalRows == 0, s"Empty commit file should have no actions, but got $totalRows rows") + + // Can call getActions multiple times on empty commit + val v1ActionsTwice = commits(1).getActions.toSeq + assert(v1ActionsTwice.map(_.getSize).sum == 0) + } + } } abstract class TableChangesSuite extends AnyFunSuite with TestUtils with WriteUtils { @@ -676,8 +931,8 @@ abstract class TableChangesSuite extends AnyFunSuite with TestUtils with WriteUt size: Long, tags: Map[String, String]) extends StandardAction - def standardizeKernelAction(row: Row): Option[StandardAction] = { - val actionIdx = (2 until row.getSchema.length()).find(!row.isNullAt(_)).getOrElse( + def standardizeKernelAction(row: Row, startIdx: Int = 2): Option[StandardAction] = { + val actionIdx = (startIdx until row.getSchema.length()).find(!row.isNullAt(_)).getOrElse( return None) row.getSchema.at(actionIdx).getName match { @@ -829,6 +1084,36 @@ abstract class TableChangesSuite extends AnyFunSuite with TestUtils with WriteUt } } + /** + * Compare actions from CommitActions objects directly with Spark actions. + * This automatically extracts version from each commit and standardizes the batches. + */ + def compareCommitActions( + commits: Seq[io.delta.kernel.CommitActions], + sparkActions: Iterator[(Long, Seq[SparkAction])]): Unit = { + // Directly convert CommitActions to StandardActions without adding columns + val standardKernelActions: Seq[(Long, StandardAction)] = commits.flatMap { commit => + val version = commit.getVersion + commit.getActions.toSeq.flatMap { batch => + batch.getRows.toSeq + .map(row => (version, standardizeKernelAction(row, startIdx = 0))) + .filter(_._2.nonEmpty) + .map(t => (t._1, t._2.get)) + } + } + + val standardSparkActions: Seq[(Long, StandardAction)] = + sparkActions.flatMap { case (version, actions) => + actions.map(standardizeSparkAction(_)).flatten.map((version, _)) + }.toSeq + + assert( + standardKernelActions.sameElements(standardSparkActions), + s"Kernel actions did not match Spark actions.\n" + + s"Kernel actions: ${standardKernelActions.take(5)}\n" + + s"Spark actions: ${standardSparkActions.take(5)}") + } + def compareActions( kernelActions: Seq[ColumnarBatch], sparkActions: Iterator[(Long, Seq[SparkAction])]): Unit = {