Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Dec 27, 2024
1 parent eab6fb1 commit 3177994
Show file tree
Hide file tree
Showing 18 changed files with 121 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.gravitino.credential;

public class CredentialConstants {
public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";
@Deprecated public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";
public static final String CREDENTIAL_PROVIDERS = "credential-providers";
public static final String CREDENTIAL_CACHE_EXPIRE_RATIO = "credential-cache-expire-ratio";
public static final String CREDENTIAL_CACHE_MAX_SIZE = "credential-cache-max-size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ public class CredentialConfig extends Config {
false /* reserved */))
.build();

public static final ConfigEntry<String> CREDENTIAL_PROVIDERS =
new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDERS)
.doc("Credential providers, separated by comma.")
.version(ConfigConstants.VERSION_0_8_0)
.stringConf()
.create();

public static final ConfigEntry<Double> CREDENTIAL_CACHE_EXPIRE_RATIO =
new ConfigBuilder(CredentialConstants.CREDENTIAL_CACHE_EXPIRE_RATIO)
.doc(
Expand Down
3 changes: 2 additions & 1 deletion dev/docker/iceberg-rest-server/rewrite_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
"GRAVITINO_IO_IMPL" : "io-impl",
"GRAVITINO_URI" : "uri",
"GRAVITINO_WAREHOUSE" : "warehouse",
"GRAVITINO_CREDENTIAL_PROVIDER_TYPE" : "credential-provider-type",
"GRAVITINO_CREDENTIAL_PROVIDER_TYPE" : "credential-providers",
"GRAVITINO_CREDENTIAL_PROVIDERS" : "credential-providers",
"GRAVITINO_GCS_CREDENTIAL_FILE_PATH" : "gcs-credential-file-path",
"GRAVITINO_S3_ACCESS_KEY" : "s3-access-key-id",
"GRAVITINO_S3_SECRET_KEY" : "s3-secret-access-key",
Expand Down
99 changes: 52 additions & 47 deletions docs/iceberg-rest-service.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.OverwriteDefaultConfig;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.config.CredentialConfig;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.gravitino.storage.S3Properties;

public class IcebergConfig extends CredentialConfig implements OverwriteDefaultConfig {
public class IcebergConfig extends Config implements OverwriteDefaultConfig {

public static final String ICEBERG_CONFIG_PREFIX = "gravitino.iceberg-rest.";
@VisibleForTesting public static final String ICEBERG_EXTENSION_PACKAGES = "extension-packages";
Expand Down Expand Up @@ -239,9 +239,13 @@ public class IcebergConfig extends CredentialConfig implements OverwriteDefaultC
.toSequence()
.createWithDefault(Collections.emptyList());

@Deprecated
public static final ConfigEntry<String> CREDENTIAL_PROVIDER_TYPE =
new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDER_TYPE)
.doc("The credential provider type for Iceberg")
.doc(
String.format(
"Deprecated, please use %s instead, The credential provider type for Iceberg",
CredentialConstants.CREDENTIAL_PROVIDERS))
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand All @@ -255,7 +259,8 @@ public String getCatalogBackendName() {
}

public IcebergConfig(Map<String, String> properties) {
super(properties);
super(false);
loadFromMap(properties, k -> true);
}

public IcebergConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,19 @@
package org.apache.gravitino.iceberg.common.ops;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
import org.apache.gravitino.utils.IsolatedClassLoader;
import org.apache.gravitino.utils.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -74,14 +70,7 @@ public class IcebergCatalogWrapper implements AutoCloseable {
private SupportsNamespaces asNamespaceCatalog;
private final IcebergCatalogBackend catalogBackend;
private String catalogUri = null;
private Map<String, String> catalogConfigToClients;
private Map<String, String> catalogPropertiesMap;
private static final Set<String> catalogPropertiesToClientKeys =
ImmutableSet.of(
IcebergConstants.IO_IMPL,
IcebergConstants.AWS_S3_REGION,
IcebergConstants.ICEBERG_S3_ENDPOINT,
IcebergConstants.ICEBERG_OSS_ENDPOINT);

public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
this.catalogBackend =
Expand All @@ -101,10 +90,6 @@ public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
if (catalog instanceof SupportsNamespaces) {
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
}
this.catalogConfigToClients =
MapUtils.getFilteredMap(
icebergConfig.getIcebergCatalogProperties(),
key -> catalogPropertiesToClientKeys.contains(key));

this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties();
}
Expand Down Expand Up @@ -311,14 +296,7 @@ private void closePostgreSQLCatalogResource() {
// Some io and security configuration should pass to Iceberg REST client
private LoadTableResponse injectTableConfig(Supplier<LoadTableResponse> supplier) {
LoadTableResponse loadTableResponse = supplier.get();
return LoadTableResponse.builder()
.withTableMetadata(loadTableResponse.tableMetadata())
.addAllConfig(getCatalogConfigToClient())
.build();
}

private Map<String, String> getCatalogConfigToClient() {
return catalogConfigToClients;
return LoadTableResponse.builder().withTableMetadata(loadTableResponse.tableMetadata()).build();
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.CatalogCredentialManager;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.CredentialPropertyUtils;
import org.apache.gravitino.credential.PathBasedCredentialContext;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.utils.MapUtils;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
Expand All @@ -41,13 +44,26 @@
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;

/** */
public class CatalogWrapper extends IcebergCatalogWrapper {
/** Process Iceberg REST specific operations, like credential vending. */
public class CatalogWrapperForREST extends IcebergCatalogWrapper {

private final CatalogCredentialManager catalogCredentialManager;

public CatalogWrapper(String catalogName, IcebergConfig config) {
private Map<String, String> catalogConfigToClients;

private static final Set<String> catalogPropertiesToClientKeys =
ImmutableSet.of(
IcebergConstants.IO_IMPL,
IcebergConstants.AWS_S3_REGION,
IcebergConstants.ICEBERG_S3_ENDPOINT,
IcebergConstants.ICEBERG_OSS_ENDPOINT);

public CatalogWrapperForREST(String catalogName, IcebergConfig config) {
super(config);
this.catalogConfigToClients =
MapUtils.getFilteredMap(
config.getIcebergCatalogProperties(),
key -> catalogPropertiesToClientKeys.contains(key));
// To compatibility with old version
Map<String, String> catalogProperties = normalizeCredentialProperties(config.getAllConfig());
this.catalogCredentialManager = new CatalogCredentialManager(catalogName, catalogProperties);
Expand Down Expand Up @@ -78,6 +94,10 @@ public void close() {
}
}

private Map<String, String> getCatalogConfigToClient() {
return catalogConfigToClients;
}

private LoadTableResponse injectCredentialConfig(
TableIdentifier tableIdentifier, LoadTableResponse loadTableResponse) {
TableMetadata tableMetadata = loadTableResponse.tableMetadata();
Expand Down Expand Up @@ -106,10 +126,12 @@ private LoadTableResponse injectCredentialConfig(
return LoadTableResponse.builder()
.withTableMetadata(loadTableResponse.tableMetadata())
.addAllConfig(loadTableResponse.config())
.addAllConfig(getCatalogConfigToClient())
.addAllConfig(credentialConfig)
.build();
}

@SuppressWarnings("Deperecated")
private Map<String, String> normalizeCredentialProperties(Map<String, String> properties) {
HashMap<String, String> normalizedProperties = new HashMap<>(properties);
String credentialProviderType = properties.get(CredentialConstants.CREDENTIAL_PROVIDER_TYPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class IcebergCatalogWrapperManager implements AutoCloseable {

public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class);

private final Cache<String, CatalogWrapper> icebergCatalogWrapperCache;
private final Cache<String, CatalogWrapperForREST> icebergCatalogWrapperCache;

private final IcebergConfigProvider configProvider;

Expand Down Expand Up @@ -72,32 +72,33 @@ public IcebergCatalogWrapperManager(
* ([^/]*\/), end with /
* @return the instance of IcebergCatalogWrapper.
*/
public CatalogWrapper getOps(String rawPrefix) {
public CatalogWrapperForREST getOps(String rawPrefix) {
String catalogName = IcebergRestUtils.getCatalogName(rawPrefix);
return getCatalogWrapper(catalogName);
}

public CatalogWrapper getCatalogWrapper(String catalogName) {
CatalogWrapper catalogWrapper =
public CatalogWrapperForREST getCatalogWrapper(String catalogName) {
CatalogWrapperForREST catalogWrapperForREST =
icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName));
// Reload conf to reset UserGroupInformation or icebergTableOps will always use
// Simple auth.
catalogWrapper.reloadHadoopConf();
return catalogWrapper;
catalogWrapperForREST.reloadHadoopConf();
return catalogWrapperForREST;
}

private CatalogWrapper createCatalogWrapper(String catalogName) {
private CatalogWrapperForREST createCatalogWrapper(String catalogName) {
Optional<IcebergConfig> icebergConfig = configProvider.getIcebergCatalogConfig(catalogName);
if (!icebergConfig.isPresent()) {
throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName);
}
return createCatalogWrapper(catalogName, icebergConfig.get());
}

// Overriding this method to create a new CatalogWrapper for test;
// Overriding this method to create a new CatalogWrapperForREST for test;
@VisibleForTesting
protected CatalogWrapper createCatalogWrapper(String catalogName, IcebergConfig icebergConfig) {
return new CatalogWrapper(catalogName, icebergConfig);
protected CatalogWrapperForREST createCatalogWrapper(
String catalogName, IcebergConfig icebergConfig) {
return new CatalogWrapperForREST(catalogName, icebergConfig);
}

private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private Map<String, String> getADLSConfig() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
CredentialConstants.ADLS_TOKEN_CREDENTIAL_PROVIDER_TYPE);
configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private Map<String, String> getADLSConfig() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
CredentialConstants.AZURE_ACCOUNT_KEY_CREDENTIAL_PROVIDER_TYPE);
configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private Map<String, String> getGCSConfig() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
CredentialConstants.GCS_TOKEN_CREDENTIAL_PROVIDER_TYPE);
configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private Map<String, String> getOSSConfig() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
CredentialConstants.OSS_TOKEN_CREDENTIAL_PROVIDER);
configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + OSSProperties.GRAVITINO_OSS_REGION, region);
configMap.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private Map<String, String> getOSSConfig() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
OSSSecretKeyCredential.OSS_SECRET_KEY_CREDENTIAL_TYPE);
configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + OSSProperties.GRAVITINO_OSS_ENDPOINT, endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private Map<String, String> getS3Config() {
Map configMap = new HashMap<String, String>();

configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
IcebergConfig.ICEBERG_CONFIG_PREFIX + CredentialConstants.CREDENTIAL_PROVIDERS,
CredentialConstants.S3_TOKEN_CREDENTIAL_PROVIDER);
configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + S3Properties.GRAVITINO_S3_REGION, region);
configMap.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestIcebergCatalogWrapperManager {
public class TestIcebergCatalogWrapperManagerForREST {

private static final String DEFAULT_CATALOG = "memory";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.gravitino.iceberg.service.rest;

import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.service.CatalogWrapper;
import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
Expand All @@ -32,7 +32,7 @@
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

// Used to override registerTable
public class CatalogWrapperForTest extends CatalogWrapper {
public class CatalogWrapperForTest extends CatalogWrapperForREST {
public CatalogWrapperForTest(String catalogName, IcebergConfig icebergConfig) {
super(catalogName, icebergConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import java.util.Map;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.service.CatalogWrapper;
import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;

Expand All @@ -33,7 +33,8 @@ public IcebergCatalogWrapperManagerForTest(
}

@Override
public CatalogWrapper createCatalogWrapper(String catalogName, IcebergConfig icebergConfig) {
public CatalogWrapperForREST createCatalogWrapper(
String catalogName, IcebergConfig icebergConfig) {
return new CatalogWrapperForTest(catalogName, icebergConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public static ResourceConfig getIcebergResourceConfig(
StaticIcebergConfigProvider.class.getName());
catalogConf.put(String.format("%s.catalog-backend-name", catalogConfigPrefix), PREFIX);
catalogConf.put(
CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE);
CredentialConstants.CREDENTIAL_PROVIDERS, DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE);
IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(catalogConf);
configProvider.initialize(catalogConf);
// used to override register table interface
Expand Down

0 comments on commit 3177994

Please sign in to comment.