Skip to content
Draft

WIP #5293

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,19 @@

package io.delta.kernel.commit;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.TableConfig;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.files.ParsedDeltaData;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -77,4 +87,53 @@ public static Map<String, String> extractProtocolProperties(Protocol protocol) {

return properties;
}

/**
* Returns the published Delta file path for a catalog commit.
*
* @param logPath the path to the Delta log directory
* @param catalogCommit the catalog commit containing the version
* @return the path where the catalog commit should be published (e.g.,
* _delta_log/00000000000000000001.json)
*/
public static String getPublishedDeltaFilePath(String logPath, ParsedDeltaData catalogCommit) {
return FileNames.deltaFile(logPath, catalogCommit.getVersion());
}

/**
* Publishes a catalog commit to its final location in the Delta log.
*
* <p>This method copies a staged catalog commit to its published location using atomic
* PUT-if-absent semantics. If the file already exists at the destination, the operation completes
* successfully (idempotent).
*
* @param engine the engine to use for reading and writing files
* @param logPath the path to the Delta log directory
* @param catalogCommit the catalog commit to publish
* @throws UnsupportedOperationException if the catalog commit contains inline data
* @throws IOException if an I/O error occurs during publishing
*/
public static void publishCatalogCommit(
Engine engine, String logPath, ParsedDeltaData catalogCommit) throws IOException {
if (!catalogCommit.isFile()) {
throw new UnsupportedOperationException("Publishing inline catalog commits is not supported");
}

final FileStatus stagedFile = catalogCommit.getFileStatus();
final String publishedPath = getPublishedDeltaFilePath(logPath, catalogCommit);

try (CloseableIterator<FileStatus> fileIter = Utils.singletonCloseableIterator(stagedFile);
CloseableIterator<ColumnarBatch> batchIter =
engine
.getJsonHandler()
.readJsonFiles(fileIter, null /* physicalSchema */, null /* predicate */);
CloseableIterator<Row> rows = batchIter.flatMap(ColumnarBatch::getRows)) {

// Write to the published location with PUT-if-absent semantics
engine.getJsonHandler().writeJsonFileAtomically(publishedPath, rows, false /* overwrite */);
} catch (FileAlreadyExistsException e) {
// File already exists at destination - this is okay (idempotent operation)
// Just return successfully
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.utils.CloseableIterator;

/**
* Interface for committing changes to Delta tables, supporting both filesystem-managed and
* catalog-managed tables.
*/
/** Interface for commits and publishing operations for Delta tables. */
@Experimental
public interface Committer {

Expand Down Expand Up @@ -64,5 +61,41 @@ CommitResponse commit(
Engine engine, CloseableIterator<Row> finalizedActions, CommitMetadata commitMetadata)
throws CommitFailedException;

// TODO: API to get the required table properties
/**
* Publishes catalog commits to the Delta log. Applicable only to catalog-managed tables. For
* filesystem-managed tables, this method is a no-op.
*
* <p>Publishing is the act of copying ratified catalog commits to the Delta log as published
* Delta files (e.g., {@code _delta_log/00000000000000000001.json}).
*
* <p>The benefits of publishing include:
*
* <ul>
* <li>Reduces the number of commits the catalog needs to store internally and serve to readers
* <li>Enables table maintenance operations that must operate on published versions only, such
* as checkpointing and log compaction
* </ul>
*
* <p>Requirements:
*
* <ul>
* <li>This method must ensure that all catalog commits are published to the Delta log up to and
* including the snapshot version specified in {@code publishMetadata}
* <li>Commits must be published in order: version V-1 must be published before version V
* </ul>
*
* <p>Catalog-specific semantics: Each catalog implementation may specify its own rules and
* semantics for publishing, including whether it expects to be notified immediately upon
* publishing success, whether published deltas must appear with PUT-if-absent semantics in the
* Delta log, and whether publishing happens client-side or server-side.
*
* @param engine the {@link Engine} instance used for publishing commits
* @param publishMetadata the {@link PublishMetadata} containing the snapshot version up to which
* all catalog commits must be published, the log path, and list of catalog commits
*/
default void publish(Engine engine, PublishMetadata publishMetadata) {
if (!publishMetadata.getAscendingCatalogCommits().isEmpty()) {
throw new UnsupportedOperationException("Publishing not supported by this committer");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.commit;

import io.delta.kernel.internal.files.ParsedDeltaData;
import java.util.List;

/** Metadata required for publishing catalog commits to the Delta log. */
public class PublishMetadata {

private final long snapshotVersion;
private final String logPath;
private final List<ParsedDeltaData> ascendingCatalogCommits;

public PublishMetadata(
long snapshotVersion, String logPath, List<ParsedDeltaData> ascendingCatalogCommits) {
this.snapshotVersion = snapshotVersion;
this.logPath = logPath;
this.ascendingCatalogCommits = ascendingCatalogCommits;
}

/** @return the snapshot version up to which all catalog commits must be published */
public long getSnapshotVersion() {
return snapshotVersion;
}

/** @return the path to the Delta log directory */
public String getLogPath() {
return logPath;
}

/** @return the list of catalog commits to be published, in ascending order of version number */
public List<ParsedDeltaData> getAscendingCatalogCommits() {
return ascendingCatalogCommits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ public void close() throws IOException {
};
}

/**
* Applies the given mapper function to each element and concatenates the resulting iterators.
*
* @param mapper A function that maps each element to a {@link CloseableIterator}
* @param <U> The type of elements in the resulting iterator
* @return A {@link CloseableIterator} containing all elements from the mapped iterators
*/
default <U> CloseableIterator<U> flatMap(Function<T, CloseableIterator<U>> mapper) {
CloseableIterator<T> delegate = this;
return new CloseableIterator<U>() {
private CloseableIterator<U> currentIterator = null;

@Override
public boolean hasNext() {
while ((currentIterator == null || !currentIterator.hasNext()) && delegate.hasNext()) {
Utils.closeCloseablesSilently(currentIterator);
currentIterator = mapper.apply(delegate.next());
}
return currentIterator != null && currentIterator.hasNext();
}

@Override
public U next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return currentIterator.next();
}

@Override
public void close() throws IOException {
Utils.closeCloseables(currentIterator, delegate);
}
};
}

/**
* Returns a new {@link CloseableIterator} that includes only the elements of this iterator for
* which the given {@code mapper} function returns {@code true}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,60 @@ class CloseableIteratorSuite extends AnyFunSuite {
assert(toList(result) === List(1, 3))
}

test("CloseableIterator::flatMap -- simple case 1") {
val innerIter1 = toCloseableIter(Seq("a", "b"))
val innerIter2 = toCloseableIter(Seq("c"))
val innerIter3 = toCloseableIter(Seq("d", "e", "f"))
val iterOfIters = toCloseableIter(Seq(innerIter1, innerIter2, innerIter3))

val flattened = iterOfIters.flatMap(x => x)

assert(toList(flattened) === List("a", "b", "c", "d", "e", "f"))
}

test("CloseableIterator::flatMap -- simple case 2") {
val result = normalDataIter.flatMap { x => toCloseableIter(Seq(x, x * 10)) }
assert(toList(result) === List(1, 10, 2, 20, 3, 30, 4, 40, 5, 50))
}

test("CloseableIterator::flatMap -- handles empty inner iterators") {
val result = normalDataIter.flatMap { x =>
if (x % 2 == 0) {
toCloseableIter(Seq(x))
} else {
toCloseableIter(Seq.empty[Int])
}
}
assert(toList(result) === List(2, 4))
}

test("CloseableIterator::flatMap -- properly closes inner iterators") {
var closedCount = 0

val trackingIter = toCloseableIter(Seq(1, 2, 3)).flatMap { x =>
new CloseableIterator[Int] {
private val inner = toCloseableIter(Seq(x, x * 10))

override def hasNext: Boolean = inner.hasNext
override def next(): Int = inner.next()
override def close(): Unit = {
closedCount += 1
inner.close()
}
}
}

assert(toList(trackingIter) === List(1, 10, 2, 20, 3, 30)) // Consume all elements
assert(closedCount === 3) // Verify that all 3 inner iterators were closed
}

test("CloseableIterator::flatMap -- chains with other operations") {
val result = normalDataIter
.filter(x => x <= 3) // [1, 2, 3
.flatMap(x => toCloseableIter(Seq(x, x * 10))) // [1, 10, 2, 20, 3, 30]
.map(_ + 1) // [2, 11, 3, 21, 4, 31]

assert(toList(result) === List(2, 11, 3, 21, 4, 31))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ package io.delta.kernel.commit
import scala.collection.JavaConverters._

import io.delta.kernel.internal.actions.Protocol
import io.delta.kernel.internal.files.ParsedDeltaData
import io.delta.kernel.test.{MockFileSystemClientUtils, VectorTestUtils}

import org.scalatest.funsuite.AnyFunSuite

class CatalogCommitterUtilsSuite extends AnyFunSuite {
class CatalogCommitterUtilsSuite
extends AnyFunSuite
with MockFileSystemClientUtils
with VectorTestUtils {

test("extractProtocolProperties - legacy protocol (1, 2)") {
// ===== GIVEN =====
Expand Down Expand Up @@ -54,4 +59,29 @@ class CatalogCommitterUtilsSuite extends AnyFunSuite {
assert(properties("delta.feature.deletionVectors") === "supported")
assert(properties("delta.feature.appendOnly") === "supported")
}

test("getPublishedDeltaFilePath - generates correct path format") {
// ===== GIVEN =====
val logPath = "/path/to/table/_delta_log"
val catalogCommit = ParsedDeltaData.forFileStatus(stagedCommitFile(42))

// ===== WHEN =====
val publishedPath = CatalogCommitterUtils.getPublishedDeltaFilePath(logPath, catalogCommit)

// ===== THEN =====
assert(publishedPath === s"$logPath/00000000000000000042.json")
}

test("publishCatalogCommit - throws exception for inline catalog commit") {
// ===== GIVEN =====
val logPath = "/path/to/table/_delta_log"
val inlineCatalogCommit = ParsedDeltaData.forInlineData(10L, emptyColumnarBatch)

// ===== WHEN/THEN =====
val exception = intercept[UnsupportedOperationException] {
CatalogCommitterUtils.publishCatalogCommit(mockEngine(), logPath, inlineCatalogCommit)
}
assert(exception.getMessage === "Publishing inline catalog commits is not supported")
}

}
Loading
Loading