Skip to content

HIVE-29016: fixing cache handling for REST catalog; #5882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1871,10 +1871,15 @@ public enum ConfVars {
"hive.metastore.iceberg.catalog.servlet.auth", "jwt", new StringSetValidator("simple", "jwt"),
"HMS Iceberg Catalog servlet authentication method (simple or jwt)."
),
ICEBERG_CATALOG_CACHE_EXPIRY("metastore.iceberg.catalog.cache.expiry",
"hive.metastore.iceberg.catalog.cache.expiry", 60_000L,
ICEBERG_CATALOG_CACHE_EXPIRY("hive.metastore.catalog.cache.expiry",
"hive.metastore.catalog.cache.expiry", -1,
"HMS Iceberg Catalog cache expiry."
),
ICEBERG_CATALOG_EVENT_LISTENER_CLASS("hive.metastore.catalog.event.listener.class",
"hive.metastore.catalog.event.listener.class",
"org.apache.iceberg.rest.HMSEventListener",
"HMS Iceberg Catalog event listener class name."
),
HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min",
"hive.metastore.httpserver.threadpool.min", 8,
"HMS embedded HTTP server minimum number of threads."
Expand Down
6 changes: 6 additions & 0 deletions standalone-metastore/metastore-rest-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@
<version>5.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,45 @@
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Class that wraps an Iceberg Catalog to cache tables.
* @param <CATALOG> the catalog class
*/
public class HMSCachingCatalog<CATALOG extends Catalog & SupportsNamespaces> extends CachingCatalog implements SupportsNamespaces {
private static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class);
protected final CATALOG nsCatalog;

public HMSCachingCatalog(CATALOG catalog, long expiration) {
super(catalog, true, expiration, Ticker.systemTicker());
super(catalog, false, expiration, Ticker.systemTicker());
nsCatalog = catalog;
}

public CATALOG hmsUnwrap() {
return nsCatalog;
}

public void invalidateTable(String dbName, String tableName) {
super.invalidateTable(TableIdentifier.of(dbName, tableName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can call l#60 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This version of invalidateTable() is called by the listener and we do want the invalidation performed here. The override at l#60 does nothing on purpose; we do want all invalidations performed through the listener.

}

@Override
public void invalidateTable(TableIdentifier tableIdentifier) {
if (LOG.isDebugEnabled()) {
LOG.debug("Avoid invalidating table: {}", tableIdentifier);
}
Copy link
Contributor

@saihemanth-cloudera saihemanth-cloudera Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can then do super.invalidateTable() here? Also, why not do nsCatalog.invalidateTable() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to try and have all actual invalidation performed through the listener; this override voids the default call on purpose.

}

@Override
public void createNamespace(Namespace nmspc, Map<String, String> map) {
nsCatalog.createNamespace(nmspc, map);
Expand All @@ -65,10 +80,6 @@ public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchN

@Override
public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException {
List<TableIdentifier> tables = listTables(nmspc);
for (TableIdentifier ident : tables) {
invalidateTable(ident);
}
return nsCatalog.dropNamespace(nmspc);
}

Expand All @@ -81,5 +92,16 @@ public boolean setProperties(Namespace nmspc, Map<String, String> map) throws No
public boolean removeProperties(Namespace nmspc, Set<String> set) throws NoSuchNamespaceException {
return nsCatalog.removeProperties(nmspc, set);
}


@Override
public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) {
return nsCatalog.buildTable(identifier, schema);
}

public void invalidateNamespace(String namespace) {
Namespace ns = Namespace.of(namespace);
for (TableIdentifier table : listTables(ns)) {
invalidateTable(table);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.rest;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.ReloadEvent;
import org.apache.iceberg.catalog.Catalog;
import org.slf4j.Logger;

/**
* IcebergEventListener is a Hive Metastore event listener that invalidates the cache
* of the HMSCachingCatalog when certain events occur, such as altering or dropping a table.
*/
public class HMSEventListener extends MetaStoreEventListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm new to this API. Does this work even when we set up HMS with HA with metastore.thrift.uri.selection=RANDOM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I would hope so, let me try and seek a definite answer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HMS only triggers MetaStoreEventListener for 'local' events so HA would need to disable the cache (for now). A potential solution would be to actively poll events from each HMS instance (need a dedicated thread that polls the NotificationEventRequest and follows the same invalidation logic).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I shared it with Jira, HiveCatalog might access two types of remote resources.

  • (A) Pointers from table names to the metadata location in the backend RDBMS
  • (B) The content of metadata.json on Amazon S3

I'm wondering if we can update HiveCatalog to allow caching (B). The metadata.json is immutable. So, we can safely cache it.
I'd say RDMBS for Iceberg pointers is fast enough, such as 10k RPS.

private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HMSEventListener.class);
/**
* Constructor for HMSEventListener.
*
* @param config the configuration to use for the listener
*/
public HMSEventListener(Configuration config) {
super(config);
}

@Override
public void onAlterTable(AlterTableEvent event) {
Catalog catalog = HMSCatalogFactory.getLastCatalog();
if (catalog instanceof HMSCachingCatalog) {
HMSCachingCatalog<?> hmsCachingCatalog = (HMSCachingCatalog<?>) catalog;
String dbName = event.getOldTable().getDbName();
String tableName = event.getOldTable().getTableName();
LOG.debug("onAlterTable: invalidating table cache for {}.{}", dbName, tableName);
hmsCachingCatalog.invalidateTable(dbName, tableName);
}
}

@Override
public void onDropTable(DropTableEvent event) {
Catalog catalog = HMSCatalogFactory.getLastCatalog();
if (catalog instanceof HMSCachingCatalog) {
HMSCachingCatalog<?> hmsCachingCatalog = (HMSCachingCatalog<?>) catalog;
String dbName = event.getTable().getDbName();
String tableName = event.getTable().getTableName();
LOG.debug("onDropTable: invalidating table cache for {}.{}", dbName, tableName);
hmsCachingCatalog.invalidateTable(dbName, tableName);
}
}

@Override
public void onReload(ReloadEvent reloadEvent) {
Catalog catalog = HMSCatalogFactory.getLastCatalog();
if (catalog instanceof HMSCachingCatalog) {
HMSCachingCatalog<?> hmsCachingCatalog = (HMSCachingCatalog<?>) catalog;
Table tableObj = reloadEvent.getTableObj();
String dbName = tableObj.getDbName();
String tableName = tableObj.getTableName();
LOG.debug("onReload: invalidating table cache for {}.{}", dbName, tableName);
hmsCachingCatalog.invalidateTable(dbName, tableName);
}
}

@Override
public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
Catalog catalog = HMSCatalogFactory.getLastCatalog();
if (catalog instanceof HMSCachingCatalog) {
HMSCachingCatalog<?> hmsCachingCatalog = (HMSCachingCatalog<?>) catalog;
String dbName = dbEvent.getDatabase().getName();
LOG.debug("onDropDatabase: invalidating tables cache for {}", dbName);
hmsCachingCatalog.invalidateNamespace(dbName);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public void setUp() throws Exception {
NS = "hms" + RND.nextInt(100);
conf = MetastoreConf.newMetastoreConf();
MetaStoreTestUtils.setConfForStandloneMode(conf);
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY, 60_000L);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.CAPABILITY_CHECK, false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true);
// new 2024-10-02
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -83,6 +87,7 @@ public void testCreateNamespaceHttp() throws Exception {
Assert.assertEquals("apache", database1.getParameters().get("owner"));
Assert.assertEquals("iceberg", database1.getParameters().get("group"));

Assert.assertSame(HMSCachingCatalog.class, catalog.getClass());
List<TableIdentifier> tis = catalog.listTables(Namespace.of(ns));
Assert.assertTrue(tis.isEmpty());

Expand All @@ -102,7 +107,6 @@ private Schema getTestSchema() {
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
}


@Test
public void testCreateTableTxnBuilder() throws Exception {
Expand Down Expand Up @@ -166,9 +170,9 @@ public void testCreateTableTxnBuilder() throws Exception {
}
}


@Test
public void testTableAPI() throws Exception {
Assert.assertSame(HMSCachingCatalog.class, catalog.getClass());
URI iceUri = URI.create("http://hive@localhost:" + catalogPort + "/"+catalogPath+"/v1/");
String jwt = generateJWT();
Schema schema = getTestSchema();
Expand Down Expand Up @@ -210,4 +214,52 @@ public void testTableAPI() throws Exception {
Assert.assertThrows(NoSuchTableException.class, () -> catalog.loadTable(rtableIdent));
}

public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
public static final String CATALOG_NAME = "iceberg.catalog";

private static Map<String, String> getCatalogPropertiesFromConf(Configuration conf, String catalogName) {
Map<String, String> catalogProperties = Maps.newHashMap();
String keyPrefix = CATALOG_CONFIG_PREFIX + catalogName;
conf.forEach(config -> {
if (config.getKey().startsWith(keyPrefix)) {
catalogProperties.put(
config.getKey().substring(keyPrefix.length() + 1),
config.getValue());
}
});
return catalogProperties;
}

@Test
public void testBuildTable() throws Exception {
String cname = catalog.name();
URI iceUri = URI.create("http://localhost:" + catalogPort + "/"+catalogPath);
String jwt = generateJWT();
Schema schema = getTestSchema();
final String tblName = "tbl_" + Integer.toHexString(RND.nextInt(65536));
final TableIdentifier TBL = TableIdentifier.of(DB_NAME, tblName);
String location = temp.newFolder(TBL.toString()).toString();

Configuration configuration = new Configuration();
configuration.set("iceberg.catalog", cname);
configuration.set("iceberg.catalog."+cname+".type", "rest");
configuration.set("iceberg.catalog."+cname+".uri", iceUri.toString());
configuration.set("iceberg.catalog."+cname+".token", jwt);

String catalogName = configuration.get(CATALOG_NAME);
Assert.assertEquals(cname, catalogName);
Map<String, String> properties = getCatalogPropertiesFromConf(configuration, catalogName);
Assert.assertFalse(properties.isEmpty());
RESTCatalog restCatalog = (RESTCatalog) CatalogUtil.buildIcebergCatalog(catalogName, properties, configuration);
restCatalog.initialize(catalogName, properties);

restCatalog.buildTable(TBL, schema)
.withLocation(location)
.createTransaction()
.commitTransaction();
Table table = catalog.loadTable(TBL);
Assert.assertEquals(location, table.location());
Table restTable = restCatalog.loadTable(TBL);
Assert.assertEquals(location, restTable.location());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,30 @@ private static void constraintHttpMethods(ServletContextHandler ctxHandler, bool
}
ctxHandler.setSecurityHandler(securityHandler);
}

/**
* Configure the metastore to propagate events to eventual Iceberg catalog.
* @param conf the configuration
*/
private static void configureIcebergCacheHandling(Configuration conf) {
// If we start a REST catalog, we need to listen to events to maintain its consistency.
String eventListenerClass = MetastoreConf.getVar(conf, ConfVars.ICEBERG_CATALOG_EVENT_LISTENER_CLASS);
if (eventListenerClass != null && !eventListenerClass.isEmpty()) {
// if expiry is negative, no cache is used, so no need to register the listener
long expiry = MetastoreConf.getLongVar(conf, MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY);
// if the port is negative, no REST catalog is configured, so no need to register the listener
int icebergPort = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.ICEBERG_CATALOG_SERVLET_PORT);
if (icebergPort >= 0 && expiry > 0) {
String listeners = MetastoreConf.getVar(conf, ConfVars.EVENT_LISTENERS);
if (listeners == null || listeners.isEmpty()) {
MetastoreConf.setVar(conf, ConfVars.EVENT_LISTENERS, eventListenerClass);
} else {
MetastoreConf.setVar(conf, ConfVars.EVENT_LISTENERS, listeners + "," + eventListenerClass);
}
}
}
}

/**
* Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
*
Expand All @@ -692,6 +716,8 @@ private static void constraintHttpMethods(ServletContextHandler ctxHandler, bool
*/
public static void startMetaStore(int port, HadoopThriftAuthBridge bridge,
Configuration conf, boolean startMetaStoreThreads, AtomicBoolean startedBackgroundThreads) throws Throwable {
// If we start an Iceberg REST catalog, we need to listen to events to maintain its consistency.
configureIcebergCacheHandling(conf);
isMetaStoreRemote = true;
String transportMode = MetastoreConf.getVar(conf, ConfVars.THRIFT_TRANSPORT_MODE, "binary");
boolean isHttpTransport = transportMode.equalsIgnoreCase("http");
Expand Down
Loading