Skip to content

Commit c47488c

Browse files
committed
AWS: KeyManagementClient implementation that works with AWS KMS
To be used in table encryption for: - wrapping/unwrapping encryption keys - generating data keys (available specs: AES_256, AES_128) Added integration test for verification.
1 parent d32ba35 commit c47488c

File tree

4 files changed

+265
-2
lines changed

4 files changed

+265
-2
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws;
20+
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import java.nio.ByteBuffer;
24+
import java.util.Map;
25+
import org.apache.iceberg.encryption.KeyManagementClient;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
27+
import org.junit.jupiter.api.AfterAll;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
31+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
import software.amazon.awssdk.services.kms.KmsClient;
35+
import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
36+
import software.amazon.awssdk.services.kms.model.CreateKeyResponse;
37+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
38+
import software.amazon.awssdk.services.kms.model.KeySpec;
39+
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
40+
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionResponse;
41+
42+
@EnabledIfEnvironmentVariables({
43+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_ACCESS_KEY_ID, matches = ".*"),
44+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SECRET_ACCESS_KEY, matches = ".*"),
45+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_SESSION_TOKEN, matches = ".*"),
46+
@EnabledIfEnvironmentVariable(named = AwsIntegTestUtil.AWS_TEST_ACCOUNT_ID, matches = "\\d{12}")
47+
})
48+
public class TestKeyManagementClient {
49+
50+
private static final Logger LOG = LoggerFactory.getLogger(TestKeyManagementClient.class);
51+
52+
private static KmsClient kmsClient;
53+
private static String keyId;
54+
55+
@BeforeAll
56+
public static void beforeClass() {
57+
kmsClient = AwsClientFactories.defaultFactory().kms();
58+
CreateKeyRequest createKeyRequest =
59+
CreateKeyRequest.builder()
60+
.keySpec(KeySpec.SYMMETRIC_DEFAULT)
61+
.description(
62+
"Iceberg integration test key for " + TestKeyManagementClient.class.getName())
63+
.build();
64+
CreateKeyResponse response = kmsClient.createKey(createKeyRequest);
65+
keyId = response.keyMetadata().keyId();
66+
}
67+
68+
@Test
69+
public void testKeyWrapping() {
70+
AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient();
71+
try {
72+
keyManagementClient.initialize(ImmutableMap.of());
73+
74+
ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes());
75+
ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyId);
76+
77+
assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
78+
} finally {
79+
keyManagementClient.close();
80+
}
81+
}
82+
83+
@Test
84+
public void testKeyGeneration() {
85+
testKeyGenerationWithDataKeySpec(null);
86+
testKeyGenerationWithDataKeySpec(DataKeySpec.AES_128);
87+
testKeyGenerationWithDataKeySpec(DataKeySpec.AES_256);
88+
}
89+
90+
private void testKeyGenerationWithDataKeySpec(DataKeySpec dataKeySpec) {
91+
AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient();
92+
try {
93+
Map<String, String> properties =
94+
dataKeySpec == null
95+
? ImmutableMap.of()
96+
: ImmutableMap.of(AwsProperties.KMS_DATA_KEY_SPEC, dataKeySpec.name());
97+
keyManagementClient.initialize(properties);
98+
KeyManagementClient.KeyGenerationResult result = keyManagementClient.generateKey(keyId);
99+
100+
assertThat(keyManagementClient.unwrapKey(result.wrappedKey(), keyId)).isEqualTo(result.key());
101+
assertThat(result.key().limit())
102+
.isEqualTo(DataKeySpec.AES_128.equals(dataKeySpec) ? 128 / 8 : 256 / 8);
103+
} finally {
104+
keyManagementClient.close();
105+
}
106+
}
107+
108+
@AfterAll
109+
public static void afterClass() {
110+
// AWS KMS doesn't allow instant deletion. Keys can be put to pendingDeletion state instead,
111+
// with a minimum of 7 days until final removal.
112+
ScheduleKeyDeletionRequest deletionRequest =
113+
ScheduleKeyDeletionRequest.builder().keyId(keyId).pendingWindowInDays(7).build();
114+
115+
ScheduleKeyDeletionResponse deletionResponse = kmsClient.scheduleKeyDeletion(deletionRequest);
116+
LOG.info(
117+
"Deletion of test key {} will be finalized at {}", keyId, deletionResponse.deletionDate());
118+
119+
try {
120+
kmsClient.close();
121+
} catch (Exception e) {
122+
LOG.error("Error closing KMS client", e);
123+
}
124+
}
125+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.aws;
20+
21+
import java.nio.ByteBuffer;
22+
import java.util.Map;
23+
import org.apache.iceberg.encryption.KeyManagementClient;
24+
import software.amazon.awssdk.core.SdkBytes;
25+
import software.amazon.awssdk.services.kms.KmsClient;
26+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
27+
import software.amazon.awssdk.services.kms.model.DecryptRequest;
28+
import software.amazon.awssdk.services.kms.model.DecryptResponse;
29+
import software.amazon.awssdk.services.kms.model.EncryptRequest;
30+
import software.amazon.awssdk.services.kms.model.EncryptResponse;
31+
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
32+
import software.amazon.awssdk.services.kms.model.GenerateDataKeyRequest;
33+
import software.amazon.awssdk.services.kms.model.GenerateDataKeyResponse;
34+
35+
/**
36+
* Key management client implementation that uses AWS Key Management Service. To be used for
37+
* encrypting/decrypting keys with a KMS-managed master key, (by referencing its key ID), and for
38+
* the generation of new encryption keys.
39+
*/
40+
public class AwsKeyManagementClient implements KeyManagementClient {
41+
42+
private KmsClient kmsClient;
43+
private EncryptionAlgorithmSpec encryptionAlgorithmSpec;
44+
private DataKeySpec dataKeySpec;
45+
46+
public AwsKeyManagementClient() {}
47+
48+
@Override
49+
public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
50+
EncryptRequest request =
51+
EncryptRequest.builder()
52+
.keyId(wrappingKeyId)
53+
.encryptionAlgorithm(encryptionAlgorithmSpec)
54+
.plaintext(SdkBytes.fromByteBuffer(key))
55+
.build();
56+
57+
EncryptResponse result = kmsClient.encrypt(request);
58+
return result.ciphertextBlob().asByteBuffer();
59+
}
60+
61+
@Override
62+
public boolean supportsKeyGeneration() {
63+
return true;
64+
}
65+
66+
@Override
67+
public KeyGenerationResult generateKey(String wrappingKeyId) {
68+
GenerateDataKeyRequest request =
69+
GenerateDataKeyRequest.builder().keyId(wrappingKeyId).keySpec(dataKeySpec).build();
70+
71+
GenerateDataKeyResponse response = kmsClient.generateDataKey(request);
72+
KeyGenerationResult result =
73+
new KeyGenerationResult(
74+
response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer());
75+
return result;
76+
}
77+
78+
@Override
79+
public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
80+
DecryptRequest request =
81+
DecryptRequest.builder()
82+
.keyId(wrappingKeyId)
83+
.encryptionAlgorithm(encryptionAlgorithmSpec)
84+
.ciphertextBlob(SdkBytes.fromByteBuffer(wrappedKey))
85+
.build();
86+
87+
DecryptResponse result = kmsClient.decrypt(request);
88+
return result.plaintext().asByteBuffer();
89+
}
90+
91+
@Override
92+
public void initialize(Map<String, String> properties) {
93+
AwsClientFactory clientFactory = AwsClientFactories.from(properties);
94+
this.kmsClient = clientFactory.kms();
95+
96+
AwsProperties awsProperties = new AwsProperties(properties);
97+
this.encryptionAlgorithmSpec = awsProperties.kmsEncryptionAlgorithmSpec();
98+
this.dataKeySpec = awsProperties.kmsDataKeySpec();
99+
}
100+
101+
@Override
102+
public void close() {
103+
if (kmsClient != null) {
104+
kmsClient.close();
105+
}
106+
}
107+
}

aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
4242
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
4343
import software.amazon.awssdk.services.glue.GlueClientBuilder;
44+
import software.amazon.awssdk.services.kms.model.DataKeySpec;
45+
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
4446

4547
public class AwsProperties implements Serializable {
4648

@@ -206,6 +208,17 @@ public class AwsProperties implements Serializable {
206208
*/
207209
public static final String REST_SESSION_TOKEN = "rest.session-token";
208210

211+
/** Encryption algorithm used to encrypt/decrypt master table keys */
212+
public static final String KMS_ENCRYPTION_ALGORITHM_SPEC = "kms.encryption-algorithm-spec";
213+
214+
public static final String KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT =
215+
EncryptionAlgorithmSpec.SYMMETRIC_DEFAULT.toString();
216+
217+
/** Length of data key generated by KMS */
218+
public static final String KMS_DATA_KEY_SPEC = "kms.data-key-spec";
219+
220+
public static final String KMS_DATA_KEY_SPEC_DEFAULT = DataKeySpec.AES_256.toString();
221+
209222
private final Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags;
210223

211224
private final String clientAssumeRoleArn;
@@ -230,6 +243,8 @@ public class AwsProperties implements Serializable {
230243
private String restAccessKeyId;
231244
private String restSecretAccessKey;
232245
private String restSessionToken;
246+
private String kmsEncryptionAlgorithmSpec;
247+
private String kmsDataKeySpec;
233248

234249
public AwsProperties() {
235250
this.stsClientAssumeRoleTags = Sets.newHashSet();
@@ -252,6 +267,9 @@ public AwsProperties() {
252267
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
253268

254269
this.restSigningName = REST_SIGNING_NAME_DEFAULT;
270+
271+
this.kmsEncryptionAlgorithmSpec = KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT;
272+
this.kmsDataKeySpec = KMS_DATA_KEY_SPEC_DEFAULT;
255273
}
256274

257275
@SuppressWarnings("MethodLength")
@@ -293,6 +311,11 @@ public AwsProperties(Map<String, String> properties) {
293311
this.restAccessKeyId = properties.get(REST_ACCESS_KEY_ID);
294312
this.restSecretAccessKey = properties.get(REST_SECRET_ACCESS_KEY);
295313
this.restSessionToken = properties.get(REST_SESSION_TOKEN);
314+
315+
this.kmsEncryptionAlgorithmSpec =
316+
properties.getOrDefault(
317+
KMS_ENCRYPTION_ALGORITHM_SPEC, KMS_ENCRYPTION_ALGORITHM_SPEC_DEFAULT);
318+
this.kmsDataKeySpec = properties.getOrDefault(KMS_DATA_KEY_SPEC, KMS_DATA_KEY_SPEC_DEFAULT);
296319
}
297320

298321
public Set<software.amazon.awssdk.services.sts.model.Tag> stsClientAssumeRoleTags() {
@@ -402,6 +425,14 @@ public AwsCredentialsProvider restCredentialsProvider() {
402425
this.restAccessKeyId, this.restSecretAccessKey, this.restSessionToken);
403426
}
404427

428+
public EncryptionAlgorithmSpec kmsEncryptionAlgorithmSpec() {
429+
return EncryptionAlgorithmSpec.fromValue(this.kmsEncryptionAlgorithmSpec);
430+
}
431+
432+
public DataKeySpec kmsDataKeySpec() {
433+
return DataKeySpec.fromValue(this.kmsDataKeySpec);
434+
}
435+
405436
private Set<software.amazon.awssdk.services.sts.model.Tag> toStsTags(
406437
Map<String, String> properties, String prefix) {
407438
return PropertyUtil.propertiesWithPrefix(properties, prefix).entrySet().stream()

core/src/main/java/org/apache/iceberg/encryption/KeyManagementClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Map;
2525

2626
/** A minimum client interface to connect to a key management service (KMS). */
27-
interface KeyManagementClient extends Serializable, Closeable {
27+
public interface KeyManagementClient extends Serializable, Closeable {
2828

2929
/**
3030
* Wrap a secret key, using a wrapping/master key which is stored in KMS and referenced by an ID.
@@ -94,7 +94,7 @@ class KeyGenerationResult {
9494
private final ByteBuffer key;
9595
private final ByteBuffer wrappedKey;
9696

97-
KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
97+
public KeyGenerationResult(ByteBuffer key, ByteBuffer wrappedKey) {
9898
this.key = key;
9999
this.wrappedKey = wrappedKey;
100100
}

0 commit comments

Comments
 (0)