Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import static org.assertj.core.api.Assertions.assertThat;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.NullSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
import software.amazon.awssdk.services.kms.model.CreateKeyResponse;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.KeySpec;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionResponse;

@EnabledIfEnvironmentVariables({
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_ACCESS_KEY_ID, matches = ".*"),
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SECRET_ACCESS_KEY, matches = ".*"),
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SESSION_TOKEN, matches = ".*"),
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_TEST_ACCOUNT_ID, matches = "\\d{12}")
})
public class TestKeyManagementClient {

private static final Logger LOG = LoggerFactory.getLogger(TestKeyManagementClient.class);

private static KmsClient kmsClient;
private static String keyId;

@BeforeAll
public static void beforeClass() {
kmsClient = AwsClientFactories.defaultFactory().kms();
CreateKeyRequest createKeyRequest =
CreateKeyRequest.builder()
.keySpec(KeySpec.SYMMETRIC_DEFAULT)
.description(
"Iceberg integration test key for " + TestKeyManagementClient.class.getName())
.build();
CreateKeyResponse response = kmsClient.createKey(createKeyRequest);
keyId = response.keyMetadata().keyId();
}

@AfterAll
public static void afterClass() {
// AWS KMS doesn't allow instant deletion. Keys can be put to pendingDeletion state instead,
// with a minimum of 7 days until final removal.
ScheduleKeyDeletionRequest deletionRequest =
ScheduleKeyDeletionRequest.builder().keyId(keyId).pendingWindowInDays(7).build();

ScheduleKeyDeletionResponse deletionResponse = kmsClient.scheduleKeyDeletion(deletionRequest);
LOG.info(
"Deletion of test key {} will be finalized at {}", keyId, deletionResponse.deletionDate());

try {
kmsClient.close();
} catch (Exception e) {
LOG.error("Error closing KMS client", e);
}
}

@Test
public void testKeyWrapping() {
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
keyManagementClient.initialize(ImmutableMap.of());

ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes());
ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyId);

assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
}
}

@ParameterizedTest
@NullSource
@EnumSource(
value = DataKeySpec.class,
names = {"AES_128", "AES_256"})
public void testKeyGeneration(DataKeySpec dataKeySpec) {
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
Map<String, String> properties =
dataKeySpec == null
? ImmutableMap.of()
: ImmutableMap.of(AwsProperties.KMS_DATA_KEY_SPEC, dataKeySpec.name());
keyManagementClient.initialize(properties);
KeyManagementClient.KeyGenerationResult result = keyManagementClient.generateKey(keyId);

assertThat(keyManagementClient.unwrapKey(result.wrappedKey(), keyId)).isEqualTo(result.key());
assertThat(result.key().limit()).isEqualTo(expectedLength(dataKeySpec));
}
}

private static int expectedLength(DataKeySpec spec) {
if (DataKeySpec.AES_128.equals(spec)) {
return 128 / 8;
} else {
return 256 / 8;
}
}
}
105 changes: 105 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsKeyManagementClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.aws;

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.encryption.KeyManagementClient;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.DecryptRequest;
import software.amazon.awssdk.services.kms.model.DecryptResponse;
import software.amazon.awssdk.services.kms.model.EncryptRequest;
import software.amazon.awssdk.services.kms.model.EncryptResponse;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest;
import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse;

/**
* Key management client implementation that uses AWS Key Management Service. To be used for
* encrypting/decrypting keys with a KMS-managed master key, (by referencing its key ID), and for
* the generation of new encryption keys.
*/
public class AwsKeyManagementClient implements KeyManagementClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will need to be serializable so that it can be used in a distributed setting (since an EncryptionManager instance can also be serialized, and generally we'd expect an EncryptionManager to also contain an instance of KeyManagementClient

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah KeyManagementClient does implement Serializable as is (though looking at the implementation, wonder how necessary that is - it's transient in the StandardEncryptionmanager both now and after #7770, cc @ggershinsky)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @smaheshwar-pltr points it out the class implements Serializable by interface but I also have doubts whether that's actually needed. KMS interactions for table encryption only concerns the master key which I'd expect to take place on driver / coordinator nodes, encrypting/decrypting the metadata json or manifest lists. Everything else down to the data files will not require a KMS.


private KmsClient kmsClient;
private EncryptionAlgorithmSpec encryptionAlgorithmSpec;
private DataKeySpec dataKeySpec;

@Override
public void initialize(Map<String, String> properties) {
AwsClientFactory clientFactory = AwsClientFactories.from(properties);
this.kmsClient = clientFactory.kms();

AwsProperties awsProperties = new AwsProperties(properties);
this.encryptionAlgorithmSpec = awsProperties.kmsEncryptionAlgorithmSpec();
this.dataKeySpec = awsProperties.kmsDataKeySpec();
}

@Override
public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
EncryptRequest request =
EncryptRequest.builder()
.keyId(wrappingKeyId)
.encryptionAlgorithm(encryptionAlgorithmSpec)
.plaintext(SdkBytes.fromByteBuffer(key))
.build();

EncryptResponse result = kmsClient.encrypt(request);
return result.ciphertextBlob().asByteBuffer();
}

@Override
public boolean supportsKeyGeneration() {
return true;
}

@Override
public KeyGenerationResult generateKey(String wrappingKeyId) {
GenerateDataKeyRequest request =
GenerateDataKeyRequest.builder().keyId(wrappingKeyId).keySpec(dataKeySpec).build();

GenerateDataKeyResponse response = kmsClient.generateDataKey(request);
KeyGenerationResult result =
new KeyGenerationResult(
response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer());
return result;
}

@Override
public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
DecryptRequest request =
DecryptRequest.builder()
.keyId(wrappingKeyId)
.encryptionAlgorithm(encryptionAlgorithmSpec)
.ciphertextBlob(SdkBytes.fromByteBuffer(wrappedKey))
.build();

DecryptResponse result = kmsClient.decrypt(request);
return result.plaintext().asByteBuffer();
}

@Override
public void close() {
if (kmsClient != null) {
kmsClient.close();
}
}
}
34 changes: 34 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;

public class AwsProperties implements Serializable {

Expand Down Expand Up @@ -206,6 +208,17 @@ public class AwsProperties implements Serializable {
*/
public static final String REST_SESSION_TOKEN = "rest.session-token";

/** Encryption algorithm used to encrypt/decrypt master table keys */
public static final String KMS_ENCRYPTION_ALGORITHM_SPEC = "kms.encryption-algorithm-spec";

public static final EncryptionAlgorithmSpec KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT =
EncryptionAlgorithmSpec.SYMMETRIC_DEFAULT;

/** Length of data key generated by KMS */
public static final String KMS_DATA_KEY_SPEC = "kms.data-key-spec";

public static final DataKeySpec KMS_DATA_KEY_SPEC_DEFAULT = DataKeySpec.AES_256;

private final Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags;

private final String clientAssumeRoleArn;
Expand All @@ -230,6 +243,8 @@ public class AwsProperties implements Serializable {
private String restAccessKeyId;
private String restSecretAccessKey;
private String restSessionToken;
private EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec;
private DataKeySpec kmsDataKeySpec;

public AwsProperties() {
this.stsClientAssumeRoleTags = Sets.newHashSet();
Expand All @@ -252,6 +267,9 @@ public AwsProperties() {
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;

this.restSigningName = REST_SIGNING_NAME_DEFAULT;

this.kmsEncryptionAlgorithmSpec = KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT;
this.kmsDataKeySpec = KMS_DATA_KEY_SPEC_DEFAULT;
}

@SuppressWarnings("MethodLength")
Expand Down Expand Up @@ -293,6 +311,14 @@ public AwsProperties(Map<String, String> properties) {
this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID);
this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY);
this.restSessionToken = properties.get(REST_SESSION_TOKEN);

this.kmsEncryptionAlgorithmSpec =
EncryptionAlgorithmSpec.fromValue(
properties.getOrDefault(
KMS_ENCRYPTION_ALGORITHM_SPEC, KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT.toString()));
this.kmsDataKeySpec =
DataKeySpec.fromValue(
properties.getOrDefault(KMS_DATA_KEY_SPEC, KMS_DATA_KEY_SPEC_DEFAULT.toString()));
}

public Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags() {
Expand Down Expand Up @@ -402,6 +428,14 @@ public AwsCredentialsProvider restCredentialsProvider() {
this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken);
}

public EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec() {
return this.kmsEncryptionAlgorithmSpec;
}

public DataKeySpec kmsDataKeySpec() {
return this.kmsDataKeySpec;
}

private Set<software.amazon.awssdk.services.sts.model.Tag> toStsTags(
Map<String, String> properties, String prefix) {
return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Map;

/** A minimum client interface to connect to a key management service (KMS). */
interface KeyManagementClient extends Serializable, Closeable {
public interface KeyManagementClient extends Serializable, Closeable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will necessarily need to be public so that implementations can reside in other modules, so I do agree with this change.

It'd be good if @rdblue can confirm that or if some other implementation pattern was intended here.

Copy link
Contributor

@smaheshwar-pltr smaheshwar-pltr Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I believe that it being public lets catalogs / table operations in other packages use the KMS client - e.g. #13066 / #13225.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely looks like something which needs to be public


/**
* Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID.
Expand Down Expand Up @@ -94,7 +94,7 @@ class KeyGenerationResult {
private final ByteBuffer key;
private final ByteBuffer wrappedKey;

KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
public KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
this.key = key;
this.wrappedKey = wrappedKey;
}
Expand Down