diff --git a/pom.xml b/pom.xml
index 735d0f1c3..977fc50d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,7 @@
32.1.3-jre
2.0.9
2.2
- 0.17.0-SNAPSHOT
+ 0.18.0-SNAPSHOT
23.5.26
diff --git a/s3stream/pom.xml b/s3stream/pom.xml
index d43abdc41..b6518b2e4 100644
--- a/s3stream/pom.xml
+++ b/s3stream/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.automq.elasticstream
s3stream
- 0.17.0-SNAPSHOT
+ 0.18.0-SNAPSHOT
5.5.0
5.10.0
diff --git a/s3stream/src/main/java/com/automq/stream/s3/Config.java b/s3stream/src/main/java/com/automq/stream/s3/Config.java
index e0ec033d6..79750f404 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/Config.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/Config.java
@@ -24,8 +24,6 @@ public class Config {
private String region;
private String bucket;
private boolean forcePathStyle = false;
- private String accessKey;
- private String secretKey;
private String walPath = "/tmp/s3stream_wal";
private long walCacheSize = 200 * 1024 * 1024;
private long walCapacity = 1024L * 1024 * 1024;
@@ -207,14 +205,6 @@ public boolean objectLogEnable() {
return objectLogEnable;
}
- public String accessKey() {
- return accessKey;
- }
-
- public String secretKey() {
- return secretKey;
- }
-
public long networkBaselineBandwidth() {
return networkBaselineBandwidth;
}
@@ -403,16 +393,6 @@ public Config objectLogEnable(boolean s3ObjectLogEnable) {
return this;
}
- public Config accessKey(String s3AccessKey) {
- this.accessKey = s3AccessKey;
- return this;
- }
-
- public Config secretKey(String s3SecretKey) {
- this.secretKey = s3SecretKey;
- return this;
- }
-
public Config networkBaselineBandwidth(long networkBaselineBandwidth) {
this.networkBaselineBandwidth = networkBaselineBandwidth;
return this;
diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
index f0f2961a8..3c489fc2d 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java
@@ -60,7 +60,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
@@ -113,20 +113,20 @@ public class DefaultS3Operator implements S3Operator {
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);
- public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
- String secretKey) {
- this(endpoint, region, bucket, forcePathStyle, accessKey, secretKey, null, null, false);
+ public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
+ List credentialsProviders) {
+ this(endpoint, region, bucket, forcePathStyle, credentialsProviders, null, null, false);
}
- public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle, String accessKey,
- String secretKey,
+ public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
+ List credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter, boolean readWriteIsolate) {
this.maxMergeReadSparsityRate = Utils.getMaxMergeReadSparsityRate();
this.networkInboundBandwidthLimiter = networkInboundBandwidthLimiter;
this.networkOutboundBandwidthLimiter = networkOutboundBandwidthLimiter;
- this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey);
- this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, accessKey, secretKey) : writeS3Client;
+ this.writeS3Client = newS3Client(endpoint, region, forcePathStyle, credentialsProviders);
+ this.readS3Client = readWriteIsolate ? newS3Client(endpoint, region, forcePathStyle, credentialsProviders) : writeS3Client;
this.inflightWriteLimiter = new Semaphore(50);
this.inflightReadLimiter = readWriteIsolate ? new Semaphore(50) : inflightWriteLimiter;
this.bucket = bucket;
@@ -137,8 +137,7 @@ public DefaultS3Operator(String endpoint, String region, String bucket, boolean
.setRegion(region)
.setBucketName(bucket)
.setForcePathStyle(forcePathStyle)
- .setAccessKey(accessKey)
- .setSecretKey(secretKey)
+ .setCredentialsProviders(credentialsProviders)
.build();
LOGGER.info("You are using s3Context: {}", s3Context);
checkAvailable(s3Context);
@@ -649,26 +648,30 @@ private void checkAvailable(S3Utils.S3Context s3Context) {
}
}
- public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle, String accessKey,
- String secretKey) {
+ public S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle,
+ List credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
- builder.credentialsProvider(AwsCredentialsProviderChain.builder()
- .reuseLastProviderEnabled(true)
- .credentialsProviders(
- () -> AwsBasicCredentials.create(accessKey, secretKey),
- InstanceProfileCredentialsProvider.create(),
- AnonymousCredentialsProvider.create()
- ).build()
- );
+ builder.credentialsProvider(newCredentialsProviderChain(credentialsProviders));
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}
+ private AwsCredentialsProvider newCredentialsProviderChain(List credentialsProviders) {
+ List providers = new ArrayList<>(credentialsProviders);
+ // Add default providers to the end of the chain
+ providers.add(InstanceProfileCredentialsProvider.create());
+ providers.add(AnonymousCredentialsProvider.create());
+ return AwsCredentialsProviderChain.builder()
+ .reuseLastProviderEnabled(true)
+ .credentialsProviders(providers)
+ .build();
+ }
+
/**
* Acquire read permit, permit will auto release when cf complete.
*
@@ -813,8 +816,7 @@ public static class Builder {
private String region;
private String bucket;
private boolean forcePathStyle;
- private String accessKey;
- private String secretKey;
+ private List credentialsProviders;
private AsyncNetworkBandwidthLimiter inboundLimiter;
private AsyncNetworkBandwidthLimiter outboundLimiter;
private boolean readWriteIsolate;
@@ -839,13 +841,8 @@ public Builder forcePathStyle(boolean forcePathStyle) {
return this;
}
- public Builder accessKey(String accessKey) {
- this.accessKey = accessKey;
- return this;
- }
-
- public Builder secretKey(String secretKey) {
- this.secretKey = secretKey;
+ public Builder credentialsProviders(List credentialsProviders) {
+ this.credentialsProviders = credentialsProviders;
return this;
}
@@ -865,7 +862,7 @@ public Builder readWriteIsolate(boolean readWriteIsolate) {
}
public DefaultS3Operator build() {
- return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, accessKey, secretKey,
+ return new DefaultS3Operator(endpoint, region, bucket, forcePathStyle, credentialsProviders,
inboundLimiter, outboundLimiter, readWriteIsolate);
}
}
diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
index 4004c4605..d835ea18b 100644
--- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
+++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java
@@ -34,10 +34,11 @@
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
@@ -84,35 +85,25 @@ private static String range(long start, long end) {
}
private static S3AsyncClient newS3AsyncClient(String endpoint, String region, boolean forcePathStyle,
- String accessKey, String secretKey) {
+ List credentialsProviders) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
if (StringUtils.isNotBlank(endpoint)) {
builder.endpointOverride(URI.create(endpoint));
}
builder.serviceConfiguration(c -> c.pathStyleAccessEnabled(forcePathStyle));
- builder.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
+ builder.credentialsProvider(AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build());
builder.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(1))
.apiCallAttemptTimeout(Duration.ofSeconds(30)));
return builder.build();
}
- private static String hideSecret(String secret) {
- if (secret == null) {
- return null;
- }
- if (secret.length() < 6) {
- return "*".repeat(secret.length());
- }
- return secret.substring(0, 3) + "*".repeat(secret.length() - 6) + secret.substring(secret.length() - 3);
- }
-
private static abstract class S3CheckTask implements AutoCloseable {
protected final S3AsyncClient client;
protected final String bucketName;
private final String taskName;
public S3CheckTask(S3Context context, String taskName) {
- this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.accessKey, context.secretKey);
+ this.client = newS3AsyncClient(context.endpoint, context.region, context.forcePathStyle, context.credentialsProviders);
this.bucketName = context.bucketName;
this.taskName = taskName;
}
@@ -363,17 +354,16 @@ public void close() {
public static class S3Context {
private final String endpoint;
- private final String accessKey;
- private final String secretKey;
+ private final List credentialsProviders;
private final String bucketName;
private final String region;
private final boolean forcePathStyle;
- public S3Context(String endpoint, String accessKey, String secretKey, String bucketName, String region,
+ public S3Context(String endpoint, List credentialsProviders, String bucketName,
+ String region,
boolean forcePathStyle) {
this.endpoint = endpoint;
- this.accessKey = accessKey;
- this.secretKey = secretKey;
+ this.credentialsProviders = credentialsProviders;
this.bucketName = bucketName;
this.region = region;
this.forcePathStyle = forcePathStyle;
@@ -406,11 +396,13 @@ public List advices() {
}
}
}
- if (StringUtils.isBlank(accessKey)) {
- advises.add("accessKey is blank. Please supply a valid accessKey.");
+ if (credentialsProviders == null || credentialsProviders.isEmpty()) {
+ advises.add("no credentials provider is supplied. Please supply a credentials provider.");
}
- if (StringUtils.isBlank(secretKey)) {
- advises.add("secretKey is blank. Please supply a valid secretKey.");
+ try (AwsCredentialsProviderChain chain = AwsCredentialsProviderChain.builder().credentialsProviders(credentialsProviders).build()) {
+ chain.resolveCredentials();
+ } catch (SdkClientException e) {
+ advises.add("all provided credentials providers are invalid. Please supply a valid credentials provider. Error msg: " + e.getMessage());
}
if (StringUtils.isBlank(region)) {
advises.add("region is blank. Please supply a valid region.");
@@ -425,8 +417,7 @@ public List advices() {
public String toString() {
return "S3CheckContext{" +
"endpoint='" + endpoint + '\'' +
- ", accessKey='" + hideSecret(accessKey) + '\'' +
- ", secretKey='" + hideSecret(secretKey) + '\'' +
+ ", credentialsProviders=" + credentialsProviders +
", bucketName='" + bucketName + '\'' +
", region='" + region + '\'' +
", forcePathStyle=" + forcePathStyle +
@@ -435,8 +426,7 @@ public String toString() {
public static class Builder {
private String endpoint;
- private String accessKey;
- private String secretKey;
+ private List credentialsProviders;
private String bucketName;
private String region;
private boolean forcePathStyle;
@@ -446,13 +436,8 @@ public Builder setEndpoint(String endpoint) {
return this;
}
- public Builder setAccessKey(String accessKey) {
- this.accessKey = accessKey;
- return this;
- }
-
- public Builder setSecretKey(String secretKey) {
- this.secretKey = secretKey;
+ public Builder setCredentialsProviders(List credentialsProviders) {
+ this.credentialsProviders = credentialsProviders;
return this;
}
@@ -472,7 +457,7 @@ public Builder setForcePathStyle(boolean forcePathStyle) {
}
public S3Context build() {
- return new S3Context(endpoint, accessKey, secretKey, bucketName, region, forcePathStyle);
+ return new S3Context(endpoint, credentialsProviders, bucketName, region, forcePathStyle);
}
}
diff --git a/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java b/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java
index 1e9833f2d..b97c17d6b 100644
--- a/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java
+++ b/store/src/main/java/com/automq/rocketmq/store/MessageStoreBuilder.java
@@ -40,6 +40,8 @@
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.S3Operator;
+import java.util.List;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import static com.automq.rocketmq.store.MessageStoreImpl.KV_NAMESPACE_CHECK_POINT;
@@ -64,7 +66,7 @@ public static MessageStoreImpl build(StoreConfig storeConfig, S3StreamConfig s3S
// S3 object manager, such as trim expired messages, etc.
S3Operator operator = new DefaultS3Operator(s3StreamConfig.s3Endpoint(), s3StreamConfig.s3Region(), s3StreamConfig.s3Bucket(),
- s3StreamConfig.s3ForcePathStyle(), s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey());
+ s3StreamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(s3StreamConfig.s3AccessKey(), s3StreamConfig.s3SecretKey())));
S3ObjectOperator objectOperator = new S3ObjectOperatorImpl(operator);
TransactionService transactionService = new TransactionService(storeConfig, timerService);
diff --git a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
index fe29a751b..bb050ab33 100644
--- a/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
+++ b/store/src/main/java/com/automq/rocketmq/store/S3StreamStore.java
@@ -58,6 +58,7 @@
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
public class S3StreamStore implements StreamStore {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamStore.class);
@@ -92,7 +93,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
}
S3Operator defaultOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
- streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
+ streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
+ networkInboundLimiter, networkOutboundLimiter, true);
WriteAheadLog writeAheadLog = BlockWALService.builder(s3Config.walPath(), s3Config.walCapacity()).config(s3Config).build();
S3BlockCache blockCache = new DefaultS3BlockCache(s3Config, objectManager, defaultOperator);
@@ -102,7 +104,8 @@ public S3StreamStore(StoreConfig storeConfig, S3StreamConfig streamConfig, Store
// Build the compaction manager
S3Operator compactionOperator = new DefaultS3Operator(streamConfig.s3Endpoint(), streamConfig.s3Region(), streamConfig.s3Bucket(),
- streamConfig.s3ForcePathStyle(), streamConfig.s3AccessKey(), streamConfig.s3SecretKey(), networkInboundLimiter, networkOutboundLimiter, true);
+ streamConfig.s3ForcePathStyle(), List.of(() -> AwsBasicCredentials.create(streamConfig.s3AccessKey(), streamConfig.s3SecretKey())),
+ networkInboundLimiter, networkOutboundLimiter, true);
this.compactionManager = new CompactionManager(s3Config, objectManager, streamManager, compactionOperator);
this.streamClient = new S3StreamClient(streamManager, storage, objectManager, defaultOperator, s3Config, networkInboundLimiter, networkOutboundLimiter);
@@ -255,8 +258,6 @@ private Config configFrom(S3StreamConfig streamConfig) {
config.bucket(streamConfig.s3Bucket());
config.forcePathStyle(streamConfig.s3ForcePathStyle());
config.walPath(streamConfig.s3WALPath());
- config.accessKey(streamConfig.s3AccessKey());
- config.secretKey(streamConfig.s3SecretKey());
config.networkBaselineBandwidth(streamConfig.networkBaselineBandwidth());
config.refillPeriodMs(streamConfig.refillPeriodMs());