Skip to content

Commit 4b7e2af

Browse files
authored
[Kernel] Add CatalogCommitterUtils::extractProtocolProperties utility (#5243)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta/pull/5243/files) to review incremental changes. - [**stack/kernel_catalog_utils_protocol_properties**](#5243) [[Files changed](https://github.com/delta-io/delta/pull/5243/files)] --------- #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Today, delta-spark includes various Protocol-derived properties as its official Snapshot properties. This PR adds an API to extract this information for commiters, to, for example, send to their catalog server. ## How was this patch tested? New UTs. ## Does this PR introduce _any_ user-facing changes? No.
1 parent 2396042 commit 4b7e2af

File tree

4 files changed

+146
-1
lines changed

4 files changed

+146
-1
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/commit/CatalogCommitterUtils.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package io.delta.kernel.commit;
1818

19+
import io.delta.kernel.internal.TableConfig;
20+
import io.delta.kernel.internal.actions.Protocol;
1921
import io.delta.kernel.internal.tablefeatures.TableFeatures;
22+
import java.util.HashMap;
23+
import java.util.Map;
2024

2125
public class CatalogCommitterUtils {
22-
2326
private CatalogCommitterUtils() {}
2427

2528
/**
@@ -30,4 +33,48 @@ private CatalogCommitterUtils() {}
3033
public static final String CATALOG_MANAGED_ENABLEMENT_KEY =
3134
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX
3235
+ TableFeatures.CATALOG_MANAGED_R_W_FEATURE_PREVIEW.featureName();
36+
37+
/** Property key that specifies which version last updated the catalog entry. */
38+
public static final String METASTORE_LAST_UPDATE_VERSION = "delta.lastUpdateVersion";
39+
40+
/**
41+
* Property key that specifies the timestamp (in milliseconds since the Unix epoch) of the last
42+
* commit that updated the catalog entry.
43+
*/
44+
public static final String METASTORE_LAST_COMMIT_TIMESTAMP = "delta.lastCommitTimestamp";
45+
46+
/**
47+
* Extract protocol-related properties from the given protocol.
48+
*
49+
* <p>For a Protocol(3, 7) with reader features ["columnMapping", "deletionVectors"] and writer
50+
* features ["appendOnly", "columnMapping"], this would return properties like:
51+
*
52+
* <ul>
53+
* <li>delta.minReaderVersion: 3
54+
* <li>delta.minWriterVersion: 7
55+
* <li>delta.feature.columnMapping: supported
56+
* <li>delta.feature.deletionVectors: supported
57+
* <li>delta.feature.appendOnly: supported
58+
* </ul>
59+
*/
60+
public static Map<String, String> extractProtocolProperties(Protocol protocol) {
61+
final Map<String, String> properties = new HashMap<>();
62+
63+
properties.put(
64+
TableConfig.MIN_PROTOCOL_READER_VERSION_KEY,
65+
String.valueOf(protocol.getMinReaderVersion()));
66+
properties.put(
67+
TableConfig.MIN_PROTOCOL_WRITER_VERSION_KEY,
68+
String.valueOf(protocol.getMinWriterVersion()));
69+
70+
if (protocol.supportsReaderFeatures() || protocol.supportsWriterFeatures()) {
71+
for (String featureName : protocol.getReaderAndWriterFeatures()) {
72+
properties.put(
73+
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_PREFIX + featureName,
74+
TableFeatures.SET_TABLE_FEATURE_SUPPORTED_VALUE);
75+
}
76+
}
77+
78+
return properties;
79+
}
3380
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
*/
3232
public class TableConfig<T> {
3333

34+
public static final String MIN_PROTOCOL_READER_VERSION_KEY = "delta.minReaderVersion";
35+
36+
public static final String MIN_PROTOCOL_WRITER_VERSION_KEY = "delta.minWriterVersion";
37+
3438
//////////////////
3539
// TableConfigs //
3640
//////////////////

kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Protocol.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,59 @@ public Protocol(
115115
this.supportsWriterFeatures = TableFeatures.supportsWriterFeatures(minWriterVersion);
116116
}
117117

118+
/** @return The minimum reader version required for this protocol */
118119
public int getMinReaderVersion() {
119120
return minReaderVersion;
120121
}
121122

123+
/** @return The minimum writer version required for this protocol */
122124
public int getMinWriterVersion() {
123125
return minWriterVersion;
124126
}
125127

128+
/**
129+
* @return The set of explicitly specified reader features for this protocol. Will be empty if
130+
* this protocol does not support reader features.
131+
*/
126132
public Set<String> getReaderFeatures() {
127133
return readerFeatures;
128134
}
129135

136+
/**
137+
* @return The set of explicitly specified writer features for this protocol. Will be empty if
138+
* this protocol does not support writer features.
139+
*/
130140
public Set<String> getWriterFeatures() {
131141
return writerFeatures;
132142
}
133143

144+
/**
145+
* @return The combined set of all reader and writer features for this protocol. Will be empty if
146+
* this protocol does not support reader or writer features.
147+
*/
148+
public Set<String> getReaderAndWriterFeatures() {
149+
final Set<String> allFeatureNames = new HashSet<>();
150+
allFeatureNames.addAll(readerFeatures);
151+
allFeatureNames.addAll(writerFeatures);
152+
return allFeatureNames;
153+
}
154+
155+
/**
156+
* @return Whether this protocol supports explicitly specifying reader features, which occurs when
157+
* the minReaderVersion is greater than or equal to 3.
158+
*/
159+
public boolean supportsReaderFeatures() {
160+
return supportsReaderFeatures;
161+
}
162+
163+
/**
164+
* @return Whether this protocol supports explicitly specifying writer features, which occurs when
165+
* the minWriterVersion is greater than or equal to 7.
166+
*/
167+
public boolean supportsWriterFeatures() {
168+
return supportsWriterFeatures;
169+
}
170+
134171
@Override
135172
public String toString() {
136173
final StringBuilder sb = new StringBuilder("Protocol{");
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.commit
18+
19+
import scala.collection.JavaConverters._
20+
21+
import io.delta.kernel.internal.actions.Protocol
22+
23+
import org.scalatest.funsuite.AnyFunSuite
24+
25+
class CatalogCommitterUtilsSuite extends AnyFunSuite {
26+
27+
test("extractProtocolProperties - legacy protocol (1, 2)") {
28+
// ===== GIVEN =====
29+
val protocol = new Protocol(1, 2)
30+
31+
// ===== WHEN =====
32+
val properties = CatalogCommitterUtils.extractProtocolProperties(protocol).asScala
33+
34+
// ===== THEN =====
35+
assert(properties.size === 2)
36+
assert(properties("delta.minReaderVersion") === "1")
37+
assert(properties("delta.minWriterVersion") === "2")
38+
}
39+
40+
test("extractProtocolProperties - protocol with overlapping reader and writer features") {
41+
// ===== GIVEN =====
42+
val readerFeatures = Set("columnMapping", "deletionVectors")
43+
val writerFeatures = Set("columnMapping", "appendOnly") // Note: columnMapping overlaps
44+
val protocol = new Protocol(3, 7, readerFeatures.asJava, writerFeatures.asJava)
45+
46+
// ===== WHEN =====
47+
val properties = CatalogCommitterUtils.extractProtocolProperties(protocol).asScala
48+
49+
// ===== THEN =====
50+
assert(properties.size === 2 + 3) // minReader + minWriter + 3 unique features
51+
assert(properties("delta.minReaderVersion") === "3")
52+
assert(properties("delta.minWriterVersion") === "7")
53+
assert(properties("delta.feature.columnMapping") === "supported")
54+
assert(properties("delta.feature.deletionVectors") === "supported")
55+
assert(properties("delta.feature.appendOnly") === "supported")
56+
}
57+
}

0 commit comments

Comments
 (0)