diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 0176128ed576..a6b5334c9d2b 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Function; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.PartitionSpec; @@ -39,6 +40,8 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.view.View; @@ -69,7 +72,15 @@ public RESTCatalog(Function, RESTClient> clientBuilder) { public RESTCatalog( SessionCatalog.SessionContext context, Function, RESTClient> clientBuilder) { - this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null); + this(context, clientBuilder, null); + } + + @VisibleForTesting + RESTCatalog( + SessionCatalog.SessionContext context, + Function, RESTClient> clientBuilder, + BiFunction, FileIO> ioBuilder) { + this.sessionCatalog = new RESTSessionCatalog(clientBuilder, ioBuilder); this.delegate = sessionCatalog.asCatalog(context); this.nsDelegate = (SupportsNamespaces) delegate; this.context = context; @@ -82,6 +93,11 @@ public void initialize(String name, Map props) { sessionCatalog.initialize(name, props); } + @VisibleForTesting + RESTSessionCatalog sessionCatalog() { + return sessionCatalog; + } + @Override public String name() { return sessionCatalog.name(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index b903f13adc09..f1a6d82fcae9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -58,6 +58,7 @@ import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -674,6 +675,11 @@ public boolean updateNamespaceMetadata( return !response.updated().isEmpty(); } + @VisibleForTesting + FileIOTracker fileIOTracker() { + return fileIOTracker; + } + @Override public void close() throws IOException { if (closeables != null) { diff --git a/core/src/test/java/org/apache/iceberg/io/FileIOTrackerTestUtil.java b/core/src/test/java/org/apache/iceberg/io/FileIOTrackerTestUtil.java new file mode 100644 index 000000000000..e947197df2a8 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/FileIOTrackerTestUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import com.github.benmanes.caffeine.cache.Cache; +import org.apache.iceberg.TableOperations; + +public abstract class FileIOTrackerTestUtil { + public static Cache trackerFrom(FileIOTracker tracker) { + return tracker.tracker(); + } + + public static void invalidate(FileIOTracker tracker, TableOperations ops) { + tracker.tracker().invalidate(ops); + tracker.tracker().cleanUp(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 59e91150eeae..dbb7a1f3ee3b 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -29,9 +29,11 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Cache; import java.io.File; import java.io.IOException; import java.net.InetAddress; @@ -43,12 +45,15 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.http.HttpHeaders; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.MetadataUpdate; @@ -57,6 +62,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -72,6 +78,8 @@ import org.apache.iceberg.exceptions.ServiceFailureException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOTrackerTestUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -2996,6 +3004,138 @@ void testDifferentTableUUID() { .hasMessageMatching("Table UUID does not match: current=.* != refreshed=" + newUUID); } + @Test + public void testDefaultFileIOIsNotTracked() { + catalog().createNamespace(TABLE.namespace()); + + catalog().createTable(TABLE, SCHEMA); + + catalog().loadTable(TABLE); + + Cache tracker = + FileIOTrackerTestUtil.trackerFrom(catalog().sessionCatalog().fileIOTracker()); + + assertThat(tracker.estimatedSize()).isZero(); + } + + @Test + public void testCreateTableTracksFileIOFromIOBuilder() { + verifyFileIOFromIOBuilderIsTracked(catalog -> (BaseTable) catalog.createTable(TABLE, SCHEMA)); + } + + @Test + public void testLoadTableTracksFileIOFromIOBuilder() { + verifyFileIOFromIOBuilderIsTracked( + catalog -> { + // Use a different catalog to create the table to avoid populating the FileIOTracker in + // the catalog under test. + RESTCatalog otherCatalog = catalog(new RESTCatalogAdapter(backendCatalog)); + otherCatalog.createTable(TABLE, SCHEMA); + + return (BaseTable) catalog.loadTable(TABLE); + }); + } + + @Test + public void testRegisterTableTracksFileIOFromIOBuilder() { + verifyFileIOFromIOBuilderIsTracked( + catalog -> { + BaseTable table = (BaseTable) catalog.createTable(TABLE, SCHEMA); + + TableIdentifier otherTable = TableIdentifier.of(TABLE.namespace(), "other_table"); + + return (BaseTable) + catalog.registerTable( + otherTable, table.operations().current().metadataFileLocation()); + }); + } + + private void verifyFileIOFromIOBuilderIsTracked(Function func) { + FileIO fileIO = + Mockito.spy( + CatalogUtil.loadFileIO("org.apache.iceberg.io.ResolvingFileIO", Map.of(), null)); + + BiFunction, FileIO> ioBuilderMock = + Mockito.mock(BiFunction.class); + + // Make sure the catalog IO is different from the table IO + when(ioBuilderMock.apply(any(), any())) + .thenReturn( + CatalogUtil.loadFileIO("org.apache.iceberg.io.ResolvingFileIO", Map.of(), null), + fileIO); + + RESTCatalog catalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + config -> new RESTCatalogAdapter(backendCatalog), + ioBuilderMock); + catalog.initialize("test", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + + Cache tracker = + FileIOTrackerTestUtil.trackerFrom(catalog.sessionCatalog().fileIOTracker()); + + assertThat(tracker.estimatedSize()).isZero(); + + BaseTable table = func.apply(catalog); + + assertThat(tracker.asMap()).containsKeys(table.operations()); + + // Simulate cleanup of TableOps from the tracker + FileIOTrackerTestUtil.invalidate(catalog.sessionCatalog().fileIOTracker(), table.operations()); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> Mockito.verify(fileIO).close()); + } + + @Test + public void testCreateTransactionTracksFileIOFromIOBuilder() { + RESTCatalog catalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + config -> new RESTCatalogAdapter(backendCatalog), + (context, conf) -> + CatalogUtil.loadFileIO("org.apache.iceberg.io.ResolvingFileIO", Map.of(), null)); + catalog.initialize("test", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + + Cache tracker = + FileIOTrackerTestUtil.trackerFrom(catalog.sessionCatalog().fileIOTracker()); + + assertThat(tracker.estimatedSize()).isZero(); + + catalog.newCreateTableTransaction(TABLE, SCHEMA); + + assertThat(tracker.estimatedSize()).isOne(); + } + + @Test + public void testReplaceTransactionTracksFileIOFromIOBuilder() { + RESTCatalog catalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + config -> new RESTCatalogAdapter(backendCatalog), + (context, conf) -> + CatalogUtil.loadFileIO("org.apache.iceberg.io.ResolvingFileIO", Map.of(), null)); + catalog.initialize("test", Map.of()); + + catalog.createNamespace(TABLE.namespace()); + + catalog.createTable(TABLE, SCHEMA); + + Cache tracker = + FileIOTrackerTestUtil.trackerFrom(catalog.sessionCatalog().fileIOTracker()); + + assertThat(tracker.estimatedSize()).isOne(); + + catalog.newReplaceTableTransaction(TABLE, SCHEMA, false); + + assertThat(tracker.estimatedSize()).isEqualTo(2); + } + private RESTCatalog catalogWithResponseHeaders(Map respHeaders) { RESTCatalogAdapter adapter = new RESTCatalogAdapter(backendCatalog) {