Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19197. S3A: Support AWS KMS Encryption Context #7193

Open
wants to merge 1 commit into
base: branch-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,7 @@ public class CommonConfigurationKeysPublic {
"fs.s3a.*.server-side-encryption.key",
"fs.s3a.encryption.algorithm",
"fs.s3a.encryption.key",
"fs.s3a.encryption.context",
"fs.azure\\.account.key.*",
"credential$",
"oauth.*secret",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,7 @@
fs.s3a.*.server-side-encryption.key
fs.s3a.encryption.algorithm
fs.s3a.encryption.key
fs.s3a.encryption.context
fs.s3a.secret.key
fs.s3a.*.secret.key
fs.s3a.session.key
Expand Down Expand Up @@ -1760,6 +1761,15 @@
</description>
</property>

<property>
<name>fs.s3a.encryption.context</name>
<description>Specific encryption context to use if fs.s3a.encryption.algorithm
has been set to 'SSE-KMS' or 'DSSE-KMS'. The value of this property is a set
of non-secret comma-separated key-value pairs of additional contextual
information about the data that are separated by equal operator (=).
</description>
</property>

<property>
<name>fs.s3a.signing-algorithm</name>
<description>Override the default signing algorithm so legacy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,16 @@ private Constants() {
public static final String S3_ENCRYPTION_KEY =
"fs.s3a.encryption.key";

/**
* Set S3-SSE encryption context.
* The value of this property is a set of non-secret comma-separated key-value pairs
* of additional contextual information about the data that are separated by equal
* operator (=).
* value:{@value}
*/
public static final String S3_ENCRYPTION_CONTEXT =
"fs.s3a.encryption.context";

/**
* List of custom Signers. The signer class will be loaded, and the signer
* name will be associated with this signer class in the S3 SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.S3AEncryption;
import org.apache.hadoop.util.functional.RemoteIterators;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
Expand Down Expand Up @@ -1326,7 +1327,7 @@ static void patchSecurityCredentialProviders(Configuration conf) {
* @throws IOException on any IO problem
* @throws IllegalArgumentException bad arguments
*/
private static String lookupBucketSecret(
public static String lookupBucketSecret(
String bucket,
Configuration conf,
String baseKey)
Expand Down Expand Up @@ -1472,6 +1473,8 @@ public static EncryptionSecrets buildEncryptionSecrets(String bucket,
int encryptionKeyLen =
StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
String diagnostics = passwordDiagnostics(encryptionKey, "key");
String encryptionContext = S3AEncryption.getS3EncryptionContextBase64Encoded(bucket, conf,
encryptionMethod.requiresSecret());
switch (encryptionMethod) {
case SSE_C:
LOG.debug("Using SSE-C with {}", diagnostics);
Expand Down Expand Up @@ -1507,7 +1510,7 @@ public static EncryptionSecrets buildEncryptionSecrets(String bucket,
LOG.debug("Data is unencrypted");
break;
}
return new EncryptionSecrets(encryptionMethod, encryptionKey);
return new EncryptionSecrets(encryptionMethod, encryptionKey, encryptionContext);
}

/**
Expand Down Expand Up @@ -1700,6 +1703,21 @@ public static Map<String, String> getTrimmedStringCollectionSplitByEquals(
final Configuration configuration,
final String name) {
String valueString = configuration.get(name);
return getTrimmedStringCollectionSplitByEquals(valueString);
}

/**
* Get the equal op (=) delimited key-value pairs of the <code>name</code> property as
* a collection of pair of <code>String</code>s, trimmed of the leading and trailing whitespace
* after delimiting the <code>name</code> by comma and new line separator.
* If no such property is specified then empty <code>Map</code> is returned.
*
* @param valueString the string containing the key-value pairs.
* @return property value as a <code>Map</code> of <code>String</code>s, or empty
* <code>Map</code>.
*/
public static Map<String, String> getTrimmedStringCollectionSplitByEquals(
final String valueString) {
if (null == valueString) {
return new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,20 @@ public static Optional<String> getSSEAwsKMSKey(final EncryptionSecrets secrets)
return Optional.empty();
}
}

/**
* Gets the SSE-KMS context if present, else don't set it in the S3 request.
*
* @param secrets source of the encryption secrets.
* @return an optional AWS KMS encryption context to attach to a request.
*/
public static Optional<String> getSSEAwsKMSEncryptionContext(final EncryptionSecrets secrets) {
if ((secrets.getEncryptionMethod() == S3AEncryptionMethods.SSE_KMS
|| secrets.getEncryptionMethod() == S3AEncryptionMethods.DSSE_KMS)
&& secrets.hasEncryptionContext()) {
return Optional.of(secrets.getEncryptionContext());
} else {
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class EncryptionSecrets implements Writable, Serializable {
*/
private String encryptionKey = "";

/**
* Encryption context: base64-encoded UTF-8 string.
*/
private String encryptionContext = "";

/**
* This field isn't serialized/marshalled; it is rebuilt from the
* encryptionAlgorithm field.
Expand All @@ -84,23 +89,28 @@ public EncryptionSecrets() {
* Create a pair of secrets.
* @param encryptionAlgorithm algorithm enumeration.
* @param encryptionKey key/key reference.
* @param encryptionContext base64-encoded string with the encryption context key-value pairs.
* @throws IOException failure to initialize.
*/
public EncryptionSecrets(final S3AEncryptionMethods encryptionAlgorithm,
final String encryptionKey) throws IOException {
this(encryptionAlgorithm.getMethod(), encryptionKey);
final String encryptionKey,
final String encryptionContext) throws IOException {
this(encryptionAlgorithm.getMethod(), encryptionKey, encryptionContext);
}

/**
* Create a pair of secrets.
* @param encryptionAlgorithm algorithm name
* @param encryptionKey key/key reference.
* @param encryptionContext base64-encoded string with the encryption context key-value pairs.
* @throws IOException failure to initialize.
*/
public EncryptionSecrets(final String encryptionAlgorithm,
final String encryptionKey) throws IOException {
final String encryptionKey,
final String encryptionContext) throws IOException {
this.encryptionAlgorithm = encryptionAlgorithm;
this.encryptionKey = encryptionKey;
this.encryptionContext = encryptionContext;
init();
}

Expand All @@ -114,6 +124,7 @@ public void write(final DataOutput out) throws IOException {
new LongWritable(serialVersionUID).write(out);
Text.writeString(out, encryptionAlgorithm);
Text.writeString(out, encryptionKey);
Text.writeString(out, encryptionContext);
}

/**
Expand All @@ -132,6 +143,7 @@ public void readFields(final DataInput in) throws IOException {
}
encryptionAlgorithm = Text.readString(in, MAX_SECRET_LENGTH);
encryptionKey = Text.readString(in, MAX_SECRET_LENGTH);
encryptionContext = Text.readString(in);
init();
}

Expand Down Expand Up @@ -164,6 +176,10 @@ public String getEncryptionKey() {
return encryptionKey;
}

public String getEncryptionContext() {
return encryptionContext;
}

/**
* Does this instance have encryption options?
* That is: is the algorithm non-null.
Expand All @@ -181,6 +197,14 @@ public boolean hasEncryptionKey() {
return StringUtils.isNotEmpty(encryptionKey);
}

/**
* Does this instance have an encryption context?
* @return true if there's an encryption context.
*/
public boolean hasEncryptionContext() {
return StringUtils.isNotEmpty(encryptionContext);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -191,12 +215,13 @@ public boolean equals(final Object o) {
}
final EncryptionSecrets that = (EncryptionSecrets) o;
return Objects.equals(encryptionAlgorithm, that.encryptionAlgorithm)
&& Objects.equals(encryptionKey, that.encryptionKey);
&& Objects.equals(encryptionKey, that.encryptionKey)
&& Objects.equals(encryptionContext, that.encryptionContext);
}

@Override
public int hashCode() {
return Objects.hash(encryptionAlgorithm, encryptionKey);
return Objects.hash(encryptionAlgorithm, encryptionKey, encryptionContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ protected void copyEncryptionParameters(HeadObjectResponse srcom,
LOG.debug("Propagating SSE-KMS settings from source {}",
sourceKMSId);
copyObjectRequestBuilder.ssekmsKeyId(sourceKMSId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
return;
}

Expand All @@ -292,11 +294,15 @@ protected void copyEncryptionParameters(HeadObjectResponse srcom,
// Set the KMS key if present, else S3 uses AWS managed key.
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
break;
case DSSE_KMS:
copyObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(copyObjectRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(copyObjectRequestBuilder::ssekmsEncryptionContext);
break;
case SSE_C:
EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
Expand Down Expand Up @@ -386,11 +392,15 @@ private void putEncryptionParameters(PutObjectRequest.Builder putObjectRequestBu
// Set the KMS key if present, else S3 uses AWS managed key.
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(putObjectRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
break;
case DSSE_KMS:
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(putObjectRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(putObjectRequestBuilder::ssekmsEncryptionContext);
break;
case SSE_C:
EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
Expand Down Expand Up @@ -462,11 +472,15 @@ private void multipartUploadEncryptionParameters(
// Set the KMS key if present, else S3 uses AWS managed key.
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(mpuRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
break;
case DSSE_KMS:
mpuRequestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS_DSSE);
EncryptionSecretOperations.getSSEAwsKMSKey(encryptionSecrets)
.ifPresent(mpuRequestBuilder::ssekmsKeyId);
EncryptionSecretOperations.getSSEAwsKMSEncryptionContext(encryptionSecrets)
.ifPresent(mpuRequestBuilder::ssekmsEncryptionContext);
break;
case SSE_C:
EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AUtils;

import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CONTEXT;

/**
* Utility methods for S3A encryption properties.
*/
public final class S3AEncryption {

private static final Logger LOG = LoggerFactory.getLogger(S3AEncryption.class);

private S3AEncryption() {
}

/**
* Get any SSE context from a configuration/credential provider.
* @param bucket bucket to query for
* @param conf configuration to examine
* @return the encryption context value or ""
* @throws IOException if reading a JCEKS file raised an IOE
* @throws IllegalArgumentException bad arguments.
*/
public static String getS3EncryptionContext(String bucket, Configuration conf)
throws IOException {
// look up the per-bucket value of the encryption context
String encryptionContext = S3AUtils.lookupBucketSecret(bucket, conf, S3_ENCRYPTION_CONTEXT);
if (encryptionContext == null) {
// look up the global value of the encryption context
encryptionContext = S3AUtils.lookupPassword(null, conf, S3_ENCRYPTION_CONTEXT);
}
if (encryptionContext == null) {
// no encryption context, return ""
return "";
}
return encryptionContext;
}

/**
* Get any SSE context from a configuration/credential provider.
* This includes converting the values to a base64-encoded UTF-8 string
* holding JSON with the encryption context key-value pairs
* @param bucket bucket to query for
* @param conf configuration to examine
* @param propagateExceptions should IO exceptions be rethrown?
* @return the Base64 encryption context or ""
* @throws IllegalArgumentException bad arguments.
* @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
*/
public static String getS3EncryptionContextBase64Encoded(
String bucket,
Configuration conf,
boolean propagateExceptions) throws IOException {
try {
final String encryptionContextValue = getS3EncryptionContext(bucket, conf);
if (StringUtils.isBlank(encryptionContextValue)) {
return "";
}
final Map<String, String> encryptionContextMap = S3AUtils
.getTrimmedStringCollectionSplitByEquals(encryptionContextValue);
if (encryptionContextMap.isEmpty()) {
return "";
}
final String encryptionContextJson = new ObjectMapper().writeValueAsString(
encryptionContextMap);
return Base64.encodeBase64String(encryptionContextJson.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
if (propagateExceptions) {
throw e;
}
LOG.warn("Cannot retrieve {} for bucket {}",
S3_ENCRYPTION_CONTEXT, bucket, e);
return "";
}
}
}
Loading