Skip to content

Commit

Permalink
solve some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
duanyyyyyyy committed Jan 21, 2025
1 parent 55b0ee0 commit 44f5cbd
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.hive.ConnectorProcessorName;
import com.starrocks.connector.hive.CatalogNameType;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -37,14 +37,14 @@ public class DeltaLakeConnector implements Connector {
private final Map<String, String> properties;
private final CloudConfiguration cloudConfiguration;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final CatalogNameType catalogNameType;
private final DeltaLakeInternalMgr internalMgr;
private final DeltaLakeMetadataFactory metadataFactory;
private IDeltaLakeMetastore metastore;

public DeltaLakeConnector(ConnectorContext context) {
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "delta_lake");
this.catalogNameType = new CatalogNameType(catalogName, "delta_lake");
this.properties = context.getProperties();
this.cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
Expand Down Expand Up @@ -78,13 +78,13 @@ public CloudConfiguration getCloudConfiguration() {
public void shutdown() {
internalMgr.shutdown();
metadataFactory.metastoreCacheInvalidateCache();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}

public void onCreate() {
Optional<DeltaLakeCacheUpdateProcessor> updateProcessor = metadataFactory.getCacheUpdateProcessor();
updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(processorName, updateProcessor.get()));
.registerCacheUpdateProcessor(catalogNameType, updateProcessor.get()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@

package com.starrocks.connector.hive;

public class ConnectorProcessorName {
/**
* This class will be used for
*/
public class CatalogNameType {

private final String catalogName;
private final String connectorName;
private final String catalogType;

public ConnectorProcessorName(String catalogName, String connectorName) {
public CatalogNameType(String catalogName, String catalogType) {
this.catalogName = catalogName;
this.connectorName = connectorName;
this.catalogType = catalogType;
}

public String getCatalogName() {
return this.catalogName;
}

public String getConnectorName() {
return this.connectorName;
public String getCatalogType() {
return this.catalogType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {

private final Set<BaseTableInfo> registeredTableInfos = Sets.newConcurrentHashSet();

private final Map<ConnectorProcessorName, CacheUpdateProcessor> cacheUpdateProcessors =
private final Map<CatalogNameType, CacheUpdateProcessor> cacheUpdateProcessors =
new ConcurrentHashMap<>();

private final ExecutorService refreshRemoteFileExecutor;
Expand All @@ -58,16 +58,16 @@ public void registerTableInfo(BaseTableInfo tableInfo) {
registeredTableInfos.add(tableInfo);
}

public void registerCacheUpdateProcessor(ConnectorProcessorName processorName, CacheUpdateProcessor cache) {
LOG.info("register to update {} metadata cache from {} in the ConnectorTableMetadataProcessor",
processorName.getConnectorName(), processorName.getConnectorName());
cacheUpdateProcessors.put(processorName, cache);
public void registerCacheUpdateProcessor(CatalogNameType catalogNameType, CacheUpdateProcessor cache) {
LOG.info("register to update {}:{} metadata cache in the ConnectorTableMetadataProcessor",
catalogNameType.getCatalogName(), catalogNameType.getCatalogType());
cacheUpdateProcessors.put(catalogNameType, cache);
}

public void unRegisterCacheUpdateProcessor(ConnectorProcessorName processorName) {
LOG.info("unregister to update {} metadata cache from {} in the ConnectorTableMetadataProcessor",
processorName.getConnectorName(), processorName.getConnectorName());
cacheUpdateProcessors.remove(processorName);
public void unRegisterCacheUpdateProcessor(CatalogNameType catalogNameType) {
LOG.info("unregister to update {}:{} metadata cache in the ConnectorTableMetadataProcessor",
catalogNameType.getCatalogName(), catalogNameType.getCatalogType());
cacheUpdateProcessors.remove(catalogNameType);
}

public void registerCachingIcebergCatalog(String catalogName, IcebergCatalog icebergCatalog) {
Expand Down Expand Up @@ -102,11 +102,11 @@ protected void runAfterCatalogReady() {

private void refreshCatalogTable() {
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
List<ConnectorProcessorName> processorNames = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (ConnectorProcessorName processorName : processorNames) {
String catalogName = processorName.getCatalogName();
LOG.info("Starting to refresh tables from {} in catalog {}", processorName.getConnectorName(), catalogName);
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(processorName);
List<CatalogNameType> catalogNameTypes = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (CatalogNameType catalogNameType : catalogNameTypes) {
String catalogName = catalogNameType.getCatalogName();
LOG.info("Starting to refresh tables from {}:{} metadata cache", catalogName, catalogNameType.getCatalogType());
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogNameType);
if (updateProcessor == null) {
LOG.error("Failed to get cacheUpdateProcessor by catalog {}.", catalogName);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ public class HiveConnector implements Connector {
public static final String HIVE_METASTORE_CONNECTION_POOL_SIZE = "hive.metastore.connection.pool.size";
private final Map<String, String> properties;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final CatalogNameType catalogNameType;
private final HiveConnectorInternalMgr internalMgr;
private final HiveMetadataFactory metadataFactory;

public HiveConnector(ConnectorContext context) {
this.properties = context.getProperties();
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "hive_connector");
this.catalogNameType = new CatalogNameType(catalogName, "hive");
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.internalMgr = new HiveConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
Expand Down Expand Up @@ -85,7 +85,7 @@ public void onCreate() {
internalMgr.isEnableBackgroundRefreshHiveMetadata()) {
updateProcessor
.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(processorName, updateProcessor.get()));
.registerCacheUpdateProcessor(catalogNameType, updateProcessor.get()));
}
}
}
Expand All @@ -95,6 +95,6 @@ public void shutdown() {
internalMgr.shutdown();
metadataFactory.getCacheUpdateProcessor().ifPresent(HiveCacheUpdateProcessor::invalidateAll);
GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.RemoteFileIO;
import com.starrocks.connector.hive.ConnectorProcessorName;
import com.starrocks.connector.hive.CatalogNameType;
import com.starrocks.connector.hive.IHiveMetastore;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
Expand All @@ -34,7 +34,7 @@ public class HudiConnector implements Connector {
public static final List<String> SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf");
private final Map<String, String> properties;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final CatalogNameType catalogNameType;
private final HudiConnectorInternalMgr internalMgr;
private final HudiMetadataFactory metadataFactory;

Expand All @@ -43,7 +43,7 @@ public HudiConnector(ConnectorContext context) {
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "hudi_connector");
this.catalogNameType = new CatalogNameType(catalogName, "hudi");
this.internalMgr = new HudiConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
this.metadataFactory = createMetadataFactory(hdfsEnvironment);
onCreate();
Expand Down Expand Up @@ -77,6 +77,6 @@ public void onCreate() {
@Override
public void shutdown() {
internalMgr.shutdown();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogNameType);
}
}

0 comments on commit 44f5cbd

Please sign in to comment.