Skip to content

Commit

Permalink
Merge large-file caching fix from integration for release
Browse files Browse the repository at this point in the history
  • Loading branch information
RayPlante committed Mar 4, 2025
2 parents b2de2bc + f768c2f commit 268f767
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [integration]
jobs:
testall:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [main]
jobs:
testall:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/source.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
- '.github/workflows/source.yml'
jobs:
buildtest:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
# strategy:
# matrix:
# java: [ '8' ]
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/testall.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: testall
on: [pull_request, workflow_dispatch]
jobs:
testall:
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- name: Checkout repository
uses: actions/checkout@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,25 @@
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;

import org.json.JSONException;
import java.util.List;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkServiceException;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
Expand All @@ -47,6 +54,8 @@
import gov.nist.oar.distrib.StorageVolumeException;
import gov.nist.oar.distrib.cachemgr.CacheObject;
import gov.nist.oar.distrib.cachemgr.CacheVolume;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;

/**
* an implementation of the CacheVolume interface that stores its data
Expand All @@ -66,6 +75,7 @@ public class AWSS3CacheVolume implements CacheVolume {
public final String name;
protected S3Client s3client = null;
protected String baseurl = null;
private static final Logger logger = LoggerFactory.getLogger(AWSS3CacheVolume.class);

/**
* create the storage instance
Expand Down Expand Up @@ -283,9 +293,16 @@ public boolean exists(String name) throws StorageVolumeException {
}

/**
* save a copy of the named object to this storage volume. If an object
* Saves a copy of the named object to this storage volume. If an object
* already exists in the volume with this name, it will be replaced.
* <p>
* <strong>Note:</strong> This implementation now uses multipart uploads instead of a single
* PUT operation. This change is necessary because AWS S3 has a hard limit of 5GB per single PUT.
* Multipart uploads also impose a maximum of 10,000 parts per upload, so developers should
* pay careful attention to the part size used. In this implementation, each part is set to 50MB,
* but if a file is extremely large, consider adjusting the part size to avoid exceeding the
* 10,000 parts limit. <strong>TODO:</strong> Make the part size configurable.
* <p>
* This implementation will look for three metadata properties that will be
* incorporated into
* the S3 transfer request for robustness:
Expand Down Expand Up @@ -318,63 +335,122 @@ public void saveAs(InputStream from, String name, JSONObject md) throws StorageV
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("AWSS3CacheVolume.saveAs(): must provide name");
}

long size = -1L;
String contentType = null;
String contentMD5 = null;

// Extract metadata
if (md != null) {
try {
size = md.getLong("size");
} catch (Exception e) {
// ignore, size is required
logger.warn("Failed to retrieve size from metadata, size is required.");
}
contentType = md.optString("contentType", null);
contentMD5 = md.optString("contentMD5", null);
}

if (size <= 0) {
throw new IllegalArgumentException("AWSS3CacheVolume.saveAs(): metadata must include size property");
}


logger.info("Starting upload: {} (Size: {} bytes)", name, size);

try {
// Validate MD5 checksum if provided
// If an MD5 checksum is provided, validate it first.
if (contentMD5 != null) {
InputStream markableInputStream = from.markSupported() ? from : new BufferedInputStream(from);
markableInputStream.mark((int) size); // Mark the stream for reset
markableInputStream.mark((int) size);
String calculatedMD5 = calculateMD5(markableInputStream, size);
if (!calculatedMD5.equals(contentMD5)) {
throw new StorageVolumeException("MD5 checksum mismatch for object: " + s3name(name));
}
markableInputStream.reset(); // Reset the stream for the actual upload
from = markableInputStream; // Ensure the validated stream is used
markableInputStream.reset();
from = markableInputStream;
}

// Prepare the PutObjectRequest
PutObjectRequest.Builder putRequestBuilder = PutObjectRequest.builder()
// Build the CreateMultipartUpload request
CreateMultipartUploadRequest.Builder createRequestBuilder = CreateMultipartUploadRequest.builder()
.bucket(bucket)
.key(s3name(name))
.contentLength(size);

.key(s3name(name));

if (contentType != null) {
putRequestBuilder.contentType(contentType);
createRequestBuilder.contentType(contentType);
}
if (contentMD5 != null) {
putRequestBuilder.contentMD5(contentMD5);
}

// Add Content-Disposition header (e.g., file name for web servers)
if (name.endsWith("/")) {
name = name.substring(0, name.length() - 1);

// Start multipart upload
CreateMultipartUploadResponse createResponse = s3client.createMultipartUpload(createRequestBuilder.build());
String uploadId = createResponse.uploadId();

List<CompletedPart> completedParts = new ArrayList<>();
final int partSize = 50 * 1024 * 1024; // 50MB
byte[] buffer = new byte[partSize];
int partNumber = 1;

try {
while (true) {
int totalRead = 0;

// Make sure we read exactly partSize (50MB) before uploading
while (totalRead < partSize) {
int bytesRead = from.read(buffer, totalRead, partSize - totalRead);
if (bytesRead == -1) {
break; // End of stream
}
totalRead += bytesRead;
}

if (totalRead == 0) {
break; // No more data to upload
}

// Upload only when we have a reasonable part size
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, totalRead);

UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucket)
.key(s3name(name))
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength((long) totalRead)
.build();

UploadPartResponse uploadPartResponse = s3client.uploadPart(
uploadPartRequest,
RequestBody.fromByteBuffer(byteBuffer));

completedParts.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.eTag())
.build());

partNumber++;
}
} catch (Exception e) {
s3client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
.bucket(bucket)
.key(s3name(name))
.uploadId(uploadId)
.build());
throw e;
}
String[] nameFields = name.split("/");
putRequestBuilder.contentDisposition(nameFields[nameFields.length - 1]);

// Perform the upload
s3client.putObject(putRequestBuilder.build(), RequestBody.fromInputStream(from, size));

// Update metadata if provided

// Complete the multipart upload
CompletedMultipartUpload completedMultipartUpload = CompletedMultipartUpload.builder()
.parts(completedParts)
.build();

s3client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucket)
.key(s3name(name))
.uploadId(uploadId)
.multipartUpload(completedMultipartUpload)
.build());

logger.info("Multipart upload completed successfully for {}. Total parts uploaded: {}", name, completedParts.size());

// Update metadata if provided.
if (md != null) {
CacheObject co = get(name);
long modifiedTime = co.getLastModified();
Expand All @@ -386,14 +462,15 @@ public void saveAs(InputStream from, String name, JSONObject md) throws StorageV
}
}
} catch (S3Exception e) {
if (e.awsErrorDetails() != null && e.awsErrorDetails().errorCode().equals("InvalidDigest")) {
if (e.awsErrorDetails() != null && "InvalidDigest".equals(e.awsErrorDetails().errorCode())) {
logger.error("MD5 checksum mismatch for {}", s3name(name));
throw new StorageVolumeException("MD5 checksum mismatch for object: " + s3name(name), e);
}
throw new StorageVolumeException("Failed to upload object: " + s3name(name) + " (" + e.getMessage() + ")",
e);
logger.error("Failed to upload object {}: {}", s3name(name), e.getMessage());
throw new StorageVolumeException("Failed to upload object: " + s3name(name) + " (" + e.getMessage() + ")", e);
} catch (Exception e) {
throw new StorageVolumeException("Unexpected error saving object " + s3name(name) + ": " + e.getMessage(),
e);
logger.error("Unexpected error saving object {}: {}", s3name(name), e.getMessage());
throw new StorageVolumeException("Unexpected error saving object " + s3name(name) + ": " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,17 @@ public void testResolveComponentID() throws OARServiceException {
assertEquals("1491_optSortSphEvaluated20160701.cdf.sha256", rec.getString("filepath"));
assertEquals(6, reslvr.getCacheSize());

rec = reslvr.resolveComponentID("ark:/88434/mds00hw91v#doi:10.18434/T4SW26");
assertNotNull(rec, "Failed to find component");
assertEquals("#doi:10.18434/T4SW26", rec.getString("@id"));
assertEquals("https://doi.org/10.18434/T4SW26", rec.getString("accessURL"));
assertEquals(6, reslvr.getCacheSize());

// NOTE: This test has been temporarily disabled because it depends on live records metadata from the RMM service,
// which changes frequently. In this specific test, we are checking that a DOI component can be resolved.
// But the DOI component is now of type "nrd:Hidden" and so it is excluded from the cache, which causes
// the test to fail. For more stable unit tests, we should consider using fixed test data.
//
// rec = reslvr.resolveComponentID("ark:/88434/mds00hw91v/#doi:10.18434/T4SW26");
// assertNotNull(rec, "Failed to find component");
// assertEquals("#doi:10.18434/T4SW26", rec.getString("@id"));
// assertEquals("https://doi.org/10.18434/T4SW26", rec.getString("accessURL"));
// assertEquals(6, reslvr.getCacheSize());

rec = reslvr.resolveComponentID("ark:/88434/mds00hw91v/cmps/goober");
assertNull(rec, "Found bogus component");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -509,6 +510,32 @@ public void getSaveObject() throws StorageVolumeException {
assertTrue(objectExists(bucket, objname2));
}

/**
* Verify that a large file (defined as larger than 5MB) can be saved
* via the saveAs() method. This test makes sure that the underlying
* S3 multipart upload is working.
*/
@Test
public void testSaveAsLargeFile() throws StorageVolumeException {
// Define a file size larger than 5MB, for example 20MB.
long fileSize = 20L * 1024 * 1024;
JSONObject md = new JSONObject();
md.put("size", fileSize);
md.put("contentType", "application/octet-stream");

// Create a simulated large input stream. Here it just returns the byte 'a'
try (InputStream is = new LargeInputStream(fileSize, (byte) 'a')) {
// Call upload method
s3cv.saveAs(is, "large-file.dat", md);
} catch (IOException e) {
fail("Unexpected IOException: " + e.getMessage());
}

// Verify that the object exists in the bucket.
String key = folder + "/large-file.dat";
assertTrue(objectExists(bucket, key), "Large file was not uploaded");
}

@Test
public void testRedirectForUnsupported()
throws StorageVolumeException, UnsupportedOperationException, IOException
Expand Down
Loading

0 comments on commit 268f767

Please sign in to comment.