Skip to content

Commit 77f1f5b

Browse files
authored
AWS: KeyManagementClient implementation that works with AWS KMS (#13136)
1 parent 29a87e7 commit 77f1f5b

File tree

4 files changed

+268
-2
lines changed

4 files changed

+268
-2
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.nio.ByteBuffer;
24+
import java.util.Map;
25+
import org.apache.iceberg.encryption.KeyManagementClient;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
31+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.EnumSource;
34+
import org.junit.jupiter.params.provider.NullSource;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
import software.amazon.awssdk.services.kms.KmsClient;
38+
import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
39+
import software.amazon.awssdk.services.kms.model.CreateKeyResponse;
40+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
41+
import software.amazon.awssdk.services.kms.model.KeySpec;
42+
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
43+
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionResponse;
44+
45+
@EnabledIfEnvironmentVariables({
46+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_ACCESS_KEY_ID, matches = ".*"),
47+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SECRET_ACCESS_KEY, matches = ".*"),
48+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SESSION_TOKEN, matches = ".*"),
49+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_TEST_ACCOUNT_ID, matches = "\\d{12}")
50+
})
51+
public class TestKeyManagementClient {
52+
53+
private static final Logger LOG = LoggerFactory.getLogger(TestKeyManagementClient.class);
54+
55+
private static KmsClient kmsClient;
56+
private static String keyId;
57+
58+
@BeforeAll
59+
public static void beforeClass() {
60+
kmsClient = AwsClientFactories.defaultFactory().kms();
61+
CreateKeyRequest createKeyRequest =
62+
CreateKeyRequest.builder()
63+
.keySpec(KeySpec.SYMMETRIC_DEFAULT)
64+
.description(
65+
"Iceberg integration test key for " + TestKeyManagementClient.class.getName())
66+
.build();
67+
CreateKeyResponse response = kmsClient.createKey(createKeyRequest);
68+
keyId = response.keyMetadata().keyId();
69+
}
70+
71+
@AfterAll
72+
public static void afterClass() {
73+
// AWS KMS doesn't allow instant deletion. Keys can be put to pendingDeletion state instead,
74+
// with a minimum of 7 days until final removal.
75+
ScheduleKeyDeletionRequest deletionRequest =
76+
ScheduleKeyDeletionRequest.builder().keyId(keyId).pendingWindowInDays(7).build();
77+
78+
ScheduleKeyDeletionResponse deletionResponse = kmsClient.scheduleKeyDeletion(deletionRequest);
79+
LOG.info(
80+
"Deletion of test key {} will be finalized at {}", keyId, deletionResponse.deletionDate());
81+
82+
try {
83+
kmsClient.close();
84+
} catch (Exception e) {
85+
LOG.error("Error closing KMS client", e);
86+
}
87+
}
88+
89+
@Test
90+
public void testKeyWrapping() {
91+
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
92+
keyManagementClient.initialize(ImmutableMap.of());
93+
94+
ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes());
95+
ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyId);
96+
97+
assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
98+
}
99+
}
100+
101+
@ParameterizedTest
102+
@NullSource
103+
@EnumSource(
104+
value = DataKeySpec.class,
105+
names = {"AES_128", "AES_256"})
106+
public void testKeyGeneration(DataKeySpec dataKeySpec) {
107+
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
108+
Map<String, String> properties =
109+
dataKeySpec == null
110+
? ImmutableMap.of()
111+
: ImmutableMap.of(AwsProperties.KMS_DATA_KEY_SPEC, dataKeySpec.name());
112+
keyManagementClient.initialize(properties);
113+
KeyManagementClient.KeyGenerationResult result = keyManagementClient.generateKey(keyId);
114+
115+
assertThat(keyManagementClient.unwrapKey(result.wrappedKey(), keyId)).isEqualTo(result.key());
116+
assertThat(result.key().limit()).isEqualTo(expectedLength(dataKeySpec));
117+
}
118+
}
119+
120+
private static int expectedLength(DataKeySpec spec) {
121+
if (DataKeySpec.AES_128.equals(spec)) {
122+
return 128 / 8;
123+
} else {
124+
return 256 / 8;
125+
}
126+
}
127+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.Map;
23+
import org.apache.iceberg.encryption.KeyManagementClient;
24+
import software.amazon.awssdk.core.SdkBytes;
25+
import software.amazon.awssdk.services.kms.KmsClient;
26+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
27+
import software.amazon.awssdk.services.kms.model.DecryptRequest;
28+
import software.amazon.awssdk.services.kms.model.DecryptResponse;
29+
import software.amazon.awssdk.services.kms.model.EncryptRequest;
30+
import software.amazon.awssdk.services.kms.model.EncryptResponse;
31+
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
32+
import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest;
33+
import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse;
34+
35+
/**
36+
* Key management client implementation that uses AWS Key Management Service. To be used for
37+
* encrypting/decrypting keys with a KMS-managed master key, (by referencing its key ID), and for
38+
* the generation of new encryption keys.
39+
*/
40+
public class AwsKeyManagementClient implements KeyManagementClient {
41+
42+
private KmsClient kmsClient;
43+
private EncryptionAlgorithmSpec encryptionAlgorithmSpec;
44+
private DataKeySpec dataKeySpec;
45+
46+
@Override
47+
public void initialize(Map<String, String> properties) {
48+
AwsClientFactory clientFactory = AwsClientFactories.from(properties);
49+
this.kmsClient = clientFactory.kms();
50+
51+
AwsProperties awsProperties = new AwsProperties(properties);
52+
this.encryptionAlgorithmSpec = awsProperties.kmsEncryptionAlgorithmSpec();
53+
this.dataKeySpec = awsProperties.kmsDataKeySpec();
54+
}
55+
56+
@Override
57+
public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
58+
EncryptRequest request =
59+
EncryptRequest.builder()
60+
.keyId(wrappingKeyId)
61+
.encryptionAlgorithm(encryptionAlgorithmSpec)
62+
.plaintext(SdkBytes.fromByteBuffer(key))
63+
.build();
64+
65+
EncryptResponse result = kmsClient.encrypt(request);
66+
return result.ciphertextBlob().asByteBuffer();
67+
}
68+
69+
@Override
70+
public boolean supportsKeyGeneration() {
71+
return true;
72+
}
73+
74+
@Override
75+
public KeyGenerationResult generateKey(String wrappingKeyId) {
76+
GenerateDataKeyRequest request =
77+
GenerateDataKeyRequest.builder().keyId(wrappingKeyId).keySpec(dataKeySpec).build();
78+
79+
GenerateDataKeyResponse response = kmsClient.generateDataKey(request);
80+
KeyGenerationResult result =
81+
new KeyGenerationResult(
82+
response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer());
83+
return result;
84+
}
85+
86+
@Override
87+
public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
88+
DecryptRequest request =
89+
DecryptRequest.builder()
90+
.keyId(wrappingKeyId)
91+
.encryptionAlgorithm(encryptionAlgorithmSpec)
92+
.ciphertextBlob(SdkBytes.fromByteBuffer(wrappedKey))
93+
.build();
94+
95+
DecryptResponse result = kmsClient.decrypt(request);
96+
return result.plaintext().asByteBuffer();
97+
}
98+
99+
@Override
100+
public void close() {
101+
if (kmsClient != null) {
102+
kmsClient.close();
103+
}
104+
}
105+
}

aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
4242
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
4343
import software.amazon.awssdk.services.glue.GlueClientBuilder;
44+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
45+
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
4446

4547
public class AwsProperties implements Serializable {
4648

@@ -206,6 +208,17 @@ public class AwsProperties implements Serializable {
206208
*/
207209
public static final String REST_SESSION_TOKEN = "rest.session-token";
208210

211+
/** Encryption algorithm used to encrypt/decrypt master table keys */
212+
public static final String KMS_ENCRYPTION_ALGORITHM_SPEC = "kms.encryption-algorithm-spec";
213+
214+
public static final EncryptionAlgorithmSpec KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT =
215+
EncryptionAlgorithmSpec.SYMMETRIC_DEFAULT;
216+
217+
/** Length of data key generated by KMS */
218+
public static final String KMS_DATA_KEY_SPEC = "kms.data-key-spec";
219+
220+
public static final DataKeySpec KMS_DATA_KEY_SPEC_DEFAULT = DataKeySpec.AES_256;
221+
209222
private final Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags;
210223

211224
private final String clientAssumeRoleArn;
@@ -230,6 +243,8 @@ public class AwsProperties implements Serializable {
230243
private String restAccessKeyId;
231244
private String restSecretAccessKey;
232245
private String restSessionToken;
246+
private EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec;
247+
private DataKeySpec kmsDataKeySpec;
233248

234249
public AwsProperties() {
235250
this.stsClientAssumeRoleTags = Sets.newHashSet();
@@ -252,6 +267,9 @@ public AwsProperties() {
252267
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
253268

254269
this.restSigningName = REST_SIGNING_NAME_DEFAULT;
270+
271+
this.kmsEncryptionAlgorithmSpec = KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT;
272+
this.kmsDataKeySpec = KMS_DATA_KEY_SPEC_DEFAULT;
255273
}
256274

257275
@SuppressWarnings("MethodLength")
@@ -293,6 +311,14 @@ public AwsProperties(Map<String, String> properties) {
293311
this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID);
294312
this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY);
295313
this.restSessionToken = properties.get(REST_SESSION_TOKEN);
314+
315+
this.kmsEncryptionAlgorithmSpec =
316+
EncryptionAlgorithmSpec.fromValue(
317+
properties.getOrDefault(
318+
KMS_ENCRYPTION_ALGORITHM_SPEC, KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT.toString()));
319+
this.kmsDataKeySpec =
320+
DataKeySpec.fromValue(
321+
properties.getOrDefault(KMS_DATA_KEY_SPEC, KMS_DATA_KEY_SPEC_DEFAULT.toString()));
296322
}
297323

298324
public Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags() {
@@ -402,6 +428,14 @@ public AwsCredentialsProvider restCredentialsProvider() {
402428
this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken);
403429
}
404430

431+
public EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec() {
432+
return this.kmsEncryptionAlgorithmSpec;
433+
}
434+
435+
public DataKeySpec kmsDataKeySpec() {
436+
return this.kmsDataKeySpec;
437+
}
438+
405439
private Set<software.amazon.awssdk.services.sts.model.Tag> toStsTags(
406440
Map<String, String> properties, String prefix) {
407441
return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()

core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Map;
2525

2626
/** A minimum client interface to connect to a key management service (KMS). */
27-
interface KeyManagementClient extends Serializable, Closeable {
27+
public interface KeyManagementClient extends Serializable, Closeable {
2828

2929
/**
3030
* Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID.
@@ -94,7 +94,7 @@ class KeyGenerationResult {
9494
private final ByteBuffer key;
9595
private final ByteBuffer wrappedKey;
9696

97-
KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
97+
public KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
9898
this.key = key;
9999
this.wrappedKey = wrappedKey;
100100
}

0 commit comments

Comments
 (0)