Skip to content

Commit

Permalink
HADOOP-18708: S3A: Support S3 Client Side Encryption(CSE) (#6884)
Browse files Browse the repository at this point in the history
Add support for S3 client side encryption (CSE).

CSE can configured in two modes:
- CSE-KMS where keys are provided by AWS KMS
- CSE-CUSTOM where custom keys are provided by implementing
  a custom keyring.

CSE requires an encryption library:

  amazon-s3-encryption-client-java.jar

This is _not_ included in the shaded bundle.jar
and is released separately.

The version used is currently 3.1.1

Contributed by Syed Shameerur Rahman.
  • Loading branch information
shameersss1 authored Nov 14, 2024
1 parent 9a743bd commit 2273278
Show file tree
Hide file tree
Showing 53 changed files with 2,175 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
* It is possible to have parts uploaded in any order (or in parallel).
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param partNumber Index of the part relative to others.
* @param isLastPart is the part the last part of the upload?
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
* stream after reading in the data.
Expand All @@ -67,6 +68,7 @@ CompletableFuture<UploadHandle> startUpload(Path filePath)
CompletableFuture<PartHandle> putPart(
UploadHandle uploadId,
int partNumber,
boolean isLastPart,
Path filePath,
InputStream inputStream,
long lengthInBytes)
Expand All @@ -77,7 +79,7 @@ CompletableFuture<PartHandle> putPart(
* @param uploadId Identifier from {@link #startUpload(Path)}.
* @param filePath Target path for upload (as {@link #startUpload(Path)}.
* @param handles non-empty map of part number to part handle.
* from {@link #putPart(UploadHandle, int, Path, InputStream, long)}.
* from {@link #putPart(UploadHandle, int, boolean, Path, InputStream, long)}.
* @return unique PathHandle identifier for the uploaded file.
* @throws IOException IO failure
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected void checkPartHandles(Map<Integer, PartHandle> partHandles) {

/**
* Check all the arguments to the
* {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)}
* {@link MultipartUploader#putPart(UploadHandle, int, boolean, Path, InputStream, long)}
* operation.
* @param filePath Target path for upload (as {@link #startUpload(Path)}).
* @param inputStream Data for this part. Implementations MUST close this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public CompletableFuture<UploadHandle> startUpload(Path filePath)

@Override
public CompletableFuture<PartHandle> putPart(UploadHandle uploadId,
int partNumber, Path filePath,
int partNumber, boolean isLastPart, Path filePath,
InputStream inputStream,
long lengthInBytes)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void testSingleUpload() throws Exception {
// was interpreted as an inconsistent write.
MultipartUploader completer = uploader0;
// and upload with uploader 1 to validate cross-uploader uploads
PartHandle partHandle = putPart(file, uploadHandle, 1, payload);
PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload);
partHandles.put(1, partHandle);
PathHandle fd = complete(completer, uploadHandle, file,
partHandles);
Expand Down Expand Up @@ -317,12 +317,13 @@ protected PartHandle buildAndPutPart(
final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final MessageDigest origDigest) throws IOException {
byte[] payload = generatePayload(index);
if (origDigest != null) {
origDigest.update(payload);
}
return putPart(file, uploadHandle, index, payload);
return putPart(file, uploadHandle, index, isLastPart, payload);
}

/**
Expand All @@ -331,13 +332,15 @@ protected PartHandle buildAndPutPart(
* @param file destination
* @param uploadHandle handle
* @param index index of part
* @param isLastPart is last part of the upload ?
* @param payload byte array of payload
* @return the part handle
* @throws IOException IO failure.
*/
protected PartHandle putPart(final Path file,
final UploadHandle uploadHandle,
final int index,
final boolean isLastPart,
final byte[] payload) throws IOException {
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
PartHandle partHandle;
Expand All @@ -347,7 +350,7 @@ protected PartHandle putPart(final Path file,
payload.length,
file)) {
partHandle = awaitFuture(getUploader(index)
.putPart(uploadHandle, index, file,
.putPart(uploadHandle, index, isLastPart, file,
new ByteArrayInputStream(payload),
payload.length));
}
Expand Down Expand Up @@ -488,7 +491,7 @@ public void testMultipartUpload() throws Exception {
MessageDigest origDigest = DigestUtils.getMd5Digest();
int payloadCount = getTestPayloadCount();
for (int i = 1; i <= payloadCount; ++i) {
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i,
PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount,
origDigest);
partHandles.put(i, partHandle);
}
Expand All @@ -515,7 +518,7 @@ public void testMultipartUploadEmptyPart() throws Exception {
origDigest.update(payload);
InputStream is = new ByteArrayInputStream(payload);
PartHandle partHandle = awaitFuture(
uploader.putPart(uploadHandle, 1, file, is, payload.length));
uploader.putPart(uploadHandle, 1, true, file, is, payload.length));
partHandles.put(1, partHandle);
completeUpload(file, uploadHandle, partHandles, origDigest, 0);
}
Expand All @@ -530,7 +533,7 @@ public void testUploadEmptyBlock() throws Exception {
Path file = methodPath();
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0]));
partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0]));
completeUpload(file, uploadHandle, partHandles, null, 0);
}

Expand All @@ -550,7 +553,8 @@ public void testMultipartUploadReverseOrder() throws Exception {
origDigest.update(payload);
}
for (int i = payloadCount; i > 0; --i) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
payloadCount * partSizeInBytes());
Expand All @@ -574,7 +578,8 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
}
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = payloadCount; i > 0; i -= 2) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount,
null));
}
completeUpload(file, uploadHandle, partHandles, origDigest,
getTestPayloadCount() * partSizeInBytes());
Expand All @@ -591,7 +596,7 @@ public void testMultipartUploadAbort() throws Exception {
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
for (int i = 12; i > 10; i--) {
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null));
partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null));
}
abortUpload(uploadHandle, file);

Expand All @@ -601,7 +606,7 @@ public void testMultipartUploadAbort() throws Exception {

intercept(IOException.class,
() -> awaitFuture(
uploader0.putPart(uploadHandle, 49, file, is, len)));
uploader0.putPart(uploadHandle, 49, true, file, is, len)));
intercept(IOException.class,
() -> complete(uploader0, uploadHandle, file, partHandles));

Expand Down Expand Up @@ -701,7 +706,8 @@ public void testPutPartEmptyUploadID() throws Exception {
byte[] payload = generatePayload(1);
InputStream is = new ByteArrayInputStream(payload);
intercept(IllegalArgumentException.class,
() -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length));
() -> uploader0.putPart(emptyHandle, 1, true, dest, is,
payload.length));
}

/**
Expand All @@ -715,7 +721,7 @@ public void testCompleteEmptyUploadID() throws Exception {
UploadHandle emptyHandle =
BBUploadHandle.from(ByteBuffer.wrap(new byte[0]));
Map<Integer, PartHandle> partHandles = new HashMap<>();
PartHandle partHandle = putPart(dest, realHandle, 1,
PartHandle partHandle = putPart(dest, realHandle, 1, true,
generatePayload(1, SMALL_FILE));
partHandles.put(1, partHandle);

Expand Down Expand Up @@ -743,7 +749,7 @@ public void testDirectoryInTheWay() throws Exception {
UploadHandle uploadHandle = startUpload(file);
Map<Integer, PartHandle> partHandles = new HashMap<>();
int size = SMALL_FILE;
PartHandle partHandle = putPart(file, uploadHandle, 1,
PartHandle partHandle = putPart(file, uploadHandle, 1, true,
generatePayload(1, size));
partHandles.put(1, partHandle);

Expand Down Expand Up @@ -802,10 +808,10 @@ public void testConcurrentUploads() throws Throwable {
assertNotEquals("Upload handles match", upload1, upload2);

// put part 1
partHandles1.put(partId1, putPart(file, upload1, partId1, payload1));
partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1));

// put part2
partHandles2.put(partId2, putPart(file, upload2, partId2, payload2));
partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2));

// complete part u1. expect its size and digest to
// be as expected.
Expand Down
12 changes: 12 additions & 0 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
<surefire.fork.timeout>900</surefire.fork.timeout>
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
Expand Down Expand Up @@ -1180,6 +1181,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.encryption.s3</groupId>
<artifactId>amazon-s3-encryption-client-java</artifactId>
<version>${amazon-s3-encryption-client-java.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
Expand Down
15 changes: 15 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,16 @@
<bannedImport>org.apache.hadoop.mapred.**</bannedImport>
</bannedImports>
</restrictImports>
<restrictImports>
<includeTestCode>false</includeTestCode>
<reason>Restrict encryption client imports to encryption client factory</reason>
<exclusions>
<exclusion>org.apache.hadoop.fs.s3a.impl.EncryptionS3ClientFactory</exclusion>
</exclusions>
<bannedImports>
<bannedImport>software.amazon.encryption.s3.**</bannedImport>
</bannedImports>
</restrictImports>
</rules>
</configuration>
</execution>
Expand Down Expand Up @@ -510,6 +520,11 @@
<artifactId>bundle</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>software.amazon.encryption.s3</groupId>
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,35 @@ private Constants() {
public static final String S3_ENCRYPTION_CONTEXT =
"fs.s3a.encryption.context";

/**
* Client side encryption (CSE-CUSTOM) with custom cryptographic material manager class name.
* Custom keyring class name for CSE-KMS.
* value:{@value}
*/
public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME =
"fs.s3a.encryption.cse.custom.keyring.class.name";

/**
* Config to provide backward compatibility with V1 encryption client.
* Enabling this configuration will invoke the followings
* 1. Unencrypted s3 objects will be read using unencrypted/base s3 client when CSE is enabled.
* 2. Size of encrypted object will be fetched from object header if present or
* calculated using ranged S3 GET calls.
* value:{@value}
*/
public static final String S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED =
"fs.s3a.encryption.cse.v1.compatibility.enabled";

/**
* Default value : {@value}.
*/
public static final boolean S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT = false;

/**
* S3 CSE-KMS KMS region config.
*/
public static final String S3_ENCRYPTION_CSE_KMS_REGION = "fs.s3a.encryption.cse.kms.region";

/**
* List of custom Signers. The signer class will be loaded, and the signer
* name will be associated with this signer class in the S3 SDK.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3BaseClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient(
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();

return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder)
.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy())
.build();
S3AsyncClientBuilder s3AsyncClientBuilder =
configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder);

// multipart upload pending with HADOOP-19326.
if (!parameters.isClientSideEncryptionEnabled()) {
s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration)
.multipartEnabled(parameters.isMultipartCopy());
}

return s3AsyncClientBuilder.build();
}

@Override
Expand Down Expand Up @@ -373,7 +380,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, ClientT>, ClientT> void
* @param conf config to build the URI from.
* @return an endpoint uri
*/
private static URI getS3Endpoint(String endpoint, final Configuration conf) {
protected static URI getS3Endpoint(String endpoint, final Configuration conf) {

boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
public class Listing extends AbstractStoreOperation {

private static final Logger LOG = S3AFileSystem.LOG;
private final boolean isCSEEnabled;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();
Expand All @@ -86,7 +85,6 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
this.isCSEEnabled = storeContext.isCSEEnabled();
}

/**
Expand Down Expand Up @@ -446,14 +444,17 @@ private boolean requestNextBatch() throws IOException {
* Build the next status batch from a listing.
* @param objects the next object listing
* @return true if this added any entries after filtering
* @throws IOException IO problems. This can happen only when CSE is enabled.
*/
private boolean buildNextStatusBatch(S3ListResult objects) {
private boolean buildNextStatusBatch(S3ListResult objects) throws IOException {
// counters for debug logs
int added = 0, ignored = 0;
// list to fill in with results. Initial size will be list maximum.
List<S3AFileStatus> stats = new ArrayList<>(
objects.getS3Objects().size() +
objects.getCommonPrefixes().size());
String userName = getStoreContext().getUsername();
long blockSize = listingOperationCallbacks.getDefaultBlockSize(null);
// objects
for (S3Object s3Object : objects.getS3Objects()) {
String key = s3Object.key();
Expand All @@ -464,9 +465,9 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, s3Object,
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
s3Object.eTag(), null, isCSEEnabled);
blockSize, userName, s3Object.eTag(),
null,
listingOperationCallbacks.getObjectSize(s3Object));
LOG.debug("Adding: {}", status);
stats.add(status);
added++;
Expand Down
Loading

0 comments on commit 2273278

Please sign in to comment.