From 6bd25d4dc039f227b64cca55606691d8d23d676d Mon Sep 17 00:00:00 2001 From: Raymond Lam Date: Mon, 28 Oct 2024 09:25:14 -0700 Subject: [PATCH] [openhouse] Implement TableOperations-specific FileIO instance --- .../java/openhouse-java-runtime/build.gradle | 2 + .../javaclient/OpenHouseCatalog.java | 48 +++++++++++++++---- 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/integrations/java/openhouse-java-runtime/build.gradle b/integrations/java/openhouse-java-runtime/build.gradle index 238dffdf..f0f62a29 100644 --- a/integrations/java/openhouse-java-runtime/build.gradle +++ b/integrations/java/openhouse-java-runtime/build.gradle @@ -23,6 +23,7 @@ ext { sparkVersion = '3.1.1' springVersion = '2.7.8' hadoopVersion = '2.10.0' + caffeineVersion = '2.9.3' } dependencies { @@ -41,6 +42,7 @@ dependencies { exclude group: 'com.zaxxer', module: 'HikariCP-java7' exclude group: 'org.apache.commons', module: 'commons-lang3' } + implementation("com.github.ben-manes.caffeine:caffeine:" + caffeineVersion) } // Following codeblock completely relocates contents of the jar diff --git a/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseCatalog.java b/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseCatalog.java index 3d75a86b..9836361a 100644 --- a/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseCatalog.java +++ b/integrations/java/openhouse-java-runtime/src/main/java/com/linkedin/openhouse/javaclient/OpenHouseCatalog.java @@ -2,6 +2,9 @@ import static com.linkedin.openhouse.javaclient.OpenHouseTableOperations.*; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; import com.linkedin.openhouse.client.ssl.HttpConnectionStrategy; import com.linkedin.openhouse.client.ssl.TablesApiClientFactory; import com.linkedin.openhouse.javaclient.api.SupportsGrantRevoke; @@ -21,6 +24,7 @@ import com.linkedin.openhouse.tables.client.model.GetAllTablesResponseBody; import com.linkedin.openhouse.tables.client.model.GetTableResponseBody; import com.linkedin.openhouse.tables.client.model.UpdateAclPoliciesRequestBody; +import java.io.Closeable; import java.net.MalformedURLException; import java.util.Collections; import java.util.List; @@ -68,7 +72,7 @@ */ @Slf4j public class OpenHouseCatalog extends BaseMetastoreCatalog - implements Configurable, SupportsNamespaces, SupportsGrantRevoke { + implements Configurable, SupportsNamespaces, SupportsGrantRevoke, Closeable { private TableApi tableApi; @@ -88,6 +92,8 @@ public class OpenHouseCatalog extends BaseMetastoreCatalog protected Map properties; + private Cache fileIOCloser; + private static final String DEFAULT_CLUSTER = "local"; private static final String CLUSTER_PROPERTY = "cluster"; @@ -130,6 +136,7 @@ public void initialize(String name, Map properties) { this.fileIO = loadFileIO(properties); this.cluster = properties.getOrDefault(CLUSTER_PROPERTY, DEFAULT_CLUSTER); + this.fileIOCloser = newFileIOCloser(); } protected FileIO loadFileIO(Map properties) { @@ -235,13 +242,17 @@ public void renameTable(TableIdentifier from, TableIdentifier to) { @Override public TableOperations newTableOps(TableIdentifier tableIdentifier) { - return OpenHouseTableOperations.builder() - .tableIdentifier(tableIdentifier) - .fileIO(fileIO) - .tableApi(tableApi) - .snapshotApi(snapshotApi) - .cluster(cluster) - .build(); + FileIO tableOperationsLocalFileIO = loadFileIO(properties); + TableOperations openHouseTableOperations = + OpenHouseTableOperations.builder() + .tableIdentifier(tableIdentifier) + .fileIO(tableOperationsLocalFileIO) + .tableApi(tableApi) + .snapshotApi(snapshotApi) + .cluster(cluster) + .build(); + fileIOCloser.put(openHouseTableOperations, openHouseTableOperations.io()); + return openHouseTableOperations; } /** @@ -592,4 +603,25 @@ private TableMetadata createStagedMetadata() { return new StaticTableOperations(tableLocation, fileIO).refresh(); } } + + @Override + public void close() { + if (fileIOCloser != null) { + fileIOCloser.invalidateAll(); + fileIOCloser.cleanUp(); + } + } + + private Cache newFileIOCloser() { + return Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener) + (ops, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } }