Skip to content

Commit 1a1945b

Browse files
authored
REST: Reconcile on CommitStateUnknown for simple update (#14320)
* Reconcile on CommitStateUnknown for simple update * fix check style error * fix checking style error * address comments * address comments * add reconcileOnSimpleUpdate method * remove config * remove empty line * remove un-needed test * add comment * spotlessApply * add TestCommitStateUnknownNotReconciled
1 parent b987e60 commit 1a1945b

File tree

2 files changed

+170
-2
lines changed

2 files changed

+170
-2
lines changed

core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,14 @@
2626
import java.util.function.Supplier;
2727
import org.apache.iceberg.LocationProviders;
2828
import org.apache.iceberg.MetadataUpdate;
29+
import org.apache.iceberg.SnapshotRef;
2930
import org.apache.iceberg.TableMetadata;
3031
import org.apache.iceberg.TableOperations;
3132
import org.apache.iceberg.TableProperties;
3233
import org.apache.iceberg.UpdateRequirement;
3334
import org.apache.iceberg.UpdateRequirements;
3435
import org.apache.iceberg.encryption.EncryptionManager;
36+
import org.apache.iceberg.exceptions.CommitStateUnknownException;
3537
import org.apache.iceberg.io.FileIO;
3638
import org.apache.iceberg.io.LocationProvider;
3739
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -155,20 +157,90 @@ public void commit(TableMetadata base, TableMetadata metadata) {
155157
// the error handler will throw necessary exceptions like CommitFailedException and
156158
// UnknownCommitStateException
157159
// TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler
158-
LoadTableResponse response =
159-
client.post(path, request, LoadTableResponse.class, headers, errorHandler);
160+
LoadTableResponse response;
161+
try {
162+
response = client.post(path, request, LoadTableResponse.class, headers, errorHandler);
163+
} catch (CommitStateUnknownException e) {
164+
// Lightweight reconciliation for snapshot-add-only updates on transient unknown commit state
165+
if (updateType == UpdateType.SIMPLE && reconcileOnSimpleUpdate(updates, e)) {
166+
return;
167+
}
168+
169+
throw e;
170+
}
160171

161172
// all future commits should be simple commits
162173
this.updateType = UpdateType.SIMPLE;
163174

164175
updateCurrentMetadata(response);
165176
}
166177

178+
/**
179+
* Attempt best-effort reconciliation for SIMPLE updates that only add a snapshot.
180+
*
181+
* <p>Returns true if the expected snapshot is observed in the refreshed table state. Returns
182+
* false if the expected snapshot cannot be determined, is not present after refresh, or if the
183+
* refresh fails. In case of refresh failure, the failure is recorded as suppressed on the
184+
* provided {@code original} exception to aid diagnostics.
185+
*/
186+
private boolean reconcileOnSimpleUpdate(
187+
List<MetadataUpdate> updates, CommitStateUnknownException original) {
188+
Long expectedSnapshotId = expectedSnapshotIdIfSnapshotAddOnly(updates);
189+
if (expectedSnapshotId == null) {
190+
return false;
191+
}
192+
193+
try {
194+
TableMetadata refreshed = refresh();
195+
return refreshed != null && refreshed.snapshot(expectedSnapshotId) != null;
196+
} catch (RuntimeException reconEx) {
197+
original.addSuppressed(reconEx);
198+
return false;
199+
}
200+
}
201+
167202
@Override
168203
public FileIO io() {
169204
return io;
170205
}
171206

207+
private static Long expectedSnapshotIdIfSnapshotAddOnly(List<MetadataUpdate> updates) {
208+
Long addedSnapshotId = null;
209+
Long mainRefSnapshotId = null;
210+
211+
for (MetadataUpdate update : updates) {
212+
if (update instanceof MetadataUpdate.AddSnapshot) {
213+
if (addedSnapshotId != null) {
214+
return null; // multiple snapshot adds -> not safe
215+
}
216+
addedSnapshotId = ((MetadataUpdate.AddSnapshot) update).snapshot().snapshotId();
217+
} else if (update instanceof MetadataUpdate.SetSnapshotRef) {
218+
MetadataUpdate.SetSnapshotRef setRef = (MetadataUpdate.SetSnapshotRef) update;
219+
if (!SnapshotRef.MAIN_BRANCH.equals(setRef.name())) {
220+
return null; // only allow main ref update
221+
}
222+
mainRefSnapshotId = setRef.snapshotId();
223+
} else {
224+
// any other update type makes this not a pure snapshot-add
225+
return null;
226+
}
227+
}
228+
229+
if (addedSnapshotId == null) {
230+
return null;
231+
}
232+
233+
if (mainRefSnapshotId != null && !addedSnapshotId.equals(mainRefSnapshotId)) {
234+
// Only handle "append to main" here. In this request, main is being set to a snapshot ID
235+
// that is different from the snapshot we just added (e.g., rollback or move main elsewhere).
236+
// In that case, finding the added snapshot in history doesn't tell us whether main moved to
237+
// it, so skip reconciliation.
238+
return null;
239+
}
240+
241+
return addedSnapshotId;
242+
}
243+
172244
private TableMetadata updateCurrentMetadata(LoadTableResponse response) {
173245
// LoadTableResponse is used to deserialize the response, but config is not allowed by the REST
174246
// spec so it can be

core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.iceberg.catalog.TableCommit;
6666
import org.apache.iceberg.catalog.TableIdentifier;
6767
import org.apache.iceberg.exceptions.CommitFailedException;
68+
import org.apache.iceberg.exceptions.CommitStateUnknownException;
6869
import org.apache.iceberg.exceptions.NotAuthorizedException;
6970
import org.apache.iceberg.exceptions.NotFoundException;
7071
import org.apache.iceberg.exceptions.ServiceFailureException;
@@ -2900,6 +2901,101 @@ public <T extends RESTResponse> T execute(
29002901
return catalog(adapter);
29012902
}
29022903

2904+
@Test
2905+
public void testReconcileOnUnknownSnapshotAddMatchesSnapshotId() {
2906+
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
2907+
2908+
RESTCatalog catalog =
2909+
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
2910+
catalog.initialize(
2911+
"test",
2912+
ImmutableMap.of(
2913+
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
2914+
2915+
if (requiresNamespaceCreate()) {
2916+
catalog.createNamespace(TABLE.namespace());
2917+
}
2918+
2919+
catalog.createTable(TABLE, SCHEMA);
2920+
2921+
// Simulate: server commits, but client receives CommitStateUnknown (transient 5xx)
2922+
Mockito.doAnswer(
2923+
invocation -> {
2924+
invocation.callRealMethod();
2925+
throw new CommitStateUnknownException(
2926+
new ServiceFailureException("Service failed: 503"));
2927+
})
2928+
.when(adapter)
2929+
.execute(
2930+
reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)),
2931+
eq(LoadTableResponse.class),
2932+
any(),
2933+
any());
2934+
2935+
Table table = catalog.loadTable(TABLE);
2936+
2937+
// Perform a snapshot-adding commit; should reconcile instead of failing
2938+
table.newFastAppend().appendFile(FILE_A).commit();
2939+
2940+
// Extract the snapshot id we attempted to commit from the request body
2941+
long expectedSnapshotId =
2942+
allRequests(adapter).stream()
2943+
.filter(
2944+
r -> r.method() == HTTPMethod.POST && r.path().equals(RESOURCE_PATHS.table(TABLE)))
2945+
.map(HTTPRequest::body)
2946+
.filter(UpdateTableRequest.class::isInstance)
2947+
.map(UpdateTableRequest.class::cast)
2948+
.map(
2949+
req ->
2950+
(MetadataUpdate.AddSnapshot)
2951+
req.updates().stream()
2952+
.filter(u -> u instanceof MetadataUpdate.AddSnapshot)
2953+
.findFirst()
2954+
.orElseThrow())
2955+
.map(add -> add.snapshot().snapshotId())
2956+
.findFirst()
2957+
.orElseThrow();
2958+
2959+
Table reloaded = catalog.loadTable(TABLE);
2960+
assertThat(reloaded.currentSnapshot()).isNotNull();
2961+
assertThat(reloaded.snapshot(expectedSnapshotId)).isNotNull();
2962+
}
2963+
2964+
@Test
2965+
public void testCommitStateUnknownNotReconciled() {
2966+
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
2967+
2968+
RESTCatalog catalog =
2969+
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
2970+
catalog.initialize(
2971+
"test",
2972+
ImmutableMap.of(
2973+
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));
2974+
2975+
if (requiresNamespaceCreate()) {
2976+
catalog.createNamespace(TABLE.namespace());
2977+
}
2978+
2979+
catalog.createTable(TABLE, SCHEMA);
2980+
2981+
// Simulate: server returns CommitStateUnknown and does NOT apply the commit
2982+
Mockito.doThrow(
2983+
new CommitStateUnknownException(new ServiceFailureException("Service failed: 503")))
2984+
.when(adapter)
2985+
.execute(
2986+
reqMatcher(HTTPMethod.POST, RESOURCE_PATHS.table(TABLE)),
2987+
eq(LoadTableResponse.class),
2988+
any(),
2989+
any());
2990+
2991+
Table table = catalog.loadTable(TABLE);
2992+
2993+
assertThatThrownBy(() -> table.newFastAppend().appendFile(FILE_A).commit())
2994+
.isInstanceOf(CommitStateUnknownException.class)
2995+
.hasMessageContaining("Cannot determine whether the commit was successful")
2996+
.satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty());
2997+
}
2998+
29032999
private RESTCatalog catalog(RESTCatalogAdapter adapter) {
29043000
RESTCatalog catalog =
29053001
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);

0 commit comments

Comments
 (0)