Skip to content

Commit

Permalink
computer checksums for s3 uploads to support object locking
Browse files Browse the repository at this point in the history
  • Loading branch information
smiklosovic committed Mar 12, 2024
1 parent 8cdb00f commit fa48dad
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 5 deletions.
16 changes: 14 additions & 2 deletions src/main/java/com/instaclustr/esop/impl/hash/HashSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.InputStream;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Base64;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
Expand Down Expand Up @@ -42,6 +43,8 @@ public HashAlgorithm convert(final String value) {
public interface Hasher {

String getHash(InputStream is) throws Exception;

String getHash(byte[] digest) throws Exception;
}

private static class SHAHasher implements Hasher {
Expand All @@ -66,10 +69,14 @@ public String getHash(InputStream is) throws Exception

byte[] bytes = digest.digest();

return getHash(bytes);
}

@Override
public String getHash(byte[] digest) throws Exception {
final StringBuilder sb = new StringBuilder();

//This bytes[] has bytes in decimal format, convert it to hexadecimal format
for (final byte aByte : bytes) {
for (final byte aByte : digest) {
sb.append(Integer.toString((aByte & 0xff) + 0x100, 16).substring(1));
}

Expand All @@ -92,6 +99,11 @@ public String getHash(InputStream is) throws Exception

return Long.toString(checksum.getValue());
}

@Override
public String getHash(byte[] digest) throws Exception {
throw new UnsupportedOperationException();
}
}

public enum HashAlgorithm {
Expand Down
53 changes: 50 additions & 3 deletions src/main/java/com/instaclustr/esop/s3/v2/BaseS3Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.stream.Collectors;

import com.instaclustr.esop.impl.hash.HashSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,6 +29,7 @@
import software.amazon.awssdk.core.waiters.WaiterOverrideConfiguration;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
Expand All @@ -39,10 +43,11 @@
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingRequest;
import software.amazon.awssdk.services.s3.model.PutObjectTaggingResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.SdkPartType;
import software.amazon.awssdk.services.s3.model.StorageClass;
Expand Down Expand Up @@ -177,7 +182,9 @@ public void uploadText(String text, RemoteObjectReference objectReference) throw
byte[] bytes = text.getBytes(UTF_8);

s3Clients.getNonEncryptingClient()
.putObject(getPutObjectRequest(objectReference, bytes.length),
.putObject(getPutObjectRequest(objectReference,
bytes.length,
getDigest(prepareMessageDigest().digest(bytes))),
RequestBody.fromBytes(bytes));
}

Expand All @@ -192,18 +199,22 @@ public void uploadEncryptedText(String plainText, RemoteObjectReference objectRe
byte[] bytes = plainText.getBytes(UTF_8);

s3Clients.getEncryptingClient().get()
.putObject(getPutObjectRequest(objectReference, bytes.length),
.putObject(getPutObjectRequest(objectReference,
bytes.length,
getDigest(prepareMessageDigest().digest(bytes))),
RequestBody.fromBytes(bytes));
}

private PutObjectRequest getPutObjectRequest(RemoteObjectReference s3RemoteObjectReference,
long unencryptedSize,
String checksumSHA256,
Tag... tags) {
return PutObjectRequest.builder()
.bucket(request.storageLocation.bucket)
.key(s3RemoteObjectReference.canonicalPath)
.storageClass(StorageClass.STANDARD_IA)
.tagging(Tagging.builder().tagSet(tags).build())
.checksumSHA256(checksumSHA256)
.build();
}

Expand All @@ -217,12 +228,15 @@ private void uploadFile(S3Client s3Client,
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.tagging(tagging)
.checksumAlgorithm(ChecksumAlgorithm.SHA256)
.build();

CreateMultipartUploadResponse multipartUploadResponse = s3Client.createMultipartUpload(multipartUploadRequest);

String uploadId = multipartUploadResponse.uploadId();

MessageDigest sha256 = prepareMessageDigest();

try
{
long partSize = Long.parseLong(System.getProperty("upload.max.part.size", Long.toString(100 * 1024 * 1024)));
Expand All @@ -244,6 +258,7 @@ private void uploadFile(S3Client s3Client,
.key(objectReference.canonicalPath)
.uploadId(uploadId)
.partNumber(partNumber)
.checksumAlgorithm(ChecksumAlgorithm.SHA256)
.sdkPartType(partNumber == numberOfParts ? SdkPartType.LAST : SdkPartType.DEFAULT)
.build();

Expand All @@ -253,7 +268,10 @@ private void uploadFile(S3Client s3Client,
completedParts.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(partResponse.eTag())
.checksumSHA256(partResponse.checksumSHA256())
.build());

sha256.update(byteBuffer);
}

// Complete the multipart upload
Expand Down Expand Up @@ -284,6 +302,22 @@ private void uploadFile(S3Client s3Client,

logger.debug("Object under key " + objectReference.canonicalPath + " exists");

Tag checksumTag = Tag.builder()
.key("fullObjectChecksum")
.value(HashSpec.HashAlgorithm.SHA_256.getHasher().getHash(sha256.digest()))
.build();

PutObjectTaggingResponse putObjectTaggingResponse = s3Client.putObjectTagging(PutObjectTaggingRequest.builder()
.bucket(request.storageLocation.bucket)
.key(objectReference.canonicalPath)
.tagging(Tagging.builder().tagSet(checksumTag).build()).build());

if (!putObjectTaggingResponse.sdkHttpResponse().isSuccessful()) {
throw new RuntimeException(String.format("Unsuccessful tagging of %s with checksum, upload id %s", objectReference.canonicalPath, uploadId));
} else {
logger.debug("Tagged {} with {}", objectReference.canonicalPath, checksumTag.toString());
}

if (s3Clients.hasEncryptingClient()) {
try {
GetObjectAttributesResponse objectAttributes = s3Clients.getNonEncryptingClient()
Expand All @@ -295,6 +329,7 @@ private void uploadFile(S3Client s3Client,
.build());

manifestEntry.size = objectAttributes.objectSize();
manifestEntry.hash = Base64.getEncoder().encodeToString(sha256.digest());
}
catch (Throwable t) {
logger.warn("Unable to get attribute {} for key {} by GetObjectAttributes request. Please check your permissions.",
Expand Down Expand Up @@ -368,4 +403,16 @@ public void abortMultipartUpload(String uploadId,
}
}
}

private static MessageDigest prepareMessageDigest() {
try {
return MessageDigest.getInstance("SHA-256");
} catch (Throwable t) {
throw new IllegalStateException("Unable to get instance of SHA-256 message digest");
}
}

private String getDigest(byte[] digest) {
return Base64.getEncoder().encodeToString(digest);
}
}

0 comments on commit fa48dad

Please sign in to comment.