diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 0176128ed576..7578628fd3a2 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; @@ -39,6 +40,7 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.view.View; @@ -69,7 +71,22 @@ public RESTCatalog(Function, RESTClient> clientBuilder) { public RESTCatalog( SessionCatalog.SessionContext context, Function, RESTClient> clientBuilder) { - this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null); + this(context, clientBuilder, null, null); + } + + public RESTCatalog( + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder, + RESTOperationsFactory operationsFactory) { + this(SessionCatalog.SessionContext.createEmpty(), clientBuilder, ioBuilder, operationsFactory); + } + + public RESTCatalog( + SessionCatalog.SessionContext context, + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder, + RESTOperationsFactory operationsFactory) { + this.sessionCatalog = new RESTSessionCatalog(clientBuilder, ioBuilder, operationsFactory); this.delegate = sessionCatalog.asCatalog(context); this.nsDelegate = (SupportsNamespaces) delegate; this.context = context; diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTOperationsFactory.java b/core/src/main/java/org/apache/iceberg/rest/RESTOperationsFactory.java new file mode 100644 index 000000000000..d8806c909d30 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/rest/RESTOperationsFactory.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.view.ViewMetadata; + +/** + * A factory interface for creating {@link RESTTableOperations} and {@link RESTViewOperations} + * instances for REST catalogs. + * + *

This interface allows custom implementations of table and view operations to be injected into + * {@link RESTSessionCatalog} and {@link RESTCatalog}, enabling extensibility for specialized use + * cases. + * + *

Example usage: + * + *

+ * RESTOperationsFactory customFactory = new RESTOperationsFactory() {
+ *   {@literal @}Override
+ *   public RESTTableOperations createTableOperations(
+ *       RESTClient client,
+ *       String path,
+ *       Supplier<Map<String, String>> headers,
+ *       FileIO io,
+ *       TableMetadata current,
+ *       Set<Endpoint> endpoints) {
+ *     return new CustomRESTTableOperations(client, path, headers, io, current, endpoints);
+ *   }
+ *
+ *   {@literal @}Override
+ *   public RESTViewOperations createViewOperations(
+ *       RESTClient client,
+ *       String path,
+ *       Supplier<Map<String, String>> headers,
+ *       ViewMetadata current,
+ *       Set<Endpoint> endpoints) {
+ *     return new CustomRESTViewOperations(client, path, headers, current, endpoints);
+ *   }
+ * };
+ *
+ * RESTSessionCatalog catalog = new RESTSessionCatalog(clientBuilder, ioBuilder, customFactory);
+ * 
+ */ +public interface RESTOperationsFactory { + + /** + * Create a new {@link RESTTableOperations} instance for simple table operations. + * + *

The default implementation creates a standard {@link RESTTableOperations} instance. + * + * @param client the REST client to use for communicating with the catalog server + * @param path the REST path for the table + * @param headers a supplier for additional HTTP headers to include in requests + * @param io the FileIO implementation for reading and writing table metadata and data files + * @param current the current table metadata + * @param endpoints the set of supported REST endpoints + * @return a new RESTTableOperations instance + */ + default RESTTableOperations createTableOperations( + RESTClient client, + String path, + Supplier> headers, + FileIO io, + TableMetadata current, + Set endpoints) { + return new RESTTableOperations(client, path, headers, io, current, endpoints); + } + + /** + * Create a new {@link RESTTableOperations} instance for transaction-based operations (create or + * replace). + * + *

This method is used when creating tables or replacing table metadata within a transaction. + * The default implementation creates a standard {@link RESTTableOperations} instance. + * + * @param client the REST client to use for communicating with the catalog server + * @param path the REST path for the table + * @param headers a supplier for additional HTTP headers to include in requests + * @param io the FileIO implementation for reading and writing table metadata and data files + * @param updateType the type of update being performed (CREATE, REPLACE, or SIMPLE) + * @param createChanges the list of metadata updates to apply during table creation or replacement + * @param current the current table metadata (may be null for CREATE operations) + * @param endpoints the set of supported REST endpoints + * @return a new RESTTableOperations instance + */ + default RESTTableOperations createTableOperationsForTransaction( + RESTClient client, + String path, + Supplier> headers, + FileIO io, + RESTTableOperations.UpdateType updateType, + List createChanges, + TableMetadata current, + Set endpoints) { + return new RESTTableOperations( + client, path, headers, io, updateType, createChanges, current, endpoints); + } + + /** + * Create a new {@link RESTViewOperations} instance. + * + *

The default implementation creates a standard {@link RESTViewOperations} instance. + * + * @param client the REST client to use for communicating with the catalog server + * @param path the REST path for the view + * @param headers a supplier for additional HTTP headers to include in requests + * @param current the current view metadata + * @param endpoints the set of supported REST endpoints + * @return a new RESTViewOperations instance + */ + default RESTViewOperations createViewOperations( + RESTClient client, + String path, + Supplier> headers, + ViewMetadata current, + Set endpoints) { + return new RESTViewOperations(client, path, headers, current, endpoints); + } + + /** Default {@link RESTOperationsFactory} instance. */ + RESTOperationsFactory DEFAULT = new RESTOperationsFactory() {}; +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index b903f13adc09..a9fb958098b1 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -146,6 +146,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private final Function, RESTClient> clientBuilder; private final BiFunction, FileIO> ioBuilder; + private final RESTOperationsFactory operationsFactory; private FileIOTracker fileIOTracker = null; private AuthSession catalogAuth = null; private AuthManager authManager; @@ -167,15 +168,25 @@ public RESTSessionCatalog() { .uri(config.get(CatalogProperties.URI)) .withHeaders(RESTUtil.configHeaders(config)) .build(), + null, null); } public RESTSessionCatalog( Function, RESTClient> clientBuilder, BiFunction, FileIO> ioBuilder) { + this(clientBuilder, ioBuilder, null); + } + + public RESTSessionCatalog( + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder, + RESTOperationsFactory operationsFactory) { Preconditions.checkNotNull(clientBuilder, "Invalid client builder: null"); this.clientBuilder = clientBuilder; this.ioBuilder = ioBuilder; + this.operationsFactory = + operationsFactory != null ? operationsFactory : RESTOperationsFactory.DEFAULT; } @Override @@ -450,7 +461,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { RESTClient tableClient = client.withAuthSession(tableSession); RESTTableOperations ops = - new RESTTableOperations( + operationsFactory.createTableOperations( tableClient, paths.table(finalIdentifier), Map::of, @@ -529,7 +540,7 @@ public Table registerTable( AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession); RESTClient tableClient = client.withAuthSession(tableSession); RESTTableOperations ops = - new RESTTableOperations( + operationsFactory.createTableOperations( tableClient, paths.table(ident), Map::of, @@ -788,7 +799,7 @@ public Table create() { AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession); RESTClient tableClient = client.withAuthSession(tableSession); RESTTableOperations ops = - new RESTTableOperations( + operationsFactory.createTableOperations( tableClient, paths.table(ident), Map::of, @@ -815,7 +826,7 @@ public Transaction createTransaction() { RESTClient tableClient = client.withAuthSession(tableSession); RESTTableOperations ops = - new RESTTableOperations( + operationsFactory.createTableOperationsForTransaction( tableClient, paths.table(ident), Map::of, @@ -878,7 +889,7 @@ public Transaction replaceTransaction() { RESTClient tableClient = client.withAuthSession(tableSession); RESTTableOperations ops = - new RESTTableOperations( + operationsFactory.createTableOperationsForTransaction( tableClient, paths.table(ident), Map::of, @@ -1154,7 +1165,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) { ViewMetadata metadata = response.metadata(); RESTViewOperations ops = - new RESTViewOperations( + operationsFactory.createViewOperations( client.withAuthSession(tableSession), paths.view(identifier), Map::of, @@ -1333,7 +1344,7 @@ public View create() { Map tableConf = response.config(); AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession); RESTViewOperations ops = - new RESTViewOperations( + operationsFactory.createViewOperations( client.withAuthSession(tableSession), paths.view(identifier), Map::of, @@ -1424,7 +1435,7 @@ private View replace(LoadViewResponse response) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession); RESTViewOperations ops = - new RESTViewOperations( + operationsFactory.createViewOperations( client.withAuthSession(tableSession), paths.view(identifier), Map::of, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 00522fd6e178..e407d782982d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -44,10 +44,10 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.util.LocationUtil; -class RESTTableOperations implements TableOperations { +public class RESTTableOperations implements TableOperations { private static final String METADATA_FOLDER_NAME = "metadata"; - enum UpdateType { + public enum UpdateType { CREATE, REPLACE, SIMPLE @@ -63,7 +63,7 @@ enum UpdateType { private UpdateType updateType; private TableMetadata current; - RESTTableOperations( + public RESTTableOperations( RESTClient client, String path, Supplier> headers, @@ -73,7 +73,7 @@ enum UpdateType { this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); } - RESTTableOperations( + public RESTTableOperations( RESTClient client, String path, Supplier> headers, @@ -183,7 +183,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { * refresh fails. In case of refresh failure, the failure is recorded as suppressed on the * provided {@code original} exception to aid diagnostics. */ - private boolean reconcileOnSimpleUpdate( + protected boolean reconcileOnSimpleUpdate( List updates, CommitStateUnknownException original) { Long expectedSnapshotId = expectedSnapshotIdIfSnapshotAddOnly(updates); if (expectedSnapshotId == null) { @@ -241,7 +241,7 @@ private static Long expectedSnapshotIdIfSnapshotAddOnly(List upd return addedSnapshotId; } - private TableMetadata updateCurrentMetadata(LoadTableResponse response) { + protected TableMetadata updateCurrentMetadata(LoadTableResponse response) { // LoadTableResponse is used to deserialize the response, but config is not allowed by the REST // spec so it can be // safely ignored. there is no requirement to update config on refresh or commit. diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java index 466a8e66899b..3cff87010396 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java @@ -29,14 +29,14 @@ import org.apache.iceberg.view.ViewMetadata; import org.apache.iceberg.view.ViewOperations; -class RESTViewOperations implements ViewOperations { +public class RESTViewOperations implements ViewOperations { private final RESTClient client; private final String path; private final Supplier> headers; private final Set endpoints; private ViewMetadata current; - RESTViewOperations( + public RESTViewOperations( RESTClient client, String path, Supplier> headers, @@ -79,7 +79,7 @@ public void commit(ViewMetadata base, ViewMetadata metadata) { updateCurrentMetadata(response); } - private ViewMetadata updateCurrentMetadata(LoadViewResponse response) { + protected ViewMetadata updateCurrentMetadata(LoadViewResponse response) { if (!Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { this.current = response.metadata(); } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 6f7af7ae758a..5f4748f24ab5 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -41,14 +41,19 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.http.HttpHeaders; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.MetadataUpdate; @@ -56,6 +61,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -71,6 +77,7 @@ import org.apache.iceberg.exceptions.ServiceFailureException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -89,6 +96,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.ViewMetadata; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.eclipse.jetty.server.Server; @@ -3066,6 +3074,101 @@ public void testCommitStateUnknownNotReconciled() { .satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty()); } + @Test + public void testFileIOAndOperationsBuilderInjection() throws IOException { + AtomicBoolean customTableOps = new AtomicBoolean(); + AtomicBoolean customTxnOps = new AtomicBoolean(); + AtomicBoolean customViewOps = new AtomicBoolean(); + AtomicBoolean customFileIO = new AtomicBoolean(); + + RESTOperationsFactory operationsFactory = + new RESTOperationsFactory() { + @Override + public RESTTableOperations createTableOperations( + RESTClient client, + String path, + Supplier> headers, + FileIO io, + TableMetadata current, + Set endpoints) { + customTableOps.set(true); + return RESTOperationsFactory.super.createTableOperations( + client, path, headers, io, current, endpoints); + } + + @Override + public RESTTableOperations createTableOperationsForTransaction( + RESTClient client, + String path, + Supplier> headers, + FileIO io, + RESTTableOperations.UpdateType updateType, + List createChanges, + TableMetadata current, + Set endpoints) { + customTxnOps.set(true); + return RESTOperationsFactory.super.createTableOperationsForTransaction( + client, path, headers, io, updateType, createChanges, current, endpoints); + } + + @Override + public RESTViewOperations createViewOperations( + RESTClient client, + String path, + Supplier> headers, + ViewMetadata current, + Set endpoints) { + customViewOps.set(true); + return RESTOperationsFactory.super.createViewOperations( + client, path, headers, current, endpoints); + } + }; + + BiFunction, FileIO> ioBuilder = + (context, config) -> { + customFileIO.set(true); + return CatalogUtil.loadFileIO( + config.getOrDefault( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"), + config, + new Configuration()); + }; + + try (RESTCatalog catalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + (config) -> new RESTCatalogAdapter(backendCatalog), + ioBuilder, + operationsFactory)) { + catalog.setConf(new Configuration()); + catalog.initialize( + "test", + ImmutableMap.of( + CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO")); + + Namespace ns = Namespace.of("test_builder"); + catalog.createNamespace(ns); + + catalog.createTable(TableIdentifier.of(ns, "table1"), SCHEMA); + assertThat(customTableOps).isTrue(); + assertThat(customFileIO).isTrue(); + + catalog + .buildTable(TableIdentifier.of(ns, "table2"), SCHEMA) + .createTransaction() + .commitTransaction(); + assertThat(customTxnOps).isTrue(); + + catalog + .buildView(TableIdentifier.of(ns, "view1")) + .withSchema(SCHEMA) + .withDefaultNamespace(ns) + .withQuery("spark", "select * from ns.table") + .create(); + assertThat(customViewOps).isTrue(); + } + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);