diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java index b34747e0f6972f..226d3ca54b0821 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java @@ -19,7 +19,7 @@ import com.starrocks.connector.ConnectorContext; import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.HdfsEnvironment; -import com.starrocks.connector.hive.ConnectorProcessorName; +import com.starrocks.connector.hive.CatalogNameType; import com.starrocks.credential.CloudConfiguration; import com.starrocks.credential.CloudConfigurationFactory; import com.starrocks.server.GlobalStateMgr; @@ -37,14 +37,14 @@ public class DeltaLakeConnector implements Connector { private final Map properties; private final CloudConfiguration cloudConfiguration; private final String catalogName; - private final ConnectorProcessorName processorName; + private final CatalogNameType catalogNameType; private final DeltaLakeInternalMgr internalMgr; private final DeltaLakeMetadataFactory metadataFactory; private IDeltaLakeMetastore metastore; public DeltaLakeConnector(ConnectorContext context) { this.catalogName = context.getCatalogName(); - this.processorName = new ConnectorProcessorName(catalogName, "delta_lake"); + this.catalogNameType = new CatalogNameType(catalogName, "delta_lake"); this.properties = context.getProperties(); this.cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); @@ -78,13 +78,13 @@ public CloudConfiguration getCloudConfiguration() { public void shutdown() { internalMgr.shutdown(); metadataFactory.metastoreCacheInvalidateCache(); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType); } public void onCreate() { Optional updateProcessor = metadataFactory.getCacheUpdateProcessor(); updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() - .registerCacheUpdateProcessor(processorName, updateProcessor.get())); + .registerCacheUpdateProcessor(catalogNameType, updateProcessor.get())); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/CatalogNameType.java similarity index 74% rename from fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java rename to fe/fe-core/src/main/java/com/starrocks/connector/hive/CatalogNameType.java index 0a4444483c9e9d..470bdb661af7ca 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/CatalogNameType.java @@ -14,22 +14,25 @@ package com.starrocks.connector.hive; -public class ConnectorProcessorName { +/** + * This class will be used for + */ +public class CatalogNameType { private final String catalogName; - private final String connectorName; + private final String catalogType; - public ConnectorProcessorName(String catalogName, String connectorName) { + public CatalogNameType(String catalogName, String catalogType) { this.catalogName = catalogName; - this.connectorName = connectorName; + this.catalogType = catalogType; } public String getCatalogName() { return this.catalogName; } - public String getConnectorName() { - return this.connectorName; + public String getCatalogType() { + return this.catalogType; } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java index 6b134980b54e95..647a7c945ce780 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java @@ -48,7 +48,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon { private final Set registeredTableInfos = Sets.newConcurrentHashSet(); - private final Map cacheUpdateProcessors = + private final Map cacheUpdateProcessors = new ConcurrentHashMap<>(); private final ExecutorService refreshRemoteFileExecutor; @@ -58,16 +58,16 @@ public void registerTableInfo(BaseTableInfo tableInfo) { registeredTableInfos.add(tableInfo); } - public void registerCacheUpdateProcessor(ConnectorProcessorName processorName, CacheUpdateProcessor cache) { - LOG.info("register to update {} metadata cache from {} in the ConnectorTableMetadataProcessor", - processorName.getConnectorName(), processorName.getConnectorName()); - cacheUpdateProcessors.put(processorName, cache); + public void registerCacheUpdateProcessor(CatalogNameType catalogNameType, CacheUpdateProcessor cache) { + LOG.info("register to update {}:{} metadata cache in the ConnectorTableMetadataProcessor", + catalogNameType.getCatalogName(), catalogNameType.getCatalogType()); + cacheUpdateProcessors.put(catalogNameType, cache); } - public void unRegisterCacheUpdateProcessor(ConnectorProcessorName processorName) { - LOG.info("unregister to update {} metadata cache from {} in the ConnectorTableMetadataProcessor", - processorName.getConnectorName(), processorName.getConnectorName()); - cacheUpdateProcessors.remove(processorName); + public void unRegisterCacheUpdateProcessor(CatalogNameType catalogNameType) { + LOG.info("unregister to update {}:{} metadata cache in the ConnectorTableMetadataProcessor", + catalogNameType.getCatalogName(), catalogNameType.getCatalogType()); + cacheUpdateProcessors.remove(catalogNameType); } public void registerCachingIcebergCatalog(String catalogName, IcebergCatalog icebergCatalog) { @@ -102,11 +102,11 @@ protected void runAfterCatalogReady() { private void refreshCatalogTable() { MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr(); - List processorNames = Lists.newArrayList(cacheUpdateProcessors.keySet()); - for (ConnectorProcessorName processorName : processorNames) { - String catalogName = processorName.getCatalogName(); - LOG.info("Starting to refresh tables from {} in catalog {}", processorName.getConnectorName(), catalogName); - CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(processorName); + List catalogNameTypes = Lists.newArrayList(cacheUpdateProcessors.keySet()); + for (CatalogNameType catalogNameType : catalogNameTypes) { + String catalogName = catalogNameType.getCatalogName(); + LOG.info("Starting to refresh tables from {}:{} metadata cache", catalogName, catalogNameType.getCatalogType()); + CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogNameType); if (updateProcessor == null) { LOG.error("Failed to get cacheUpdateProcessor by catalog {}.", catalogName); continue; diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java index 27821543a62df8..9d067f3cae3851 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java @@ -34,14 +34,14 @@ public class HiveConnector implements Connector { public static final String HIVE_METASTORE_CONNECTION_POOL_SIZE = "hive.metastore.connection.pool.size"; private final Map properties; private final String catalogName; - private final ConnectorProcessorName processorName; + private final CatalogNameType catalogNameType; private final HiveConnectorInternalMgr internalMgr; private final HiveMetadataFactory metadataFactory; public HiveConnector(ConnectorContext context) { this.properties = context.getProperties(); this.catalogName = context.getCatalogName(); - this.processorName = new ConnectorProcessorName(catalogName, "hive_connector"); + this.catalogNameType = new CatalogNameType(catalogName, "hive"); CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); this.internalMgr = new HiveConnectorInternalMgr(catalogName, properties, hdfsEnvironment); @@ -85,7 +85,7 @@ public void onCreate() { internalMgr.isEnableBackgroundRefreshHiveMetadata()) { updateProcessor .ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() - .registerCacheUpdateProcessor(processorName, updateProcessor.get())); + .registerCacheUpdateProcessor(catalogNameType, updateProcessor.get())); } } } @@ -95,6 +95,6 @@ public void shutdown() { internalMgr.shutdown(); metadataFactory.getCacheUpdateProcessor().ifPresent(HiveCacheUpdateProcessor::invalidateAll); GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor().unRegisterCacheUpdateProcessor(catalogName); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java index 670358094202b8..135744e9c61f16 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java @@ -20,7 +20,7 @@ import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.HdfsEnvironment; import com.starrocks.connector.RemoteFileIO; -import com.starrocks.connector.hive.ConnectorProcessorName; +import com.starrocks.connector.hive.CatalogNameType; import com.starrocks.connector.hive.IHiveMetastore; import com.starrocks.credential.CloudConfiguration; import com.starrocks.credential.CloudConfigurationFactory; @@ -34,7 +34,7 @@ public class HudiConnector implements Connector { public static final List SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf"); private final Map properties; private final String catalogName; - private final ConnectorProcessorName processorName; + private final CatalogNameType catalogNameType; private final HudiConnectorInternalMgr internalMgr; private final HudiMetadataFactory metadataFactory; @@ -43,7 +43,7 @@ public HudiConnector(ConnectorContext context) { CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); this.catalogName = context.getCatalogName(); - this.processorName = new ConnectorProcessorName(catalogName, "hudi_connector"); + this.catalogNameType = new CatalogNameType(catalogName, "hudi"); this.internalMgr = new HudiConnectorInternalMgr(catalogName, properties, hdfsEnvironment); this.metadataFactory = createMetadataFactory(hdfsEnvironment); onCreate(); @@ -77,6 +77,6 @@ public void onCreate() { @Override public void shutdown() { internalMgr.shutdown(); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType); } } \ No newline at end of file