diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java index 5e4eda26c7f1d..c41a03b94fdfb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java @@ -57,6 +57,7 @@ CompletableFuture startUpload(Path filePath) * It is possible to have parts uploaded in any order (or in parallel). * @param uploadId Identifier from {@link #startUpload(Path)}. * @param partNumber Index of the part relative to others. + * @param isLastPart is the part the last part of the upload? * @param filePath Target path for upload (as {@link #startUpload(Path)}). * @param inputStream Data for this part. Implementations MUST close this * stream after reading in the data. @@ -67,6 +68,7 @@ CompletableFuture startUpload(Path filePath) CompletableFuture putPart( UploadHandle uploadId, int partNumber, + boolean isLastPart, Path filePath, InputStream inputStream, long lengthInBytes) @@ -77,7 +79,7 @@ CompletableFuture putPart( * @param uploadId Identifier from {@link #startUpload(Path)}. * @param filePath Target path for upload (as {@link #startUpload(Path)}. * @param handles non-empty map of part number to part handle. - * from {@link #putPart(UploadHandle, int, Path, InputStream, long)}. + * from {@link #putPart(UploadHandle, int, boolean, Path, InputStream, long)}. * @return unique PathHandle identifier for the uploaded file. * @throws IOException IO failure */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java index f9ae9f55cc17f..f3502600a84fd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractMultipartUploader.java @@ -101,7 +101,7 @@ protected void checkPartHandles(Map partHandles) { /** * Check all the arguments to the - * {@link MultipartUploader#putPart(UploadHandle, int, Path, InputStream, long)} + * {@link MultipartUploader#putPart(UploadHandle, int, boolean, Path, InputStream, long)} * operation. * @param filePath Target path for upload (as {@link #startUpload(Path)}). * @param inputStream Data for this part. Implementations MUST close this diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java index 28a4bce0489cd..09d9079cff3de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileSystemMultipartUploader.java @@ -111,7 +111,7 @@ public CompletableFuture startUpload(Path filePath) @Override public CompletableFuture putPart(UploadHandle uploadId, - int partNumber, Path filePath, + int partNumber, boolean isLastPart, Path filePath, InputStream inputStream, long lengthInBytes) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java index 7420b47a98495..16482915bdf7f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java @@ -249,7 +249,7 @@ public void testSingleUpload() throws Exception { // was interpreted as an inconsistent write. MultipartUploader completer = uploader0; // and upload with uploader 1 to validate cross-uploader uploads - PartHandle partHandle = putPart(file, uploadHandle, 1, payload); + PartHandle partHandle = putPart(file, uploadHandle, 1, true, payload); partHandles.put(1, partHandle); PathHandle fd = complete(completer, uploadHandle, file, partHandles); @@ -317,12 +317,13 @@ protected PartHandle buildAndPutPart( final Path file, final UploadHandle uploadHandle, final int index, + final boolean isLastPart, final MessageDigest origDigest) throws IOException { byte[] payload = generatePayload(index); if (origDigest != null) { origDigest.update(payload); } - return putPart(file, uploadHandle, index, payload); + return putPart(file, uploadHandle, index, isLastPart, payload); } /** @@ -331,6 +332,7 @@ protected PartHandle buildAndPutPart( * @param file destination * @param uploadHandle handle * @param index index of part + * @param isLastPart is last part of the upload ? * @param payload byte array of payload * @return the part handle * @throws IOException IO failure. @@ -338,6 +340,7 @@ protected PartHandle buildAndPutPart( protected PartHandle putPart(final Path file, final UploadHandle uploadHandle, final int index, + final boolean isLastPart, final byte[] payload) throws IOException { ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); PartHandle partHandle; @@ -347,7 +350,7 @@ protected PartHandle putPart(final Path file, payload.length, file)) { partHandle = awaitFuture(getUploader(index) - .putPart(uploadHandle, index, file, + .putPart(uploadHandle, index, isLastPart, file, new ByteArrayInputStream(payload), payload.length)); } @@ -488,7 +491,7 @@ public void testMultipartUpload() throws Exception { MessageDigest origDigest = DigestUtils.getMd5Digest(); int payloadCount = getTestPayloadCount(); for (int i = 1; i <= payloadCount; ++i) { - PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, + PartHandle partHandle = buildAndPutPart(file, uploadHandle, i, i == payloadCount, origDigest); partHandles.put(i, partHandle); } @@ -515,7 +518,7 @@ public void testMultipartUploadEmptyPart() throws Exception { origDigest.update(payload); InputStream is = new ByteArrayInputStream(payload); PartHandle partHandle = awaitFuture( - uploader.putPart(uploadHandle, 1, file, is, payload.length)); + uploader.putPart(uploadHandle, 1, true, file, is, payload.length)); partHandles.put(1, partHandle); completeUpload(file, uploadHandle, partHandles, origDigest, 0); } @@ -530,7 +533,7 @@ public void testUploadEmptyBlock() throws Exception { Path file = methodPath(); UploadHandle uploadHandle = startUpload(file); Map partHandles = new HashMap<>(); - partHandles.put(1, putPart(file, uploadHandle, 1, new byte[0])); + partHandles.put(1, putPart(file, uploadHandle, 1, true, new byte[0])); completeUpload(file, uploadHandle, partHandles, null, 0); } @@ -550,7 +553,8 @@ public void testMultipartUploadReverseOrder() throws Exception { origDigest.update(payload); } for (int i = payloadCount; i > 0; --i) { - partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount, + null)); } completeUpload(file, uploadHandle, partHandles, origDigest, payloadCount * partSizeInBytes()); @@ -574,7 +578,8 @@ public void testMultipartUploadReverseOrderNonContiguousPartNumbers() } Map partHandles = new HashMap<>(); for (int i = payloadCount; i > 0; i -= 2) { - partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == payloadCount, + null)); } completeUpload(file, uploadHandle, partHandles, origDigest, getTestPayloadCount() * partSizeInBytes()); @@ -591,7 +596,7 @@ public void testMultipartUploadAbort() throws Exception { UploadHandle uploadHandle = startUpload(file); Map partHandles = new HashMap<>(); for (int i = 12; i > 10; i--) { - partHandles.put(i, buildAndPutPart(file, uploadHandle, i, null)); + partHandles.put(i, buildAndPutPart(file, uploadHandle, i, i == 12, null)); } abortUpload(uploadHandle, file); @@ -601,7 +606,7 @@ public void testMultipartUploadAbort() throws Exception { intercept(IOException.class, () -> awaitFuture( - uploader0.putPart(uploadHandle, 49, file, is, len))); + uploader0.putPart(uploadHandle, 49, true, file, is, len))); intercept(IOException.class, () -> complete(uploader0, uploadHandle, file, partHandles)); @@ -701,7 +706,8 @@ public void testPutPartEmptyUploadID() throws Exception { byte[] payload = generatePayload(1); InputStream is = new ByteArrayInputStream(payload); intercept(IllegalArgumentException.class, - () -> uploader0.putPart(emptyHandle, 1, dest, is, payload.length)); + () -> uploader0.putPart(emptyHandle, 1, true, dest, is, + payload.length)); } /** @@ -715,7 +721,7 @@ public void testCompleteEmptyUploadID() throws Exception { UploadHandle emptyHandle = BBUploadHandle.from(ByteBuffer.wrap(new byte[0])); Map partHandles = new HashMap<>(); - PartHandle partHandle = putPart(dest, realHandle, 1, + PartHandle partHandle = putPart(dest, realHandle, 1, true, generatePayload(1, SMALL_FILE)); partHandles.put(1, partHandle); @@ -743,7 +749,7 @@ public void testDirectoryInTheWay() throws Exception { UploadHandle uploadHandle = startUpload(file); Map partHandles = new HashMap<>(); int size = SMALL_FILE; - PartHandle partHandle = putPart(file, uploadHandle, 1, + PartHandle partHandle = putPart(file, uploadHandle, 1, true, generatePayload(1, size)); partHandles.put(1, partHandle); @@ -802,10 +808,10 @@ public void testConcurrentUploads() throws Throwable { assertNotEquals("Upload handles match", upload1, upload2); // put part 1 - partHandles1.put(partId1, putPart(file, upload1, partId1, payload1)); + partHandles1.put(partId1, putPart(file, upload1, partId1, false, payload1)); // put part2 - partHandles2.put(partId2, putPart(file, upload2, partId2, payload2)); + partHandles2.put(partId2, putPart(file, upload2, partId2, true, payload2)); // complete part u1. expect its size and digest to // be as expected. diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index fedd3f633af47..e28ddff1921dc 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -205,6 +205,7 @@ 900 1.12.720 2.25.53 + 3.1.1 1.0.1 2.7.1 1.11.2 @@ -1180,6 +1181,17 @@ + + software.amazon.encryption.s3 + amazon-s3-encryption-client-java + ${amazon-s3-encryption-client-java.version} + + + * + * + + + org.apache.mina mina-core diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index eddcbd81fad1b..d28704b7c334e 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -466,6 +466,16 @@ org.apache.hadoop.mapred.** + + false + Restrict encryption client imports to encryption client factory + + org.apache.hadoop.fs.s3a.impl.EncryptionS3ClientFactory + + + software.amazon.encryption.s3.** + + @@ -510,6 +520,11 @@ bundle compile + + software.amazon.encryption.s3 + amazon-s3-encryption-client-java + provided + org.assertj assertj-core diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 67854e65720fd..4d52ee0d2f9c5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -766,6 +766,35 @@ private Constants() { public static final String S3_ENCRYPTION_CONTEXT = "fs.s3a.encryption.context"; + /** + * Client side encryption (CSE-CUSTOM) with custom cryptographic material manager class name. + * Custom keyring class name for CSE-KMS. + * value:{@value} + */ + public static final String S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME = + "fs.s3a.encryption.cse.custom.keyring.class.name"; + + /** + * Config to provide backward compatibility with V1 encryption client. + * Enabling this configuration will invoke the followings + * 1. Unencrypted s3 objects will be read using unencrypted/base s3 client when CSE is enabled. + * 2. Size of encrypted object will be fetched from object header if present or + * calculated using ranged S3 GET calls. + * value:{@value} + */ + public static final String S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED = + "fs.s3a.encryption.cse.v1.compatibility.enabled"; + + /** + * Default value : {@value}. + */ + public static final boolean S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT = false; + + /** + * S3 CSE-KMS KMS region config. + */ + public static final String S3_ENCRYPTION_CSE_KMS_REGION = "fs.s3a.encryption.cse.kms.region"; + /** * List of custom Signers. The signer class will be loaded, and the signer * name will be associated with this signer class in the S3 SDK. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index 0b18ce999f813..556bd3510752e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -41,6 +41,7 @@ import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.s3accessgrants.plugin.S3AccessGrantsPlugin; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3BaseClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3Configuration; @@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient( .thresholdInBytes(parameters.getMultiPartThreshold()) .build(); - return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) - .httpClientBuilder(httpClientBuilder) - .multipartConfiguration(multipartConfiguration) - .multipartEnabled(parameters.isMultipartCopy()) - .build(); + S3AsyncClientBuilder s3AsyncClientBuilder = + configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket) + .httpClientBuilder(httpClientBuilder); + + // multipart upload pending with HADOOP-19326. + if (!parameters.isClientSideEncryptionEnabled()) { + s3AsyncClientBuilder.multipartConfiguration(multipartConfiguration) + .multipartEnabled(parameters.isMultipartCopy()); + } + + return s3AsyncClientBuilder.build(); } @Override @@ -373,7 +380,7 @@ private , ClientT> void * @param conf config to build the URI from. * @return an endpoint uri */ - private static URI getS3Endpoint(String endpoint, final Configuration conf) { + protected static URI getS3Endpoint(String endpoint, final Configuration conf) { boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index e0868a2e13087..d8f68b4840339 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -75,7 +75,6 @@ public class Listing extends AbstractStoreOperation { private static final Logger LOG = S3AFileSystem.LOG; - private final boolean isCSEEnabled; static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N = new AcceptAllButS3nDirs(); @@ -86,7 +85,6 @@ public Listing(ListingOperationCallbacks listingOperationCallbacks, StoreContext storeContext) { super(storeContext); this.listingOperationCallbacks = listingOperationCallbacks; - this.isCSEEnabled = storeContext.isCSEEnabled(); } /** @@ -446,14 +444,17 @@ private boolean requestNextBatch() throws IOException { * Build the next status batch from a listing. * @param objects the next object listing * @return true if this added any entries after filtering + * @throws IOException IO problems. This can happen only when CSE is enabled. */ - private boolean buildNextStatusBatch(S3ListResult objects) { + private boolean buildNextStatusBatch(S3ListResult objects) throws IOException { // counters for debug logs int added = 0, ignored = 0; // list to fill in with results. Initial size will be list maximum. List stats = new ArrayList<>( objects.getS3Objects().size() + objects.getCommonPrefixes().size()); + String userName = getStoreContext().getUsername(); + long blockSize = listingOperationCallbacks.getDefaultBlockSize(null); // objects for (S3Object s3Object : objects.getS3Objects()) { String key = s3Object.key(); @@ -464,9 +465,9 @@ private boolean buildNextStatusBatch(S3ListResult objects) { // Skip over keys that are ourselves and old S3N _$folder$ files if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) { S3AFileStatus status = createFileStatus(keyPath, s3Object, - listingOperationCallbacks.getDefaultBlockSize(keyPath), - getStoreContext().getUsername(), - s3Object.eTag(), null, isCSEEnabled); + blockSize, userName, s3Object.eTag(), + null, + listingOperationCallbacks.getObjectSize(s3Object)); LOG.debug("Adding: {}", status); stats.add(status); added++; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 741a78a0537f2..7b249a11c07d4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -39,6 +39,7 @@ import javax.annotation.Nonnull; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -76,6 +77,7 @@ import org.apache.hadoop.util.Progressable; import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent.*; @@ -1002,11 +1004,19 @@ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, uploadData.getSize(), CONTENT_TYPE_OCTET_STREAM); - request = writeOperationHelper.newUploadPartRequestBuilder( + UploadPartRequest.Builder requestBuilder = writeOperationHelper.newUploadPartRequestBuilder( key, uploadId, currentPartNumber, - size).build(); + isLast, + size); + request = requestBuilder.build(); + } catch (SdkException aws) { + // catch and translate + IOException e = translateException("upload", key, aws); + // failure to start the upload. + noteUploadFailure(e); + throw e; } catch (IOException e) { // failure to prepare the upload. noteUploadFailure(e); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3ab19dfa1914f..9846953911171 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -74,7 +74,6 @@ import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; @@ -116,9 +115,10 @@ import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; -import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.BaseS3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation; import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl; +import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ClientManager; import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl; @@ -126,6 +126,9 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation; import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder; +import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations; +import org.apache.hadoop.fs.s3a.impl.CSEV1CompatibleS3AFileSystemOperations; +import org.apache.hadoop.fs.s3a.impl.CSEMaterials; import org.apache.hadoop.fs.s3a.impl.DeleteOperation; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy; import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl; @@ -147,6 +150,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.impl.StoreContextFactory; import org.apache.hadoop.fs.s3a.impl.UploadContentProviders; +import org.apache.hadoop.fs.s3a.impl.CSEUtils; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; @@ -212,7 +216,6 @@ import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.RateLimitingFactory; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.functional.CallableRaisingIOE; @@ -244,12 +247,10 @@ import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE; -import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403_FORBIDDEN; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND; @@ -464,6 +465,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private ArnResource accessPoint; + /** + * Handler for certain filesystem operations. + */ + private S3AFileSystemOperations fsHandler; + + /** * Does this S3A FS instance have multipart upload enabled? */ @@ -622,11 +629,12 @@ public void initialize(URI name, Configuration originalConf) invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); - // If CSE-KMS method is set then CSE is enabled. - isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod() - .equals(getS3EncryptionAlgorithm().getMethod()); - LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled); - setCSEGauge(); + // If encryption method is set to CSE-KMS or CSE-CUSTOM then CSE is enabled. + isCSEEnabled = CSEUtils.isCSEEnabled(getS3EncryptionAlgorithm().getMethod()); + + // Create the appropriate fsHandler instance using a factory method + fsHandler = createFileSystemHandler(); + fsHandler.setCSEGauge((IOStatisticsStore) getIOStatistics()); // Username is the current user at the time the FS was instantiated. owner = UserGroupInformation.getCurrentUser(); username = owner.getShortUserName(); @@ -821,6 +829,26 @@ public void initialize(URI name, Configuration originalConf) } } + /** + * Creates and returns an instance of the appropriate S3AFileSystemOperations. + * Creation is baaed on the client-side encryption (CSE) settings. + * + * @return An instance of the appropriate S3AFileSystemOperations implementation. + */ + private S3AFileSystemOperations createFileSystemHandler() { + if (isCSEEnabled) { + if (getConf().getBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, + S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED_DEFAULT)) { + return new CSEV1CompatibleS3AFileSystemOperations(); + } else { + return new CSES3AFileSystemOperations(); + } + } else { + return new BaseS3AFileSystemOperations(); + } + } + + /** * Create the S3AStore instance. * This is protected so that tests can override it. @@ -862,22 +890,6 @@ private VectoredIOContext populateVectoredIOContext(Configuration conf) { .build(); } - /** - * Set the client side encryption gauge to 0 or 1, indicating if CSE is - * enabled through the gauge or not. - */ - private void setCSEGauge() { - IOStatisticsStore ioStatisticsStore = - (IOStatisticsStore) getIOStatistics(); - if (isCSEEnabled) { - ioStatisticsStore - .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L); - } else { - ioStatisticsStore - .setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L); - } - } - /** * Test bucket existence in S3. * When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0, @@ -1123,9 +1135,11 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I credentials = createAWSCredentialProviderList(fsURI, conf); } LOG.debug("Using credential provider {}", credentials); - Class s3ClientFactoryClass = conf.getClass( - S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, - S3ClientFactory.class); + + S3ClientFactory clientFactory = fsHandler.getS3ClientFactory(conf); + S3ClientFactory unencryptedClientFactory = fsHandler.getUnencryptedS3ClientFactory(conf); + CSEMaterials cseMaterials = fsHandler.getClientSideEncryptionMaterials(conf, bucket, + getS3EncryptionAlgorithm()); S3ClientFactory.S3ClientCreationParameters parameters = new S3ClientFactory.S3ClientCreationParameters() @@ -1146,17 +1160,21 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I .withExpressCreateSession( conf.getBoolean(S3EXPRESS_CREATE_SESSION, S3EXPRESS_CREATE_SESSION_DEFAULT)) .withChecksumValidationEnabled( - conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)); + conf.getBoolean(CHECKSUM_VALIDATION, CHECKSUM_VALIDATION_DEFAULT)) + .withClientSideEncryptionEnabled(isCSEEnabled) + .withClientSideEncryptionMaterials(cseMaterials) + .withKMSRegion(conf.get(S3_ENCRYPTION_CSE_KMS_REGION)); - S3ClientFactory clientFactory = ReflectionUtils.newInstance(s3ClientFactoryClass, conf); // this is where clients and the transfer manager are created on demand. - return createClientManager(clientFactory, parameters, getDurationTrackerFactory()); + return createClientManager(clientFactory, unencryptedClientFactory, parameters, + getDurationTrackerFactory()); } /** * Create the Client Manager; protected to allow for mocking. * Requires {@link #unboundedThreadPool} to be initialized. * @param clientFactory (reflection-bonded) client factory. + * @param unencryptedClientFactory (reflection-bonded) client factory. * @param clientCreationParameters parameters for client creation. * @param durationTrackerFactory factory for duration tracking. * @return a client manager instance. @@ -1164,9 +1182,11 @@ private ClientManager createClientManager(URI fsURI, boolean dtEnabled) throws I @VisibleForTesting protected ClientManager createClientManager( final S3ClientFactory clientFactory, + final S3ClientFactory unencryptedClientFactory, final S3ClientFactory.S3ClientCreationParameters clientCreationParameters, final DurationTrackerFactory durationTrackerFactory) { return new ClientManagerImpl(clientFactory, + unencryptedClientFactory, clientCreationParameters, durationTrackerFactory ); @@ -1942,10 +1962,11 @@ public GetObjectRequest.Builder newGetRequestBuilder(final String key) { } @Override - public ResponseInputStream getObject(GetObjectRequest request) { + public ResponseInputStream getObject(GetObjectRequest request) throws + IOException { // active the audit span used for the operation try (AuditSpan span = auditSpan.activate()) { - return getS3Client().getObject(request); + return fsHandler.getObject(store, request, getRequestFactory()); } } @@ -2773,6 +2794,18 @@ public long getDefaultBlockSize(Path path) { return S3AFileSystem.this.getDefaultBlockSize(path); } + /** + * Get the S3 object size. + * If the object is encrypted, the unpadded size will be returned. + * @param s3Object S3object + * @return plaintext S3 object size + * @throws IOException IO problems + */ + @Override + public long getObjectSize(S3Object s3Object) throws IOException { + return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null); + } + @Override public int getMaxKeys() { return S3AFileSystem.this.getMaxKeys(); @@ -3039,39 +3072,7 @@ protected HeadObjectResponse getObjectMetadata(String key, ChangeTracker changeTracker, Invoker changeInvoker, String operation) throws IOException { - HeadObjectResponse response = changeInvoker.retryUntranslated("GET " + key, true, - () -> { - HeadObjectRequest.Builder requestBuilder = - getRequestFactory().newHeadObjectRequestBuilder(key); - incrementStatistic(OBJECT_METADATA_REQUESTS); - DurationTracker duration = getDurationTrackerFactory() - .trackDuration(ACTION_HTTP_HEAD_REQUEST.getSymbol()); - try { - LOG.debug("HEAD {} with change tracker {}", key, changeTracker); - if (changeTracker != null) { - changeTracker.maybeApplyConstraint(requestBuilder); - } - HeadObjectResponse headObjectResponse = getS3Client() - .headObject(requestBuilder.build()); - if (changeTracker != null) { - changeTracker.processMetadata(headObjectResponse, operation); - } - return headObjectResponse; - } catch (AwsServiceException ase) { - if (!isObjectNotFound(ase)) { - // file not found is not considered a failure of the call, - // so only switch the duration tracker to update failure - // metrics on other exception outcomes. - duration.failed(); - } - throw ase; - } finally { - // update the tracker. - duration.close(); - } - }); - incrementReadOperations(); - return response; + return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation); } /** @@ -4084,14 +4085,7 @@ S3AFileStatus s3GetFileStatus(final Path path, // look for the simple file HeadObjectResponse meta = getObjectMetadata(key); LOG.debug("Found exact file: normal file {}", key); - long contentLength = meta.contentLength(); - // check if CSE is enabled, then strip padded length. - if (isCSEEnabled && - meta.metadata().get(AWSHeaders.CRYPTO_CEK_ALGORITHM) != null - && contentLength >= CSE_PADDING_LENGTH) { - contentLength -= CSE_PADDING_LENGTH; - } - return new S3AFileStatus(contentLength, + return new S3AFileStatus(meta.contentLength(), meta.lastModified().toEpochMilli(), path, getDefaultBlockSize(path), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index cfdc361234f9f..147cd7567a301 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -1405,11 +1405,15 @@ public interface InputStreamCallbacks extends Closeable { /** * Execute the request. + * When CSE is enabled with reading of unencrypted data, The object is checked if it is + * encrypted and if so, the request is made with encrypted S3 client. If the object is + * not encrypted, the request is made with unencrypted s3 client. * @param request the request * @return the response + * @throws IOException on any failure. */ @Retries.OnceRaw - ResponseInputStream getObject(GetObjectRequest request); + ResponseInputStream getObject(GetObjectRequest request) throws IOException; /** * Submit some asynchronous work, for example, draining a stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java index aed4442716963..ab8785e01dafd 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java @@ -27,6 +27,7 @@ import java.util.concurrent.CancellationException; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -35,6 +36,8 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; @@ -43,8 +46,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.ChangeTracker; import org.apache.hadoop.fs.s3a.impl.ClientManager; import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException; +import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; @@ -193,6 +198,39 @@ Map.Entry deleteObjects(DeleteObjectsRequest de Map.Entry> deleteObject( DeleteObjectRequest request) throws SdkException; + /** + * Performs a HEAD request on an S3 object to retrieve its metadata. + * + * @param key The S3 object key to perform the HEAD operation on + * @param changeTracker Tracks changes to the object's metadata across operations + * @param changeInvoker The invoker responsible for executing the HEAD request with retries + * @param fsHandler Handler for filesystem-level operations and configurations + * @param operation Description of the operation being performed for tracking purposes + * @return HeadObjectResponse containing the object's metadata + * @throws IOException If the HEAD request fails, object doesn't exist, or other I/O errors occur + */ + @Retries.RetryRaw + HeadObjectResponse headObject(String key, + ChangeTracker changeTracker, + Invoker changeInvoker, + S3AFileSystemOperations fsHandler, + String operation) throws IOException; + + /** + * Retrieves a specific byte range of an S3 object as a stream. + * + * @param key The S3 object key to retrieve + * @param start The starting byte position (inclusive) of the range to retrieve + * @param end The ending byte position (inclusive) of the range to retrieve + * @return A ResponseInputStream containing the requested byte range of the S3 object + * @throws IOException If the object cannot be retrieved other I/O errors occur + * @see GetObjectResponse For additional metadata about the retrieved object + */ + @Retries.RetryRaw + ResponseInputStream getRangedS3Object(String key, + long start, + long end) throws IOException; + /** * Upload part of a multi-partition file. * Increments the write and put counters. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 69390a8cc724e..a1da63329a9ea 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -80,6 +80,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.audit.AuditIntegration.maybeTranslateAuditException; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeProcessEncryptionClientException; import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.instantiationException; import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isAbstract; import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.isNotInstanceOf; @@ -184,6 +185,8 @@ public static IOException translateException(@Nullable String operation, path = "/"; } + exception = maybeProcessEncryptionClientException(exception); + if (!(exception instanceof AwsServiceException)) { // exceptions raised client-side: connectivity, auth, network problems... Exception innerCause = containsInterruptedException(exception); @@ -529,7 +532,7 @@ public static String stringify(AwsServiceException e) { * @param owner owner of the file * @param eTag S3 object eTag or null if unavailable * @param versionId S3 object versionId or null if unavailable - * @param isCSEEnabled is client side encryption enabled? + * @param size s3 object size * @return a status entry */ public static S3AFileStatus createFileStatus(Path keyPath, @@ -538,12 +541,7 @@ public static S3AFileStatus createFileStatus(Path keyPath, String owner, String eTag, String versionId, - boolean isCSEEnabled) { - long size = s3Object.size(); - // check if cse is enabled; strip out constant padding length. - if (isCSEEnabled && size >= CSE_PADDING_LENGTH) { - size -= CSE_PADDING_LENGTH; - } + long size) { return createFileStatus(keyPath, objectRepresentsDirectory(s3Object.key()), size, Date.from(s3Object.lastModified()), blockSize, owner, eTag, versionId); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index e82eb4c9182e1..cda09b622eaff 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.impl.CSEMaterials; import org.apache.hadoop.fs.s3a.statistics.StatisticsFromAwsSdk; import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ENDPOINT; @@ -118,6 +119,23 @@ final class S3ClientCreationParameters { */ private StatisticsFromAwsSdk metrics; + /** + * Is CSE enabled? + * The default value is {@value}. + */ + private Boolean isCSEEnabled = false; + + /** + * KMS region. + * This is only used if CSE is enabled. + */ + private String kmsRegion; + + /** + * Client side encryption materials. + */ + private CSEMaterials cseMaterials; + /** * Use (deprecated) path style access. */ @@ -428,6 +446,56 @@ public S3ClientCreationParameters withRegion( return this; } + /** + * Set the client side encryption flag. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withClientSideEncryptionEnabled(final boolean value) { + this.isCSEEnabled = value; + return this; + } + + /** + * Set the KMS client region. + * This is required for CSE-KMS + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withKMSRegion(final String value) { + this.kmsRegion = value; + return this; + } + + /** + * Get the client side encryption flag. + * @return client side encryption flag + */ + public boolean isClientSideEncryptionEnabled() { + return this.isCSEEnabled; + } + + /** + * Set the client side encryption materials. + * + * @param value new value + * @return the builder + */ + public S3ClientCreationParameters withClientSideEncryptionMaterials(final CSEMaterials value) { + this.cseMaterials = value; + return this; + } + + /** + * Get the client side encryption materials. + * @return client side encryption materials + */ + public CSEMaterials getClientSideEncryptionMaterials() { + return this.cseMaterials; + } + /** * Get the region. * @return invoker @@ -436,6 +504,14 @@ public String getRegion() { return region; } + /** + * Get the KMS region. + * @return Configured KMS region. + */ + public String getKmsRegion() { + return kmsRegion; + } + /** * Should s3express createSession be called? * @return true if the client should enable createSession. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java index b7387fc12e140..a7a87bdfcfb29 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java @@ -469,6 +469,7 @@ public void abortMultipartCommit(String destKey, String uploadId) * @param destKey destination key of ongoing operation * @param uploadId ID of ongoing upload * @param partNumber current part number of the upload + * @param isLastPart is this the last part? * @param size amount of data * @return the request builder. * @throws IllegalArgumentException if the parameters are invalid. @@ -480,6 +481,7 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( String destKey, String uploadId, int partNumber, + boolean isLastPart, long size) throws IOException { return once("upload part request", destKey, withinAuditSpan(getAuditSpan(), () -> @@ -487,6 +489,7 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( destKey, uploadId, partNumber, + isLastPart, size))); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java index 68709c40f45ca..93d2506a4f3ad 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java @@ -190,6 +190,7 @@ void abortMultipartCommit(String destKey, String uploadId) * @param destKey destination key of ongoing operation * @param uploadId ID of ongoing upload * @param partNumber current part number of the upload + * @param isLastPart is this the last part of the upload? * @param size amount of data * @return the request builder. * @throws IllegalArgumentException if the parameters are invalid @@ -199,6 +200,7 @@ UploadPartRequest.Builder newUploadPartRequestBuilder( String destKey, String uploadId, int partNumber, + boolean isLastPart, long size) throws IOException; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 73ad137a86d3c..c69e3394c3dd3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -203,6 +203,7 @@ CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder( * @param destKey destination key of ongoing operation * @param uploadId ID of ongoing upload * @param partNumber current part number of the upload + * @param isLastPart isLastPart is this the last part? * @param size amount of data * @return the request builder. * @throws PathIOException if the part number is out of range. @@ -211,6 +212,7 @@ UploadPartRequest.Builder newUploadPartRequestBuilder( String destKey, String uploadId, int partNumber, + boolean isLastPart, long size) throws PathIOException; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index f33d94ce84fef..14bd4cc2f7da1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -638,17 +638,19 @@ private List uploadFileData( for (int partNumber = 1; partNumber <= numParts; partNumber++) { progress.progress(); int size = (int)Math.min(length - offset, uploadPartSize); - UploadPartRequest part = writeOperations.newUploadPartRequestBuilder( + UploadPartRequest.Builder partBuilder = writeOperations.newUploadPartRequestBuilder( destKey, uploadId, partNumber, - size).build(); + partNumber == numParts, + size); // Create a file content provider starting at the current offset. RequestBody body = RequestBody.fromContentProvider( UploadContentProviders.fileContentProvider(localFile, offset, size), size, CONTENT_TYPE_OCTET_STREAM); - UploadPartResponse response = writeOperations.uploadPart(part, body, statistics); + UploadPartResponse response = writeOperations.uploadPart(partBuilder.build(), body, + statistics); offset += uploadPartSize; parts.add(CompletedPart.builder() .partNumber(partNumber) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java index aaca3b9b194d6..fe7b45147986f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java @@ -83,6 +83,11 @@ public interface AWSHeaders { */ String CRYPTO_CEK_ALGORITHM = "x-amz-cek-alg"; + /** + * Header for unencrypted content length of an object: {@value}. + */ + String UNENCRYPTED_CONTENT_LENGTH = "x-amz-unencrypted-content-length"; + /** * Headers in request indicating that the requester must be charged for data * transfer. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BaseS3AFileSystemOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BaseS3AFileSystemOperations.java new file mode 100644 index 0000000000000..b06ff970d5a6e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BaseS3AFileSystemOperations.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.util.ReflectionUtils; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED; + +/** + * An implementation of the {@link S3AFileSystemOperations} interface. + * This handles certain filesystem operations when s3 client side encryption is disabled. + */ +public class BaseS3AFileSystemOperations implements S3AFileSystemOperations { + + /** + * Constructs a new instance of {@code BaseS3AFileSystemOperations}. + */ + public BaseS3AFileSystemOperations() { + } + + /** + * Retrieves an object from the S3. + * + * @param store The S3AStore object representing the S3 bucket. + * @param request The GetObjectRequest containing the details of the object to retrieve. + * @param factory The RequestFactory used to create the GetObjectRequest. + * @return A ResponseInputStream containing the GetObjectResponse. + * @throws IOException If an error occurs while retrieving the object. + */ + @Override + public ResponseInputStream getObject(S3AStore store, + GetObjectRequest request, + RequestFactory factory) throws IOException { + return store.getOrCreateS3Client().getObject(request); + } + + /** + * Set the client side encryption gauge to 0. + * @param ioStatisticsStore The IOStatisticsStore of the filesystem. + */ + @Override + public void setCSEGauge(IOStatisticsStore ioStatisticsStore) { + ioStatisticsStore.setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L); + } + + /** + * Retrieves the client-side encryption materials for the given bucket and encryption algorithm. + * + * @param conf The Hadoop configuration object. + * @param bucket The name of the S3 bucket. + * @param algorithm The client-side encryption algorithm to use. + * @return null. + */ + @Override + public CSEMaterials getClientSideEncryptionMaterials(Configuration conf, String bucket, + S3AEncryptionMethods algorithm) { + return null; + } + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return The S3 client factory instance. + */ + @Override + public S3ClientFactory getS3ClientFactory(Configuration conf) { + Class s3ClientFactoryClass = conf.getClass( + S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, + S3ClientFactory.class); + return ReflectionUtils.newInstance(s3ClientFactoryClass, conf); + } + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return null. + */ + @Override + public S3ClientFactory getUnencryptedS3ClientFactory(Configuration conf) { + return null; + } + + + /** + * Return the size of S3 object. + * + * @param key The key (path) of the object in the S3 bucket. + * @param length The expected length of the object. + * @param store The S3AStore object representing the S3 bucket. + * @param response The HeadObjectResponse containing the metadata of the object. + * @return The size of the object in bytes. + */ + @Override + public long getS3ObjectSize(String key, long length, S3AStore store, + HeadObjectResponse response) throws IOException { + return length; + } + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java new file mode 100644 index 0000000000000..adbeb441fbca4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import org.apache.hadoop.conf.Configuration; + +/** + * This class is for storing information about key type and corresponding key + * to be used for client side encryption. + */ +public class CSEMaterials { + + /** + * Enum for CSE key types. + */ + public enum CSEKeyType { + /** + * AWS KMS keys are used of encryption and decryption. + */ + KMS, + /** + * Custom cryptographic manager class is used for encryption and decryption. + */ + CUSTOM + } + + /** + * The KMS key Id. + */ + private String kmsKeyId; + + /** + * Custom cryptographic manager class name. + */ + private String customKeyringClassName; + + private Configuration conf; + + /** + * The CSE key type to use. + */ + private CSEKeyType cseKeyType; + + /** + * Kms key id to use. + * @param value new value + * @return the builder + */ + public CSEMaterials withKmsKeyId( + final String value) { + kmsKeyId = value; + return this; + } + + /** + * Custom cryptographic class name to use. + * @param value cryptographic manager class name + * @return the builder + */ + public CSEMaterials withCustomCryptographicClassName( + final String value) { + customKeyringClassName = value; + return this; + } + + /** + * Configuration. + * @param value configuration + * @return the builder + */ + public CSEMaterials withConf( + final Configuration value) { + conf = value; + return this; + } + + + /** + * Get the Kms key id to use. + * @return the kms key id. + */ + public String getKmsKeyId() { + return kmsKeyId; + } + + public Configuration getConf() { + return conf; + } + + /** + * Get the custom cryptographic class name. + * @return custom keyring class name + */ + public String getCustomKeyringClassName() { + return customKeyringClassName; + } + + /** + * CSE key type to use. + * @param value new value + * @return the builder + */ + public CSEMaterials withCSEKeyType( + final CSEKeyType value) { + cseKeyType = value; + return this; + } + + /** + * Get the CSE key type. + * @return CSE key type + */ + public CSEKeyType getCseKeyType() { + return cseKeyType; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSES3AFileSystemOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSES3AFileSystemOperations.java new file mode 100644 index 0000000000000..4cfab1b415972 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSES3AFileSystemOperations.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.util.ReflectionUtils; + +import static org.apache.hadoop.fs.s3a.Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; + +/** + * An implementation of the {@link S3AFileSystemOperations} interface. + * This handles certain filesystem operations when s3 client side encryption is enabled. + */ +public class CSES3AFileSystemOperations implements S3AFileSystemOperations { + + /** + * Constructs a new instance of {@code CSES3AFileSystemOperations}. + */ + public CSES3AFileSystemOperations() { + } + + /** + * Retrieves an object from the S3 using encrypted S3 client. + * + * @param store The S3AStore object representing the S3 bucket. + * @param request The GetObjectRequest containing the details of the object to retrieve. + * @param factory The RequestFactory used to create the GetObjectRequest. + * @return A ResponseInputStream containing the GetObjectResponse. + * @throws IOException If an error occurs while retrieving the object. + */ + @Override + public ResponseInputStream getObject(S3AStore store, + GetObjectRequest request, + RequestFactory factory) throws IOException { + return store.getOrCreateS3Client().getObject(request); + } + + /** + * Set the client side encryption gauge 1. + * @param ioStatisticsStore The IOStatisticsStore of the filesystem. + */ + @Override + public void setCSEGauge(IOStatisticsStore ioStatisticsStore) { + ioStatisticsStore.setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L); + } + + /** + * Retrieves the cse materials. + * + * @param conf The Hadoop configuration object. + * @param bucket The name of the S3 bucket. + * @param algorithm The client-side encryption algorithm to use. + * @return The client-side encryption materials. + * @throws IOException If an error occurs while retrieving the encryption materials. + */ + @Override + public CSEMaterials getClientSideEncryptionMaterials(Configuration conf, String bucket, + S3AEncryptionMethods algorithm) throws IOException { + return CSEUtils.getClientSideEncryptionMaterials(conf, bucket, algorithm); + } + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return The S3 client factory instance. + */ + @Override + public S3ClientFactory getS3ClientFactory(Configuration conf) { + return ReflectionUtils.newInstance(EncryptionS3ClientFactory.class, conf); + } + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return null + */ + @Override + public S3ClientFactory getUnencryptedS3ClientFactory(Configuration conf) { + return null; + } + + + /** + * Retrieves the unencrypted length of an object in the S3 bucket. + * + * @param key The key (path) of the object in the S3 bucket. + * @param length The length of the object. + * @param store The S3AStore object representing the S3 bucket. + * @param response The HeadObjectResponse containing the metadata of the object. + * @return The unencrypted size of the object in bytes. + * @throws IOException If an error occurs while retrieving the object size. + */ + @Override + public long getS3ObjectSize(String key, long length, S3AStore store, + HeadObjectResponse response) throws IOException { + long unencryptedLength = length - CSE_PADDING_LENGTH; + if (unencryptedLength >= 0) { + return unencryptedLength; + } + return length; + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java new file mode 100644 index 0000000000000..ddb0040cbcb58 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.util.Preconditions; + +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM; +import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS; +import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM; +import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH; +import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; + +/** + * S3 client side encryption (CSE) utility class. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class CSEUtils { + + private CSEUtils() { + } + + /** + * Checks if Client-Side Encryption (CSE) is enabled based on the encryption method. + * + * Validates if the provided encryption method matches either CSE-KMS or CSE-Custom + * encryption methods. These are the two supported client-side encryption methods. + * + * @param encryptionMethod The encryption method to check (case-sensitive) + * @return true if the encryption method is either CSE-KMS or CSE-Custom, + * false otherwise + * @see S3AEncryptionMethods#CSE_KMS + * @see S3AEncryptionMethods#CSE_CUSTOM + */ + public static boolean isCSEEnabled(String encryptionMethod) { + return CSE_KMS.getMethod().equals(encryptionMethod) || + CSE_CUSTOM.getMethod().equals(encryptionMethod); + } + + /** + * Checks if an S3 object is encrypted by examining its metadata. + * + * This method performs a HEAD request on the object and checks for the presence + * of encryption metadata (specifically the CEK algorithm indicator). + * + * @param store The S3AStore instance used to access the S3 object + * @param key The key (path) of the S3 object to check + * @return true if the object is encrypted (has CEK algorithm metadata), + * false otherwise + * @throws IOException If there's an error accessing the object metadata or + * communicating with S3 + */ + public static boolean isObjectEncrypted(S3AStore store, + String key) throws IOException { + HeadObjectResponse headObjectResponse = store.headObject(key, + null, + null, + null, + "getObjectMetadata"); + + if (headObjectResponse.hasMetadata() && + headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) { + return true; + } + return false; + } + + /** + * Determines the actual unencrypted length of an S3 object. + * + * This method uses a three-step process to determine the object's unencrypted length: + * 1. If the object is not encrypted, returns the original content length + * 2. If encrypted, attempts to read the unencrypted length from object metadata + * 3. If metadata is unavailable, calculates length by performing a ranged GET operation + * + * @param store The S3AStore instance used to access the S3 object + * @param key The key (path) of the S3 object + * @param contentLength The encrypted object's content length + * @param headObjectResponse The object's metadata from a HEAD request, may be null + * @return The length of the object's unencrypted content + * @throws IOException If there's an error: + * - accessing the object or its metadata + * - parsing the unencrypted length from metadata + * - performing the ranged GET operation + * - computing the unencrypted length + */ + public static long getUnencryptedObjectLength(S3AStore store, + String key, + long contentLength, + HeadObjectResponse headObjectResponse) throws IOException { + + // if object is unencrypted, return the actual size + if (!isObjectEncrypted(store, key)) { + return contentLength; + } + + // check if unencrypted content length metadata is present or not. + if (headObjectResponse != null) { + String plaintextLength = headObjectResponse.metadata().get(UNENCRYPTED_CONTENT_LENGTH); + if (headObjectResponse.hasMetadata() && plaintextLength != null + && !plaintextLength.isEmpty()) { + return Long.parseLong(plaintextLength); + } + } + + // identify the length by doing a ranged GET operation. + if (contentLength >= CSE_PADDING_LENGTH) { + long minPlaintextLength = contentLength - CSE_PADDING_LENGTH; + if (minPlaintextLength < 0) { + minPlaintextLength = 0; + } + try (InputStream is = store.getRangedS3Object(key, minPlaintextLength, contentLength)) { + int i = 0; + while (is.read() != -1) { + i++; + } + return minPlaintextLength + i; + } catch (Exception e) { + throw new IOException("Failed to compute unencrypted length", e); + } + } + return contentLength; + } + + /** + * Creates encryption materials for client-side encryption based on the specified algorithm. + * + * Supports two types of client-side encryption: + *
    + *
  • CSE_KMS: Uses AWS KMS for key management
  • + *
  • CSE_CUSTOM: Uses a custom cryptographic implementation
  • + *
+ * + * @param conf The configuration containing encryption settings + * @param bucket The S3 bucket name for which encryption materials are being created + * @param algorithm The encryption algorithm to use (CSE_KMS or CSE_CUSTOM) + * @return CSEMaterials configured with the appropriate encryption settings + * @throws IOException If there's an error retrieving encryption configuration + * @throws IllegalArgumentException If: + * - KMS key ID is null or empty (for CSE_KMS) + * - Custom crypto class name is null or empty (for CSE_CUSTOM) + * - Unsupported encryption algorithm is specified + */ + public static CSEMaterials getClientSideEncryptionMaterials(Configuration conf, + String bucket, + S3AEncryptionMethods algorithm) throws IOException { + switch (algorithm) { + case CSE_KMS: + String kmsKeyId = getS3EncryptionKey(bucket, conf, true); + Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(), + "KMS keyId cannot be null or empty"); + return new CSEMaterials() + .withCSEKeyType(CSEMaterials.CSEKeyType.KMS) + .withConf(conf) + .withKmsKeyId(kmsKeyId); + case CSE_CUSTOM: + String customCryptoClassName = conf.getTrimmed(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME); + Preconditions.checkArgument(customCryptoClassName != null && + !customCryptoClassName.isEmpty(), + "CSE custom cryptographic class name cannot be null or empty"); + return new CSEMaterials() + .withCSEKeyType(CSEMaterials.CSEKeyType.CUSTOM) + .withConf(conf) + .withCustomCryptographicClassName(customCryptoClassName); + default: + throw new IllegalArgumentException("Invalid client side encryption algorithm." + + " Only CSE-KMS and CSE-CUSTOM are supported"); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemOperations.java new file mode 100644 index 0000000000000..3a540c2e5f343 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemOperations.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.util.ReflectionUtils; + +import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted; + +/** + * An extension of the {@link CSES3AFileSystemOperations} class. + * This handles certain file system operations when client-side encryption is enabled with v1 client + * compatibility. + * {@link org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}. + */ +public class CSEV1CompatibleS3AFileSystemOperations extends CSES3AFileSystemOperations { + + /** + * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemOperations}. + */ + public CSEV1CompatibleS3AFileSystemOperations() { + } + + /** + * Retrieves an object from the S3. + * If the S3 object is encrypted, it uses the encrypted S3 client to retrieve the object else + * it uses the unencrypted S3 client. + * + * @param store The S3AStore object representing the S3 bucket. + * @param request The GetObjectRequest containing the details of the object to retrieve. + * @param factory The RequestFactory used to create the GetObjectRequest. + * @return A ResponseInputStream containing the GetObjectResponse. + * @throws IOException If an error occurs while retrieving the object. + */ + @Override + public ResponseInputStream getObject(S3AStore store, + GetObjectRequest request, + RequestFactory factory) throws IOException { + boolean isEncrypted = isObjectEncrypted(store, request.key()); + return isEncrypted ? store.getOrCreateS3Client().getObject(request) + : store.getOrCreateUnencryptedS3Client().getObject(request); + } + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return The S3 client factory instance. + */ + @Override + public S3ClientFactory getUnencryptedS3ClientFactory(Configuration conf) { + Class s3ClientFactoryClass = conf.getClass( + S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, + S3ClientFactory.class); + return ReflectionUtils.newInstance(s3ClientFactoryClass, conf); + } + + /** + * Retrieves the unencrypted length of an object in the S3 bucket. + * + * @param key The key (path) of the object in the S3 bucket. + * @param length The length of the object. + * @param store The S3AStore object representing the S3 bucket. + * @param response The HeadObjectResponse containing the metadata of the object. + * @return The unencrypted size of the object in bytes. + * @throws IOException If an error occurs while retrieving the object size. + */ + @Override + public long getS3ObjectSize(String key, long length, S3AStore store, + HeadObjectResponse response) throws IOException { + return CSEUtils.getUnencryptedObjectLength(store, key, length, response); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java index 7fadac8623d50..ad7afc732387f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java @@ -61,6 +61,14 @@ S3TransferManager getOrCreateTransferManager() */ S3AsyncClient getOrCreateAsyncClient() throws IOException; + /** + * Get or create an unencrypted S3 client. + * This is used for unencrypted operations when CSE is enabled with V1 compatibility. + * @return unencrypted S3 client + * @throws IOException on any failure + */ + S3Client getOrCreateUnencryptedS3Client() throws IOException; + /** * Get the AsyncS3Client, raising a failure to create as an UncheckedIOException. * @return the S3 client diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java index 24c37cc564a09..44383e381248f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java @@ -62,6 +62,11 @@ public class ClientManagerImpl implements ClientManager { */ private final S3ClientFactory clientFactory; + /** + * Client factory to invoke for unencrypted client. + */ + private final S3ClientFactory unencryptedClientFactory; + /** * Closed flag. */ @@ -85,6 +90,12 @@ public class ClientManagerImpl implements ClientManager { /** Async client is used for transfer manager. */ private final LazyAutoCloseableReference s3AsyncClient; + /** + * Unencrypted S3 client. + * This is used for unencrypted operations when CSE is enabled with V1 compatibility. + */ + private final LazyAutoCloseableReference unencryptedS3Client; + /** Transfer manager. */ private final LazyAutoCloseableReference transferManager; @@ -95,18 +106,22 @@ public class ClientManagerImpl implements ClientManager { *

* It does disable noisy logging from the S3 Transfer Manager. * @param clientFactory client factory to invoke + * @param unencryptedClientFactory client factory to invoke * @param clientCreationParameters creation parameters. * @param durationTrackerFactory duration tracker. */ public ClientManagerImpl( final S3ClientFactory clientFactory, + final S3ClientFactory unencryptedClientFactory, final S3ClientFactory.S3ClientCreationParameters clientCreationParameters, final DurationTrackerFactory durationTrackerFactory) { this.clientFactory = requireNonNull(clientFactory); + this.unencryptedClientFactory = unencryptedClientFactory; this.clientCreationParameters = requireNonNull(clientCreationParameters); this.durationTrackerFactory = requireNonNull(durationTrackerFactory); this.s3Client = new LazyAutoCloseableReference<>(createS3Client()); this.s3AsyncClient = new LazyAutoCloseableReference<>(createAyncClient()); + this.unencryptedS3Client = new LazyAutoCloseableReference<>(createUnencryptedS3Client()); this.transferManager = new LazyAutoCloseableReference<>(createTransferManager()); // fix up SDK logging. @@ -135,6 +150,17 @@ private CallableRaisingIOE createAyncClient() { () -> clientFactory.createS3AsyncClient(getUri(), clientCreationParameters)); } + /** + * Create the function to create the unencrypted S3 client. + * @return a callable which will create the client. + */ + private CallableRaisingIOE createUnencryptedS3Client() { + return trackDurationOfOperation( + durationTrackerFactory, + STORE_CLIENT_CREATION.getSymbol(), + () -> unencryptedClientFactory.createS3Client(getUri(), clientCreationParameters)); + } + /** * Create the function to create the Transfer Manager. * @return a callable which will create the component. @@ -182,6 +208,18 @@ public synchronized S3Client getOrCreateAsyncS3ClientUnchecked() throws Unchecke return s3Client.get(); } + /** + * Get or create an unencrypted S3 client. + * This is used for unencrypted operations when CSE is enabled with V1 compatibility. + * @return unencrypted S3 client + * @throws IOException on any failure + */ + @Override + public synchronized S3Client getOrCreateUnencryptedS3Client() throws IOException { + checkNotClosed(); + return unencryptedS3Client.eval(); + } + @Override public synchronized S3TransferManager getOrCreateTransferManager() throws IOException { checkNotClosed(); @@ -213,6 +251,7 @@ public synchronized void close() { l.add(closeAsync(transferManager)); l.add(closeAsync(s3AsyncClient)); l.add(closeAsync(s3Client)); + l.add(closeAsync(unencryptedS3Client)); // once all are queued, await their completion // and swallow any exception. @@ -261,6 +300,7 @@ public String toString() { "closed=" + closed.get() + ", s3Client=" + s3Client + ", s3AsyncClient=" + s3AsyncClient + + ", unencryptedS3Client=" + unencryptedS3Client + ", transferManager=" + transferManager + '}'; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java new file mode 100644 index 0000000000000..5817d924febcc --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/EncryptionS3ClientFactory.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; +import java.net.URI; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.awssdk.services.kms.KmsClientBuilder; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.encryption.s3.S3AsyncEncryptionClient; +import software.amazon.encryption.s3.S3EncryptionClient; +import software.amazon.encryption.s3.materials.CryptographicMaterialsManager; +import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager; +import software.amazon.encryption.s3.materials.Keyring; +import software.amazon.encryption.s3.materials.KmsKeyring; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.functional.LazyAtomicReference; + +import static org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable; + +/** + * Factory class to create encrypted s3 client and encrypted async s3 client. + */ +public class EncryptionS3ClientFactory extends DefaultS3ClientFactory { + + /** + * Encryption client class name. + * value: {@value} + */ + private static final String ENCRYPTION_CLIENT_CLASSNAME = + "software.amazon.encryption.s3.S3EncryptionClient"; + + /** + * Encryption client availability. + */ + private static final LazyAtomicReference ENCRYPTION_CLIENT_AVAILABLE = + LazyAtomicReference.lazyAtomicReferenceFromSupplier( + EncryptionS3ClientFactory::checkForEncryptionClient + ); + + + /** + * S3Client to be wrapped by encryption client. + */ + private S3Client s3Client; + + /** + * S3AsyncClient to be wrapped by encryption client. + */ + private S3AsyncClient s3AsyncClient; + + /** + * Checks if {@link #ENCRYPTION_CLIENT_CLASSNAME} is available in the class path. + * @return true if available, false otherwise. + */ + private static boolean checkForEncryptionClient() { + try { + ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader(); + cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME); + LOG.debug("encryption client class {} found", ENCRYPTION_CLIENT_CLASSNAME); + return true; + } catch (Exception e) { + LOG.debug("encryption client class {} not found", ENCRYPTION_CLIENT_CLASSNAME, e); + return false; + } + } + + /** + * Is the Encryption client available? + * @return true if it was found in the classloader + */ + private static synchronized boolean isEncryptionClientAvailable() { + return ENCRYPTION_CLIENT_AVAILABLE.get(); + } + + /** + * Creates both synchronous and asynchronous encrypted s3 clients. + * Synchronous client is wrapped by encryption client first and then + * Asynchronous client is wrapped by encryption client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3Client createS3Client(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + + s3Client = super.createS3Client(uri, parameters); + s3AsyncClient = super.createS3AsyncClient(uri, parameters); + + return createS3EncryptionClient(parameters); + } + + /** + * Create async encrypted s3 client. + * @param uri S3A file system URI + * @param parameters parameter object + * @return async encrypted s3 client + * @throws IOException IO failures + */ + @Override + public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters parameters) + throws IOException { + if (!isEncryptionClientAvailable()) { + throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null, + "No encryption client available"); + } + return createS3AsyncEncryptionClient(parameters); + } + + /** + * Creates an S3EncryptionClient instance based on the provided parameters. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @return An instance of S3EncryptionClient. + * @throws IOException If an error occurs during the creation of the S3EncryptionClient. + */ + private S3Client createS3EncryptionClient(S3ClientCreationParameters parameters) + throws IOException { + CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials(); + Preconditions.checkArgument(s3AsyncClient != null, + "S3 async client not initialized"); + Preconditions.checkArgument(s3Client != null, + "S3 client not initialized"); + Preconditions.checkArgument(parameters != null, + "S3ClientCreationParameters is not initialized"); + + S3EncryptionClient.Builder s3EncryptionClientBuilder = + S3EncryptionClient.builder() + .wrappedAsyncClient(s3AsyncClient) + .wrappedClient(s3Client) + // this is required for doing S3 ranged GET calls + .enableLegacyUnauthenticatedModes(true) + // this is required for backward compatibility with older encryption clients + .enableLegacyWrappingAlgorithms(true); + + switch (cseMaterials.getCseKeyType()) { + case KMS: + Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials); + CryptographicMaterialsManager kmsCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(kmsKeyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager); + break; + case CUSTOM: + Keyring keyring; + try { + keyring = + getKeyringProvider(cseMaterials.getCustomKeyringClassName(), cseMaterials.getConf()); + } catch (RuntimeException e) { + throw new IOException("Failed to instantiate a custom keyring provider: " + e, e); + } + CryptographicMaterialsManager customCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(keyring) + .build(); + s3EncryptionClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager); + break; + default: + break; + } + return s3EncryptionClientBuilder.build(); + } + + /** + * Creates KmsKeyring instance based on the provided S3ClientCreationParameters and CSEMaterials. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @param cseMaterials The CSEMaterials containing the KMS key ID and other encryption materials. + * @return A KmsKeyring instance configured with the appropriate KMS client and wrapping key ID. + */ + private Keyring createKmsKeyring(S3ClientCreationParameters parameters, + CSEMaterials cseMaterials) { + KmsClientBuilder kmsClientBuilder = KmsClient.builder(); + if (parameters.getCredentialSet() != null) { + kmsClientBuilder.credentialsProvider(parameters.getCredentialSet()); + } + // check if kms region is configured. + if (parameters.getKmsRegion() != null) { + kmsClientBuilder.region(Region.of(parameters.getKmsRegion())); + } else if (parameters.getRegion() != null) { + // fallback to s3 region if kms region is not configured. + kmsClientBuilder.region(Region.of(parameters.getRegion())); + } else if (parameters.getEndpoint() != null) { + // fallback to s3 endpoint config if both kms region and s3 region is not set. + String endpointStr = parameters.getEndpoint(); + URI endpoint = getS3Endpoint(endpointStr, cseMaterials.getConf()); + kmsClientBuilder.endpointOverride(endpoint); + } + return KmsKeyring.builder() + .kmsClient(kmsClientBuilder.build()) + .wrappingKeyId(cseMaterials.getKmsKeyId()) + .build(); + } + + /** + * Creates an S3AsyncEncryptionClient instance based on the provided parameters. + * + * @param parameters The S3ClientCreationParameters containing the necessary configuration. + * @return An instance of S3AsyncEncryptionClient. + * @throws IOException If an error occurs during the creation of the S3AsyncEncryptionClient. + */ + private S3AsyncClient createS3AsyncEncryptionClient(S3ClientCreationParameters parameters) + throws IOException { + Preconditions.checkArgument(s3AsyncClient != null, + "S3 async client not initialized"); + Preconditions.checkArgument(parameters != null, + "S3ClientCreationParameters is not initialized"); + + S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder = + S3AsyncEncryptionClient.builder() + .wrappedClient(s3AsyncClient) + // this is required for doing S3 ranged GET calls + .enableLegacyUnauthenticatedModes(true) + // this is required for backward compatibility with older encryption clients + .enableLegacyWrappingAlgorithms(true); + + CSEMaterials cseMaterials = parameters.getClientSideEncryptionMaterials(); + switch (cseMaterials.getCseKeyType()) { + case KMS: + Keyring kmsKeyring = createKmsKeyring(parameters, cseMaterials); + CryptographicMaterialsManager kmsCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(kmsKeyring) + .build(); + s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(kmsCryptoMaterialsManager); + break; + case CUSTOM: + Keyring keyring; + try { + keyring = + getKeyringProvider(cseMaterials.getCustomKeyringClassName(), cseMaterials.getConf()); + } catch (RuntimeException e) { + throw new IOException("Failed to instantiate a custom keyring provider: " + e, e); + } + CryptographicMaterialsManager customCryptoMaterialsManager = + DefaultCryptoMaterialsManager.builder() + .keyring(keyring) + .build(); + s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(customCryptoMaterialsManager); + break; + default: + break; + } + return s3EncryptionAsyncClientBuilder.build(); + } + + /** + * Creates and returns a Keyring provider instance based on the given class name. + * + *

This method attempts to instantiate a Keyring provider using reflection. It first tries + * to create an instance using the standard ReflectionUtils.newInstance method. If that fails, + * it falls back to an alternative instantiation method, which is primarily used for testing + * purposes (specifically for CustomKeyring.java). + * + * @param className The fully qualified class name of the Keyring provider to instantiate. + * @param conf The Configuration object to be passed to the Keyring provider constructor. + * @return An instance of the specified Keyring provider. + * @throws RuntimeException If unable to create the Keyring provider instance. + */ + private Keyring getKeyringProvider(String className, Configuration conf) { + Class keyringProviderClass = getCustomKeyringProviderClass(className); + try { + return ReflectionUtils.newInstance(keyringProviderClass, conf); + } catch (Exception e) { + LOG.warn("Failed to create Keyring provider", e); + // This is for testing purposes to support CustomKeyring.java + try { + return ReflectionUtils.newInstance(keyringProviderClass, conf, + new Class[] {Configuration.class}, conf); + } catch (Exception ex) { + throw new RuntimeException("Failed to create Keyring provider", ex); + } + } + } + + /** + * Retrieves the Class object for the custom Keyring provider based on the provided class name. + * + * @param className The fully qualified class name of the custom Keyring provider implementation. + * @return The Class object representing the custom Keyring provider implementation. + * @throws IllegalArgumentException If the provided class name is null or empty, + * or if the specified class is not found. + */ + private Class getCustomKeyringProviderClass(String className) { + Preconditions.checkArgument(className !=null && !className.isEmpty(), + "Custom Keyring class name is null or empty"); + try { + return Class.forName(className).asSubclass(Keyring.class); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + "Custom CryptographicMaterialsManager class " + className + "not found", e); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java index 7934a5c7d4d5c..cd24b61340c3f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ErrorTranslation.java @@ -22,6 +22,7 @@ import java.lang.reflect.Constructor; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.exception.SdkException; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.s3a.HttpChannelEOFException; @@ -63,6 +64,12 @@ public final class ErrorTranslation { private static final String SHADED_NO_HTTP_RESPONSE_EXCEPTION = "software.amazon.awssdk.thirdparty.org.apache.http.NoHttpResponseException"; + /** + * S3 encryption client exception class name: {@value}. + */ + private static final String S3_ENCRYPTION_CLIENT_EXCEPTION = + "software.amazon.encryption.s3.S3EncryptionClientException"; + /** * Private constructor for utility class. */ @@ -105,6 +112,54 @@ private static Throwable getInnermostThrowable(Throwable thrown, Throwable outer return getInnermostThrowable(thrown.getCause(), thrown); } + /** + * Attempts to extract the underlying SdkException from an S3 encryption client exception. + * + *

This method is designed to handle exceptions that may be wrapped within + * S3EncryptionClientExceptions. It performs the following steps: + *

    + *
  1. Checks if the input exception is null.
  2. + *
  3. Verifies if the exception contains the S3EncryptionClientException signature.
  4. + *
  5. Examines the cause chain to find the most relevant SdkException.
  6. + *
+ * + *

The method aims to unwrap nested exceptions to provide more meaningful + * error information, particularly in the context of S3 encryption operations. + * + * @param exception The SdkException to analyze. This may be a wrapper exception + * containing a more specific underlying cause. + * @return The extracted SdkException if found within the exception chain, + * or the original exception if no relevant nested exception is found. + * Returns null if the input exception is null. + * + * @see SdkException + * @see AwsServiceException + */ + public static SdkException maybeProcessEncryptionClientException( + SdkException exception) { + if (exception == null) { + return null; + } + + // check if the exception contains S3EncryptionClientException + if (!exception.toString().contains(S3_ENCRYPTION_CLIENT_EXCEPTION)) { + return exception; + } + + Throwable cause = exception.getCause(); + if (!(cause instanceof SdkException)) { + return exception; + } + + // get the actual sdk exception. + SdkException sdkCause = (SdkException) cause; + if (sdkCause.getCause() instanceof AwsServiceException) { + return (SdkException) sdkCause.getCause(); + } + + return sdkCause; + } + /** * Translate an exception if it or its inner exception is an * IOException. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java index 3865c391d6ddb..2771e02bd6eb5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java @@ -325,19 +325,6 @@ private Map retrieveHeaders( md.contentEncoding()); maybeSetHeader(headers, XA_CONTENT_LANGUAGE, md.contentLanguage()); - // If CSE is enabled, use the unencrypted content length. - // TODO: CSE is not supported yet, add these headers in during CSE work. -// if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null -// && md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH) != null) { -// maybeSetHeader(headers, XA_CONTENT_LENGTH, -// md.getUserMetaDataOf(Headers.UNENCRYPTED_CONTENT_LENGTH)); -// } else { -// maybeSetHeader(headers, XA_CONTENT_LENGTH, -// md.contentLength()); -// } -// maybeSetHeader(headers, XA_CONTENT_MD5, -// md.getContentMD5()); - // TODO: Add back in else block during CSE work. maybeSetHeader(headers, XA_CONTENT_LENGTH, md.contentLength()); if (md.sdkHttpResponse() != null && md.sdkHttpResponse().headers() != null diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java index f4d7a4349e484..200e9a386b359 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.services.s3.model.S3Object; + import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Retries; @@ -105,6 +107,15 @@ S3ListRequest createListObjectsRequest( */ long getDefaultBlockSize(Path path); + /** + * Get the S3 object size. + * If the object is encrypted, the unpadded size will be returned. + * @param s3Object S3object + * @return plaintext S3 object size + * @throws IOException IO problems + */ + long getObjectSize(S3Object s3Object) throws IOException; + /** * Get the maximum key count. * @return a value, valid after initialization diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index e9f7d707286a8..00d9368aa58f1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -44,6 +44,7 @@ import software.amazon.awssdk.services.s3.model.MetadataDirective; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.SdkPartType; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; @@ -588,6 +589,7 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( String destKey, String uploadId, int partNumber, + boolean isLastPart, long size) throws PathIOException { checkNotNull(uploadId); checkArgument(size >= 0, "Invalid partition size %s", size); @@ -609,6 +611,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder( .uploadId(uploadId) .partNumber(partNumber) .contentLength(size); + if (isLastPart) { + builder.sdkPartType(SdkPartType.LAST); + } uploadPartEncryptionParameters(builder); // Set the request timeout for the part upload diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemOperations.java new file mode 100644 index 0000000000000..8437d80897040 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemOperations.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.IOException; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; +import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; + +/** + * An interface that helps map from object store semantics to that of the fileystem. + * This specially supports encrypted stores. + */ +public interface S3AFileSystemOperations { + + /** + * Retrieves an object from the S3. + * + * @param store The S3AStore object representing the S3 bucket. + * @param request The GetObjectRequest containing the details of the object to retrieve. + * @param factory The RequestFactory used to create the GetObjectRequest. + * @return A ResponseInputStream containing the GetObjectResponse. + * @throws IOException If an error occurs while retrieving the object. + */ + ResponseInputStream getObject(S3AStore store, GetObjectRequest request, + RequestFactory factory) throws IOException; + + /** + * Set the client side encryption gauge to 0 or 1, indicating if CSE is enabled. + * @param ioStatisticsStore The IOStatisticsStore of the filesystem. + */ + void setCSEGauge(IOStatisticsStore ioStatisticsStore); + + /** + * Retrieves the client-side encryption materials for the given bucket and encryption algorithm. + * + * @param conf The Hadoop configuration object. + * @param bucket The name of the S3 bucket. + * @param algorithm The client-side encryption algorithm to use. + * @return The client-side encryption materials. + * @throws IOException If an error occurs while retrieving the encryption materials. + */ + CSEMaterials getClientSideEncryptionMaterials(Configuration conf, String bucket, + S3AEncryptionMethods algorithm) throws IOException; + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return The S3 client factory instance. + */ + S3ClientFactory getS3ClientFactory(Configuration conf); + + /** + * Retrieves the S3 client factory for the specified class and configuration. + * + * @param conf The Hadoop configuration object. + * @return The S3 client factory instance. + */ + S3ClientFactory getUnencryptedS3ClientFactory(Configuration conf); + + + /** + * Retrieves the unencrypted length of an object in the S3 bucket. + * + * @param key The key (path) of the object in the S3 bucket. + * @param length The length of the object. + * @param store The S3AStore object representing the S3 bucket. + * @param response The HeadObjectResponse containing the metadata of the object. + * @return The unencrypted size of the object in bytes. + * @throws IOException If an error occurs while retrieving the object size. + */ + long getS3ObjectSize(String key, long length, S3AStore store, + HeadObjectResponse response) throws IOException; + +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java index 58e38c2873bd0..e00319c3dc581 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java @@ -139,6 +139,7 @@ public CompletableFuture startUpload( public CompletableFuture putPart( final UploadHandle uploadId, final int partNumber, + final boolean isLastPart, final Path filePath, final InputStream inputStream, final long lengthInBytes) @@ -154,7 +155,7 @@ public CompletableFuture putPart( return context.submit(new CompletableFuture<>(), () -> { UploadPartRequest request = writeOperations.newUploadPartRequestBuilder(key, - uploadIdString, partNumber, lengthInBytes).build(); + uploadIdString, partNumber, isLastPart, lengthInBytes).build(); RequestBody body = RequestBody.fromInputStream(inputStream, lengthInBytes); UploadPartResponse response = writeOperations.uploadPart(request, body, statistics); statistics.partPut(lengthInBytes); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index 385023598c559..db07881345500 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -32,6 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -42,6 +43,10 @@ import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.S3Error; @@ -59,11 +64,13 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3AStorageStatistics; import org.apache.hadoop.fs.s3a.S3AStore; +import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Statistic; import org.apache.hadoop.fs.s3a.UploadInfo; import org.apache.hadoop.fs.s3a.api.RequestFactory; import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A; import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext; +import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.store.audit.AuditSpanSource; @@ -75,11 +82,13 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.extractException; import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength; import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; +import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_HEAD_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.IGNORED_ERRORS; import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_PART_PUT; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_BULK_DELETE_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUEST; +import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_METADATA_REQUESTS; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_BYTES; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_BYTES_PENDING; import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_PUT_REQUESTS_ACTIVE; @@ -90,6 +99,7 @@ import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.util.Preconditions.checkArgument; @@ -247,6 +257,11 @@ public S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException return clientManager.getOrCreateAsyncS3ClientUnchecked(); } + @Override + public S3Client getOrCreateUnencryptedS3Client() throws IOException { + return clientManager.getOrCreateUnencryptedS3Client(); + } + @Override public DurationTrackerFactory getDurationTrackerFactory() { return durationTrackerFactory; @@ -540,6 +555,102 @@ public Map.Entry deleteObjects( } } + /** + * Performs a HEAD request on an S3 object to retrieve its metadata. + * + * @param key The S3 object key to perform the HEAD operation on + * @param changeTracker Tracks changes to the object's metadata across operations + * @param changeInvoker The invoker responsible for executing the HEAD request with retries + * @param fsHandler Handler for filesystem-level operations and configurations + * @param operation Description of the operation being performed for tracking purposes + * @return HeadObjectResponse containing the object's metadata + * @throws IOException If the HEAD request fails, object doesn't exist, or other I/O errors occur + */ + @Override + @Retries.RetryRaw + public HeadObjectResponse headObject(String key, + ChangeTracker changeTracker, + Invoker changeInvoker, + S3AFileSystemOperations fsHandler, + String operation) throws IOException { + HeadObjectResponse response = getStoreContext().getInvoker() + .retryUntranslated("HEAD " + key, true, + () -> { + HeadObjectRequest.Builder requestBuilder = + getRequestFactory().newHeadObjectRequestBuilder(key); + incrementStatistic(OBJECT_METADATA_REQUESTS); + DurationTracker duration = + getDurationTrackerFactory().trackDuration(ACTION_HTTP_HEAD_REQUEST.getSymbol()); + try { + LOG.debug("HEAD {} with change tracker {}", key, changeTracker); + if (changeTracker != null) { + changeTracker.maybeApplyConstraint(requestBuilder); + } + HeadObjectResponse headObjectResponse = + getS3Client().headObject(requestBuilder.build()); + if (fsHandler != null) { + long length = + fsHandler.getS3ObjectSize(key, headObjectResponse.contentLength(), this, + headObjectResponse); + // overwrite the content length + headObjectResponse = headObjectResponse.toBuilder().contentLength(length).build(); + } + if (changeTracker != null) { + changeTracker.processMetadata(headObjectResponse, operation); + } + return headObjectResponse; + } catch (AwsServiceException ase) { + if (!isObjectNotFound(ase)) { + // file not found is not considered a failure of the call, + // so only switch the duration tracker to update failure + // metrics on other exception outcomes. + duration.failed(); + } + throw ase; + } finally { + // update the tracker. + duration.close(); + } + }); + incrementReadOperations(); + return response; + } + + /** + * Retrieves a specific byte range of an S3 object as a stream. + * + * @param key The S3 object key to retrieve + * @param start The starting byte position (inclusive) of the range to retrieve + * @param end The ending byte position (inclusive) of the range to retrieve + * @return A ResponseInputStream containing the requested byte range of the S3 object + * @throws IOException If the object cannot be retrieved other I/O errors occur + * @see GetObjectResponse For additional metadata about the retrieved object + */ + @Override + @Retries.RetryRaw + public ResponseInputStream getRangedS3Object(String key, + long start, + long end) throws IOException { + final GetObjectRequest request = getRequestFactory().newGetObjectRequestBuilder(key) + .range(S3AUtils.formatRange(start, end)) + .build(); + DurationTracker duration = getDurationTrackerFactory() + .trackDuration(ACTION_HTTP_GET_REQUEST); + ResponseInputStream objectRange; + try { + objectRange = getStoreContext().getInvoker() + .retryUntranslated("GET Ranged Object " + key, true, + () -> getS3Client().getObject(request)); + + } catch (IOException ex) { + duration.failed(); + throw ex; + } finally { + duration.close(); + } + return objectRange; + } + /** * {@inheritDoc}. */ diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md index 7b9e8d0412efd..1d598f37599d5 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md @@ -680,9 +680,9 @@ client-side and then transmit it over to S3 storage. The same encrypted data is then transmitted over to client while reading and then decrypted on the client-side. -S3-CSE, uses `AmazonS3EncryptionClientV2.java` as the AmazonS3 client. The -encryption and decryption is done by AWS SDK. As of July 2021, Only CSE-KMS -method is supported. +S3-CSE, uses `S3EncryptionClient.java` (V3) as the AmazonS3 client. The +encryption and decryption is done by AWS SDK. Both CSE-KMS and CSE-CUSTOM +methods are supported. A key reason this feature (HADOOP-13887) has been unavailable for a long time is that the AWS S3 client pads uploaded objects with a 16 byte footer. This @@ -703,11 +703,30 @@ shorter than the length of files listed with other clients -including S3A clients where S3-CSE has not been enabled. ### Features - -- Supports client side encryption with keys managed in AWS KMS. +- Supports client side encryption with keys managed in AWS KMS (CSE-KMS) +- Supports client side encryption with custom keys by +implementing custom [Keyring](https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/choose-keyring.html) (CSE-CUSTOM) +- Backward compatible with older encryption clients +like `AmazonS3EncryptionClient.java`(V1) and `AmazonS3EncryptionClientV2.java`(V2) - encryption settings propagated into jobs through any issued delegation tokens. - encryption information stored as headers in the uploaded object. +### Compatibility Issues +- The V1 client support reading unencrypted S3 objects, whereas the V3 client does not. +- Unlike the V2 and V3 clients, which always append 16 bytes to a file, +the V1 client appends extra bytes to the next multiple of 16. +For example, if the unencrypted object size is 28 bytes, +the V1 client pads an extra 4 bytes to make it a multiple of 16. + +Note: Inorder to workaround the above compatibility issues +set `fs.s3a.encryption.cse.v1.compatibility.enabled=true` + +Note: The V1 client supports storing encryption metadata in a separate file with +the suffix "fileName".instruction. However, these instruction files are not +skipped and will lead to exceptions or unknown issues. +Therefore, it is recommended not to use S3A client-side encryption (CSE) +when instruction files are used to store encryption metadata. + ### Limitations - Performance will be reduced. All encrypt/decrypt is now being done on the @@ -722,11 +741,13 @@ clients where S3-CSE has not been enabled. NIST. ### Setup +#### 1. CSE-KMS - Generate an AWS KMS Key ID from AWS console for your bucket, with same region as the storage bucket. - If already created, [view the kms key ID by these steps.](https://docs.aws.amazon.com/kms/latest/developerguide/find-cmk-id-arn.html) - Set `fs.s3a.encryption.algorithm=CSE-KMS`. - Set `fs.s3a.encryption.key=`. +- Set `fs.s3a.encryption.cse.kms.region=`. KMS_KEY_ID: @@ -755,6 +776,34 @@ S3-CSE to work. fs.s3a.encryption.key ${KMS_KEY_ID} + + + fs.s3a.encryption.cse.kms.region + ${KMS_REGION} + +``` + +#### 2. CSE-CUSTOM +- Set `fs.s3a.encryption.algorithm=CSE-CUSTOM`. +- Set +`fs.s3a.encryption.cse.custom.cryptographic.material.manager.class.name=`. + +Example for custom keyring implementation +``` +public class CustomKeyring implements Keyring { + public CustomKeyring() { + } + + @Override + public EncryptionMaterials onEncrypt(EncryptionMaterials encryptionMaterials) { + // custom code + } + + @Override + public DecryptionMaterials onDecrypt(DecryptionMaterials decryptionMaterials, + List list) { + // custom code + } ``` ## Troubleshooting Encryption diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md index 4856b0f576026..88f5fcf3d0213 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md @@ -1051,15 +1051,25 @@ file using configured SSE-C keyB into that structure. ## S3 Client Side Encryption +### java.lang.NoClassDefFoundError: software/amazon/encryption/s3/S3EncryptionClient + +With the move to the V2 AWS SDK, CSE is implemented via +[amazon-s3-encryption-client-java](https://github.com/aws/amazon-s3-encryption-client-java/tree/v3.1.1) +which is not packaged in AWS SDK V2 bundle jar and needs to be added separately. + +Fix: add amazon-s3-encryption-client-java jar version 3.1.1 to the class path. + ### Instruction file not found for S3 object -Reading an unencrypted file would fail when read through CSE enabled client. +Reading an unencrypted file would fail when read through CSE enabled client by default. ``` -java.lang.SecurityException: Instruction file not found for S3 object with bucket name: ap-south-cse, key: unencryptedData.txt - +software.amazon.encryption.s3.S3EncryptionClientException: Instruction file not found! +Please ensure the object you are attempting to decrypt has been encrypted +using the S3 Encryption Client. ``` CSE enabled client should read encrypted data only. +Fix: set `fs.s3a.encryption.cse.v1.compatibility.enabled=true` ### CSE-KMS method requires KMS key ID KMS key ID is required for CSE-KMS to encrypt data, not providing one leads @@ -1107,49 +1117,8 @@ Key 'arn:aws:kms:ap-south-1:152813717728:key/' does not exist(Service: AWSKMS; Status Code: 400; Error Code: NotFoundException; Request ID: 279db85d-864d-4a38-9acd-d892adb504c0; Proxy: null) ``` -While generating the KMS Key ID make sure to generate it in the same region - as your bucket. - -### Unable to perform range get request: Range get support has been disabled - -If Range get is not supported for a CSE algorithm or is disabled: -``` -java.lang.SecurityException: Unable to perform range get request: Range get support has been disabled. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html - -``` -Range gets must be enabled for CSE to work. - -### WARNING: Range gets do not provide authenticated encryption properties even when used with an authenticated mode (AES-GCM). - -The S3 Encryption Client is configured to support range get requests. This - warning would be shown everytime S3-CSE is used. -``` -2021-07-14 12:54:09,525 [main] WARN s3.AmazonS3EncryptionClientV2 -(AmazonS3EncryptionClientV2.java:warnOnRangeGetsEnabled(401)) - The S3 -Encryption Client is configured to support range get requests. Range gets do -not provide authenticated encryption properties even when used with an -authenticated mode (AES-GCM). See https://docs.aws.amazon.com/general/latest -/gr/aws_sdk_cryptography.html -``` -We can Ignore this warning since, range gets must be enabled for S3-CSE to -get data. - -### WARNING: If you don't have objects encrypted with these legacy modes, you should disable support for them to enhance security. - -The S3 Encryption Client is configured to read encrypted data with legacy -encryption modes through the CryptoMode setting, and we would see this -warning for all S3-CSE request. - -``` -2021-07-14 12:54:09,519 [main] WARN s3.AmazonS3EncryptionClientV2 -(AmazonS3EncryptionClientV2.java:warnOnLegacyCryptoMode(409)) - The S3 -Encryption Client is configured to read encrypted data with legacy -encryption modes through the CryptoMode setting. If you don't have objects -encrypted with these legacy modes, you should disable support for them to -enhance security. See https://docs.aws.amazon.com/general/latest/gr/aws_sdk_cryptography.html -``` -We can ignore this, since this CryptoMode setting(CryptoMode.AuthenticatedEncryption) -is required for range gets to work. +If S3 bucket region is different from the KMS key region, +set`fs.s3a.encryption.cse.kms.region=` ### `software.amazon.awssdk.services.kms.mode.InvalidKeyUsageException: You cannot generate a data key with an asymmetric CMK` diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java new file mode 100644 index 0000000000000..564e9351feef8 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.List; + +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kms.KmsClient; +import software.amazon.encryption.s3.materials.DecryptionMaterials; +import software.amazon.encryption.s3.materials.EncryptedDataKey; +import software.amazon.encryption.s3.materials.EncryptionMaterials; +import software.amazon.encryption.s3.materials.Keyring; +import software.amazon.encryption.s3.materials.KmsKeyring; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION; +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_KMS_REGION; + +/** + * Custom Keyring implementation. + * This is used for testing {@link ITestS3AClientSideEncryptionCustom}. + */ +public class CustomKeyring implements Keyring { + private final KmsClient kmsClient; + private final Configuration conf; + private final KmsKeyring kmsKeyring; + + + public CustomKeyring(Configuration conf) throws IOException { + this.conf = conf; + String bucket = S3ATestUtils.getFsName(conf); + kmsClient = KmsClient.builder() + .region(Region.of(conf.get(S3_ENCRYPTION_CSE_KMS_REGION, AWS_S3_DEFAULT_REGION))) + .credentialsProvider(new TemporaryAWSCredentialsProvider( + new Path(bucket).toUri(), conf)) + .build(); + kmsKeyring = KmsKeyring.builder() + .kmsClient(kmsClient) + .wrappingKeyId(S3AUtils.getS3EncryptionKey(bucket, conf)) + .build(); + } + + @Override + public EncryptionMaterials onEncrypt(EncryptionMaterials encryptionMaterials) { + return kmsKeyring.onEncrypt(encryptionMaterials); + } + + @Override + public DecryptionMaterials onDecrypt(DecryptionMaterials decryptionMaterials, + List list) { + return kmsKeyring.onDecrypt(decryptionMaterials, list); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java index 4094b22eb1926..5069f949ea221 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java @@ -18,24 +18,34 @@ package org.apache.hadoop.fs.s3a; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.assertj.core.api.Assertions; import org.junit.Test; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.api.RequestFactory; +import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl; import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.store.audit.AuditSpan; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; @@ -44,13 +54,16 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestFileSystem; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetAllEncryptionPropertiesForBucket; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -232,13 +245,8 @@ public void testEncryptionEnabledAndDisabledFS() throws Exception { // CSE enabled FS trying to read unencrypted data would face an exception. try (FSDataInputStream in = cseEnabledFS.open(unEncryptedFilePath)) { - FileStatus encryptedFSFileStatus = - cseEnabledFS.getFileStatus(unEncryptedFilePath); - assertEquals("Mismatch in content length bytes", SMALL_FILE_SIZE, - encryptedFSFileStatus.getLen()); - - intercept(SecurityException.class, "", - "SecurityException should be thrown", + intercept(FileNotFoundException.class, "Instruction file not found!", + "FileNotFoundException should be thrown", () -> { in.read(new byte[SMALL_FILE_SIZE]); return "Exception should be raised if unencrypted data is read by " @@ -266,6 +274,133 @@ public void testEncryptionEnabledAndDisabledFS() throws Exception { } } + /** + * Test to check if unencrypted objects are read with V1 client compatibility. + */ + @Test + public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws Exception { + maybeSkipTest(); + // initialize base s3 client. + Configuration conf = new Configuration(getConfiguration()); + unsetAllEncryptionPropertiesForBucket(conf); + + Path file = methodPath(); + + try (S3AFileSystem nonCseFs = createTestFileSystem(conf)) { + nonCseFs.initialize(getFileSystem().getUri(), conf); + + // write unencrypted file + ContractTestUtils.writeDataset(nonCseFs, file, new byte[SMALL_FILE_SIZE], + SMALL_FILE_SIZE, SMALL_FILE_SIZE, true); + } + + Configuration cseConf = new Configuration(getConfiguration()); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + + // create filesystem with cse enabled and v1 compatibility. + try (S3AFileSystem cseFs = createTestFileSystem(cseConf)) { + cseFs.initialize(getFileSystem().getUri(), cseConf); + + // read unencrypted file. It should not throw any exception. + try (FSDataInputStream in = cseFs.open(file)) { + in.read(new byte[SMALL_FILE_SIZE]); + } + } + } + + /** + * Tests the size of an encrypted object when with V1 compatibility and custom header length. + */ + @Test + public void testSizeOfEncryptedObjectFromHeaderWithV1Compatibility() throws Exception { + maybeSkipTest(); + Configuration cseConf = new Configuration(getConfiguration()); + unsetAllEncryptionPropertiesForBucket(cseConf); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + try (S3AFileSystem fs = createTestFileSystem(cseConf)) { + fs.initialize(getFileSystem().getUri(), cseConf); + + Path filePath = methodPath(); + Path file = new Path(filePath, "file"); + String key = fs.pathToKey(file); + + // write object with random content length header + Map metadata = new HashMap<>(); + metadata.put(AWSHeaders.UNENCRYPTED_CONTENT_LENGTH, "10"); + try (AuditSpan span = span()) { + RequestFactory factory = RequestFactoryImpl.builder() + .withBucket(fs.getBucket()) + .build(); + PutObjectRequest.Builder putObjectRequestBuilder = + factory.newPutObjectRequestBuilder(key, + null, SMALL_FILE_SIZE, false); + putObjectRequestBuilder.contentLength(Long.parseLong(String.valueOf(SMALL_FILE_SIZE))); + putObjectRequestBuilder.metadata(metadata); + fs.putObjectDirect(putObjectRequestBuilder.build(), + PutObjectOptions.deletingDirs(), + new S3ADataBlocks.BlockUploadData(new byte[SMALL_FILE_SIZE], null), + null); + + // check if fetched file length matches with the header. + assertFileLength(fs, file, 10); + } + } + } + + /** + * Tests the size of an unencrypted object when using V1 compatibility mode. + */ + @Test + public void testSizeOfUnencryptedObjectWithV1Compatibility() throws Exception { + maybeSkipTest(); + Configuration conf = new Configuration(getConfiguration()); + unsetAllEncryptionPropertiesForBucket(conf); + conf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, false); + Path file = methodPath(); + try (S3AFileSystem fs = createTestFileSystem(conf)) { + fs.initialize(getFileSystem().getUri(), conf); + + // Unencrypted data written to a path. + ContractTestUtils.writeDataset(fs, file, new byte[SMALL_FILE_SIZE], SMALL_FILE_SIZE, + SMALL_FILE_SIZE, true); + + // check the file size + assertFileLength(fs, file, SMALL_FILE_SIZE); + } + + // initialize encrypted s3 client with support for reading unencrypted objects + Configuration cseConf = new Configuration(getConfiguration()); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + + try (S3AFileSystem cseFs = createTestFileSystem(cseConf)) { + cseFs.initialize(getFileSystem().getUri(), cseConf); + // check the file size + assertFileLength(cseFs, file, SMALL_FILE_SIZE); + } + } + + /** + * Tests the size of an encrypted object when using V1 compatibility mode. + */ + @Test + public void testSizeOfEncryptedObjectWithV1Compatibility() throws Exception { + maybeSkipTest(); + Configuration cseConf = new Configuration(getConfiguration()); + unsetAllEncryptionPropertiesForBucket(cseConf); + cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true); + try (S3AFileSystem fs = createTestFileSystem(cseConf)) { + fs.initialize(getFileSystem().getUri(), cseConf); + + // write encrypted file + Path file = methodPath(); + ContractTestUtils.writeDataset(fs, file, new byte[SMALL_FILE_SIZE], SMALL_FILE_SIZE, + SMALL_FILE_SIZE, true); + // check the file size + assertFileLength(fs, file, SMALL_FILE_SIZE); + } + } + + @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); @@ -294,6 +429,24 @@ protected void validateEncryptionForFileSize(int len) throws IOException { rm(getFileSystem(), path, false, false); } + /** + * Asserts that the length of a file in the given FileSystem matches the expected value. + * + *

This method retrieves the FileStatus of the specified file and compares its length + * to the expected value. It uses AssertJ for the assertion, which provides a detailed + * error message if the assertion fails. + * + * @param fs The FileSystem instance containing the file to be checked. + * @param path The Path to the file whose length is to be verified. + * @param expected The expected length of the file in bytes. + */ + private void assertFileLength(FileSystem fs, Path path, long expected) throws IOException { + FileStatus fileStatus = fs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .describedAs("Length of %s status: %s", path, fileStatus) + .isEqualTo(expected); + } + /** * Skip tests if certain conditions are met. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java new file mode 100644 index 0000000000000..8065ce1b3a759 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.util.Map; + +import org.assertj.core.api.Assertions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.impl.AWSHeaders; +import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; + +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetAllEncryptionPropertiesForBucket; + +/** + * Tests to verify Custom S3 client side encryption CSE-CUSTOM. + */ +public class ITestS3AClientSideEncryptionCustom extends ITestS3AClientSideEncryption { + + private static final String KMS_KEY_WRAP_ALGO = "kms+context"; + /** + * Creating custom configs for CSE-CUSTOM testing. + * + * @return Configuration. + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + unsetAllEncryptionPropertiesForBucket(conf); + conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME, + CustomKeyring.class.getCanonicalName()); + return conf; + } + + @Override + protected void maybeSkipTest() throws IOException { + skipIfEncryptionTestsDisabled(getConfiguration()); + // skip the test if CSE-CUSTOM is not set. + skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_CUSTOM); + } + + + @Override + protected void assertEncrypted(Path path) throws IOException { + Map fsXAttrs = getFileSystem().getXAttrs(path); + String xAttrPrefix = "header."; + + // Assert KeyWrap Algo + Assertions.assertThat(processHeader(fsXAttrs, + xAttrPrefix + AWSHeaders.CRYPTO_KEYWRAP_ALGORITHM)) + .describedAs("Key wrap algo") + .isEqualTo(KMS_KEY_WRAP_ALGO); + } + + /** + * A method to process a FS xAttribute Header key by decoding it. + * + * @param fsXAttrs Map of String(Key) and bytes[](Value) to represent fs + * xAttr. + * @param headerKey Key value of the header we are trying to process. + * @return String representing the value of key header provided. + */ + private String processHeader(Map fsXAttrs, + String headerKey) { + return HeaderProcessing.decodeBytes(fsXAttrs.get(headerKey)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java index 967ba885bc90f..24115177f35a2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java @@ -32,6 +32,7 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkClient; import software.amazon.awssdk.core.client.config.SdkClientConfiguration; import software.amazon.awssdk.core.client.config.SdkClientOption; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; @@ -42,6 +43,7 @@ import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.model.StsException; +import software.amazon.encryption.s3.S3EncryptionClient; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; @@ -370,6 +372,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() conf = new Configuration(); skipIfCrossRegionClient(conf); + unsetEncryption(conf); conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true)); assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false)); @@ -411,6 +414,7 @@ public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty() public void testDefaultUserAgent() throws Exception { conf = new Configuration(); skipIfCrossRegionClient(conf); + unsetEncryption(conf); fs = S3ATestUtils.createTestFileSystem(conf); assertNotNull(fs); S3Client s3 = getS3Client("User Agent"); @@ -425,6 +429,7 @@ public void testDefaultUserAgent() throws Exception { public void testCustomUserAgent() throws Exception { conf = new Configuration(); skipIfCrossRegionClient(conf); + unsetEncryption(conf); conf.set(Constants.USER_AGENT_PREFIX, "MyApp"); fs = S3ATestUtils.createTestFileSystem(conf); assertNotNull(fs); @@ -446,7 +451,10 @@ public void testRequestTimeout() throws Exception { Duration timeout = Duration.ofSeconds(120); conf.set(REQUEST_TIMEOUT, timeout.getSeconds() + "s"); fs = S3ATestUtils.createTestFileSystem(conf); - S3Client s3 = getS3Client("Request timeout (ms)"); + SdkClient s3 = getS3Client("Request timeout (ms)"); + if (s3 instanceof S3EncryptionClient) { + s3 = ((S3EncryptionClient) s3).delegate(); + } SdkClientConfiguration clientConfiguration = getField(s3, SdkClientConfiguration.class, "clientConfiguration"); Assertions.assertThat(clientConfiguration.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT)) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java index 2be3fe8889624..2c191a2d1865f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; @@ -40,7 +41,8 @@ protected Configuration createConfiguration() { Configuration c = new Configuration(); String kmsKey = S3AUtils.getS3EncryptionKey(getTestBucketName(c), c); // skip the test if SSE-KMS or KMS key not set. - if (StringUtils.isBlank(kmsKey)) { + if (StringUtils.isBlank(kmsKey) || !SSE_KMS.getMethod() + .equals(c.get(S3_ENCRYPTION_ALGORITHM))) { skip(S3_ENCRYPTION_KEY + " is not set for " + SSE_KMS.getMethod()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java index 80b061de03183..07caeb02f416a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEndpointRegion.java @@ -52,6 +52,7 @@ import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT; import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; +import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.ERROR_ENDPOINT_WITH_FIPS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -483,7 +484,8 @@ public void testCentralEndpointAndNullRegionWithCRUD() throws Throwable { newConf, ENDPOINT, AWS_REGION, - FIPS_ENDPOINT); + FIPS_ENDPOINT, + S3_ENCRYPTION_ALGORITHM); newConf.set(ENDPOINT, CENTRAL_ENDPOINT); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java index 20e5c12ec79cf..0c6acb4bb08f8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java @@ -92,7 +92,7 @@ public static IdKey createPartUpload(S3AFileSystem fs, String key, int len, InputStream in = new ByteArrayInputStream(data); String uploadId = writeHelper.initiateMultiPartUpload(key, PutObjectOptions.keepingDirs()); UploadPartRequest req = writeHelper.newUploadPartRequestBuilder(key, uploadId, - partNo, len).build(); + partNo, true, len).build(); RequestBody body = RequestBody.fromInputStream(in, len); UploadPartResponse response = writeHelper.uploadPart(req, body, null); LOG.debug("uploaded part etag {}, upid {}", response.eTag(), uploadId); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 3a3f875f5f0d5..66b25a054741e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -635,6 +635,36 @@ public static void print(Logger log, S3ATestUtils.MetricDiff... metrics) { } } + /** + * Unset encryption options. + * @param conf configuration + */ + public static void unsetEncryption(Configuration conf) { + removeBaseAndBucketOverrides(conf, S3_ENCRYPTION_ALGORITHM); + } + + /** + * Removes all encryption-related properties for a specific S3 bucket from given configuration. + * + *

This method unsets various encryption settings specific to the test bucket. It removes + * bucket-specific overrides for multiple encryption-related properties, including both + * client-side and server-side encryption settings. + * + * @param conf The Configuration object from which to remove the encryption properties. + * This object will be modified by this method. + */ + public static void unsetAllEncryptionPropertiesForBucket(Configuration conf) { + removeBucketOverrides(getTestBucketName(conf), + conf, + S3_ENCRYPTION_ALGORITHM, + S3_ENCRYPTION_KEY, + SERVER_SIDE_ENCRYPTION_ALGORITHM, + SERVER_SIDE_ENCRYPTION_KEY, + S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME, + S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, + S3_ENCRYPTION_CSE_KMS_REGION); + } + /** * Print all metrics in a list, then reset them. * @param log log to print the metrics to. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java index 5caa7c8785534..dc1cb2f54bfa6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java @@ -107,11 +107,11 @@ public void testWriteOperationHelperPartLimits() throws Throwable { // first one works String key = "destKey"; woh.newUploadPartRequestBuilder(key, - "uploadId", 1, 1024); + "uploadId", 1, false, 1024); // but ask past the limit and a PathIOE is raised intercept(PathIOException.class, key, () -> woh.newUploadPartRequestBuilder(key, - "uploadId", 50000, 1024)); + "uploadId", 50000, true, 1024)); } static class StreamClosedException extends ClosedIOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java index 71be373b407ed..7488de41ce638 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java @@ -34,6 +34,7 @@ import java.util.UUID; import java.util.stream.Collectors; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.util.Sets; import org.assertj.core.api.Assertions; import org.junit.FixMethodOrder; @@ -80,6 +81,7 @@ import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID; import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS; +import static org.apache.hadoop.mapred.JobConf.MAPRED_TASK_ENV; /** * Test an MR Job with all the different committers. @@ -338,6 +340,8 @@ public void test_200_execute() throws Exception { @Override protected void applyCustomConfigOptions(final JobConf jobConf) throws IOException { + jobConf.set(MAPRED_TASK_ENV, "AWS_REGION=" + jobConf.get(Constants.AWS_REGION)); + jobConf.set("yarn.app.mapreduce.am.env", "AWS_REGION=" + jobConf.get(Constants.AWS_REGION)); committerTestBinding.applyCustomConfigOptions(jobConf); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index fbad671e1fa66..1724006a83198 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -13,7 +13,6 @@ */ package org.apache.hadoop.fs.s3a.fileContext; -import java.io.IOException; import java.net.URI; import org.slf4j.Logger; @@ -24,7 +23,6 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AEncryptionMethods; import org.apache.hadoop.fs.s3a.S3ATestUtils; import org.apache.hadoop.fs.s3a.auth.STSClientFactory; @@ -32,12 +30,6 @@ import org.junit.Assert; import org.junit.Before; -import static org.apache.hadoop.fs.s3a.S3ATestConstants.KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm; -import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey; -import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH; - /** * S3a implementation of FCStatisticsBaseTest. */ @@ -73,30 +65,12 @@ protected void verifyReadBytes(FileSystem.Statistics stats) { /** * A method to verify the bytes written. - *
- * NOTE: if Client side encryption is enabled, expected bytes written - * should increase by 16(padding of data) + bytes for the key ID set + 94(KMS - * key generation) in case of storage type CryptoStorageMode as - * ObjectMetadata(Default). If Crypto Storage mode is instruction file then - * add additional bytes as that file is stored separately and would account - * for bytes written. - * * @param stats Filesystem statistics. */ @Override - protected void verifyWrittenBytes(FileSystem.Statistics stats) - throws IOException { + protected void verifyWrittenBytes(FileSystem.Statistics stats) { //No extra bytes are written - long expectedBlockSize = blockSize; - if (S3AEncryptionMethods.CSE_KMS.getMethod() - .equals(getEncryptionAlgorithm(getTestBucketName(conf), conf) - .getMethod())) { - String keyId = getS3EncryptionKey(getTestBucketName(conf), conf); - // Adding padding length and KMS key generation bytes written. - expectedBlockSize += CSE_PADDING_LENGTH + keyId.getBytes().length + - KMS_KEY_GENERATION_REQUEST_PARAMS_BYTES_WRITTEN; - } - Assert.assertEquals("Mismatch in bytes written", expectedBlockSize, + Assert.assertEquals("Mismatch in bytes written", blockSize, stats.getBytesWritten()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java index 9ca12e4f31a60..a4cc5cadc5da0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java @@ -145,6 +145,7 @@ public void teardown() throws Exception { */ @Test public void testGeneratePoolTimeouts() throws Throwable { + skipIfClientSideEncryption(); AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); Configuration conf = timingOutConfiguration(); Path path = methodPath(); @@ -186,6 +187,7 @@ public void testGeneratePoolTimeouts() throws Throwable { */ @Test public void testObjectUploadTimeouts() throws Throwable { + skipIfClientSideEncryption(); AWSClientConfig.setMinimumOperationDuration(Duration.ZERO); final Path dir = methodPath(); Path file = new Path(dir, "file"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestClientManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestClientManager.java index 857df58f42bb1..a807cf6c4cbf0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestClientManager.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestClientManager.java @@ -119,6 +119,7 @@ private StubS3ClientFactory factory(final InvocationRaisingIOE invocationRaising private ClientManager manager(final StubS3ClientFactory factory) { return new ClientManagerImpl( factory, + null, new S3ClientFactory.S3ClientCreationParameters() .withPathUri(uri), StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java index 3a4994897a6b9..60a3a165e171c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestErrorTranslation.java @@ -30,15 +30,18 @@ import org.junit.Test; import software.amazon.awssdk.awscore.retry.conditions.RetryOnErrorCodeCondition; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.retry.RetryPolicyContext; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.encryption.s3.S3EncryptionClientException; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException; import org.apache.hadoop.test.AbstractHadoopTestBase; import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeExtractIOException; +import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.maybeProcessEncryptionClientException; import static org.apache.hadoop.test.LambdaTestUtils.intercept; -import static org.junit.Assert.assertTrue; /** * Unit tests related to the {@link ErrorTranslation} class. @@ -128,8 +131,34 @@ public void testNoConstructorExtraction() throws Throwable { }); } + @Test + public void testEncryptionClientExceptionExtraction() throws Throwable { + intercept(NoSuchKeyException.class, () -> { + throw maybeProcessEncryptionClientException( + new S3EncryptionClientException("top", + new S3EncryptionClientException("middle", NoSuchKeyException.builder().build()))); + }); + } + + @Test + public void testNonEncryptionClientExceptionExtraction() throws Throwable { + intercept(SdkException.class, () -> { + throw maybeProcessEncryptionClientException( + sdkException("top", sdkException("middle", NoSuchKeyException.builder().build()))); + }); + } + + @Test + public void testEncryptionClientExceptionExtractionWithRTE() throws Throwable { + intercept(S3EncryptionClientException.class, () -> { + throw maybeProcessEncryptionClientException( + new S3EncryptionClientException("top", new UnsupportedOperationException())); + }); + } + + - public static final class NoConstructorIOE extends IOException { + public static final class NoConstructorIOE extends IOException { public static final String MESSAGE = "no-arg constructor"; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index b864cd3b63982..c779062f518f7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -200,11 +200,13 @@ public void testMultipartUploadRequest() throws Throwable { String path = "path"; String id = "1"; - a(factory.newUploadPartRequestBuilder(path, id, 1, 0)); - a(factory.newUploadPartRequestBuilder(path, id, 2, 128_000_000)); + a(factory.newUploadPartRequestBuilder(path, id, 1, false, 0)); + a(factory.newUploadPartRequestBuilder(path, id, 2, false, + 128_000_000)); // partNumber is past the limit intercept(PathIOException.class, () -> - factory.newUploadPartRequestBuilder(path, id, 3, 128_000_000)); + factory.newUploadPartRequestBuilder(path, id, 3, true, + 128_000_000)); assertThat(countRequests.counter.get()) .describedAs("request preparation count") @@ -241,7 +243,9 @@ public void testDefaultUploadTimeouts() throws Throwable { .withMultipartPartCountLimit(2) .build(); final UploadPartRequest upload = - factory.newUploadPartRequestBuilder("path", "id", 2, 128_000_000).build(); + factory.newUploadPartRequestBuilder("path", "id", 2, + true, 128_000_000) + .build(); assertApiTimeouts(DEFAULT_PART_UPLOAD_TIMEOUT, upload); } @@ -266,7 +270,7 @@ public void testUploadTimeouts() throws Throwable { // multipart part final UploadPartRequest upload = factory.newUploadPartRequestBuilder(path, - "1", 3, 128_000_000) + "1", 3, false, 128_000_000) .build(); assertApiTimeouts(partDuration, upload); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java index 2a8cfc663a53a..504f701d00084 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.services.s3.model.S3Object; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileStatus; import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; @@ -70,6 +72,12 @@ public long getDefaultBlockSize(Path path) { return 0; } + + @Override + public long getObjectSize(S3Object s3Object) throws IOException { + return 0; + } + @Override public int getMaxKeys() { return 0;