Skip to content

Commit

Permalink
support credential cache
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Dec 25, 2024
1 parent c37d0f9 commit f32da01
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
public class CredentialConstants {
public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";
public static final String CREDENTIAL_PROVIDERS = "credential-providers";
public static final String CREDENTIAL_MAX_CACHE_TIME = "credential-max-cache-time-ms";
public static final String CREDENTIAL_MAX_CACHE_SIZE = "credential-max-cache-size";
public static final String S3_TOKEN_CREDENTIAL_PROVIDER = "s3-token";
public static final String S3_TOKEN_EXPIRE_IN_SECS = "s3-token-expire-in-secs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,27 @@ public CatalogCredentialContext(String userName) {
public String getUserName() {
return userName;
}

@Override
public int hashCodeIgnoreUser() {
return 9999;
}

@Override
public boolean equalsIgnoreUser(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof CatalogCredentialContext)) {
return false;
}
return true;
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("User name: ").append(userName);
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

package org.apache.gravitino.credential;

import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.exceptions.NoSuchCredentialException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,20 +34,21 @@ public class CatalogCredentialManager implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(CatalogCredentialManager.class);

private CredentialCacheManager<CredentialCacheKey> cacheManager;

private final String catalogName;
private final Map<String, CredentialProvider> credentialProviders;

public CatalogCredentialManager(String catalogName, Map<String, String> catalogProperties) {
this.catalogName = catalogName;
this.credentialProviders = CredentialUtils.loadCredentialProviders(catalogProperties);
this.cacheManager = new CredentialCacheManager();
cacheManager.initialize(catalogProperties);
}

public Credential getCredential(String credentialType, CredentialContext context) {
// todo: add credential cache
Preconditions.checkState(
credentialProviders.containsKey(credentialType),
String.format("Credential %s not found", credentialType));
return credentialProviders.get(credentialType).getCredential(context);
CredentialCacheKey credentialCacheKey = new CredentialCacheKey(credentialType, context);
return cacheManager.getCredential(credentialCacheKey, cacheKey -> getCredential(cacheKey));
}

@Override
Expand All @@ -67,4 +68,15 @@ public void close() {
}
});
}

public Credential getCredential(CredentialCacheKey credentialCacheKey) {
String credentialType = credentialCacheKey.getCredentialType();
CredentialContext context = credentialCacheKey.getCredentialContext();
LOG.info("try get credential, credential type: {}, context: {}", credentialType, context);
CredentialProvider credentialProvider = credentialProviderMap.get(credentialType);
if (credentialProvider == null) {
throw new NoSuchCredentialException("No such credential: %s", credentialType);
}
return credentialProvider.getCredential(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import java.util.Objects;
import lombok.Getter;

@Getter
public class CredentialCacheKey {

private String credentialType;
private CredentialContext credentialContext;

public CredentialCacheKey(String credentialType, CredentialContext credentialContext) {
this.credentialType = credentialType;
this.credentialContext = credentialContext;
}

@Override
public int hashCode() {
return Objects.hashCode(credentialType) + credentialContext.hashCodeIgnoreUser();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof CredentialCacheKey)) {
return false;
}
CredentialCacheKey that = (CredentialCacheKey) o;
return Objects.equals(credentialType, that.credentialType)
&& credentialContext.equalsIgnoreUser(that.credentialContext);
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder
.append("credentialType: ")
.append(credentialType)
.append("credentialContext: ")
.append(credentialContext);
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.gravitino.catalog.CatalogManager;
import org.checkerframework.checker.index.qual.NonNegative;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CredentialCacheManager<T> {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);

// Calculates the credential expire time in the cache.
static class CredentialExpireTimeCaculator<T> implements Expiry<T, Credential> {

private long credentialMaxCacheTimeInMs = 0;

public CredentialExpireTimeCaculator(long credentialMaxCacheTimeInMs) {
this.credentialMaxCacheTimeInMs = credentialMaxCacheTimeInMs;
}

// Set expire time after add a credential in the cache.
@Override
public long expireAfterCreate(
@NonNull T key, @NonNull Credential credential, long currentTime) {
long credentialExpireTime = credential.expireTimeInMs();
long timeToExpire = credentialExpireTime - System.currentTimeMillis();
if (timeToExpire <= 0) {
return 0;
}

long idealExpireTime = Math.min(timeToExpire / 2, credentialMaxCacheTimeInMs);
LOG.info("credential expire time: {}", idealExpireTime);
return TimeUnit.MILLISECONDS.toNanos(idealExpireTime);
}

// not change expire time after update credential, this should not happen.
@Override
public long expireAfterUpdate(
@NonNull T key,
@NonNull Credential value,
long currentTime,
@NonNegative long currentDuration) {
return currentDuration;
}

// not change expire time after read credential.
@Override
public long expireAfterRead(
@NonNull T key,
@NonNull Credential value,
long currentTime,
@NonNegative long currentDuration) {
return currentDuration;
}
}

private Cache<T, Credential> credentialCache;

public void initialize(Map<String, String> catalogProperties) {
try {
long cache_size =
Long.valueOf(
catalogProperties.getOrDefault(
CredentialConstants.CREDENTIAL_MAX_CACHE_SIZE, "1000"));
long cache_time =
Long.valueOf(
catalogProperties.getOrDefault(
CredentialConstants.CREDENTIAL_MAX_CACHE_TIME, "60000"));

this.credentialCache =
Caffeine.newBuilder()
.expireAfter(new CredentialExpireTimeCaculator(cache_time))
.maximumSize(cache_size)
.removalListener(
(k, credential, c) -> {
LOG.info("credential expire {}.", k);
})
.build();
} catch (NumberFormatException e) {
throw new RuntimeException(e);
}
}

public Credential getCredential(T cacheKey, Function<T, Credential> credentialSupplier) {
return credentialCache.get(cacheKey, key -> credentialSupplier.apply(cacheKey));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@

/** Contains credential context information to get credential from a credential provider. */
public interface CredentialContext {

/**
* Providing the username.
*
* @return A string identifying user name.
*/
String getUserName();

int hashCodeIgnoreUser();

boolean equalsIgnoreUser(Object o);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.gravitino.credential;

import com.google.common.base.Preconditions;
import java.util.Objects;
import java.util.Set;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -48,11 +49,41 @@ public String getUserName() {
return userName;
}

@Override
public int hashCodeIgnoreUser() {
return Objects.hash(writePaths, readPaths);
}

@Override
public boolean equalsIgnoreUser(Object o) {
if (this == o) {
return true;
}
if (o == null || !(o instanceof PathBasedCredentialContext)) {
return false;
}
PathBasedCredentialContext that = (PathBasedCredentialContext) o;
return Objects.equals(writePaths, that.writePaths) && Objects.equals(readPaths, that.readPaths);
}

public Set<String> getWritePaths() {
return writePaths;
}

public Set<String> getReadPaths() {
return readPaths;
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder
.append("User name: ")
.append(userName)
.append(", write path: ")
.append(writePaths)
.append(", read path: ")
.append(readPaths);
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,33 @@ public class CredentialConfig {
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
CredentialConstants.CREDENTIAL_PROVIDERS,
PropertyEntry.booleanPropertyEntry(
PropertyEntry.stringPropertyEntry(
CredentialConstants.CREDENTIAL_PROVIDERS,
"Credential providers for the Gravitino catalog, schema, fileset, table, etc.",
false /* required */,
false /* immutable */,
null /* default value */,
false /* hidden */,
false /* reserved */))
.put(
CredentialConstants.CREDENTIAL_MAX_CACHE_TIME,
PropertyEntry.longPropertyEntry(
CredentialConstants.CREDENTIAL_MAX_CACHE_TIME,
"Max cache time for the credential.",
false /* required */,
false /* immutable */,
300_000L /* default value */,
false /* hidden */,
false /* reserved */))
.put(
CredentialConstants.CREDENTIAL_MAX_CACHE_SIZE,
PropertyEntry.longPropertyEntry(
CredentialConstants.CREDENTIAL_MAX_CACHE_SIZE,
"Max cache size for the credential.",
false /* required */,
false /* immutable */,
2000 /* default value */,
false /* hidden */,
false /* reserved */))
.build();
}
Loading

0 comments on commit f32da01

Please sign in to comment.